1.RabbitMQ组成

RabbitMQ结构

1.1 结构组成:

  1. producer:生产者,消息的生产者。

  2. exchange: 交换器,关键组件,灵活性所在。

  3. Queue:队列

  4. Consumer:消费者 ,处理生产者通过queue发来的消息。

通常我们对于消息队列的认识,主要由生产者、队列、消费者这三部分组成。因为RabbitMQ采用AMQP协议,这个协议很重要的一点就是:将生产者和消息队列解耦,所以引入了交换器Exchange。消费者之只关心把消息发给Exchange,并不需要考虑各种情况(队列是否存在、能否发送成功)。通过配置exchange,可以实现不同的效果。

1.2 exchange支持的类型

  1. Default模式:名称为空的direct模式
  2. fanout模式:类似广播,将message发送到所有的队列中
  3. direct模式:使用最多的模式
  4. topic模式:主题模式,相当在direct模式增加通配符功能,更加灵活
  5. headers模式:忽略程序中exchange和routing_key配置,采用header头信息参数来进行指定参数,性能较差,一般不用

2.六种典型工作模式

2.1 默认工作模式

最简单的工作模式,只使用生产者、队列、消费者。exchange采用Default模式,name为空的Direct模式

send1.py

import pika

# 创建连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

# 获取通道
channel = connection.channel()

# 设置队列名称
channel.queue_declare(queue='hello')

# 发送消息,只指定routing_key和body
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")

# 关闭连接
connection.close()

receive1.py

import pika, sys, os

def main():
    # 获取连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
	
    # 声明队列名称
    channel.queue_declare(queue='hello')

    # 定义消息消费函数
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
	
    # 指定队列和消费函数的对应关系
    # 注意auto_ack处于打开状态,会自动返回确认消息
    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    # 开始同步阻塞方式消费
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

运行效果

(先启动receive,然后再启动send):

# send
[x] Sent 'Hello World!'

# receive
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'Hello World!'

2.2 work queue 工作队列

相比于最基本的方式,使用同一种exchane模式,改进的地方在于:

1.consume侧,通过多个consume共同消费,主要用于某些耗时处理,通过负载均衡的模式,提高整体的负载能力。并且不同消费者之间可以指定prefetch_count数目,不仅仅只是简单的轮询,同时考虑consume的消费情况来分配消息,更好的合理。

2.ack返回机制,保证consume中断处理,消息不丢失,重新分发。

3.队列queue可以指定持久化duration

4.消息可以持久化机制(delivery_mode=2),保证rabbitmq本身程序中断,也不会丢失数据。说明即使设置也是有概率存在数据的可能,fsync机制。

send2.py

import pika
import sys

# 创建连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

# 获取通道
channel = connection.channel()

# 指定队列名称为task_queue(只是一个名称而已),同时队列是可以持久化的
channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # 声明消息持久化
    ))
print(" [x] Sent %r" % message)

# 关闭连接
connection.close()

receive2.py

import pika
import time

# 创建连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

# 获取通道
channel = connection.channel()

# 指定队列名称为task_queue(只是一个名称而已),同时队列是可以持久化的
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

# 定义消息处理函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    # 手动返回ack确认信息
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 指定最多一次接收的消息,prefetch_count=1, 代表不要在同一时间给一个工作者一个以上的消息。或者,换句话说,在worker处理并确认前一个消息之前,不要向它发送新消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

运行效果

(开启两个receive,一个send。先开receive,然后开send)

# work1
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\receive2.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'
 [x] Done
 [x] Received 'Hello World!'
 [x] Done

# work2
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\receive2.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'
 [x] Done
 [x] Received 'Hello World!'
 [x] Done

# send
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send2.py
 [x] Sent 'Hello World!'
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send2.py
 [x] Sent 'Hello World!'
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials>
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send2.py
 [x] Sent 'Hello World!'
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send2.py
 [x] Sent 'Hello World!'

2.3 publish/subscribe 发布/订阅

exchange_type=fanout

发布订阅模式和前面介绍的两种模式最大的差别在于,普通消息队列中一个消息在同一时间只可能被一个consumer消费。但是在发布订阅模式中,发布者publish广播的一条消息,订阅者subscriber只要订阅了对应的主题,就可以收到消息,一条消息会被多个订阅者消费。

