【RabbitMQ】工作模式实现

【RabbitMQ】工作模式实现

目录

一、Work Queues (工作队列模式)

简单模式在这个系列第一个文章,上手程序就是一个Simple (简单模式)的实现。

⼯作队列模式⽀持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被⼀个消费者接收。

每个工作模式的实现,都先需要引入RabbitMQ的依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency>

1.1 生产者

生产者:

  1. 创建连接
  2. 创建Channel
  3. 声明⼀个队列Queue
  4. 生产消息
  5. 释放资源
packageorg.example.rabbitmq.workqueues;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducerDemo{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//开启信道Channel channel = connection.createChannel();//声明信道 channel.queueDeclare("workQueues",true,false,true,null);//声明队列,发送消息for(int i =0; i <10; i++){String msg ="hello workQueues"+i; channel.basicPublish("","workQueues",null,msg.getBytes());}System.out.println("消息发送成功");//资源释放 channel.close(); connection.close();}}

1.2 消费者

我们创建两个消费者模拟。
消费者:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源,为了造成两个消费者竞争,我们先不释放资源。
packageorg.example.rabbitmq.workqueues;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明队列 channel.queueDeclare("workQueues",false,false,true,null);//消费消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Consumer1 接收到消息: "+newString(body));}}; channel.basicConsume("workQueues",true,consumer);Thread.sleep(100);////释放资源//channel.close();//connection.close();}}

效果:

二、Publish/Subscribe(发布/订阅)

在发布/订阅模型中,多了⼀个Exchange⻆⾊。

Exchange 常⻅有三种类型,分别代表不同的路由规则,也就分别对应不同的⼯作模式:

  1. Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)
  2. Direct:定向,把消息交给符合指定routing key的队列(Routing模式)
  3. Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topics模式)

2.1 生产者

生产者

  1. 创建连接
  2. 创建信道
  3. 声明交换机
  4. 声明两个队列
  5. 交换机与队列进行绑定
  6. 生产消息
  7. 释放资源

声明交换机的方法是Channel类下的exchangeDeclare方法

  • exchange – the name of the exchange,交换机名称
  • type – the exchange type ,交换机类型
  • durable – true if we are declaring a durable exchange (the exchange will survive a server restart),是否可持久化
  • autoDelete – true if the server should delete the exchange when it is no longer in use ,是否自动删除
  • internal – true if the exchange is internal, i.e. can’t be directly published to by a client,是否内部使用,内部使用客户端发不进去消息
  • arguments – other properties (construction arguments) for the exchange,参数
packageorg.example.rabbitmq.fanout;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明交换机 channel.exchangeDeclare("fanout.exchange",BuiltinExchangeType.FANOUT,false);//4. 声明队列 channel.queueDeclare("fanout.queue1",true,false,false,null); channel.queueDeclare("fanout.queue2",true,false,false,null);//5.交换机与队列绑定 channel.queueBind("fanout.queue1","fanout.exchange",""); channel.queueBind("fanout.queue2","fanout.exchange","");//6. 生产消息for(int i =0; i <10; i++){String msg ="hello fanout"+i; channel.basicPublish("fanout.exchange","",null,msg.getBytes());}System.out.println("发送消息成功");//7. 释放资源 channel.close(); connection.close();}}

2.2 消费者

两个消费者分别消费两个队列的消息。

消费者:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源。
packageorg.example.rabbitmq.fanout;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明队列 channel.queueDeclare("fanout.queue1",true,false,false,null);//消费消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Consumer1 接收到消息: "+newString(body));}}; channel.basicConsume("fanout.queue1",consumer);/* //释放资源 channel.close(); connection.close();*/}}

结果:

三、Routing(路由模式)

Routing(路由模式):
队列和交换机的绑定,不能是任意的绑定了,⽽是要指定⼀个BindingKey (RoutingKey的⼀种),消息的发送⽅在向Exchange发送消息时,也需要指定消息的RoutingKey,Exchange也不再把消息交给每⼀个绑定的key,⽽是根据消息的RoutingKey进⾏判断,只有队列绑定时的BindingKey和发送消息的RoutingKey 完全⼀致,才会接收到消息。

