【RabbitMQ】工作模式实现

【RabbitMQ】工作模式实现

目录

一、Work Queues (工作队列模式)

简单模式在这个系列第一个文章,上手程序就是一个Simple (简单模式)的实现。

⼯作队列模式⽀持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被⼀个消费者接收。

每个工作模式的实现,都先需要引入RabbitMQ的依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency>

1.1 生产者

生产者:

  1. 创建连接
  2. 创建Channel
  3. 声明⼀个队列Queue
  4. 生产消息
  5. 释放资源
packageorg.example.rabbitmq.workqueues;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducerDemo{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//开启信道Channel channel = connection.createChannel();//声明信道 channel.queueDeclare("workQueues",true,false,true,null);//声明队列,发送消息for(int i =0; i <10; i++){String msg ="hello workQueues"+i; channel.basicPublish("","workQueues",null,msg.getBytes());}System.out.println("消息发送成功");//资源释放 channel.close(); connection.close();}}

1.2 消费者

我们创建两个消费者模拟。
消费者:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源,为了造成两个消费者竞争,我们先不释放资源。
packageorg.example.rabbitmq.workqueues;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明队列 channel.queueDeclare("workQueues",false,false,true,null);//消费消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Consumer1 接收到消息: "+newString(body));}}; channel.basicConsume("workQueues",true,consumer);Thread.sleep(100);////释放资源//channel.close();//connection.close();}}

效果:

二、Publish/Subscribe(发布/订阅)

在发布/订阅模型中,多了⼀个Exchange⻆⾊。

Exchange 常⻅有三种类型,分别代表不同的路由规则,也就分别对应不同的⼯作模式:

  1. Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)
  2. Direct:定向,把消息交给符合指定routing key的队列(Routing模式)
  3. Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topics模式)

2.1 生产者

生产者

  1. 创建连接
  2. 创建信道
  3. 声明交换机
  4. 声明两个队列
  5. 交换机与队列进行绑定
  6. 生产消息
  7. 释放资源

声明交换机的方法是Channel类下的exchangeDeclare方法

  • exchange – the name of the exchange,交换机名称
  • type – the exchange type ,交换机类型
  • durable – true if we are declaring a durable exchange (the exchange will survive a server restart),是否可持久化
  • autoDelete – true if the server should delete the exchange when it is no longer in use ,是否自动删除
  • internal – true if the exchange is internal, i.e. can’t be directly published to by a client,是否内部使用,内部使用客户端发不进去消息
  • arguments – other properties (construction arguments) for the exchange,参数
packageorg.example.rabbitmq.fanout;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明交换机 channel.exchangeDeclare("fanout.exchange",BuiltinExchangeType.FANOUT,false);//4. 声明队列 channel.queueDeclare("fanout.queue1",true,false,false,null); channel.queueDeclare("fanout.queue2",true,false,false,null);//5.交换机与队列绑定 channel.queueBind("fanout.queue1","fanout.exchange",""); channel.queueBind("fanout.queue2","fanout.exchange","");//6. 生产消息for(int i =0; i <10; i++){String msg ="hello fanout"+i; channel.basicPublish("fanout.exchange","",null,msg.getBytes());}System.out.println("发送消息成功");//7. 释放资源 channel.close(); connection.close();}}

2.2 消费者

两个消费者分别消费两个队列的消息。

消费者:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源。
packageorg.example.rabbitmq.fanout;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明队列 channel.queueDeclare("fanout.queue1",true,false,false,null);//消费消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Consumer1 接收到消息: "+newString(body));}}; channel.basicConsume("fanout.queue1",consumer);/* //释放资源 channel.close(); connection.close();*/}}

结果:

三、Routing(路由模式)

Routing(路由模式):
队列和交换机的绑定,不能是任意的绑定了,⽽是要指定⼀个BindingKey (RoutingKey的⼀种),消息的发送⽅在向Exchange发送消息时,也需要指定消息的RoutingKey,Exchange也不再把消息交给每⼀个绑定的key,⽽是根据消息的RoutingKey进⾏判断,只有队列绑定时的BindingKey和发送消息的RoutingKey 完全⼀致,才会接收到消息。

3.1 生产者

生产者:

  1. 创建连接
  2. 创建信道
  3. 声明交换机,类型为direct
  4. 声明队列
  5. 队列与交换机绑定,绑定的时候加上BindingKey参数
  6. 生产消息,消息发送的时候指定routingKey
  7. 释放资源
