Python RabbitMQ原理和使用场景以及模式

RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件。

一、RabbitMQ 原理简介

1. RabbitMQ 角色

  • 生产者:消息的创建者,负责创建和推送数据到消息服务器;
  • 消费者:消息的接收方,用于处理数据和确认消息;
  • 代理:就是RabbitMQ本身,负责消息的传递。

2. 连接 RabbitMQ 方式

客户端通过 TCP 连接到 RabbitMQ Server。
连接成功后 RabbitMQ 会创建一个 AMQP 信道。
信道是创建在 TCP 上的虚拟连接,AMQP 命令都是通过信道发送出去的,每个信道都会有一个唯一的 ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。

3. RabbitMQ 中的关键词

  • Producer (生产者):消息生产者
  • Consumer(消费者):消息的消费者
  • Connection(连接):就是一个TCP的连接,Producer 和 Consumer 都是通过 TCP 连接到 RabbitMQ Server 的。
  • Channel(信道):他是虚拟连接,他建立在 Connection 的 TCP 连接中。也就是说他是消息推送使用的通道;
  • Exchange(交换器):是生产者发布消息的通道,接收生产者消息并将消息路由到消息队列;
  • Queue(队列):是消费者接受消息的通道,用于存储生产者的消息;
  • RoutingKey(路由键):用于把生成者的数据分配到交换器上;
  • BindingKey(绑定键):用于把交换器的消息绑定到队列上;

4. RabbitMQ 消息持久化

RabbitMQ 会将持久化的消息写入磁盘上的持久化日志文件,等消息被消费之后,RabbitMQ 会把这条消息标识为等待垃圾回收。

RabbitMQ 默认情况下是关闭消息持久化的。需要在创建队列的时候设置。

设置如下:

  • 队列持久化 durable=True 消息的生产者和接收者都需要设置队列持久化
# durable=True 队列持久化 channel.queue_declare(queue='test', durable=True)

只做队列持久化是不行的还需要在加上消息持久化

  • 消息持久化 delivery_mode=2
channel.basic_publish( exchange="", routing_key="test", body="hello world", properties=pika.BasicProperties(delivery_mode=2,))
  • 消息持久化的缺点
    因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量。

4. RabbitMQ 虚拟主机

  • 虚拟主机概念
虚拟主机是 RabbitMQ 创建的虚拟消息服务器。为了在一个 RabbitMQ 上实现多用户隔离。为此提供了一个虚拟主机(virtual hosts - vhosts)的概念。

二、RabbitMQ 模式

前三个说的是 RabbitMQ 的 Exchange 的类型。RPC 是消费者和生产者互通的一种方式。

1. Direct 直连交换机

当一个绑定了 routing_key = 1 的消息被发送给直连交换机时,交换机会把消息发送给绑定了routing_key = 1的队列。

直连交换机经常用来循环分发任务给多个消费者。然后消息的负载均衡是发生在消费者之间的,而不是队列之间。
如下例:

消息生产者

import pika config = pika.ConnectionParameters( host='127.0.0.1', credentials=pika.PlainCredentials('test','test'),)# 创建 MQ 连接 conn = pika.BlockingConnection(config) channel = conn.channel()# 在频道中创建一个队列 channel.exchange_declare(exchange='ceshi',type='direct')# 发送消息到指定队列# exchange 指定交换器# routing_key 设置路由键# body 发送的内容 channel.basic_publish( exchange='ceshi', routing_key='1', body='Hello World!') conn.close()

消息消费者

import pika config = pika.ConnectionParameters( host='127.0.0.1', port=5672, credentials=pika.PlainCredentials('test','test'),)# 创建 MQ 连接 conn = pika.BlockingConnection(config) channel = conn.channel()# 如果使用exchange, 这里检测 exchange 是否存在,如不存在创建。存在检测是否正确且是否符合 exchange_type 类型 channel.exchange_declare(exchange='ceshi', exchange_type='direct')# 在频道中创建一个队列 channel.queue_declare(queue='hello')# 将队列绑定到指定的 exchange# routing_key 类似密钥,只接收 routing_key 正确的信息 channel.queue_bind(exchange='ceshi', queue='hello', routing_key='1')# 回调函数四个必须的参数 body 是传入的内容# channel: BlockingChannel# method: spec.Basic.Deliver# properties: spec.BasicProperties# body: str or unicodedefcallback(channel, method, properties, body):print channel print method print properties print body # 指定队列调用的函数# no_ack 参数 True 时处理完成后没有返回信息。False 时在处理完后应答 channel.basic_consume( callback, queue='hello', no_ack=False)print'waiting ...' channel.start_consuming()

