目录

  1. 概述
  2. 应用场景
  3. 核心概念
  4. 常用协议
    1. AMQP
    2. STOMP
    3. XMPP
    4. JMS
  5. 常用MQ
    1. RabbitMQ
      1. 基础概念
      2. 通信过程
      3. 示例(kombu)
        1. Hello World
        2. Work Queues(工作队列)
        3. Publish/Subscribe(发布/订阅)
        4. Routing(路由)
        5. Topic(主题)
        6. RPC(远程过程调用)
      4. 示例(oslo_messaging)
        1. RPC
        2. Notification

概述

什么是消息队列。可以将消息队列理解为一个使用队列来通信的组件。它的本质就是一个转发器,包含发布消息、存储消息、消费消息的过程。一个简单的消息队列模型如下。

消息队列消息队列

消息队列,简称 MQ(Message Queue),它其实就指消息中间件。现阶段流行的开源消息中间件包括:ActiveMQ、RabbitMQ、RocketMQ 和 Kafka。

应用场景

  • 异步处理:处理如短信下发、状态推送、用户注册、数据同步等功能,提高系统并发的能力。
  • 系统解耦:可在模块、服务、接口等不同粒度上实现解耦。
  • 重试补偿:在跨机器数据传输的整个过程中,只要任意一个环节出错,都会导致问题的产生,可以通过 MQ 的重试补偿机制去尽可能的处理掉这些异常。
  • 流量削峰:对于秒杀场景下的下单处理。服务器收到消息后,首先送入消息队列,然后按照消息队列的处理能力做处理。
  • 日志处理:可以定时将日志写入 MQ,并且主动订阅日志服务。

核心概念

  • Broker
    消息服务器,作为 server 提供消息核心服务。

  • Producer
    消息生产者,发布消息到消息队列。

  • Consumer
    消息消费者,从消息队列接收消息。

  • Queue
    消息队列,一个先进先出的消息存储区域。消息按照顺序发送接收,一旦消息被消费处理,该消息将从队列中删除。

  1. 本地队列。本地队列按照功能可以分为初始化队列、传输队列、目标队列和死信队列。
    • 初始化队列:用作消息触发功能。
    • 传输队列:是暂存待传的消息,条件许可的情况下,通过管道将消息传送到其他的队列管理器。
    • 目标队列:是消息的目的地,可以长期存放消息。
    • 死信队列:如果消息不能送达目标队列,也不能再路由出去,则被自动放入死信队列保存。
  2. 别名队列/远程队列。用来指定远端队列管理器的队列。使用了远程队列,程序就不知道目标队列的位置。
  3. 模型队列。模型队列定义了一套本地队列的属性结合,一旦打开模型队列,队列管理器会按照这些属性动态地创建出一个本地队列。
  • Topic
    主题,发布订阅模式下的消息统一汇集地,不同生产者向 topic 发送消息,由 MQ 服务器分发到不同的订阅者,实现消息的消费。

  • Message
    消息体。

常用协议

AMQP

AMQP 即 Advanced Message Queuing Protocol,是面向消息中间件提供的开放的应用层协议。AMQP 的定义特性是消息导向、队列、路由(包括点对点和发布订阅)、可靠性和安全性。

AMQP 规范了消息传递方和接收方的行为,以使信息在不同的提供商之间实现互操作性,就像 SMTP、HTTP、FTP 等协议可以创建交互系统一样。以前的中间件标准化发生在 API 级别(例如 JMS),JMS 在特定的 API 接口层面和实现行为上进行了统一,而 AMQP 则关注于各种消息如何以字节流的形式进行传递。因此使用了符合协议实现的任意应用程序之间可以保持对讯息的创建、传递,而与实现语言无关。

AMQP 对于实现有如下规定:

  1. 类型系统。类型系统用于定义允许处理实体表达和理解标准和扩展元数据的消息格式,它还用于定义在这些实体之间交换消息的通信原语,即AMQP帧。
  2. 对称的异步消息传递。
  3. 标准的、可扩展的消息格式。
  4. 标准的、可扩展的消息存储池。

STOMP

STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互,通常作用于发布-订阅的模型。STOMP协议由于设计简单,易于开发客户端,因此在多种语言和多种平台上得到广泛地应用。

XMPP

