实现功能:一条消息发送给多个消费者
交换机模式:topic
相比于direct匹配模式,匹配routingKey时,topic模式下不仅支持完全匹配,还支持两种特殊的匹配方式
#:可以匹配一个或多个字符
*:可以匹配一个字符
生产者:
package com.example.demo.queue.exchangeToQueue.topic;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Producer { private static final String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { // 获取连接 connection = ConnectionUtil.getConnection(); // 创建通道 channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = "debug"; // 生产者发送的信息 String msg = "routingKey="+routingKey; System.out.println("send msg : "+msg); // 发送信息 channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } // 关闭连接 try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
消费者1:
package com.example.demo.queue.exchangeToQueue.topic;import java.io.IOException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer01 { private static final String EXCHANGE_NAME = "exchange_topic"; private static final String QUEUE_NAME = "topic_exchange_to_queue_01"; public static void main(String[] args) { try { // 获取连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); String routingKey = "debug"; // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[1]:receive msg:"+msg); System.out.println("[1]:deal msg successful. routingKey="+routingKey); } }; // 接收信息 channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } }
消费者2:
package com.example.demo.queue.exchangeToQueue.topic;import java.io.IOException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer02 { private static final String EXCHANGE_NAME = "exchange_topic"; private static final String QUEUE_NAME = "topic_exchange_to_queue_02"; public static void main(String[] args) { try { // 获取连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); String routingKey = "debug.*"; // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[2]:receive msg:"+msg); System.out.println("[2]:deal msg successful. routingKey="+routingKey); } }; // 接收信息 channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } }
消费者3:
package com.example.demo.queue.exchangeToQueue.topic;import java.io.IOException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer03 { private static final String EXCHANGE_NAME = "exchange_topic"; private static final String QUEUE_NAME = "topic_exchange_to_queue_03"; public static void main(String[] args) { try { // 获取连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); String routingKey = "debug.#"; // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[3]:receive msg:"+msg); System.out.println("[3]:deal msg successful. routingKey="+routingKey); } }; // 接收信息 channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } }
和之前一样,涉及到了交换机,而在消费者类中中并没有声明交换机,所以需要先执行生产者类的main方法
接下来,我们就可以依次执行三个消费者类的main方法,之后再执行生产者类的main方法。
生产者指定的routingKey为【debug】,而消费者指定的routingKey依次为【debug】【debug.*】【debug.#】,理论上应该只有消费者1才能收到消息。下面来展示结果:
消费者1终端:
消费者2终端:
消费者3终端:
我也看不懂了,消费者3怎么也收到了消息,晕那。。。。。。。。