注意\color{red}{注意}注意:在修改消费者的routing_key后,需要重新创建队列。

2. Fanout 扇型交换机(订阅者模式)

他回将消息发送给绑定到它身上的所有队列,而不理会绑定的路由键。

如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。

扇型交换机实现消息的广播。

以下几个应用场景体育比赛用它来给手机客户端发送比分数据广播各种状态和产品的更新在群聊时分发消息给群聊中的用户

消息生产者

#!coding=utf-8import pika config = pika.ConnectionParameters( host='127.0.0.1', credentials=pika.PlainCredentials('test','test'),) conn = pika.BlockingConnection(config) channel = conn.channel()# 修改 type 为 fanout channel.exchange_declare(exchange='ceshi2',type='fanout') channel.basic_publish( exchange='ceshi2', routing_key='', body='Hello World!') conn.close()

在生产者中只需要修改 exchange_declare 他的 typefanout 即可

消息消费者

#! coding=utf-8import pika config = pika.ConnectionParameters( host='127.0.0.1', port=5672, credentials=pika.PlainCredentials('test','test'),) conn = pika.BlockingConnection(config) channel = conn.channel() channel.exchange_declare(exchange='ceshi2', exchange_type='fanout')# 订阅者模式# 生成随机 queue_name# 订阅者之间的 queue name 不能重复 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='ceshi2', queue=queue_name, routing_key='')defcallback(channel, method, properties, body):print channel print method print properties print body channel.basic_consume( callback, queue=queue_name, no_ack=False)print'waiting ...' channel.start_consuming()

3. Topic 主题交换机(模糊匹配)

通过发送者和接收者之间的 routing_key 相互匹配,将消息路由给一个或多个队列。

主题交换机通常用来实现消息的多路由广播

以下应用场景发布不同分类或者不同标签的新闻(例如,发送游戏类新闻和体育类新闻)系统中不同种类服务的调用(例如:发布系统和付费系统的调用)

例如:
接收者(消费者) routing_key='a.*'
发送者(生产者) routing_key = 'a.b.c.d'
结果:匹配失败

因为:
* 表示匹配一个单词
# 表示匹配0个或多个单词

消息生产者

#!coding=utf-8import pika config = pika.ConnectionParameters( host='127.0.0.1', credentials=pika.PlainCredentials('test','test'),) conn = pika.BlockingConnection(config) channel = conn.channel()# 修改 type 为 topic channel.exchange_declare(exchange='ceshi3',type='topic') channel.basic_publish( exchange='ceshi3', routing_key='a.b.c.d', body='Hello World!') conn.close()

消息消费者

#! coding=utf-8import pika config = pika.ConnectionParameters( host='127.0.0.1', port=5672, credentials=pika.PlainCredentials('test','test'),) conn = pika.BlockingConnection(config) channel = conn.channel() channel.exchange_declare(exchange='ceshi3', exchange_type='topic') channel.queue_declare(queue='hello')# routing_key='a.*' 此时是无法接受到信息的,因为生产者发送的是 a.* a. 后面一个单词。修改 routing_key='a.#' 即可接受成功 channel.queue_bind(exchange='ceshi3', queue='hello', routing_key='a.*')defcallback(channel, method, properties, body):print channel print method print properties print body channel.basic_consume( callback, queue='hello', no_ack=False)print'waiting ...' channel.start_consuming()

4. RPC

生产者发送消息给消费者,并接收消费者处理完的结果
原理:生产者会创建一个新的队列,用来接收消费者返回的信息。生产者在发送消息的同时,还会发送 1步骤中创建的队列名和一个 correlation_id 用来验证当消费者处理完数据后,会把结果和correlation_id发送到 1步骤创建的队列中去生产者会使用一个循环 while 来监测返回结果 self.response生产者获取到数据后比对 correlation_id 是否一致,然后结束此次发送流程

消息生产者