XMPP(可扩展消息与存在协议,Extensible Messaging and Presence Protocol)是一种以XML为基础的开放式及时通信协议。多用于及时消息(IM)以及在线现场探测。适用于服务器之间的准及时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送及时消息,即使操作系统和浏览器不同,但XML编码格式占用带宽大。

JMS

JMS是基于JVM消息代理的规范,是对AMQP、MQTT、STOMP、XMPP等协议更高一层的抽象。JMS是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

常用MQ

常用MQ对比常用MQ对比

RabbitMQ

基础概念

  • Message
    Message消息,是MQ的基础,它由消息头和消息体构成。消息体是生产者传递给消费者的消息,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息是否需要持久化存储)等。

  • Publisher
    消息的生产者,也是一个向交换器发布消息的客户端端应用程序。

  • Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  • Routing Key
    路由键,提供交换机查看并根据键来决定如何分发消息到队列的一个键,路由键可以说是消息的目的地址。

  • Binding
    绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

  • Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可以投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  • Connection
    连接RabbitMQ和应用服务器的TCP连接。

  • Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接。AMQP命令都是通过信道发送出去的,不管是发布消息,订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入信道的概念,以复用一条TCP连接。

  • Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

  • Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定。

通信过程

RabbitMQ通信过程RabbitMQ通信过程

流程解析:

  1. 消息生产者连接到RabbitMQ Broker,创建Connection,开启Channel。
  2. 生产者声明交换机类型、名称、是否持久化等(大部分是在管理页面或者消费者端声明的)。
  3. 生产者发送消息,并指定消息是否持久化等属性和Routing Key。
  4. Exchange收到消息后,根据Routing Key路由到和当前交换器绑定的相匹配的队列里面。
  5. 消费者监听接收到消息之后开始业务处理,然后发送一个ack确认告知消息已经被消费(手动自动都有可能)。
  6. RabbitMQ Broker收到ack之后将对应的消息从队列中删除掉。

示例(kombu)

Hello World

P是Producer也就是消息的生产者,C是Consumer也就是消息的消费者,红色的框框是以“hello”命名的队列queue。生产者将消息传入hello队列,消费者从队列将消息取出。

send.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
__file__ = 'send.py'
__author__ = 'king'
__time__ = '2022/03/28 11:23:00'
__version__ = '1.0'
'''

import kombu
import datetime

with kombu.Connection('amqp://guest:guest@localhost:5672//') as conn:
simple_queue = conn.SimpleQueue('hello')
message = f'helloworld, sent at {datetime.datetime.today()}'
simple_queue.put(message)
print(f'Sent: {message}')
simple_queue.close()

receive.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
__file__ = 'receive.py'
__author__ = 'king'
__time__ = '2022/03/28 14:56:22'
__version__ = '1.0'
'''

import kombu

with kombu.Connection('amqp://guest:guest@localhost:5672//') as conn:
simple_queue = conn.SimpleQueue('hello')
message = simple_queue.get(block=True, timeout=1)
print(f'Received: {message.payload}')
message.ack()
simple_queue.close()

Work Queues(工作队列)

工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。

client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from kombu import Queue
from kombu import Connection

queue = Queue('task_queue', routing_key='task_queue')

with Connection('amqp://guest:guest@localhost:5672//') as connection:
producer = connection.Producer()
for i in range(1, 11):
producer.publish(
body=f'{i}th message',
routing_key='task_queue',
queue=queue,
declare=[queue],
)

worker.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from kombu import Queue
from kombu.mixins import ConsumerMixin
from kombu import Connection


queue = Queue('task_queue', routing_key='task_queue')


class Worker(ConsumerMixin):
def __init__(self, connection) -> None:
self.connection = connection

def get_consumers(self, Consumer, channel):
return [
Consumer(
queues=[queue],
callbacks=[self.process_task],
accept=['text/plain', 'json'],
prefetch_count=1,
)
]

def process_task(self, body, message):
print(body)
message.ack()


with Connection('amqp://guest:guest@localhost:5672//') as connection:
try:
worker = Worker(connection=connection)
worker.run()
except KeyboardInterrupt:
print('bye bye')

Publish/Subscribe(发布/订阅)

emit_log.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from kombu import Connection
from kombu import Exchange

exchange = Exchange('log', type='fanout')

with Connection('amqp://guest:guest@localhost:5672//') as connection:
producer = connection.Producer()
for i in range(10):
producer.publish(
body=f'INFO: {i + 1}th message',
routing_key='',
exchange=exchange,
declare=[exchange],
)

