Code Ease Code Ease
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档

神秘的鱼仔

你会累是因为你在走上坡路
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档
服务器
  • Java核心基础

  • 框架的艺术

    • Spring

    • Mybatis

    • SpringBoot

    • MQ

      • RabbitMQ的了解安装和使用
      • 简单队列详解
      • 工作队列详解
      • 发布-订阅模型详解
      • routing路由模式和Topic主题模式
        • (一)routing路由模式
        • (二)路由模式实践
          • 2.1 工具类
          • 2.2 生产者
          • 2.3 消费者一
          • 2.4 消费者二
          • 2.5运行结果
        • (三)Topic主题模式
        • (四)主题模式实践
          • 4.1 生产者
          • 4.2 消费者一
          • 4.3 消费者二
      • RabbitMQ消息确认机制
    • Zookeeper

    • netty

  • 分布式与微服务

  • 开发经验大全

  • 版本新特性

  • Java
  • 框架的艺术
  • MQ
CodeEase
2023-11-11
目录

routing路由模式和Topic主题模式

作者:鱼仔
博客首页: codeease.top (opens new window)
公众号:Java鱼仔

# (一)routing路由模式

在前面一篇博客中讲到了exchange的类型,其中direct类型的exchange就是用于routing路由模式。direct类型的交换机是指:交换机和队列绑定时会设置路由键(routingkey),当消息从生产者发送给交换机时也会发送一个路由键。只有当这两个路由键相同时,交换机才会把消息发送给队列。

5-1.png

如上图所示,当生产者发送消息的路由键为error时,两个队列均可以收到消息;当生产者发送消息的路由键为info或warning时,只有第二个队列可收到消息。

# (二)路由模式实践

# 2.1 工具类

工具类和之前都一样,不做介绍了

public class ConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义一个连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //设置AMQP端口
        factory.setPort(5672);
        //设置VHOSTS
        factory.setVirtualHost("/vhosts_sdxb");
        //设置用户名
        factory.setUsername("user_sdxb");
        factory.setPassword("123456");
        return factory.newConnection();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 2.2 生产者

与前面几种模式不同的地方已经通过注解标出,其中交换机类型选择成direct,并且按需要设置路由键

public class Sent {
    private static final String EXCHANGENAME="routing_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //交换机类型选择direct
        channel.exchangeDeclare(EXCHANGENAME,"direct");
        String msg="hello world";
        //设置路由键(routingkey)
        String routingkey="error";
        channel.basicPublish(EXCHANGENAME,routingkey,null,msg.getBytes());
        channel.close();
        connection.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 2.3 消费者一

在队列绑定时设置队列的路由键

public class Receive1 {
    private static final String QUEUENAME="routing_queue1";
    private static final String EXCHANGENAME="routing_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUENAME, false, false, false, null);
        //在队列绑定时设置路由键
        channel.queueBind(QUEUENAME, EXCHANGENAME, "error");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("receive:" + msg);
            }
        };
        //监听队列
        channel.basicConsume(QUEUENAME, true, consumer);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 2.4 消费者二

在第二个消费者的队列中设置多个路由键

public class Receive2 {
    private static final String QUEUENAME="routing_queue2";
    private static final String EXCHANGENAME="routing_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUENAME, false, false, false, null);
        //在队列绑定时设置多个路由键
        channel.queueBind(QUEUENAME, EXCHANGENAME, "error");
        channel.queueBind(QUEUENAME, EXCHANGENAME, "info");
        channel.queueBind(QUEUENAME, EXCHANGENAME, "warning");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("receive:" + msg);
            }
        };
        //监听队列
        channel.basicConsume(QUEUENAME, true, consumer);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 2.5运行结果

当路由键为error时,两个消费者均输出消息。当路由键为info或warning时,只有第二个消费者输出消息。

# (三)Topic主题模式

Topic主题模式和routing路由模式类似,只不过这里的交换机使用的是topic类型,topic类型的交换机和direct的不同就在于topic可以匹配通配符。*代表匹配一个元素,#代表匹配一个或多个元素

5-2.png

以上图为例,Q1队列的路由键为*.orange.,Q2队列的路由键为.*.rabbit和lazy.#,当消息的路由键为quick.orange.rabbit时,两个队列均能收到,当消息的路由键为lazy.orange.male.rabbit时,只有Q2队列能收到。

# (四)主题模式实践

在routing路由模式的代码基础上修改

# 4.1 生产者

//交换机类型选择direct
channel.exchangeDeclare(EXCHANGENAME,"topic");
//设置路由键(routingkey)
String routingkey="lazy.brown.fox";
1
2
3
4

# 4.2 消费者一

channel.queueBind(QUEUENAME, EXCHANGENAME, "*.orange.*");
1

# 4.3 消费者二

channel.queueBind(QUEUENAME, EXCHANGENAME, "*.*.rabbit");
channel.queueBind(QUEUENAME, EXCHANGENAME, "lazy.#");
1
2

修改以上几处地方就实现了上述topic模型

上次更新: 2025/04/29, 17:22:06
发布-订阅模型详解
RabbitMQ消息确认机制

← 发布-订阅模型详解 RabbitMQ消息确认机制→

最近更新
01
AI大模型部署指南
02-18
02
半个月了,DeepSeek为什么还是服务不可用
02-13
03
Python3.9及3.10安装文档
01-23
更多文章>
Theme by Vdoing | Copyright © 2023-2025 备案图标 浙公网安备33021202002405 | 浙ICP备2023040452号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式