Code Ease Code Ease
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档

神秘的鱼仔

你会累是因为你在走上坡路
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档
服务器
  • Java核心基础

  • 框架的艺术

    • Spring

    • Mybatis

    • SpringBoot

    • MQ

      • RabbitMQ的了解安装和使用
      • 简单队列详解
        • (一)RabbitMQ的使用教程
        • (二)Rabbit的简单队列
          • 2.1 新建工具类
          • 2.2 创建生产者
          • 2.3 创建消费者
        • (三)简单队列的不足
      • 工作队列详解
      • 发布-订阅模型详解
      • routing路由模式和Topic主题模式
      • RabbitMQ消息确认机制
    • Zookeeper

    • netty

  • 分布式与微服务

  • 开发经验大全

  • 版本新特性

  • Java
  • 框架的艺术
  • MQ
CodeEase
2023-11-11
目录

简单队列详解

作者:鱼仔
博客首页: codeease.top (opens new window)
公众号:Java鱼仔

# (一)RabbitMQ的使用教程

RabbitMQ的官网提供了RabbitMQ的六种创建消息传递应用程序的方式https://www.rabbitmq.com/getstarted.html

2-1.png

分别是简单队列、工作队列、Publish/Subscribe订阅模式、routing路由模式、Topics主题模式和RPC,首先介绍简单队列。maven项目依赖如下:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.2</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.28</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.12</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# (二)Rabbit的简单队列

简单队列的模型结构如下图所示:

2-2.png

简单队列主要由三大结构组成:

P:producer生产者 发送消息的部分

红色部分:一个队列

C:consumer消费者 用于获取队列中的消息

简单队列即生产者将消息发送到队列中,当消费者要用的时候直接在队列中取即可。通过实际操作来模拟简单队列的运行流程

# 2.1 新建工具类

首先写一个连接工具类,用户获取rabbitmq的连接:

public class ConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义一个连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //设置AMQP端口
        factory.setPort(5672);
        //设置VHOSTS
        factory.setVirtualHost("/vhosts_sdxb");
        //设置用户名
        factory.setUsername("user_sdxb");
        factory.setPassword("123456");
        return factory.newConnection();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

需要注意的是连接工厂中设置的端口不是rabbitmq可视化界面的端口,而是AMQP协议的端口,这个端口号可在rabbitmq的可视化界面中查看到:

2-3.png

# 2.2 创建生产者

public class Send {
    private static final String QUEUE_NAME="simple_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtil.getConnection();
        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String msg="Hello rabbitmq";
        //发布队列
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        channel.close();
        connection.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

这段代码创建了一个名为simple_queue的队列,并且发布了一条msg到队列中,可在rabbitmq的可视化界面看到详情:

2-4.png

标记1处表示队列的名称,标记2处代表此时队列中的消息数。

上面一段代码中有几个方法的参数介绍一下:

channel.queueDeclare(String queue , boolean durable , boolean exclusive , boolean autoDelete , Map arguments);
queue:队列名称
durable:是否设置持久化,队列中的消息是存放在内存中的,理论上如果队列挂掉消息也就消失了,
因此持久化参数可以让队列消息持久化到硬盘
exclusive:设置是否排他,即如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见
autoDelete:设置是否自动删除,为true则设置队列为自动删除
arguments:设置队列的其他一些参数
1
2
3
4
5
6
7

其中现在不懂的参数会在接下来几种模式的学习中学到。

channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
exchange:交换器名称
routingKey:路由键
props:有14个应用程序标识号 - 生成消息的应用程序标识符
body:真正要发送的消息
1
2
3
4
5

# 2.3 创建消费者

public class Receive {
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtil.getConnection();
        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //创建消费者监听方法
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("receive:" + msg);
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

通过重写DefaultConsumer的handleDelivery方法来获取队列中的消息,并通过channel.basicConsume来监听队列,一旦队列中有消息存在就取出。

# (三)简单队列的不足

简单队列耦合性高,一个消费者对应于一个生产者,无法处理多个消费者同时调用消息队列的情况。

要变换队列名称时,需要同时该表消费者和生产者中队列的名称,麻烦并且容易遗漏。

上次更新: 2025/04/29, 17:22:06
RabbitMQ的了解安装和使用
工作队列详解

← RabbitMQ的了解安装和使用 工作队列详解→

最近更新
01
AI大模型部署指南
02-18
02
半个月了,DeepSeek为什么还是服务不可用
02-13
03
Python3.9及3.10安装文档
01-23
更多文章>
Theme by Vdoing | Copyright © 2023-2025 备案图标 浙公网安备33021202002405 | 浙ICP备2023040452号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式