receive_logs.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from kombu import Queue
from kombu import Exchange
from kombu import Connection
from kombu.mixins import ConsumerMixin

exchange = Exchange('log', type='fanout')
queue = Queue('log', exchange=exchange)


class Worker(ConsumerMixin):
def __init__(self, connection) -> None:
self.connection = connection

def get_consumers(self, Consumer, channel):
return [
Consumer(
queues=[queue],
callbacks=[self.process_task],
accept=['text/plain', 'json'],
prefetch_count=1,
)
]

def process_task(self, body, message):
print(body)
message.ack()


with Connection('amqp://guest:guest@localhost:5672//') as connection:
try:
worker = Worker(connection=connection)
worker.run()
except KeyboardInterrupt:
print('bye bye')

Routing(路由)

emit_log_direct.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from kombu import Exchange
from kombu import Connection

levels = ['INFO', 'ERROR', 'WARNING']
exchange = Exchange('direct_logs', type='direct')


with Connection('amqp://guest:guest@localhost:5672//') as connection:
producer = connection.Producer()
for i in range(1, 20):
level = levels[i % len(levels)]
log = f'{level}: {i}th message'
producer.publish(
body=log,
routing_key=level,
exchange=exchange,
declare=[exchange],
)

receive_logs_direct.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from kombu import Queue
from kombu import Exchange
from kombu import Connection
from kombu.mixins import ConsumerMixin

levels = ['INFO', 'ERROR', 'WARNING']
exchange = Exchange('direct_logs', type='direct')
queues = [
Queue(f'log.{level}', exchange=exchange, routing_key=level)
for level in levels
]


class Worker(ConsumerMixin):
def __init__(self, connection) -> None:
self.connection = connection

def get_consumers(self, Consumer, channel):
return [
Consumer(
queues=queues,
accept=['text/plain', 'json'],
callbacks=[self.process_task],
)
]

def process_task(self, body, message):
print(body)
message.ack()


with Connection('amqp://guest:guest@localhost:5672//') as connection:
try:
worker = Worker(connection=connection)
worker.run()
except KeyboardInterrupt:
print('bye bye')

Topic(主题)

发送到主题交换机(topic exchange)的消息不可以携带任意的路由键(routing_key),它的路由键必须是一个由.分隔开的单词列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是几个推荐的例子:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。词语的个数可以随意,但是不要超过255字节。

绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。但是它的绑定键和路由键有两个特殊应用方式:

  • * (星号) 用来表示一个单词.
  • # (井号) 用来表示任意数量(零个或多个)单词。

emit_log_topic.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from kombu import Exchange
from kombu import Connection

levels = ['INFO', 'ERROR', 'WARNING']
exchange = Exchange('topic_logs', type='topic')


with Connection('amqp://guest:guest@localhost:5672//') as connection:
producer = connection.Producer()
for i in range(1, 20):
level = levels[i % len(levels)]
log = f'{level}: {i}th message'
producer.publish(
body=log,
routing_key=f'log.{level}',
exchange=exchange,
declare=[exchange],
)

receive_logs_topic.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from kombu import Queue
from kombu import Exchange
from kombu import Connection
from kombu.mixins import ConsumerMixin

levels = ['INFO', 'ERROR', 'WARNING']
exchange = Exchange('topic_logs', type='topic')
queues = [
Queue(f'log.{level}', exchange=exchange, routing_key=f'*.{level}')
for level in levels
]


class Worker(ConsumerMixin):
def __init__(self, connection) -> None:
self.connection = connection

def get_consumers(self, Consumer, channel):
return [
Consumer(
queues=queues,
accept=['text/plain', 'json'],
callbacks=[self.process_task],
)
]

def process_task(self, body, message):
print(body)
message.ack()


with Connection('amqp://guest:guest@localhost:5672//') as connection:
try:
worker = Worker(connection=connection)
worker.run()
except KeyboardInterrupt:
print('bye bye')

RPC(远程过程调用)

rpc_client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from kombu import uuid
from kombu import Queue
from kombu import Consumer
from kombu import Connection
from kombu import producers


class FibonacciRpcClient(object):
def __init__(self):
self.connection = Connection('amqp://guest:guest@localhost:5672//')
self.queue = Queue('rpc_queue', routing_key='rpc_queue')