send3.py

import pika
import sys

# 创建连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

# 获取通道
channel = connection.channel()

# 声明name='logs'的exchange,exchange的方式为fanout(扇出,给所有已知的队列发送消息)
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# producer发送消息,指定消息发送到name='logs'的exchange上面,不指定routing_key
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)

# 关闭连接
connection.close()

receive3.py

import pika

# 创建连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

# 获取通道
channel = connection.channel()

# 声明name='logs'的exchange,exchange的方式为fanout(扇出,给所有已知的队列发送消息)
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 创建随机名称的队列,并且在consumer关闭连接之后,删除临时队列
result = channel.queue_declare(queue='', exclusive=True)
# 获取临时队列名称
queue_name = result.method.queue

# 将exchange和queue进行绑定
channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

# 定义消息处理函数
def callback(ch, method, properties, body):
    print(" [x] %r" % body)

# 声明consumer消费过程中队列名称、回调函数,自动返回ack确认信息
channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

# 开始消费
channel.start_consuming()

运行效果

首先开两个receive,进入监听状态。然后在执行2次(多次也可以)send

# work 1
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\receive3.py
 [*] Waiting for logs. To exit press CTRL+C
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'
 
# work 2
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\receive3.py
 [*] Waiting for logs. To exit press CTRL+C
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'

# send(发送了两次消息)
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send3.py
 [x] Sent 'info: Hello World!'
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send3.py
 [x] Sent 'info: Hello World!'

work1 和 work2 都获得了相同的详细内容

2.4 routing 路由

exchange_type=direct

路由模式相比上一个发布/订阅更加具有灵活性,可以根据消息本身的特性,补充exchange_key来进行二次指定。

send4.py

import pika
import sys

# 创建连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
# 获取通道
channel = connection.channel()

# 指定交换器name=direct_logs,并指定交换类型为'direct'(直接模式)
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 利用severity来作为routing_key,默认为'info',可以手动指定
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# 指定消息发送中的交换器和routing_key
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))

# 关闭连接
connection.close()

receive4.py

import pika
import sys

# 创建连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
# 获取通道
channel = connection.channel()

# 指定交换器name=direct_logs,并指定交换类型为'direct'(直接模式)
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 创建随机名称的队列,并且在consumer关闭连接之后,删除临时队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

# 根据运行时,指定的severities,逐个进行绑定(exchange想用,只是routing_key有变化)    
for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

# 定义消息处理函数
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

# 消息消费声明,指定随机队列名,处理函数,自动返回ack信息
channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

# 开始消费
channel.start_consuming()

运行效果

开启两个receive,指定不同的消息类型。开启send,发送不同类型的消息

# receive 1,指定接收warning和error类型的消息,实际接收了2条消息
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\receive4.py warning error
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'warning':b'warning msg'
 [x] 'error':b'error msg'

# receive 2,指定接收info、warning、error三个类型的消息,实际接收了3条消息
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\receive4.py info warning error
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info':b'info msg'
 [x] 'warning':b'warning msg'
 [x] 'error':b'error msg'
 
# send, 发送了三条不同类型的数据,分别为info、warning、error,各一条
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send4.py info "info msg"
 [x] Sent 'info':'info msg'
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send4.py warning "warning msg"
 [x] Sent 'warning':'warning msg'
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send4.py error "error msg"
 [x] Sent 'error':'error msg'

2.5 topics 主题

exchange_type=topic

主题模式是一个非常灵活的模式。routing_key采用<celerity>.<colour>.<species>的模式。相比于direct模式,主题模式引入了特殊符号*(代表一个单词)和#(代表0或者多个单词),起到通配符的作用。

send5.py

import pika
import sys

# 创建连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

# 获取通道
channel = connection.channel()

# 声明交换器,name='topic_logs',类型='topic'
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 手动输入或者采用默认值'anonymous.info'作为routing_key
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# 指定消息发送中的交换器和routing_key
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))

# 关闭连接
connection.close()

receive5.py

import pika
import sys

# 创建连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

# 获取连接
channel = connection.channel()

# 指定交换器name=topic_logs,并指定交换类型为'topic'(主题模式)
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 创建随机名称的队列,并且在consumer关闭连接之后,删除临时队列
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

