在上一篇文章中,我们已经了解到了RabbitMQ中的基本操作了,现在让我们开始实战演习吧!
最简单的模型
从上图可以看出,最简单的模型就是:一个生产者、一个队列、一个消费者。
注意:其实,在Rabbitmq中,生产者是不能直接发送消息到队列中的,当发送消息时,不指定交换器的话,Rabbitmq会默认使用一个交换器来路由消息到队列中。(前面的博客中,我已经解释过了,不清楚的,可以再去看看!)
(1)获取连接
/**
* 描述:
*
* @author liuzhuo
* @create 2019-04-25 15:31
*/
public class ConnectionUtil {
private static String USERNAME = "gakkij";
private static String PASSWORD = "gakkij";
private static String IPADDRESS = "192.168.69.200";
private static int PORT = 5672;
private static String VHOST = "/";
public static Connection openConnection() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置相关的属性值
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
//设置地址
connectionFactory.setHost(IPADDRESS);
//设置端口号
connectionFactory.setPort(PORT);
//设置虚拟主机
connectionFactory.setVirtualHost(VHOST);
//根据连接工厂创建连接
Connection connection = connectionFactory.newConnection();
return connection;
}
}
(2)生产者
package com.liuzhuo.rabbitmq.simplest;
import com.liuzhuo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 描述:
*
* @author liuzhuo
* @create 2019-04-25 15:45
*/
public class Productor {
public static void main(String[] args) throws Exception {
/**
* 注意,这里是最简单的消息队列的使用,没有使用交换机的例子。
* 生产者"直接"把消息发送到队列中,发送消息时的routingKey必须与队列queue的名字相同才行。
*/
Connection connection = null;
Channel channel = null;
//获取信道
connection = ConnectionUtil.openConnection();
channel = connection.createChannel();
//消息
String message = "simplest queue!";
//发送消息到队列中
for (int i = 0; i < 5; i++) {
//没有指定交换器,使用的的空字符串,路由键是:"simple_queue"
channel.basicPublish("", "simple_queue", null, message.getBytes());
}
System.out.println("发送消息完毕!");
//关闭信道
channel.close();
//关闭连接
connection.close();
}
}
(3)消费者
package com.liuzhuo.rabbitmq.simplest;
import com.liuzhuo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 描述:
*
* @author liuzhuo
* @create 2019-04-25 15:57
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
//声明队列
//持久化队列、非排他队列、非自动删除队列,队列参数为null
channel.queueDeclare("simple_queue", true, false, false, null);
while (true) {
//不是自动应答,需要手动应答
Boolean autoAck = false;
channel.basicConsume("simple_queue", autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("==========================");
System.out.println("consumerTag: " + consumerTag);
System.out.println("envelope: " + envelope);
System.out.println("exchange: " + envelope.getExchange());
System.out.println("routingKey: " + envelope.getRoutingKey());
System.out.println("deliveryTag: " + envelope.getDeliveryTag());
System.out.println("接受到的消息为:" + new String(body, "utf-8"));
//确认应对,如果不确认消息的话,队列里面的消息是不会消除的!!!
//envelope.getDeliveryTag():消息的标识。
//multiple:false,只应答该条消息。true的话,应答所有比当前消息标识小的消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
}
(4)运行结果
先运行消费者:
查看Rabbitmq的管控台:
运行生产者:
再看消费者的控制台:
五条消息已经从生产者发送到队列中,然后消费者从队列中将消息给消费了。
上述中,我们提到了,自动应答和手动应答的作用,如果是手动应答的话,需要我们人工来应答已经成功消费的消息,否则消息是不会从队列中删除的,如果在生产中,忘记了人工应答的话,会导致消息重复消费的,这里需要重点提醒大家!
现在,我们将手动应答的那条代码注释掉:
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("==========================");
System.out.println("consumerTag: " + consumerTag);
System.out.println("envelope: " + envelope);
System.out.println("exchange: " + envelope.getExchange());
System.out.println("routingKey: " + envelope.getRoutingKey());
System.out.println("deliveryTag: " + envelope.getDeliveryTag());
System.out.println("接受到的消息为:" + new String(body, "utf-8"));
//确认应对,如果不确认消息的话,队列里面的消息是不会消除的!!!
//将这条手动应答的代码注释掉!!!
//channel.basicAck(envelope.getDeliveryTag(), false);
}
再次运行,消费者,和生产者:
似乎和刚刚没有什么区别?其实有很大的区别,现在的这五条消息根本没有删除,不信打开Rabbitmq的管控台:
所以,请大家注意:当你使用手动应答时,不要忘记主动编写应答的代码,不要忘记了,否则会重复消费消息的!!!工作队列模型
上面的简单模式,只有一个消费者,现在我们尝试多个消费者的情况:
工作队列模型时,生产者发送到队列中的消息,会均摊到各个消费者中,而不是将队列中的每个消息都发给所有的消费者。意思就是,如果现在队列中有:3,2,1,三条消息,1这条首先发送的消息,要么被Consumer1消费,要么被Consumer2消费,不会同时被这两个消费者消费!
(1)生产者:
public class Productor {
public static void main(String[] args) throws IOException, TimeoutException {
/**
* work模式的消息队列,会存在多个消费者,每个消费者是均摊来消费消息的
* 即:每个消费者轮询来消费一个消息,不会考虑消费者的性能问题,会导致
* 性能强的消费者会空闲,性能差的消费者一直忙于消费消息。
*
* 我们可以通过设置: channel.basicQos(prefetchCount);来控制消费者的强度。
* prefetchCount:越大说明这个消费者的消费能力越强,否则,反之。
*/
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work_queue", true, false, false, null);
//循环十次来发送消息。
for (int i = 0; i < 10; i++) {
String message = "work_queue_" + i;
channel.basicPublish("", "work_queue", null, message.getBytes());
}
channel.close();
connection.close();
}
}
(2)消费者1:
public class Worker01 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
channel.basicConsume("work_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
System.out.println("接受到的消息为:" + new String(body, "utf-8"));
//确认应答
channel.basicAck(envelope.getDeliveryTag(), false);
//故意让这个消费者耗时,来代表这个消费者的业务难度高!!!
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
(3)消费者2:
public class Worker02 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
channel.basicConsume("work_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受到的消息为:" + new String(body, "utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
//故意让这个消费者耗时短,来代表这个消费者的业务难度低
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
我们能看到,消费者1的业务逻辑复杂些,通过睡眠时间来代表的!按照我们之前的理论,多个消费者连接同一个队列的话,队列中的消息会均摊到每个消费者中,由于消费者1的业务处理花的时间交久些,消费者2的业务逻辑简单些,那样的话,消费者2处理的消息照理应该消费更多的消息,因为处理速度更快,接受到的消息会更多,下面我们来运行程序,看看是否符合我们的推理
如果先运行消费者的话,会出现异常,因为队列是在生产者中声明的,所以先运行生产者,声明:”work_queue”成功后,再运行两个消费者,最后再运行一次生产者:
首先看到管控台:
再运行两个消费者:
最后运行生产者:
查看两个消费者的控制台:
我们发现,为啥消费者1和消费者2,都是消费五条消息,而不是消费者2消费更多的消息呢?
这是因为,在Rabbitmq中,还有一个 basicQos属性,当不指定 basicQos属性的时候,Rabbitmq不管消费者的性能多强,队列中的消息都会均摊到每个消费者中,不管你是否是消费完毕了当前的消息。
现在,我们在消费者1中,添加:channel.basicQos(1);
public class Worker01 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
//这个消费者的性能差,我们就让它消费的慢一些。
//说明最多发给这个消费者的消息就是一个,必须等这个消息消费完毕后,才会发送第二个。
channel.basicQos(1);
channel.basicConsume("work_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受到的消息为:" + new String(body, "utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
//故意让这个消费者耗时,来代表这个消费者的业务难度高
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
channel.basicQos(1)
:表示当前的消费者,只能处理一个消息,当前的消息没有处理完的话,Rabbitmq是不会将队列中的消息再发给这个消费者的,怎么衡量消费者是否已经消费当前消息呢?就是应答处理,当Broker收到了消息应答,那么表示当前的消息已经处理完了,就可以发送下一条消息了。
在消费者2中,添加:channel.basicQos(3)
public class Worker02 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
//这个消费者的性能强,我们就让它消费的快一些。
//说明最多发给这个消费者的消息是三条,不管当前消费者是否消费完毕当前消息。
channel.basicQos(3);
channel.basicConsume("work_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受到的消息为:" + new String(body, "utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
//故意让这个消费者耗时,来代表这个消费者的业务难度低
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
channel.basicQos(3)
:表示Rabbitmq发送给这个消费者的没有应答的消息为三条,消费者可以慢慢处理这三条消息,当处理完当前的消息后,Rabbitmq会继续发送下一条消息,意思就是这个消费者中可以暂存三条消息。
再次,测试:
现在,大家应该明白了,channel.basicQos(prefetchCount)
的作用了。
发布与订阅模型
上面的演示中,都没有涉及到交换器(其实涉及到了默认的交换器),在我们平时的开发中,都是需要涉及到交换器的,现在开始我们的交换器的旅途吧。
发布与订阅模型,为了解决队列不能广播的问题,上述中,队列中的消息只能均摊到每个消费者中,不能将队列中的消息广播到与之有关联的消费者中,而交换器可以实现广播的功能。
这里就需要使用:“fanout”类型的交换器了。
(1)生产者
public class Productor {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
//声明交换机,fanout类型的交换器,持久化
channel.exchangeDeclare("fanout_exchange", "fanout", true);
String message = "fanout_message";
//fanout类型的交换机,不需要指定routingKey,因为是广播的方式。
channel.basicPublish("fanout_exchange", "", null, message.getBytes());
System.out.println("发出消息:" + message);
channel.close();
connection.close();
}
}
(2)消费者1
public class Consumer01 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("fanout_queue01", true, false, false, null);
//绑定交换机与队列,不需要指定路由键!!!
//将fanout_queue01队列 与 fanout_exchange交换器 绑定到了一起。
channel.queueBind("fanout_queue01", "fanout_exchange", "");
while (true) {
//自动应答,false,不会自动应答,即不会自动消除消息。
Boolean autoAck = false;
channel.basicConsume("fanout_queue01", autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("=================");
System.out.println("fanout_queue01_接受到的消息为:" + new String(body, "utf-8"));
System.out.println("=================");
//确认应对,如果不确认消息的话,队列里面的消息是不会消除的!!!
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
}
(3)消费者2
public class Consumer02 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("fanout_queue02", true, false, false, null);
//绑定交换机与队列,不需要指定路由键!!!
channel.queueBind("fanout_queue02", "fanout_exchange", "");
while (true) {
//自动应答,false,不会自动应答,即不会自动消除消息。
Boolean autoAck = false;
channel.basicConsume("fanout_queue02", autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("=================");
System.out.println("fanout_queue02_接受到的消息为:" + new String(body, "utf-8"));
System.out.println("=================");
//确认应对,如果不确认消息的话,队列里面的消息是不会消除的!!!
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
}
(4)测试:
能看到,两个队列中都收到了 “fanout_message” 这条消息。
总结:
1)发送消息到 fanout类型的交换器时,不需要指定路由键。
2)绑定队列到 fanout类型的交换器时,不需要指定绑定键。
3)fanout类型的交换器,实现了交换器级别的广播,将消息发送到所有与之绑定的队列中。
路由模型
路由模型是使用:direct类型的交换器来实现的,路由键必须与绑定键完全匹配才行。
(1)生产者
public class Productor {
public static void main(String[] args) throws IOException, TimeoutException {
/**
* 使用交换机的模式:direct。
* direc:一个交换机的一个routingKey匹配一个队列,是一对一的关系。
* 注意交换机还是可以绑定多个队列的,只是通过routingKey匹配唯一的队列而已。
*
* 生产者只需声明交换机即可,消费者那里声明队列,然后设置routingKye来匹配队列。
*
* 生产者发送消息时,需要指定routingKey。发送的消息经过交换机后,会转发消息到与
* routingKey匹配的队列中
*
* 发送消息时,使用不同的routingKey就可以发送消息到不同的队列中。
*/
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("direct_exchange", "direct",true);
String message = "gakki_message";
//发送消息,指定 交换机 和 路由键。
channel.basicPublish("direct_exchange", "gakki", null, message.getBytes());
System.out.println("发送消息:" + message);
channel.close();
connection.close();
}
}
(2)消费者1
public class Consumer01 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("gakki_queue", true, false, false, null);
//绑定交换机与队列
channel.queueBind("gakki_queue", "direct_exchange", "gakki");
while (true) {
Boolean autoAck = false;
channel.basicConsume("gakki_queue", autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("=================");
System.out.println("gakki_queue队列接受到的消息为:" + new String(body, "utf-8"));
System.out.println("=================");
//确认应对,如果不确认消息的话,队列里面的消息是不会消除的!!!
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
}
(3)消费者2
public class Consumer02 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("liuzhuo_queue", true, false, false, null);
//绑定交换机与队列
channel.queueBind("liuzhuo_queue", "direct_exchange", "liuzhuo");
while (true) {
//自动应答,false,不会自动应答,即不会自动消除消息。
Boolean autoAck = false;
channel.basicConsume("liuzhuo_queue", autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("=================");
System.out.println("liuzhuo_queue队列接受到的消息为:" + new String(body, "utf-8"));
System.out.println("=================");
//确认应对,如果不确认消息的话,队列里面的消息是不会消除的!!!
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
}
(4)测试:
修改生产者的代码:
//声明交换机
channel.exchangeDeclare("direct_exchange", "direct", true);
String message = "liuzhuo_message";
//发送消息,指定 交换机 和 路由键。
channel.basicPublish("direct_exchange", "liuzhuo", null, message.getBytes());
System.out.println("发送消息:" + message);
总结:
1)direct类型的交换器,需要路由键与绑定键完全匹配才行。
2)direct类型的交换器,可以连接多个队列,指定不同的绑定键即可。
主题模型
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
binding key与routing key一样也是句点号“. ”分隔的字符串
binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
主题模型就是使用 topic类型的交换器,如下所示:
(1)生产者
public class Productor {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
//创建topic类型的交换机
channel.exchangeDeclare("topic_exchange", "topic", true);
//创建消息
String message = "news.liuzhuo.djtopic";
//发送消息,需要路由键
channel.basicPublish("topic_exchange", "news.liuzhuo.djtopic", null, message.getBytes());
System.out.println("发送的消息为:" + message);
channel.close();
connection.close();
}
}
(2)消费者1
public class Consumer01 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("topic_queue01", true, false, false, null);
//绑定交换机与队列,需要指定路由键,路由键可以设置通配符: *.liuzhuo.*
channel.queueBind("topic_queue01", "topic_exchange", "*.liuzhuo.*");
while (true) {
//自动应答,false,不会自动应答,即不会自动消除消息。
Boolean autoAck = false;
channel.basicConsume("topic_queue01", autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("=================");
System.out.println("topic_queue01:接受到的消息为:" + new String(body, "utf-8"));
System.out.println("=================");
//确认应对,如果不确认消息的话,队列里面的消息是不会消除的!!!
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
}
(3)消费者2
public class Consumer02 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("topic_queue02", true, false, false, null);
//绑定交换机与队列,需要指定路由键
channel.queueBind("topic_queue02", "topic_exchange", "*.*.client");
channel.queueBind("topic_queue02", "topic_exchange", "com.#");
while (true) {
//自动应答,false,不会自动应答,即不会自动消除消息。
Boolean isAsk = false;
channel.basicConsume("topic_queue02", isAsk, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("=================");
System.out.println("topic_queue02:接受到的消息为:" + new String(body, "utf-8"));
System.out.println("=================");
//确认应对,如果不确认消息的话,队列里面的消息是不会消除的!!!
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
}
(4)测试:
修改生产者代码:
//创建消息
String message = "news.liuzhuo.client";
//发送消息,需要路由键
channel.basicPublish("topic_exchange", "news.liuzhuo.client", null, message.getBytes());
运行测试:
修改生产者代码:
String message = "news.gakki.client";
//发送消息,需要路由键
channel.basicPublish("topic_exchange", "news.gakki.client", null, message.getBytes());
运行测试:
修改生产者代码:
String message = "com.xxxx";
//发送消息,需要路由键
channel.basicPublish("topic_exchange", "com.xxxx", null, message.getBytes());
运行测试:
topic类型的交换器,我就演示到这里了,大家可以自行测试~~~
RPC模型
Remote procedure call (RPC):远程过程调用。
MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
RabbitMQ中实现RPC的机制是:
- 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14种properties,这些属性会随着消息一起发送)中设置两个值 replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和 correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
- 服务器端收到消息并处理
- 服务器端处理完消息后,将生成一条应答消息到 replyTo 指定的 Queue,同时带上correlationId属性
- 客户端之前已订阅 replyTo指定的 Queue,从中收到服务器的应答消息后,根据其中的 correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。
服务器:
public class Consumer {
public static void main(String[] args) {
Connection connection = null;
try {
connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("rpc_queue", true, false, false, null);
channel.basicConsume("rpc_queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("=================");
System.out.println("rpc_queue中接收到的信息:" + new String(body, "utf-8"));
System.out.println("=================");
System.out.println("发送应答信息之前:");
System.out.println("ReplyTo:" + properties.getReplyTo());
System.out.println("CorrelationId:" + properties.getCorrelationId());
//发送应答的信息
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();
String replytoMessage = "服务端消费完消息后,向客户端发送自己的应答信息!!!";
channel.basicPublish("", properties.getReplyTo(), props, replytoMessage.getBytes());
System.out.println("发送应答信息之后:");
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
//连接不关闭,一直处于打开状态!
}
}
}
客户端:
public class Productor {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = ConnectionUtil.openConnection();
channel = connection.createChannel();
//声明反馈队列
channel.queueDeclare("replyTo_queue", true, false, false, null);
//UUID
String uuid = UUID.randomUUID().toString();
String message = "rpc_message";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().
replyTo("replyTo_queue").
correlationId(uuid).build();
channel.basicPublish("", "rpc_queue", properties, message.getBytes());
System.out.println("发出消息:" + message);
//接收应答信息
channel.basicConsume("replyTo_queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
while (true) {
if (properties.getCorrelationId().equals(uuid)) {
System.out.println(new String(body, "utf-8"));
break;
}
System.out.println("correlationId:" + properties.getCorrelationId());
System.out.println("correlationId不是我声明的!!!");
}
}
});
Thread.sleep(30000L);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
先运行服务器端,再运行客户端:
最开始的服务器端:
客户端:
再看服务器端:
在我们的客户端中:
handleDelivery
函数中使用了 死循环来同步,如果没有接收到 我们一开始发送的 correlationId的话,就会一直死循环,不会进行其他操作了,只要返回回来的correlationId和当初设置的correlationId相等的话,就会跳出死循环,进行下一步操作,强行让异步的消息中间件变成了同步的处理流程。