简单队列详解
作者:鱼仔
博客首页: https://codeease.top
公众号:Java鱼仔
# (一)RabbitMQ的使用教程
RabbitMQ的官网提供了RabbitMQ的六种创建消息传递应用程序的方式https://www.rabbitmq.com/getstarted.html
分别是简单队列、工作队列、Publish/Subscribe订阅模式、routing路由模式、Topics主题模式和RPC,首先介绍简单队列。maven项目依赖如下:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.28</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.12</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# (二)Rabbit的简单队列
简单队列的模型结构如下图所示:
简单队列主要由三大结构组成:
P:producer生产者 发送消息的部分
红色部分:一个队列
C:consumer消费者 用于获取队列中的消息
简单队列即生产者将消息发送到队列中,当消费者要用的时候直接在队列中取即可。通过实际操作来模拟简单队列的运行流程
# 2.1 新建工具类
首先写一个连接工具类,用户获取rabbitmq的连接:
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//设置AMQP端口
factory.setPort(5672);
//设置VHOSTS
factory.setVirtualHost("/vhosts_sdxb");
//设置用户名
factory.setUsername("user_sdxb");
factory.setPassword("123456");
return factory.newConnection();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
需要注意的是连接工厂中设置的端口不是rabbitmq可视化界面的端口,而是AMQP协议的端口,这个端口号可在rabbitmq的可视化界面中查看到:
# 2.2 创建生产者
public class Send {
private static final String QUEUE_NAME="simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg="Hello rabbitmq";
//发布队列
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
channel.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
这段代码创建了一个名为simple_queue的队列,并且发布了一条msg到队列中,可在rabbitmq的可视化界面看到详情:
标记1处表示队列的名称,标记2处代表此时队列中的消息数。
上面一段代码中有几个方法的参数介绍一下:
channel.queueDeclare(String queue , boolean durable , boolean exclusive , boolean autoDelete , Map arguments);
queue:队列名称
durable:是否设置持久化,队列中的消息是存放在内存中的,理论上如果队列挂掉消息也就消失了,
因此持久化参数可以让队列消息持久化到硬盘
exclusive:设置是否排他,即如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见
autoDelete:设置是否自动删除,为true则设置队列为自动删除
arguments:设置队列的其他一些参数
1
2
3
4
5
6
7
2
3
4
5
6
7
其中现在不懂的参数会在接下来几种模式的学习中学到。
channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
exchange:交换器名称
routingKey:路由键
props:有14个应用程序标识号 - 生成消息的应用程序标识符
body:真正要发送的消息
1
2
3
4
5
2
3
4
5
# 2.3 创建消费者
public class Receive {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//创建消费者监听方法
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("receive:" + msg);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
通过重写DefaultConsumer的handleDelivery方法来获取队列中的消息,并通过channel.basicConsume来监听队列,一旦队列中有消息存在就取出。
# (三)简单队列的不足
简单队列耦合性高,一个消费者对应于一个生产者,无法处理多个消费者同时调用消息队列的情况。
要变换队列名称时,需要同时该表消费者和生产者中队列的名称,麻烦并且容易遗漏。
上次更新: 2024/07/23, 15:40:55