# 根据运行时,指定的binding_keys,逐个进行绑定(exchange想用,只是routing_key有变化)    
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

# 定义消息处理函数
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

# 指定消息消费,对应随机队列名称、处理函数callback,自动返回ack信息
channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

# 开始消费
channel.start_consuming()

运行效果

首先开两个(或者多个receive),然后再启动send,发送类型的消息看效果

# receive 1, 使用'#'接收所有的信息
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\receive5.py "#"
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'test.info':b'info msg'
 [x] 'test.warning':b'warning msg'
 [x] 'test.error':b'error msg'

# receive 2, 接收routing_key符合'*.info'格式的数据
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\receive5.py "*.info"
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'test.info':b'info msg'
 
# 发送三条不同route_key格式的数据
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send5.py "test.info" "info msg"
 [x] Sent 'test.info':'info msg'
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send5.py "test.warning" "warning msg"
 [x] Sent 'test.warning':'warning msg'
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials>
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send5.py "test.error" "error msg"
 [x] Sent 'test.error':'error msg'

2.6 RPC模式

exchange_type=Default

使用消息实现RPC(远程调用)的效果,实际上就是两个队列来实现,一发送一消费,整个过程类似于发送请求,处理请求以后,返回对应的结果。主要使用在一些需要调用结果的场景。

send6.py

import pika
import uuid
import time


class FibonacciRpcClient(object):

    # 初始化
    def __init__(self):
        # 创建连接
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        # 获取channel
        self.channel = self.connection.channel()

        # 创建随机队列,断开连接以后销毁队列
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        # 声明消息回传以后的处理,指定队列为刚才创建的临时队列,处理函数为on_response
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

        # 初始化返回值
        self.response = None

    # 消息处理函数
    def on_response(self, ch, method, props, body):
        # 只有返回的消息id,与之前发送的消息一致,才处理
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        # 通过uuid生成唯一消息id
        self.corr_id = str(uuid.uuid4())
        # 未指定exchange,将消息发送的queue='rpc_queue',并指定回调处理队列名称、消息id
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))

        # 循环等待计算结果(有风险,rpc处理失败,无返回,这里会循环等待)
        while self.response is None:
            self.connection.process_data_events()

        return int(self.response) if self.response else None


# 定义调用rpc类
fibonacci_rpc = FibonacciRpcClient()

# 调用消息队列实现类rpc功能,计算fib(30)的值
print(" [x] Requesting fib(30), time={0}".format(time.time()))
response = fibonacci_rpc.call(30)
print(" [.] Got {0}, time={1}".format(response, time.time()))

receive6.py

import pika
import time

# 创建连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

# 获取通道
channel = connection.channel()

# 队列声明,队列名称为'rpc_queue'
channel.queue_declare(queue='rpc_queue')


# 斐波那契计算函数
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


# 队列消息回传处理函数(和receive的功能类似)
def on_request(ch, method, props, body):
    n = int(body)

    # 先调用菲波那切函数计算出结果
    print(" [.] fib({0}), time={1}".format(n, time.time()))
    response = fib(n)

    # 然后作为消费者,把计算结果,按照传递过来的参数,设置routing_key和消息id
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id= \
                                                         props.correlation_id),
                     body=str(response))

    # 手动提交ack确认信息
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 指定最多一次接收的消息,prefetch_count=1, 代表不要在同一时间给一个工作者一个以上的消息。或者,换句话说,在worker处理并确认前一个消息之前,不要向它发送新消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

# 开始消费
print(" [x] Awaiting RPC requests")
channel.start_consuming()

运行效果

先运行receive,然后在执行send

# receive
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\receive6.py
 [x] Awaiting RPC requests
 [.] fib(30), time=1600334411.0510812
 
# send
PS D:\workspace\local\python\python3_test\rabbitmq\tutorials> python .\send6.py
 [x] Requesting fib(30), time=1600334411.0500834
 [.] Got 832040, time=1600334411.3492827

通过时间点信息和打印提示信息可以知道,消息在send一侧,然后在receive一侧,计算完斐波那契数值,最后又返回到send侧。实现类RPC功能。

参考资料列表:

  1. 官网-getstarted
  2. 知乎-理解 RabbitMQ Exchange

备注:
更多最新精彩博客,请访问:聂发俊的技术博客