发布-订阅模型详解
作者:鱼仔
博客首页: https://codeease.top
公众号:Java鱼仔
# (一)发布-订阅模型(Publish/Subscribe)
发布订阅模型的结构图如下所示:
和前两个的模型结构不同,在发布订阅模型中多了一个X(exchange),exchange是一个交换机,生产者不是直接将消息发送给队列,而是先发送给交换机。消费者可以通过队列去订阅这个交换机,每个消费者对应于自己的一个队列。
这个结构就好像我们订阅微信公众号一样,作者将文章发送到自己的公众号上,只有订阅过该公众号的人才能收到消息。因此这个模型被称为发布-订阅模型。
# (二)发布-订阅模型实践
发布订阅模型中多了交换机的存在,而我们在rabbitmq的可视化界面中就见到过exchange
继续通过代码展示该模型:
# 2.1 工具类
工具类和之前都一样,不做介绍了
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//设置AMQP端口
factory.setPort(5672);
//设置VHOSTS
factory.setVirtualHost("/vhosts_sdxb");
//设置用户名
factory.setUsername("user_sdxb");
factory.setPassword("123456");
return factory.newConnection();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2.2 生产者
public class Sent {
private static final String EXCHANGE="ps_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//绑定交换机,设置类型为fanout
channel.exchangeDeclare(EXCHANGE,"fanout");
String msg="hello world";
channel.basicPublish(EXCHANGE,"",null,msg.getBytes());
channel.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
生产者在发布订阅模型中不再绑定队列,而是绑定交换机。exchange的种类有4中,分别是Direct 、Fanout 、Topic、Headers。接下来会做详细介绍。
# 2.3 消费者一
public class Receive1 {
private static final String QUEUE_NAME="ps_queue1";
private static final String EXCHANGE="ps_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将队列和交换机绑定
channel.queueBind(QUEUE_NAME,EXCHANGE,"");
//保证一次只分发一次
channel.basicQos(1);
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println(msg);
//设置手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//关闭自动回复
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
消费者的代码和之前一样,唯一的区别是增加了队列和交换机的绑定channel.queueBind();
# 2.4 消费者二
public class Receive2 {
private static final String QUEUE_NAME="ps_queue2";
private static final String EXCHANGE="ps_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将队列和交换机绑定
channel.queueBind(QUEUE_NAME,EXCHANGE,"");
//保证一次只分发一次
channel.basicQos(1);
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println(msg);
//设置手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//关闭自动回复
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 2.5 运行项目
由于此时rabbitmq中不存在名称为ps_exchange的交换机,因此我们可以手动在rabbitmq的可视化界面中创建,也可以运行一次生产者来创建交换机。接着运行两个消费者和生产者,可以看到生产者发送出去的消息被消费者收到。
观察此时的可视化界面,可以看到该交换机上已经绑定了两个队列:
# (三)Exchange类型介绍
exchange的种类有4中,分别是Direct 、Fanout 、Topic、Headers。接下来会做详细介绍。
# 3.1 Fanout(不处理路由键)
直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。即交换机将消息从生产者获取之后,直接发给订阅的队列。
# 3.2 Direct(处理路由键)
交换机和队列绑定时会设置路由键(routingkey),当消息从生产者发送给交换机时也会发送一个路由键。只有当这两个路由键相同时,交换机才会把消息发送给队列。
# 3.3 Topic(可以有通配符)
Topic和Direct类似,只不过Direct要求路由键完全相同,但是Topic可以使用通配符进行匹配,如#,*
# 3.4 header(根据header匹配)
在发布消息的时候就需要传入header值,其中的header就是binding时的arguments参数