routing路由模式和Topic主题模式
作者:鱼仔
博客首页: https://codeease.top
公众号:Java鱼仔
# (一)routing路由模式
在前面一篇博客中讲到了exchange的类型,其中direct类型的exchange就是用于routing路由模式。direct类型的交换机是指:交换机和队列绑定时会设置路由键(routingkey),当消息从生产者发送给交换机时也会发送一个路由键。只有当这两个路由键相同时,交换机才会把消息发送给队列。
如上图所示,当生产者发送消息的路由键为error时,两个队列均可以收到消息;当生产者发送消息的路由键为info或warning时,只有第二个队列可收到消息。
# (二)路由模式实践
# 2.1 工具类
工具类和之前都一样,不做介绍了
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//设置AMQP端口
factory.setPort(5672);
//设置VHOSTS
factory.setVirtualHost("/vhosts_sdxb");
//设置用户名
factory.setUsername("user_sdxb");
factory.setPassword("123456");
return factory.newConnection();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2.2 生产者
与前面几种模式不同的地方已经通过注解标出,其中交换机类型选择成direct,并且按需要设置路由键
public class Sent {
private static final String EXCHANGENAME="routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//交换机类型选择direct
channel.exchangeDeclare(EXCHANGENAME,"direct");
String msg="hello world";
//设置路由键(routingkey)
String routingkey="error";
channel.basicPublish(EXCHANGENAME,routingkey,null,msg.getBytes());
channel.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 2.3 消费者一
在队列绑定时设置队列的路由键
public class Receive1 {
private static final String QUEUENAME="routing_queue1";
private static final String EXCHANGENAME="routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUENAME, false, false, false, null);
//在队列绑定时设置路由键
channel.queueBind(QUEUENAME, EXCHANGENAME, "error");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("receive:" + msg);
}
};
//监听队列
channel.basicConsume(QUEUENAME, true, consumer);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 2.4 消费者二
在第二个消费者的队列中设置多个路由键
public class Receive2 {
private static final String QUEUENAME="routing_queue2";
private static final String EXCHANGENAME="routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUENAME, false, false, false, null);
//在队列绑定时设置多个路由键
channel.queueBind(QUEUENAME, EXCHANGENAME, "error");
channel.queueBind(QUEUENAME, EXCHANGENAME, "info");
channel.queueBind(QUEUENAME, EXCHANGENAME, "warning");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("receive:" + msg);
}
};
//监听队列
channel.basicConsume(QUEUENAME, true, consumer);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 2.5运行结果
当路由键为error时,两个消费者均输出消息。当路由键为info或warning时,只有第二个消费者输出消息。
# (三)Topic主题模式
Topic主题模式和routing路由模式类似,只不过这里的交换机使用的是topic类型,topic类型的交换机和direct的不同就在于topic可以匹配通配符。*代表匹配一个元素,#代表匹配一个或多个元素
以上图为例,Q1队列的路由键为*.orange.,Q2队列的路由键为.*.rabbit和lazy.#,当消息的路由键为quick.orange.rabbit时,两个队列均能收到,当消息的路由键为lazy.orange.male.rabbit时,只有Q2队列能收到。
# (四)主题模式实践
在routing路由模式的代码基础上修改
# 4.1 生产者
//交换机类型选择direct
channel.exchangeDeclare(EXCHANGENAME,"topic");
//设置路由键(routingkey)
String routingkey="lazy.brown.fox";
1
2
3
4
2
3
4
# 4.2 消费者一
channel.queueBind(QUEUENAME, EXCHANGENAME, "*.orange.*");
1
# 4.3 消费者二
channel.queueBind(QUEUENAME, EXCHANGENAME, "*.*.rabbit");
channel.queueBind(QUEUENAME, EXCHANGENAME, "lazy.#");
1
2
2
修改以上几处地方就实现了上述topic模型
上次更新: 2024/07/23, 15:40:55