博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitmq学习记录(七)交换机Exchange-topic
阅读量:6035 次
发布时间:2019-06-20

本文共 5629 字,大约阅读时间需要 18 分钟。

hot3.png

实现功能:一条消息发送给多个消费者

交换机模式:topic

相比于direct匹配模式,匹配routingKey时,topic模式下不仅支持完全匹配,还支持两种特殊的匹配方式

#:可以匹配一个或多个字符

*:可以匹配一个字符

637de24a52c8f0fef3806e89927c2dc086f.jpg

生产者:

package com.example.demo.queue.exchangeToQueue.topic;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Producer {	private static final String EXCHANGE_NAME = "exchange_topic"; 		public static void main(String[] args) {		Connection connection = null;		Channel channel = null;		try {			// 获取连接			connection = ConnectionUtil.getConnection();			// 创建通道			channel = connection.createChannel();			// 声明交换机			channel.exchangeDeclare(EXCHANGE_NAME, "topic");			String routingKey = "debug";			// 生产者发送的信息			String msg = "routingKey="+routingKey;			System.out.println("send msg : "+msg);			// 发送信息			channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());		} catch (Exception e) {			e.printStackTrace();		} finally {			// 关闭通道			try {				channel.close();			} catch (IOException e) {				e.printStackTrace();			} catch (TimeoutException e) {				e.printStackTrace();			}			// 关闭连接			try {				connection.close();			} catch (IOException e) {				e.printStackTrace();			}		}			}	}

消费者1:

package com.example.demo.queue.exchangeToQueue.topic;import java.io.IOException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer01 {	private static final String EXCHANGE_NAME = "exchange_topic"; 		private static final String QUEUE_NAME = "topic_exchange_to_queue_01";		public static void main(String[] args) {		try {			// 获取连接			Connection connection = ConnectionUtil.getConnection();			// 创建通道			final Channel channel = connection.createChannel();			// 声明队列			channel.queueDeclare(QUEUE_NAME, true, false, false, null);			String routingKey = "debug";			// 绑定队列到交换机			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);			// 定义消费者			DefaultConsumer consumer = new DefaultConsumer(channel) {				@Override				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {					String msg = new String(body,"UTF-8");					System.out.println("[1]:receive msg:"+msg);					System.out.println("[1]:deal msg successful. routingKey="+routingKey);				}							};			// 接收信息			channel.basicConsume(QUEUE_NAME, true, consumer);		} catch (Exception e) {			e.printStackTrace();		}	}	}

消费者2:

package com.example.demo.queue.exchangeToQueue.topic;import java.io.IOException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer02 {	private static final String EXCHANGE_NAME = "exchange_topic"; 		private static final String QUEUE_NAME = "topic_exchange_to_queue_02";		public static void main(String[] args) {		try {			// 获取连接			Connection connection = ConnectionUtil.getConnection();			// 创建通道			final Channel channel = connection.createChannel();			// 声明队列			channel.queueDeclare(QUEUE_NAME, true, false, false, null);			String routingKey = "debug.*";			// 绑定队列到交换机			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);			// 定义消费者			DefaultConsumer consumer = new DefaultConsumer(channel) {				@Override				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {					String msg = new String(body,"UTF-8");					System.out.println("[2]:receive msg:"+msg);					System.out.println("[2]:deal msg successful. routingKey="+routingKey);				}			};			// 接收信息			channel.basicConsume(QUEUE_NAME, true, consumer);		} catch (Exception e) {			e.printStackTrace();		}	}	}

消费者3:

package com.example.demo.queue.exchangeToQueue.topic;import java.io.IOException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer03 {	private static final String EXCHANGE_NAME = "exchange_topic"; 		private static final String QUEUE_NAME = "topic_exchange_to_queue_03";		public static void main(String[] args) {		try {			// 获取连接			Connection connection = ConnectionUtil.getConnection();			// 创建通道			final Channel channel = connection.createChannel();			// 声明队列			channel.queueDeclare(QUEUE_NAME, true, false, false, null);			String routingKey = "debug.#";			// 绑定队列到交换机			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);			// 定义消费者			DefaultConsumer consumer = new DefaultConsumer(channel) {				@Override				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {					String msg = new String(body,"UTF-8");					System.out.println("[3]:receive msg:"+msg);					System.out.println("[3]:deal msg successful. routingKey="+routingKey);				}			};			// 接收信息			channel.basicConsume(QUEUE_NAME, true, consumer);		} catch (Exception e) {			e.printStackTrace();		}	}	}

和之前一样,涉及到了交换机,而在消费者类中中并没有声明交换机,所以需要先执行生产者类的main方法

ab15f9d764bf0d51f24a2c5f3c6bc4bd499.jpg

接下来,我们就可以依次执行三个消费者类的main方法,之后再执行生产者类的main方法。

生产者指定的routingKey为【debug】,而消费者指定的routingKey依次为【debug】【debug.*】【debug.#】,理论上应该只有消费者1才能收到消息。下面来展示结果:

消费者1终端:

8a7a88b1b104c250f849540a18f700988a4.jpg

消费者2终端:

4db6bd7b6c444acb351158683ebfe79cd16.jpg

消费者3终端:

468bc6e1effa3997455e37c8ac5b31d5284.jpg

我也看不懂了,消费者3怎么也收到了消息,晕那。。。。。。。。

 

转载于:https://my.oschina.net/u/3229807/blog/1860341

你可能感兴趣的文章
Windows安装Composer出现【Composer Security Warning】警告
查看>>
四 指针与数组 五 函数
查看>>
硬盘空间满了
查看>>
dutacm.club Water Problem(矩阵快速幂)
查看>>
深入JVM内核--GC算法和种类
查看>>
iOS的AssetsLibrary框架访问所有相片
查看>>
MySQLdb的安装
查看>>
读书笔记三
查看>>
数论 - 最小乘法逆元
查看>>
企业架构研究总结(22)——TOGAF架构开发方法(ADM)之信息系统架构阶段
查看>>
接口测试(三)--HTTP协议简介
查看>>
周志华《机器学习》课后答案——第4章.决策树
查看>>
frameset分帧问题
查看>>
特殊样式:ime-mode禁汉字,tabindex焦点
查看>>
linux
查看>>
Layout父元素点击不到的解决办法
查看>>
【面试次体验】堆糖前端开发实习生
查看>>
基于apache实现负载均衡调度请求至后端tomcat服务器集群的实现
查看>>
C#+QQEmail自动发送邮件
查看>>
[Hadoop]MapReduce多输出
查看>>