RabbitMQ是一个广泛使用的消息服务器,采用Erlang语言编写,是一种开源的实现 AMQP(高级消息队列协议)的消息中间件。
RabbitMQ一些信息
- RabbitMQ 可以部署在分布式和联合配置中,在易用性、扩展性、高可用性等方面表现不俗。
- AMQP协议(http://www.amqp.org),即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
- 特点:
- 可靠性(Reliability ):RabbitMQ 通过一些机制保证可靠性,如持久化、传输确认、发布确认。
- 灵活的路由(Flexible Routing):由 Exchange 将消息路由至消息队列。RabbitMQ 已经提供了一些内置的 Exchange 来实现典型的路由功能;对于较复杂的路由功能,则将多个Exchange 绑定在一起,或者通过插件机制实现自己的 Exchange。
- 消息集群(Custering ):多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker。
- 高可用(Highly Available Queues):消息队列可以在集群中的机器上镜像存储,使得队列在部分节点出问题的情况下仍然可用。
- 多种协议支持(multi-protocol):RabbitMQ 支持多种消息队列协议,例如 STOMP、MOTT。多语言客户端(Many Clients ):RabbiMQ 几乎支持所有常用语言,例如 Java、.NET、Ruby。
- 管理界面(ManagementUl):RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的多个方面。
- 跟踪机制(Tracing):RabbitMQ 提供了消息跟踪机制,如果消息异常,使用者就可以找出发生了什么。
- 插件机制(Plugin System ):RabbitMQ提供了许多插件,实现了多方面的扩展,用户也可以编写自己的插件。
准备工作
- 安装:
brew install rabbitmq
- 配置~/.bash_profile或者~/.profile环境变量
1
2
| export RABBIT_HOME= /usr/local/Cellar/rabbitmq/X.Y.Z
export PATH=$PATH:$RABBIT_HOME/sbin
|
- 安装成功后目录
/usr/local/Cellar/rabbitmq/X.Y.Z
- 启动服务:
rabbitmq-server
(rabbitmqctl stop
关闭) - 浏览器打开 RabbitMQ 的管理页面。(默认账户密码 guest/guest)
- 卸载:
1
2
3
4
5
6
7
8
9
10
11
| brew services stop rabbitmq
brew uninstall rabbitmq
# Now delete all node's data directories and configuration files.
# This assumes that Homebrew root is at /opt/homebrew
rm -rf /usr/local/Cellar/rabbitmq/
rm -rf/usr/local/opt/rabbitmq/
rm -rf /usr/local/var/lib/rabbitmq/
# the launch agent file
rm -f $HOME/Library/LaunchAgents/homebrew.mxcl.rabbitmq.plist
|
- 安装python库:
pip3 install pika
使用
1.典型的生产消费例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| #公共部分:链接并声明队列
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
#producer
channel.basic_publish(exchange='',
routing_key='hello', #routing_key=队列名称
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
#consumer
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello',
auto_ack=True, #auto_ack=True收到消息后消息可以被删除
on_message_callback=callback)#basic_consume不阻塞
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
|
2.阻塞的获取消息:
basic_get
。(basic_consume
不会阻塞当前调用)
1
2
3
4
5
6
7
8
9
10
| #producer
while True:
data =input()
channel.basic_publish(exchange='',routing_key='hello',body=data)
print(f'Put {data}')
#consumer
while True:
method_frame,header, body= channel.basic_get(queue='hello', auto_ack=True)
if body: print(f'get {body}')
|
3.优先队列
1
2
3
4
5
6
7
8
9
10
11
12
| # 队列声明时,声明最大优先级
channel.queue_declare(queue='hello',
arguments={'x-max-priority':100} #设置最大优先级为100
)
#消息发送时设置通过 properties 参数设置优先级
while True:
data, priority = input().split()
channel.basic_publish(exchange='',routing_key='hello',
properties = pika.BasicProperties( priority=int(priority) ), #设置发送消息的优先级
body=data)
print(f'Put {data}')
|
4.队列持久(重启后只要消息未被消费移除就不会丢失)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| # 公共部分:队列声明时,添加参数durable= True
channel.queue_declare(queue='hello', durable= True)
#producer:消息发送时设置通过 properties的deliver_mode=2
while True:
data, priority = input().split()
channel.basic_publish(exchange='',routing_key='hello',
properties = pika.BasicProperties( deliver_mode=2 ),
body=data)
print(f'Put {data}')
#consumer:
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
print(type(ch), type(method), type(properties), type(body))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='hello2', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
|
5.实例:
producer产生一系列的requests.Request
对象序列化后存储到RabbitMQ
中,consumer获取队列反序列化后再实际执行请求
requests.Request
对象的序列化和反序了化,这里用到了pickle
(完全是多余的操作,直接将url存RabbitMQ远比Request对象消耗低速度快)- 序列化:
pickle.dumps(data)
- 反序列化:
pickle.loads(obj)
- 核心点
- producer:
Request
对象构造requests.Request('GET', url)
- consumer:
1
2
| session = requests.Session()
response = session.send(request.prepare())
|