3.1 生产者

生产者:

  1. 创建连接
  2. 创建信道
  3. 声明交换机,类型为direct
  4. 声明队列
  5. 队列与交换机绑定,绑定的时候加上BindingKey参数
  6. 生产消息,消息发送的时候指定routingKey
  7. 释放资源
packageorg.example.rabbitmq.direct;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明交换机 channel.exchangeDeclare("direct.exchange",BuiltinExchangeType.DIRECT,true);//4. 声明队列 channel.queueDeclare("direct.queue1",true,false,false,null); channel.queueDeclare("direct.queue2",true,false,false,null);//5. 队列与交换机绑定 channel.queueBind("direct.queue1","direct.exchange","a"); channel.queueBind("direct.queue2","direct.exchange","a"); channel.queueBind("direct.queue2","direct.exchange","b"); channel.queueBind("direct.queue2","direct.exchange","c");//6. 生产消息String msgA ="hello direct routingKey is a"; channel.basicPublish("direct.exchange","a",null,msgA.getBytes());String msgB ="hello direct routingKey is b"; channel.basicPublish("direct.exchange","b",null,msgB.getBytes());String msgC ="hello direct routingKey is c"; channel.basicPublish("direct.exchange","c",null,msgC.getBytes());System.out.println("发送消息成功");//7. 释放资源 channel.close(); connection.close();}}

3.2 消费者

两个消费者分别消费两个队列的消息。

消费者:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源。
packageorg.example.rabbitmq.direct;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明队列 channel.queueDeclare("direct.queue1",true,false,false,null);//消费消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Consumer1 接收到消息: "+newString(body));}}; channel.basicConsume("direct.queue1",consumer);/* //释放资源 channel.close(); connection.close();*/}}

结果:

四、Topics(通配符模式)

Topics 和 Routing 模式的区别是:

  1. topics 模式使⽤的交换机类型为topic(Routing模式使⽤的交换机类型为direct)
  2. topic 类型的交换机在匹配规则上进⾏了扩展,Binding Key⽀持通配符匹配(direct类型的交换机路由规则是BindingKey和RoutingKey完全匹配)。

在topic类型的交换机在匹配规则上,有些要求:

  1. RoutingKey 是⼀系列由点( . )分隔的单词,⽐如 " stock.usd.nyse “,” nyse.vmw “,” quick.orange.rabbit "
  2. BindingKey 和 RoutingKey⼀样,也是点( . )分割的字符串。
  3. Binding Key中可以存在两种特殊字符串,⽤于模糊匹配
    3.1 * 表⽰⼀个单词
    3.2 # 表⽰多个单词(0-N个)

4.1 生产者

生产者:

  1. 创建连接
  2. 创建信道
  3. 声明交换机,类型为topic
  4. 声明队列
  5. 队列与交换机绑定,绑定的时候加上BindingKey参数
  6. 生产消息,消息发送的时候指定routingKey
  7. 释放资源
packageorg.example.rabbitmq.topics;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明交换机 channel.exchangeDeclare("topic.exchange",BuiltinExchangeType.TOPIC,true);//4. 声明队列 channel.queueDeclare("topic.queue1",true,false,false,null); channel.queueDeclare("topic.queue2",true,false,false,null);//5. 队列与交换机绑定 channel.queueBind("topic.queue1","topic.exchange","*.a.*"); channel.queueBind("topic.queue2","topic.exchange","*.*.b"); channel.queueBind("topic.queue2","topic.exchange","c.#");//6. 生产消息String msgA ="hello topic routingKey is word.a.word"; channel.basicPublish("topic.exchange","word.a.word",null,msgA.getBytes());String msgB ="hello topic routingKey is word.word.b"; channel.basicPublish("topic.exchange","word.word.b",null,msgB.getBytes());String msgC ="hello topic routingKey is c.word.word.word.word.b"; channel.basicPublish("topic.exchange","c.word.word.word.word.b",null,msgC.getBytes());String msgD ="hello topic routingKey is c.a.b"; channel.basicPublish("topic.exchange","c.a.b",null,msgD.getBytes());System.out.println("发送消息成功");//7. 释放资源 channel.close(); connection.close();}}