def call(self, n):
self.response = None
self.correlation_id = uuid()
with producers[self.connection].acquire(block=True) as producer:
producer.publish(
body={'n': n},
routing_key='rpc_queue',
queue=self.queue,
declare=[self.queue],
reply_to=self.queue.name,
correlation_id=self.correlation_id,
)
with Consumer(
self.connection,
on_message=self.on_response,
queues=[self.queue],
no_ack=True,
):
while self.response is None:
self.connection.drain_events()
return self.response

def on_response(self, message):
if message.properties['correlation_id'] == self.correlation_id:
self.response = message.payload.get('result')


client = FibonacciRpcClient()
print(' [x] Requesting fib(30)')
result = client.call(30)
print(f' [✔️] Got {result!r}')

rpc_server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from kombu import Queue
from kombu import Connection
from kombu.mixins import ConsumerProducerMixin


def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)


class Worker(ConsumerProducerMixin):
def __init__(self, connection) -> None:
self.connection = connection

def get_consumers(self, Consumer, channel):
return [
Consumer(
queues=[Queue('rpc_queue')],
accept={'text/plain', 'application/json'},
on_message=self.on_request,
prefetch_count=1,
),
]

def on_request(self, message):
n = message.payload.get('n')
print(f' [.] fib({n})')
result = fib(n)
self.producer.publish(
{'result': result},
exchange='',
routing_key=message.properties['reply_to'],
correlation_id=message.properties['correlation_id'],
serializer='json',
retry=True,
)
message.ack()


with Connection('amqp://guest:guest@localhost:5672//') as connection:
try:
worker = Worker(connection=connection)
worker.run()
except KeyboardInterrupt:
print('bye bye')

示例(oslo_messaging)

RPC

client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import oslo_messaging as messaging
from oslo_config import cfg

transport_url = 'rabbit://guest:guest@127.0.0.1:5672/'

context = {}
transport = messaging.get_rpc_transport(conf=cfg.CONF, url=transport_url)
target = messaging.Target(topic='test')
client = messaging.RPCClient(transport=transport, target=target)
client = client.prepare()

result = client.call(context, 'add', x=1, y=2)
print(f'result is {result}')

client.cast(context, 'test')

result = client.call(context, 'fib', n=10)
print(f'fib(10) = {result}')

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import time
import oslo_messaging as messaging
from oslo_config import cfg

transport_url = 'rabbit://guest:guest@127.0.0.1:5672/'


class Endpoint(object):
def _fib(self, n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return self._fib(n - 1) + self._fib(n - 2)

def fib(self, context, n):
print(n)
return self._fib(n)

def test(self, context):
print('testing')

def add(self, context, x, y):
result = x + y
print(f'result is {result}')
return result


transport = messaging.get_rpc_transport(conf=cfg.CONF, url=transport_url)
target = messaging.Target(topic='test', server='server-1')
endpoints = [
Endpoint()
]
server = messaging.get_rpc_server(transport, target, endpoints=endpoints)
try:
print('Start Server')
server.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
print('Stopping Server')
server.stop()
server.wait()

Notification

send.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from oslo_config import cfg
import oslo_messaging as messaging

transport_url = "rabbit://guest:guest@127.0.0.1:5672/"

transport = messaging.get_notification_transport(cfg.CONF, transport_url)

notifier = messaging.Notifier(
transport,
publisher_id="local",
driver="messaging",
topics=["notifications"],
)

client = notifier.prepare()
client.info({}, event_type="my_type", payload={"content": "Hello World"})
client.error({}, event_type="error_type", payload={"content": "Hello World"})

receiver.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from oslo_config import cfg
import oslo_messaging as messaging


class NotificationEndpoint(object):
filter_rule = messaging.NotificationFilter(publisher_id="^local.*")

def info(self, context, publish_id, event_type, payload, metadata):
print("Info")
print(context, publish_id, event_type, payload, metadata)

def warn(self, context, publish_id, event_type, payload, metadata):
print("Warn")
print(context, publish_id, event_type, payload, metadata)

def error(self, context, publish_id, event_type, payload, metadata):
print("Error")
print(context, publish_id, event_type, payload, metadata)


transport_url = "rabbit://guest:guest@127.0.0.1:5672/"

transport = messaging.get_notification_transport(cfg.CONF, transport_url)
target = messaging.Target(topic="notifications")
listener = messaging.get_notification_listener(
transport,
targets=[target],
endpoints=[NotificationEndpoint()],
pool='listener'
)
listener.start()
listener.wait()