工作队列详解
作者:鱼仔
博客首页: https://codeease.top
公众号:Java鱼仔
# (一)RabbitMQ工作队列模型结构
工作队列的模型相比简单队列增加了消费者的数量。
生产者提供消息到消息队列中,消费者可以去获取队列中的消息。在工作队列中默认采用轮询分发的方式将消息分发给消费者。所谓轮询分发,就是指不管消费者处理消息的速度是快是慢,都按照顺序轮流把消息发给消费者。
# (二)工作队列实践(轮询分发)
使用工作队列的代码和简单队列基本一致,只不过多了几个消费者
# 2.1 创建工具类
创建工具类的代码在前一篇博客中已经讲到了,这里直接贴上代码:
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//设置AMQP端口
factory.setPort(5672);
//设置VHOSTS
factory.setVirtualHost("/vhosts_sdxb");
//设置用户名
factory.setUsername("user_sdxb");
factory.setPassword("123456");
return factory.newConnection();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2.2 创建生产者
public class Send {
private static final String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接
Connection connection = ConnectionUtil.getConnection();
//2.创建通道
Channel channel = connection.createChannel();
//3.创建队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for (int i=0;i<50;i++){
String msg="i="+i;
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
channel.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 2.3 创建消费者一
为了体现消费者处理消息的快慢,我在两个消费者中分别设置线程休眠1s和2s
public class Receive1 {
private static final String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//创建消费者监听方法
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println(msg);
try {
//设置睡眠实践1s
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 2.4 创建消费者二
public class Receive2 {
private static final String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println(msg);
try {
//设置睡眠时间2s
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
分别将两个消费者运行起来,然后运行生产者发送50条消息,可以发现虽然两个消费者处理消息的能力有快有慢,但是得到的消息都是25条,下面展示消费者1获取的消息部分截图。
# (三)公平分发(Fair dispatch)
在某些场景下轮询分发是不合理的,因此工作队列还有公平分发的方式,所谓公平分发,就是能者多劳,处理消息快的人获得消息多,处理消息慢的人获得消息少。公平分发的实现只需要对代码做一些修改:
# 3.1 修改生产者
对于生产者,只需要对通道增加一条限制,限制通道发送给同一个消费者不得超过一条消息,也就是只有当消费者处理完一条消息以后才会发第二条消息给它。使用channel.basicQos();方法,设置参数为1表示限制一次不超过1条消息。
public class Send {
private static final String QUEUE_NAME="work_queue_fair";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//限制通道发送给同一个消费者不得超过一条消息
int prefenchCount=1;
channel.basicQos(prefenchCount);
for (int i=0;i<50;i++){
String msg="i="+i;
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
channel.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 3.2 修改消费者
对于消费者,需要修改三处地方,第一处和生产者一样修改通道的限制信息;第二处关闭消费者的自动应答;第三处设置手动回执,即处理完一条消息后手动发送处理完成的指令给队列。
//保证一次只分发一次
channel.basicQos(1);
//设置手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
//关闭自动应答
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
2
3
4
5
6
7
以下是修改后的消费者代码
public class Receive1 {
private static final String QUEUE_NAME="work_queue_fair";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//保证一次只分发一次
channel.basicQos(1);
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println(msg);
//设置手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//关闭自动应答
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
设置完之后工作队列就变成了公平分发方式,测试结果:
# 3.3 关于自动应答
在前面修改消费者代码的时候,我们关闭了自动应答
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
2
这是basicConsume的第二个参数
当autoAck=true时,表示开启自动应答,一旦rabbitmq将队列中的消息发送给消费者,这个消息就会从队列中消失。但是如果此时消费者挂掉了,那么这条消息也就彻底消失了。
当autoAck=false时,关闭自动应答,rabbitmq将队列中的消息发送给消费者,只有当消费者返回确认之后,队列中的消息才会被删除。