4.2 消费者

两个消费者分别消费两个队列的消息。

消费者:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源。
packageorg.example.rabbitmq.topics;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明队列 channel.queueDeclare("topic.queue1",true,false,false,null);//消费消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Consumer1 接收到消息: "+newString(body));}}; channel.basicConsume("topic.queue1",consumer);//释放资源// channel.close();// connection.close();}}

结果:

五、RPC通信

RPC(Remote Procedure Call),即远程过程调⽤。它是⼀种通过⽹络从远程计算机上请求服务,⽽不需要了解底层⽹络的技术。类似于Http远程调⽤。
RabbitMQ实现RPC通信的过程,⼤概是通过两个队列实现⼀个可回调的过程。

⼤概流程如下:

  1. 客⼾端发送消息到⼀个指定的队列,并在消息属性中设置 replyTo 字段,这个字段指定了⼀个回调队列,服务端处理后,会把响应结果发送到这个队列。
  2. 服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列
  3. 客⼾端在回调队列上等待响应消息,⼀旦收到响应,客⼾端会检查消息的 correlationId 属性,以确保它是所期望的响应。

5.1 客户端

客户端:

  1. 声明两个队列,包含回调队列 replyQueueName,声明本次请求的唯⼀标志 corrId
  2. 将 replyQueueName 和 corrId 配置到要发送的消息队列中
  3. 使⽤阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中
  4. 阻塞队列有消息后,主线程被唤醒,打印返回内容
packageorg.example.rabbitmq.rpc;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.UUID;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.TimeoutException;publicclassClient{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明队列 channel.queueDeclare("rpc.request.queue",true,false,false,null); channel.queueDeclare("rpc.response.queue",true,false,false,null);//4. 发送消息//设置请求标识String correlationId = UUID.randomUUID().toString();AMQP.BasicProperties properties =newAMQP.BasicProperties().builder().correlationId(correlationId).replyTo("rpc.response.queue").build();String msg ="hello rpc"; channel.basicPublish("","rpc.request.queue", properties,msg.getBytes());//5. 接收响应//使用阻塞队列存储响应finalBlockingQueue<String> response =newArrayBlockingQueue<>(1);DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Client 接收到消息: "+newString(body));//判断唯⼀标识正确, 放到阻塞队列中if(correlationId.equals(properties.getCorrelationId())){ response.offer(newString(body));}System.out.println("Client 接收到消息: "+newString(body));}}; channel.basicConsume("rpc.response.queue",true,consumer);// 获取响应的结果String result = response.take();System.out.println(" [RPC_Client] Result:"+ result);}}

5.2 服务器

服务器:

  1. 接收消息
  2. 根据消息内容进⾏响应处理,把应答结果返回到回调队列中
packageorg.example.rabbitmq.rpc;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassServer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//接收请求//每次接受一条 channel.basicQos(1);DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Server 接收到请求: "+newString(body));//响应请求String response ="响应";AMQP.BasicProperties props =newAMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).replyTo("rpc.request.queue").build(); channel.basicPublish("","rpc.response.queue",props,response.getBytes());//手动确认 channel.basicAck(envelope.getDeliveryTag(),false);}}; channel.basicConsume("rpc.request.queue",false,consumer);}}

结果:

六、Publisher Confirms(发布确认)

消息中间件都会有消息丢失的问题发生,大概分为以下三种丢失情况:

  1. 生产者问题:因为应用故障,网络等问题,生产者没有成功向消息中间件发送消息;
  2. 消息中间件问题:生产者成功发送了消息,消息中间件自己原因导致消息丢失;
  3. 消费者问题:消费者消费消息时,处理出现问题,导致消费者 消费失败的 消息 从消息队列中删除了。

RabbitMQ针对上面三种情况给出的解决方案:

  1. 针对生产者问题:采取Publisher Confirms(发布确认)机制 解决;
  2. 针对消息中间件问题:通过持久化机制解决;
  3. 针对消费者问题:通过消息应答机制解决;

生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始);
一旦消息被投递到所有匹配的队列之后,RabbitMO就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出.
brokerl(消息中间件)回传给生产者的确认消息中 deliveryTag 包含了确认消息的序号,
此外 broker 也可以设置 channel.basicAck() 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理.