packageorg.example.rabbitmq.direct;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明交换机 channel.exchangeDeclare("direct.exchange",BuiltinExchangeType.DIRECT,true);//4. 声明队列 channel.queueDeclare("direct.queue1",true,false,false,null); channel.queueDeclare("direct.queue2",true,false,false,null);//5. 队列与交换机绑定 channel.queueBind("direct.queue1","direct.exchange","a"); channel.queueBind("direct.queue2","direct.exchange","a"); channel.queueBind("direct.queue2","direct.exchange","b"); channel.queueBind("direct.queue2","direct.exchange","c");//6. 生产消息String msgA ="hello direct routingKey is a"; channel.basicPublish("direct.exchange","a",null,msgA.getBytes());String msgB ="hello direct routingKey is b"; channel.basicPublish("direct.exchange","b",null,msgB.getBytes());String msgC ="hello direct routingKey is c"; channel.basicPublish("direct.exchange","c",null,msgC.getBytes());System.out.println("发送消息成功");//7. 释放资源 channel.close(); connection.close();}}

3.2 消费者

两个消费者分别消费两个队列的消息。

消费者:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源。
packageorg.example.rabbitmq.direct;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明队列 channel.queueDeclare("direct.queue1",true,false,false,null);//消费消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Consumer1 接收到消息: "+newString(body));}}; channel.basicConsume("direct.queue1",consumer);/* //释放资源 channel.close(); connection.close();*/}}

结果:

四、Topics(通配符模式)

Topics 和 Routing 模式的区别是:

  1. topics 模式使⽤的交换机类型为topic(Routing模式使⽤的交换机类型为direct)
  2. topic 类型的交换机在匹配规则上进⾏了扩展,Binding Key⽀持通配符匹配(direct类型的交换机路由规则是BindingKey和RoutingKey完全匹配)。

在topic类型的交换机在匹配规则上,有些要求:

  1. RoutingKey 是⼀系列由点( . )分隔的单词,⽐如 " stock.usd.nyse “,” nyse.vmw “,” quick.orange.rabbit "
  2. BindingKey 和 RoutingKey⼀样,也是点( . )分割的字符串。
  3. Binding Key中可以存在两种特殊字符串,⽤于模糊匹配
    3.1 * 表⽰⼀个单词
    3.2 # 表⽰多个单词(0-N个)

4.1 生产者

生产者:

  1. 创建连接
  2. 创建信道
  3. 声明交换机,类型为topic
  4. 声明队列
  5. 队列与交换机绑定,绑定的时候加上BindingKey参数
  6. 生产消息,消息发送的时候指定routingKey
  7. 释放资源
packageorg.example.rabbitmq.topics;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明交换机 channel.exchangeDeclare("topic.exchange",BuiltinExchangeType.TOPIC,true);//4. 声明队列 channel.queueDeclare("topic.queue1",true,false,false,null); channel.queueDeclare("topic.queue2",true,false,false,null);//5. 队列与交换机绑定 channel.queueBind("topic.queue1","topic.exchange","*.a.*"); channel.queueBind("topic.queue2","topic.exchange","*.*.b"); channel.queueBind("topic.queue2","topic.exchange","c.#");//6. 生产消息String msgA ="hello topic routingKey is word.a.word"; channel.basicPublish("topic.exchange","word.a.word",null,msgA.getBytes());String msgB ="hello topic routingKey is word.word.b"; channel.basicPublish("topic.exchange","word.word.b",null,msgB.getBytes());String msgC ="hello topic routingKey is c.word.word.word.word.b"; channel.basicPublish("topic.exchange","c.word.word.word.word.b",null,msgC.getBytes());String msgD ="hello topic routingKey is c.a.b"; channel.basicPublish("topic.exchange","c.a.b",null,msgD.getBytes());System.out.println("发送消息成功");//7. 释放资源 channel.close(); connection.close();}}

4.2 消费者

两个消费者分别消费两个队列的消息。

消费者:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源。
packageorg.example.rabbitmq.topics;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明队列 channel.queueDeclare("topic.queue1",true,false,false,null);//消费消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Consumer1 接收到消息: "+newString(body));}}; channel.basicConsume("topic.queue1",consumer);//释放资源// channel.close();// connection.close();}}

结果:

五、RPC通信

RPC(Remote Procedure Call),即远程过程调⽤。它是⼀种通过⽹络从远程计算机上请求服务,⽽不需要了解底层⽹络的技术。类似于Http远程调⽤。
RabbitMQ实现RPC通信的过程,⼤概是通过两个队列实现⼀个可回调的过程。

⼤概流程如下:

  1. 客⼾端发送消息到⼀个指定的队列,并在消息属性中设置 replyTo 字段,这个字段指定了⼀个回调队列,服务端处理后,会把响应结果发送到这个队列。
  2. 服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列
  3. 客⼾端在回调队列上等待响应消息,⼀旦收到响应,客⼾端会检查消息的 correlationId 属性,以确保它是所期望的响应。