#! coding=utf-8import pika import uuid classRpcClient(object):def__init__(self): config = pika.ConnectionParameters( host='127.0.0.1', credentials=pika.PlainCredentials('test','test'),) self.conn = pika.BlockingConnection(config) self.channel = self.conn.channel()# 在频道中创建一个队列 result = self.channel.queue_declare(exclusive=True)# 生成队列名 self.queue_name = result.method.queue # 指定队列要调用的函数 self.channel.basic_consume( self.on_request, no_ack=True, queue=self.queue_name )defon_request(self, channel, method, props, body):if self.corr_id == props.correlation_id: self.response = body defcall(self, message): self.response =None self.corr_id =str(uuid.uuid4)# 发送消息到指定队列 self.channel.basic_publish( exchange='', routing_key='rpc',# 消费者返回处理结果需要的队列信息(reply_to)和相关 id(correlation_id).# 以便消费者知道返回给那个生产者 properties=pika.BasicProperties( reply_to=self.queue_name, correlation_id=self.corr_id ), body=message )while self.response isNone:# 是一个等待消息的阻塞过程,连接的任何消息都可以使它脱离阻塞状态 self.conn.process_data_events()return self.response fibonacci_rpc = RpcClient()print"等待处理结果" response = fibonacci_rpc.call('Hello World!')print"处理完返回信息: %s"% response 

消息消费者

#! coding=utf-8import pika import time config = pika.ConnectionParameters( host='127.0.0.1', credentials=pika.PlainCredentials('test','test'),) conn = pika.BlockingConnection(config) channel = conn.channel() channel.exchange_declare(exchange='rpc') channel.queue_declare(queue='rpc')defon_request(channel, method, props, body):print'处理中, 收到内容:%s'% body # 发送消息到指定队列# exchange 指定 exchange# routing_key 设置为队列的名称# body 发送的内容 channel.basic_publish( exchange='',# props.reply_to 是生产者发送过来 接收处理结果的 指定队列 routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body='处理完成')# 告知 rabbit 消息已经处理完 channel.basic_ack(delivery_tag=method.delivery_tag)# 使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,只有工作者完成任务之后,才会再次接收到任务 channel.basic_qos(prefetch_count=1)# 接收的结果调用 on_request 函数处理 channel.basic_consume(on_request, queue='rpc')print("waiting....")# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理 channel.start_consuming() conn.close()

三、RabbitMQ 使用场景

1. 任务相互依赖

例:

三个任务,任务1、任务2、任务3。他们之间有相互的制约。执行任务1的前提是要有任务2的结果。执行任务2需要任务3的结果。

一般会使用crontab来做计划任务。预估一下每个任务的完成时间。然后制定任务。当数量变大处理时间变成,就需要经常修改crontab任务。

使用 RabbitMQ 后每个任务结束时只需要发送一个结束信息即可

如:任务2订阅任务3的信息。当任务3完成后发送一个完成消息。任务2接收到完成消息后开始执行,在执行结束后发送任务2完成消息。任务1订阅任务2的消息,然后执行。

优点:
  • 任务执行时间发生变化,不需要修改 crontab 任务
  • 每个任务之间的预留时间没有了

2. 下发任务后不关心结果

例:

有三个用户A、B、C 他们分别发送文章,但后台会根据用户的级别做不同的操作。
A 是普通用户,系统发布。
B 是 VIP 用户,系统发布和推荐给关注这部分内容的客户。
C 是黑卡客户,系统发布、推荐给关注这部分内容的客户、在这个分类中置顶这篇文章。
文章发布服务只关心是否成功,剩下的操作都不关心。可以使用 RabbitMQ 服务将其他操作分离。

优点:
  • 减少发布时间,只操作发布,其他操作由别的系统执行。
  • 解耦,系统之间不在直接关联。
  • 新增发布以后的其他操作不需要在修改发布系统代码。

3. 任务执行时间很长还需要等待结果

例:

有一个操作 A。用户执行这个任务后,又非常需要结果。

解决方法:
  1. 用户调用操作 A。
  2. 操作 A 直接返回调用成功。此时只是调用成功。
  3. 操作 A 告诉后台应该执行什么程序。
  4. 后台执行完成后,将完成消息发送给 RabbitMQ。
  5. 用户订阅 RabbitMQ 中的操作 A 的消息。

为什么不是后台直接将结果发送给用户呢。因为一旦增加了订阅用户,就需要修改后台程序,这样很恶心。

四、开启 WEB 服务