6.1 Publishing Messages Individually(单独确认)

跟生产者发送消息,只有调用Channel类confirmSelect()设置信道为confirm模式,和Channel类waitForConfirmsOrDie()方法等待手动确认。

privatestaticvoidpublishingMessagesIndividually()throwsIOException,TimeoutException,InterruptedException{try(Channel channel =createChannel()){//设置信道为confirm模式 channel.confirmSelect();//声明队列 channel.queueDeclare("publish.confirm.queue1",true,false,false,null);long start =System.currentTimeMillis();//发送消息for(int i =0; i <200; i++){String msg ="Publishing Messages Individually "+ i; channel.basicPublish("","publish.confirm.queue1",null,msg.getBytes());//等待确认 channel.waitForConfirmsOrDie(5000);}long end =System.currentTimeMillis();System.out.println("Publishing Messages Individually(单独确认) 发送200条消息耗时 "+(end - start));}}

结果:

可以发现,发送200条消息,耗时很⻓。

观察上⾯代码,会发现这种策略是每发送⼀条消息后就调⽤ channel.waitForConfirmsOrDie() ⽅法,之后 等待服务端的确认,这实际上是⼀种串⾏同步等待的⽅式。
尤其对于持久化的消息来说,需要等待消息确认存储在磁盘之后才会返回(调⽤Linux内核的fsync⽅法)。
但是发布确认机制是⽀持异步的。可以⼀边发送消息,⼀边等待消息确认。

6.2 Publishing Messages in Batches(批量确认)

每发送⼀批消息后,调⽤ channel.waitForConfirms ⽅法,等待服务器的确认返回。

跟单独确认区别就是,发送到一定消息再进行等待确认。

privatestaticvoidpublishingMessagesInBatches()throwsIOException,TimeoutException,InterruptedException{try(Channel channel =createChannel()){//设置信道为confirm模式 channel.confirmSelect();//声明队列 channel.queueDeclare("publish.confirm.queue2",true,false,false,null);long start =System.currentTimeMillis();//发送消息int batchSize =100;int flag =0;for(int i =0; i <200; i++){String msg ="Publishing Messages in Batches "+ i; channel.basicPublish("","publish.confirm.queue2",null,msg.getBytes());//批量 等待确认if(flag == batchSize){ channel.waitForConfirmsOrDie(5000); flag =0;} flag++;}if(flag >0){ channel.waitForConfirmsOrDie(5000);}long end =System.currentTimeMillis();System.out.println("Publishing Messages in Batches(批量确认) 发送200条消息耗时 "+(end - start));}}

结果:

相⽐于单独确认策略,批量确认极⼤地提升了confirm的效率,
缺点是出现Basic.Nack或者超时时,我们不清楚具体哪条消息出了问题。客⼾端需要将这⼀批次的消息全部重发,这会带来明显的重复消息数量。
当消息经常丢失时,批量确认的性能应该是不升反降的。

6.3 Handling Publisher Confirms Asynchronously(异步确认)

提供⼀个回调⽅法,服务端确认了⼀条或者多条消息后客⼾端会回这个⽅法进⾏处理。

异步confirm⽅法的编程实现最为复杂。Channel 接⼝提供了⼀个⽅法 addConfirmListener,这个⽅法可以添加ConfirmListener 回调接⼝。

ConfirmListener 接⼝中包含两个⽅法:handleAck(long deliveryTag, boolean multiple) handleNack(long deliveryTag, boolean multiple) ,分别对应处理RabbitMQ发送给⽣产者的 ack 和 nack。

deliveryTag 表⽰发送消息的序号,multiple 表⽰是否批量确认。我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合。
当收到RabbitMQ的confirm 回调时,从集合中删除对应的消息。当Channel开启confirm模式后,channel上发送消息都会附带⼀个从1开始递增的deliveryTag序号。

我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合。

  1. 当收到ack时,从序列中删除该消息的序号。如果为批量确认消息,表⽰⼩于等于当前序号deliveryTag的消息都收到了,则清除对应集合
  2. 当收到nack时,处理逻辑类似,不过需要结合具体的业务情况,进⾏消息重发等操作。
privatestaticvoidhandlingPublisherConfirmsAsynchronously()throwsIOException,TimeoutException,InterruptedException{try(Channel channel =createChannel()){//设置信道为confirm模式 channel.confirmSelect();//声明队列 channel.queueDeclare("publish.confirm.queue3",true,false,false,null);//有序集合,元素按照⾃然顺序进⾏排序,存储未confirm消息序号SortedSet<Long> confirmSet =Collections.synchronizedSortedSet(newTreeSet<>());long start =System.currentTimeMillis();//监听 channel.addConfirmListener(newConfirmListener(){@OverridepublicvoidhandleAck(long deliveryTag,boolean multiple)throwsIOException{//批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰这批序号的消息都已经被ack了if(multiple){ confirmSet.headSet(deliveryTag+1).clear();}else{//单独确认 confirmSet.remove(deliveryTag);}}@OverridepublicvoidhandleNack(long deliveryTag,boolean multiple)throwsIOException{//批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰这批序号的消息都已经被ack了if(multiple){ confirmSet.headSet(deliveryTag+1).clear();}else{//单独确认 confirmSet.remove(deliveryTag);}//根据业务处理}});//发送消息int flag =0;for(int i =0; i <200; i++){String msg ="Handling Publisher Confirms Asynchronously "+ i;//得到下次发送消息的序号, 从1开始long nextPublishSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish("","publish.confirm.queue3",null,msg.getBytes());//存入集合 confirmSet.add(nextPublishSeqNo);}while(confirmSet.isEmpty()){Thread.sleep(20);}long end =System.currentTimeMillis();System.out.println("Handling Publisher Confirms Asynchronously(异步确认) 发送200条消息耗时 "+(end - start));}}

结果:

Read more

汇川机器人软件RobotLab常规操作

汇川机器人软件RobotLab常规操作

一.权限管理注意事项 1.1 软件登录权限管理 连接上软件后,修改轴参数、点位数据需要权限。点击人物图标,登录对应的权限,管理员权限登录密码6个0。 1.2机器人控制权限管理 点击“锁”,打开机器人控制权配置页面。 选择“InoRoboLabt”,机器人受编程软件控制,使用软件可手动移动点位、示教位置信息。 选择“远程IO单元”,机器人受外部设备控制如PLC、上位机,机器人进入自动模式,收到交互信号就按照程序执行。 选择“远程以太网客户端”,机器人受远程客户短控制,用于查找问题、远程调试。 二、 使用过渡点注意事项 程序中点到点直线运动会有机构干涉或有安全风险时,使用过渡点在运动规避风险。 使用过渡点时,注意指令的工具坐标系,选择正确的Wobj工具好,否则运动出错有撞机风险。 如下图所示为例,wobj0为A工位,wobj1为B工位,注意在“轴控制面板”中选择对应工具坐标号 三、使用全局点位移动注意事项 双击左侧“P.

By Ne0inhk
【Part 4 XR综合技术分享】第一节|技术上的抉择:三维实时渲染与VR全景视频的共生

【Part 4 XR综合技术分享】第一节|技术上的抉择:三维实时渲染与VR全景视频的共生

《VR 360°全景视频开发》专栏 将带你深入探索从全景视频制作到Unity眼镜端应用开发的全流程技术。专栏内容涵盖安卓原生VR播放器开发、Unity VR视频渲染与手势交互、360°全景视频制作与优化,以及高分辨率视频性能优化等实战技巧。 📝 希望通过这个专栏,帮助更多朋友进入VR 360°全景视频的世界! Part 4|XR综合技术分享 最后一Part了,我将分享一些关于当前常用的XR综合技术,内容涵盖三维实时渲染与全景视频的共生、多模态交互体验的融合,以及AI如何深度赋能XR应用,推动智能化发展。同时畅想通向全感知XR智能沉浸时代的未来,探索如何通过更先进的技术不断提升用户体验。毕竟,360°全景视频仅是XR应用中的冰山一角。 第一节|技术上的抉择:三维实时渲染与VR全景视频的共生 文章目录 * 《VR 360°全景视频开发》专栏 * Part 4|XR综合技术分享 * 第一节|技术上的抉择:三维实时渲染与VR全景视频的共生 * 1、VR内容形态的分化与融合 * 1.1 三维实时渲染的发展 * 1.2

By Ne0inhk
【大模型应用篇】用 OpenClaw + 飞书打造 7x24 小时服务器运维机器人

【大模型应用篇】用 OpenClaw + 飞书打造 7x24 小时服务器运维机器人

前言 本文基于OpenClaw,也是最近超火的可在本地运行的AI Agent网关,记录从零搭建通过飞书对话管理服务器运维机器人的全过程。该机器人支持随时随地通过飞书查看服务器状态、检索日志、管理进程,其核心机制在于:由OpenClaw将聊天平台(飞书等)的消息路由至大模型,模型调用本地工具(如Shell、文件系统、浏览器)执行相应任务,最终将结果自动返回至飞书会话中,实现自动化运维交互。 架构概览 飞书 App (WebSocket 长连接)         ↕ OpenClaw Gateway (服务器上 systemd 常驻)         ↕ AI 模型 (DeepSeek v3.2/GLM 4.7)         ↕ 服务器 Shell (受白名单限制的命令执行) 核心组件: * OpenClaw Gateway:Agent 网关,管理会话、工具调用、渠道连接 * 飞书插件:通过

By Ne0inhk

多模态融合:结合RetinaFace+CurricularFace与语音识别构建智能交互系统

多模态融合:结合RetinaFace+CurricularFace与语音识别构建智能交互系统 你是否也遇到过这样的问题:团队想做一个能“看脸”又能“听声”的智能交互系统,比如门禁系统既能识别人脸又能验证声音,或者客服机器人能通过摄像头和麦克风同时感知用户情绪?听起来很酷,但真正动手时却发现——人脸模型和语音模型像是两个世界的东西,部署方式五花八门,环境依赖冲突不断,GPU资源调度混乱,最后集成起来像拼图一样费劲。 别担心,这正是我们今天要解决的问题。作为一名在AI领域摸爬滚打多年的技术老兵,我最近也在帮一个创新团队搭建类似的多模态系统。他们原本打算分别用两套服务器跑人脸识别和语音识别,结果不仅成本翻倍,数据同步还经常出错。后来我们换了个思路:用统一的AI镜像平台,把RetinaFace + CurricularFace 和语音识别模型一起部署到同一个GPU环境中,实现了“一次部署、多模态协同”。 这篇文章就是我实战经验的完整复盘。我会带你从零开始,一步步搭建这个融合视觉与听觉的智能交互系统。即使你是AI新手,只要跟着操作,也能在5分钟内完成核心功能的部署,并理解背后的运行逻辑。更

By Ne0inhk