5.1 客户端

客户端:

  1. 声明两个队列,包含回调队列 replyQueueName,声明本次请求的唯⼀标志 corrId
  2. 将 replyQueueName 和 corrId 配置到要发送的消息队列中
  3. 使⽤阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中
  4. 阻塞队列有消息后,主线程被唤醒,打印返回内容
packageorg.example.rabbitmq.rpc;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.UUID;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.TimeoutException;publicclassClient{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明队列 channel.queueDeclare("rpc.request.queue",true,false,false,null); channel.queueDeclare("rpc.response.queue",true,false,false,null);//4. 发送消息//设置请求标识String correlationId = UUID.randomUUID().toString();AMQP.BasicProperties properties =newAMQP.BasicProperties().builder().correlationId(correlationId).replyTo("rpc.response.queue").build();String msg ="hello rpc"; channel.basicPublish("","rpc.request.queue", properties,msg.getBytes());//5. 接收响应//使用阻塞队列存储响应finalBlockingQueue<String> response =newArrayBlockingQueue<>(1);DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Client 接收到消息: "+newString(body));//判断唯⼀标识正确, 放到阻塞队列中if(correlationId.equals(properties.getCorrelationId())){ response.offer(newString(body));}System.out.println("Client 接收到消息: "+newString(body));}}; channel.basicConsume("rpc.response.queue",true,consumer);// 获取响应的结果String result = response.take();System.out.println(" [RPC_Client] Result:"+ result);}}

5.2 服务器

服务器:

  1. 接收消息
  2. 根据消息内容进⾏响应处理,把应答结果返回到回调队列中
packageorg.example.rabbitmq.rpc;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassServer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//接收请求//每次接受一条 channel.basicQos(1);DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Server 接收到请求: "+newString(body));//响应请求String response ="响应";AMQP.BasicProperties props =newAMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).replyTo("rpc.request.queue").build(); channel.basicPublish("","rpc.response.queue",props,response.getBytes());//手动确认 channel.basicAck(envelope.getDeliveryTag(),false);}}; channel.basicConsume("rpc.request.queue",false,consumer);}}

结果:

六、Publisher Confirms(发布确认)

消息中间件都会有消息丢失的问题发生,大概分为以下三种丢失情况:

  1. 生产者问题:因为应用故障,网络等问题,生产者没有成功向消息中间件发送消息;
  2. 消息中间件问题:生产者成功发送了消息,消息中间件自己原因导致消息丢失;
  3. 消费者问题:消费者消费消息时,处理出现问题,导致消费者 消费失败的 消息 从消息队列中删除了。

RabbitMQ针对上面三种情况给出的解决方案:

  1. 针对生产者问题:采取Publisher Confirms(发布确认)机制 解决;
  2. 针对消息中间件问题:通过持久化机制解决;
  3. 针对消费者问题:通过消息应答机制解决;

生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始);
一旦消息被投递到所有匹配的队列之后,RabbitMO就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出.
brokerl(消息中间件)回传给生产者的确认消息中 deliveryTag 包含了确认消息的序号,
此外 broker 也可以设置 channel.basicAck() 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理.

6.1 Publishing Messages Individually(单独确认)

跟生产者发送消息,只有调用Channel类confirmSelect()设置信道为confirm模式,和Channel类waitForConfirmsOrDie()方法等待手动确认。

