Code Ease Code Ease
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档

神秘的鱼仔

你会累是因为你在走上坡路
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档
服务器
  • Java核心基础

  • 框架的艺术

    • Spring

    • Mybatis

    • SpringBoot

    • MQ

      • RabbitMQ的了解安装和使用
      • 简单队列详解
      • 工作队列详解
      • 发布-订阅模型详解
        • (一)发布-订阅模型(Publish/Subscribe)
        • (二)发布-订阅模型实践
          • 2.1 工具类
          • 2.2 生产者
          • 2.3 消费者一
          • 2.4 消费者二
          • 2.5 运行项目
        • (三)Exchange类型介绍
          • 3.1 Fanout(不处理路由键)
          • 3.2 Direct(处理路由键)
          • 3.3 Topic(可以有通配符)
          • 3.4 header(根据header匹配)
      • routing路由模式和Topic主题模式
      • RabbitMQ消息确认机制
    • Zookeeper

    • netty

  • 分布式与微服务

  • 开发经验大全

  • 版本新特性

  • Java
  • 框架的艺术
  • MQ
CodeEase
2023-11-11
目录

发布-订阅模型详解

作者:鱼仔
博客首页: codeease.top (opens new window)
公众号:Java鱼仔

# (一)发布-订阅模型(Publish/Subscribe)

发布订阅模型的结构图如下所示:

4-1.png

和前两个的模型结构不同,在发布订阅模型中多了一个X(exchange),exchange是一个交换机,生产者不是直接将消息发送给队列,而是先发送给交换机。消费者可以通过队列去订阅这个交换机,每个消费者对应于自己的一个队列。

这个结构就好像我们订阅微信公众号一样,作者将文章发送到自己的公众号上,只有订阅过该公众号的人才能收到消息。因此这个模型被称为发布-订阅模型。

# (二)发布-订阅模型实践

发布订阅模型中多了交换机的存在,而我们在rabbitmq的可视化界面中就见到过exchange

4-2.png

继续通过代码展示该模型:

# 2.1 工具类

工具类和之前都一样,不做介绍了

public class ConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义一个连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //设置AMQP端口
        factory.setPort(5672);
        //设置VHOSTS
        factory.setVirtualHost("/vhosts_sdxb");
        //设置用户名
        factory.setUsername("user_sdxb");
        factory.setPassword("123456");
        return factory.newConnection();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 2.2 生产者

public class Sent {
    private static final String EXCHANGE="ps_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,设置类型为fanout
        channel.exchangeDeclare(EXCHANGE,"fanout");
        String msg="hello world";
        channel.basicPublish(EXCHANGE,"",null,msg.getBytes());
        channel.close();
        connection.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

生产者在发布订阅模型中不再绑定队列,而是绑定交换机。exchange的种类有4中,分别是Direct 、Fanout 、Topic、Headers。接下来会做详细介绍。

# 2.3 消费者一

public class Receive1 {
    private static final String QUEUE_NAME="ps_queue1";
    private static final String EXCHANGE="ps_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //将队列和交换机绑定
        channel.queueBind(QUEUE_NAME,EXCHANGE,"");
        //保证一次只分发一次
        channel.basicQos(1);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                //设置手动回执
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //关闭自动回复
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

消费者的代码和之前一样,唯一的区别是增加了队列和交换机的绑定channel.queueBind();

# 2.4 消费者二

public class Receive2 {
    private static final String QUEUE_NAME="ps_queue2";
    private static final String EXCHANGE="ps_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //将队列和交换机绑定
        channel.queueBind(QUEUE_NAME,EXCHANGE,"");
        //保证一次只分发一次
        channel.basicQos(1);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                //设置手动回执
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //关闭自动回复
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 2.5 运行项目

由于此时rabbitmq中不存在名称为ps_exchange的交换机,因此我们可以手动在rabbitmq的可视化界面中创建,也可以运行一次生产者来创建交换机。接着运行两个消费者和生产者,可以看到生产者发送出去的消息被消费者收到。

观察此时的可视化界面,可以看到该交换机上已经绑定了两个队列:

4-3.png

# (三)Exchange类型介绍

exchange的种类有4中,分别是Direct 、Fanout 、Topic、Headers。接下来会做详细介绍。

# 3.1 Fanout(不处理路由键)

直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。即交换机将消息从生产者获取之后,直接发给订阅的队列。

# 3.2 Direct(处理路由键)

交换机和队列绑定时会设置路由键(routingkey),当消息从生产者发送给交换机时也会发送一个路由键。只有当这两个路由键相同时,交换机才会把消息发送给队列。

# 3.3 Topic(可以有通配符)

Topic和Direct类似,只不过Direct要求路由键完全相同,但是Topic可以使用通配符进行匹配,如#,*

# 3.4 header(根据header匹配)

在发布消息的时候就需要传入header值,其中的header就是binding时的arguments参数

上次更新: 2025/04/29, 17:22:06
工作队列详解
routing路由模式和Topic主题模式

← 工作队列详解 routing路由模式和Topic主题模式→

最近更新
01
AI大模型部署指南
02-18
02
半个月了,DeepSeek为什么还是服务不可用
02-13
03
Python3.9及3.10安装文档
01-23
更多文章>
Theme by Vdoing | Copyright © 2023-2025 备案图标 浙公网安备33021202002405 | 浙ICP备2023040452号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式