RabbitMQ 自带管理后台,安装后需要配置开启

进入 RabbitMQ 安装目录中的 sbin 目录执行

rabbitmq-plugins enable rabbitmq_management 

http://localhost:15672/

用户名密码均为guest

Read more

【动态规划】数位DP的原理、模板(封装类)

【动态规划】数位DP的原理、模板(封装类)

本文涉及知识点 C++动态规划 复杂但相对容易理解的解法 上界、下界的位数一样都为N。如果不一样,拆分一样。比如:[10,200],拆分[10,99]和[100,200]。由于要枚举到 1 ∼ N 1\sim N 1∼N,故实际复杂度是N倍。 动态规划的状态表示 dp[n][m][m1],n表示已经处理最高n位,m表示上下界状态:0非上下界,1下界,2上界,3上下界。m1是自定义状态。 某题范围是[110,190],处理一位后:1是上下界,无其它合法状态。处理二位后,11是下界,19是上界, 12 ∼ 18 12

By Ne0inhk

(CPO_SVR)2024最新算法豪冠猪算法优化SVR实现数据回归预测(区别于SVM的数据分类)(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭:行百里者,半于九十。 📋📋📋本文内容如下:🎁🎁🎁  ⛳️赠与读者 👨‍💻做科研,涉及到一个深在的思想系统,需要科研者逻辑缜密,踏实认真,但是不能只是努力,很多时候借力比努力更重要,然后还要有仰望星空的创新点和启发点。建议读者按目录次序逐一浏览,免得骤然跌入幽暗的迷宫找不到来时的路,它不足为你揭示全部问题的答案,但若能解答你胸中升起的一朵朵疑云,也未尝不会酿成晚霞斑斓的别一番景致,万一它给你带来了一场精神世界的苦雨,那就借机洗刷一下原来存放在那儿的“躺平”上的尘埃吧。      或许,雨过云收,神驰的天地更清朗.......🔎🔎🔎 💥第一部分——内容介绍 基于冠豪猪优化算法(CPO)的支持向量回归(SVR)数据回归预测研究 摘要:本文提出一种基于冠豪猪优化算法(CPO)优化支持向量回归(SVR)的混合模型(CPO-SVR),用于解决多输入单输出(MISO)回归预测任务中的非

By Ne0inhk
HDFS SafeMode深度解析:原理、触发机制与运维实践

HDFS SafeMode深度解析:原理、触发机制与运维实践

HDFS SafeMode深度解析:原理、触发机制与运维实践 * 引言 * 一、什么是SafeMode? * 1.1 基本概念 * 1.2 SafeMode的核心作用 * 二、SafeMode的工作原理 * 2.1 整体流程 * 2.2 安全阈值的计算 * 2.3 块报告与健康度评估 * 三、何时需要进入SafeMode? * 3.1 正常启动过程(自动进入) * 3.2 管理员手动进入(运维操作) * 场景一:执行重要维护操作 * 场景二:集群恢复和数据校验 * 场景三:强制退出长时间SafeMode * 3.3 异常触发情况 * 场景一:副本严重不足 * 场景二:DataNode批量掉线 * 场景三:元数据不一致 * 四、

By Ne0inhk
【LeetCode原地复写零】:双指针+逆向填充,O(n)时间O(1)空间最优解!

【LeetCode原地复写零】:双指针+逆向填充,O(n)时间O(1)空间最优解!

🎁个人主页:User_芊芊君子 🎉欢迎大家点赞👍评论📝收藏⭐文章 🔍系列专栏:Java.数据结构 【前言】 本文聚焦 LeetCode“原地复写零”经典题目,核心需求是在固定长度数组中复写每个 0并右移其余元素,且需满足原地修改、不使用额外数组空间的约束。正向遍历易导致后续元素被覆盖,为此本文详解双指针+逆向填充的优雅解法,高效破解这一核心难点。 文章目录: * 一、复写零 * 二、思路分析 * 1.找到复写的最后一个数 * 2.开始从后往前复写 * 三、代码展示 * 四、时间和空间复杂度分析 * 五、总结 一、复写零 二、思路分析 复写零这道题是让在原数组修改,如果从前向后遍历,后面的元素会被覆盖,所以我们要找到被复写的最后一个元素,然后从后往前复写。运用双指针+逆向填充 1.

By Ne0inhk