Post

0408RabbitMQ

RabbitMQ是一个广泛使用的消息服务器,采用Erlang语言编写,是一种开源的实现 AMQP(高级消息队列协议)的消息中间件。

RabbitMQ一些信息

  • RabbitMQ 可以部署在分布式和联合配置中,在易用性、扩展性、高可用性等方面表现不俗。
  • AMQP协议(http://www.amqp.org),即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
  • 特点:
    • 可靠性(Reliability ):RabbitMQ 通过一些机制保证可靠性,如持久化、传输确认、发布确认。
    • 灵活的路由(Flexible Routing):由 Exchange 将消息路由至消息队列。RabbitMQ 已经提供了一些内置的 Exchange 来实现典型的路由功能;对于较复杂的路由功能,则将多个Exchange 绑定在一起,或者通过插件机制实现自己的 Exchange。
    • 消息集群(Custering ):多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker。
    • 高可用(Highly Available Queues):消息队列可以在集群中的机器上镜像存储,使得队列在部分节点出问题的情况下仍然可用。
    • 多种协议支持(multi-protocol):RabbitMQ 支持多种消息队列协议,例如 STOMP、MOTT。多语言客户端(Many Clients ):RabbiMQ 几乎支持所有常用语言,例如 Java、.NET、Ruby。
    • 管理界面(ManagementUl):RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的多个方面。
    • 跟踪机制(Tracing):RabbitMQ 提供了消息跟踪机制,如果消息异常,使用者就可以找出发生了什么。
    • 插件机制(Plugin System ):RabbitMQ提供了许多插件,实现了多方面的扩展,用户也可以编写自己的插件。

准备工作

  • 安装:brew install rabbitmq
    • 配置~/.bash_profile或者~/.profile环境变量
    1
    2
    
      export RABBIT_HOME= /usr/local/Cellar/rabbitmq/X.Y.Z
      export PATH=$PATH:$RABBIT_HOME/sbin
    
    • 安装成功后目录/usr/local/Cellar/rabbitmq/X.Y.Z
    • 启动服务:rabbitmq-serverrabbitmqctl stop 关闭)
    • 浏览器打开 RabbitMQ 的管理页面。(默认账户密码 guest/guest)
  • 卸载:
1
2
3
4
5
6
7
8
9
10
11
brew services stop rabbitmq

brew uninstall rabbitmq

# Now delete all node's data directories and configuration files.
# This assumes that Homebrew root is at /opt/homebrew
rm -rf /usr/local/Cellar/rabbitmq/
rm -rf/usr/local/opt/rabbitmq/
rm -rf /usr/local/var/lib/rabbitmq/
# the launch agent file
rm -f $HOME/Library/LaunchAgents/homebrew.mxcl.rabbitmq.plist
  • 安装python库:pip3 install pika

使用

1.典型的生产消费例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#公共部分:链接并声明队列
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

#producer
channel.basic_publish(exchange='',
                        routing_key='hello', #routing_key=队列名称
                        body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

#consumer
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    
channel.basic_consume(queue='hello', 
                            auto_ack=True, #auto_ack=True收到消息后消息可以被删除
                            on_message_callback=callback)#basic_consume不阻塞
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

2.阻塞的获取消息:

basic_get。(basic_consume不会阻塞当前调用)

1
2
3
4
5
6
7
8
9
10
#producer
while True:
    data =input()
    channel.basic_publish(exchange='',routing_key='hello',body=data)
    print(f'Put {data}')

#consumer
while True:
    method_frame,header, body= channel.basic_get(queue='hello', auto_ack=True)
    if body: print(f'get {body}')

3.优先队列

1
2
3
4
5
6
7
8
9
10
11
12
# 队列声明时,声明最大优先级
channel.queue_declare(queue='hello', 
        arguments={'x-max-priority':100} #设置最大优先级为100
    )

#消息发送时设置通过 properties 参数设置优先级
while True:
    data, priority = input().split()
    channel.basic_publish(exchange='',routing_key='hello',
        properties = pika.BasicProperties( priority=int(priority) ), #设置发送消息的优先级
        body=data)
    print(f'Put {data}')

4.队列持久(重启后只要消息未被消费移除就不会丢失)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 公共部分:队列声明时,添加参数durable= True
channel.queue_declare(queue='hello', durable= True)

#producer:消息发送时设置通过 properties的deliver_mode=2
while True:
    data, priority = input().split()
    channel.basic_publish(exchange='',routing_key='hello',
        properties = pika.BasicProperties( deliver_mode=2 ),
        body=data)
    print(f'Put {data}')

#consumer:
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    print(type(ch), type(method), type(properties), type(body))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='hello2',  on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

5.实例:

producer产生一系列的requests.Request对象序列化后存储到RabbitMQ中,consumer获取队列反序列化后再实际执行请求

  • requests.Request对象的序列化和反序了化,这里用到了pickle(完全是多余的操作,直接将url存RabbitMQ远比Request对象消耗低速度快)
    • 序列化:pickle.dumps(data)
    • 反序列化:pickle.loads(obj)
  • 核心点
    • producer:Request对象构造requests.Request('GET', url)
    • consumer:
    1
    2
    
      session = requests.Session()
      response = session.send(request.prepare())
    
This post is licensed under CC BY 4.0 by the author.