privatestaticvoidpublishingMessagesIndividually()throwsIOException,TimeoutException,InterruptedException{try(Channel channel =createChannel()){//设置信道为confirm模式 channel.confirmSelect();//声明队列 channel.queueDeclare("publish.confirm.queue1",true,false,false,null);long start =System.currentTimeMillis();//发送消息for(int i =0; i <200; i++){String msg ="Publishing Messages Individually "+ i; channel.basicPublish("","publish.confirm.queue1",null,msg.getBytes());//等待确认 channel.waitForConfirmsOrDie(5000);}long end =System.currentTimeMillis();System.out.println("Publishing Messages Individually(单独确认) 发送200条消息耗时 "+(end - start));}}

结果:

可以发现,发送200条消息,耗时很⻓。

观察上⾯代码,会发现这种策略是每发送⼀条消息后就调⽤ channel.waitForConfirmsOrDie() ⽅法,之后 等待服务端的确认,这实际上是⼀种串⾏同步等待的⽅式。
尤其对于持久化的消息来说,需要等待消息确认存储在磁盘之后才会返回(调⽤Linux内核的fsync⽅法)。
但是发布确认机制是⽀持异步的。可以⼀边发送消息,⼀边等待消息确认。

6.2 Publishing Messages in Batches(批量确认)

每发送⼀批消息后,调⽤ channel.waitForConfirms ⽅法,等待服务器的确认返回。

跟单独确认区别就是,发送到一定消息再进行等待确认。

privatestaticvoidpublishingMessagesInBatches()throwsIOException,TimeoutException,InterruptedException{try(Channel channel =createChannel()){//设置信道为confirm模式 channel.confirmSelect();//声明队列 channel.queueDeclare("publish.confirm.queue2",true,false,false,null);long start =System.currentTimeMillis();//发送消息int batchSize =100;int flag =0;for(int i =0; i <200; i++){String msg ="Publishing Messages in Batches "+ i; channel.basicPublish("","publish.confirm.queue2",null,msg.getBytes());//批量 等待确认if(flag == batchSize){ channel.waitForConfirmsOrDie(5000); flag =0;} flag++;}if(flag >0){ channel.waitForConfirmsOrDie(5000);}long end =System.currentTimeMillis();System.out.println("Publishing Messages in Batches(批量确认) 发送200条消息耗时 "+(end - start));}}

结果:

相⽐于单独确认策略,批量确认极⼤地提升了confirm的效率,
缺点是出现Basic.Nack或者超时时,我们不清楚具体哪条消息出了问题。客⼾端需要将这⼀批次的消息全部重发,这会带来明显的重复消息数量。
当消息经常丢失时,批量确认的性能应该是不升反降的。

6.3 Handling Publisher Confirms Asynchronously(异步确认)

提供⼀个回调⽅法,服务端确认了⼀条或者多条消息后客⼾端会回这个⽅法进⾏处理。

异步confirm⽅法的编程实现最为复杂。Channel 接⼝提供了⼀个⽅法 addConfirmListener,这个⽅法可以添加ConfirmListener 回调接⼝。

ConfirmListener 接⼝中包含两个⽅法:handleAck(long deliveryTag, boolean multiple) handleNack(long deliveryTag, boolean multiple) ,分别对应处理RabbitMQ发送给⽣产者的 ack 和 nack。

deliveryTag 表⽰发送消息的序号,multiple 表⽰是否批量确认。我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合。
当收到RabbitMQ的confirm 回调时,从集合中删除对应的消息。当Channel开启confirm模式后,channel上发送消息都会附带⼀个从1开始递增的deliveryTag序号。

我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合。

  1. 当收到ack时,从序列中删除该消息的序号。如果为批量确认消息,表⽰⼩于等于当前序号deliveryTag的消息都收到了,则清除对应集合
  2. 当收到nack时,处理逻辑类似,不过需要结合具体的业务情况,进⾏消息重发等操作。
privatestaticvoidhandlingPublisherConfirmsAsynchronously()throwsIOException,TimeoutException,InterruptedException{try(Channel channel =createChannel()){//设置信道为confirm模式 channel.confirmSelect();//声明队列 channel.queueDeclare("publish.confirm.queue3",true,false,false,null);//有序集合,元素按照⾃然顺序进⾏排序,存储未confirm消息序号SortedSet<Long> confirmSet =Collections.synchronizedSortedSet(newTreeSet<>());long start =System.currentTimeMillis();//监听 channel.addConfirmListener(newConfirmListener(){@OverridepublicvoidhandleAck(long deliveryTag,boolean multiple)throwsIOException{//批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰这批序号的消息都已经被ack了if(multiple){ confirmSet.headSet(deliveryTag+1).clear();}else{//单独确认 confirmSet.remove(deliveryTag);}}@OverridepublicvoidhandleNack(long deliveryTag,boolean multiple)throwsIOException{//批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰这批序号的消息都已经被ack了if(multiple){ confirmSet.headSet(deliveryTag+1).clear();}else{//单独确认 confirmSet.remove(deliveryTag);}//根据业务处理}});//发送消息int flag =0;for(int i =0; i <200; i++){String msg ="Handling Publisher Confirms Asynchronously "+ i;//得到下次发送消息的序号, 从1开始long nextPublishSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish("","publish.confirm.queue3",null,msg.getBytes());//存入集合 confirmSet.add(nextPublishSeqNo);}while(confirmSet.isEmpty()){Thread.sleep(20);}long end =System.currentTimeMillis();System.out.println("Handling Publisher Confirms Asynchronously(异步确认) 发送200条消息耗时 "+(end - start));}}

结果:

Read more

保姆级教程:Windows本地部署Ollama+OpenClaw,打造你的AI赚钱系统(APP开发/量化/小说/剪辑)

摘要:想用AI搞钱但卡在技术门槛?本文手把手教你用一台Windows电脑,零成本本地部署Ollama大模型+OpenClaw智能中枢,赋予AI开发APP、量化分析、编写小说、剪辑辅助等“赚钱技能”。全程无需编程基础,跟着鼠标点、照着命令敲,即可拥有24小时待命的AI员工。 一、写在前面 很多朋友对AI变现跃跃欲试,却常被这些问题劝退: * 云端部署太贵,API调用怕浪费钱 * 技术文档看不懂,不知道从哪下手 * 数据隐私担忧,不敢把敏感资料上传 其实,你手头那台Windows电脑完全能胜任!本文将带你搭建一套完全本地化、免费、可扩展的AI生产力系统,让AI帮你写代码、分析表格、生成文案、处理视频,真正把AI变成你的“赚钱工具”。 系统架构: * 本地大脑:Ollama + DeepSeek模型,负责理解任务、生成内容 * 智能中枢:OpenClaw(原名OpenClaude),负责调用各类工具(Skill) * 赚钱技能:通过安装Skill包,让AI具备特定领域的实操能力 适用人群:

By Ne0inhk

在CodeBuddy中使用自定义AI接口,轻松对接GPT5-Code等大模型,实现AI编程自由!

使用过CodeBuddy的朋友都知道,新用户赠送的大模型使用额度太少了,跑一天就没了,根本就支撑不住我们的Vibe Coding! 比如痴狂哥最近做的项目,没到半天额度就满了:不写一行代码!我用 AI 打造了一款 AI 客户端!(开源) 所以,今天痴狂哥就给大家公布一个自用的方法,让CodeBuddy编辑器能够使用我们自定义的AI接口和大模型,能够无限地愉快自动化编程!实现真正的AI编程自由! 实现效果 1. 让CodeBuddy使用自定义大模型接口,轻松对接如GPT5-Codex等大模型 2. 不消耗用户的使用量,无限免费Vibe Coding! 好了,我们开始吧! 准备工作 1. CodeBuddy海外版(截至至文章发布日期的最新版本0.2.4) 2. Python3(脚本环境) 3. Reqable(抓包工具) 以上软件自行前往官网下载安装。 第一步:重定向大模型接口 将CodeBuddy请求大模型的接口地址,重定向到我们自己的任意大模型API! 安装完毕之后,我们首先打开Reqable,初始化配置,装好证书后启动代理

By Ne0inhk
openJiuwen集成蓝耘AI模型深度解析:从架构设计到企业级Agent实战部署

openJiuwen集成蓝耘AI模型深度解析:从架构设计到企业级Agent实战部署

前言 在人工智能技术从单纯的感知智能向认知智能演进的浪潮中,大语言模型(LLM)的成熟催生了AI Agent(人工智能体)这一全新的应用形态。AI Agent不再局限于传统的单指令执行,而是演进为具备自主感知、推理规划、决策执行能力的智能实体。在这一技术变革背景下,openJiuwen作为一个致力于提供灵活、强大且易用能力的开源Agent平台应运而生。本文将深度剖析openJiuwen的技术架构、核心优势,并基于真实的服务器部署环境,详细拆解从底层环境搭建到上层复杂智能体构建的全过程。 一、 Agentic AI时代的基础设施:openJiuwen概览 openJiuwen的定位不仅是一个开发工具,而是面向生产级应用的Agent全生命周期管理平台。它旨在解决当前大模型应用落地过程中面临的开发门槛高、协同调度难、运行稳定性差等痛点。通过提供标准化的开发框架与高可靠的运行引擎,openJiuwen支持开发者快速构建能够处理各类简单或复杂任务的AI Agent,并实现多Agent间的协同交互。 作为核心代码资产的入口,开发者能在这里查看项目的 Readme 文档、分支管理和最新提交

By Ne0inhk
免费开源AI工具:CoPaw与OpenFang整理

免费开源AI工具:CoPaw与OpenFang整理

CoPaw 和 OpenFang,两者软件本体都免费开源,但模型 API 可能产生费用。 CoPaw(阿里云) * 软件本身:完全免费开源(Apache 2.0),无会员、无广告、无功能限制 * 本地部署:免费,仅需 Python 环境,可跑本地模型(Ollama 等),零 API 费用 * 云端部署:魔搭创空间有免费测试额度;长期使用按云资源(CPU/GPU/ 存储)计费 * 模型 API:调用通义千问、OpenAI、DeepSeek 等按官方标准按量付费  CoPaw GitHub 地址 https://github.com/agentscope-ai/CoPaw OpenFang(

By Ne0inhk