Jinyun's Notes

没什么天赋,爱好也不多,但愿坚持做些喜欢的事情

0%

RabbitMQ 消息队列

概念

消息队列(MQ - Message Queue):是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。

RabbitMQ 是指实现了高级消息队列协议(AMQP:Advanced Message Queuing Protocol)的开源消息代理软件(亦称面向消息的中间件)。

RabbitMQ 是部署最广泛的开源消息代理。RabbitMQ 是个轻量级和易于部署在云端场所。它支持多种消息传递协议。RabbitMQ 可以部署在分布式和联合的配置中,以满足高规模,高可用性要求。

安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 添加 erlang yum 源
$ sudo vim /etc/yum.repos.d/rabbitmq-erlang.repo
[rabbitmq-erlang]
name=rabbitmq-erlang
baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/21/el/7
gpgcheck=1
gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
repo_gpgcheck=0
enabled=1

// 更新 yum 源
$ sudo yum -y update

// 安装 erlang
$ sudo yum install -y erlang

// 安装 rabbitmq-server
$ sudo yum install -y rabbitmq-server

// 如果中途出现 file /usr/lib64/erlang/bin/epmd from install of erlang-erts-R16B-03.18.el7.x86_64 conflicts with file from package erlang-20.2.2-1.el7.centos.x86_64 这种错误,请移除 erlang-20.2.2-1.el7.centos.x86_64 包再试
$ sudo yum remove erlang-20.2.2-1.el7.centos.x86_64

配置

配置文件

1
2
$ find / -name "rabbitmq.config.example"
$ sudo cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

开放防火墙端口

1
2
3
4
5
$ sudo firewall-cmd --permanent --zone=public --add-port=15672/tcp
$ sudo firewall-cmd --permanent --zone=public --add-port=5672/tcp
$ sudo systemctl restart firewalld.service

$ sudo firewall-cmd --reload

RabbitMQ 服务

1
2
3
4
5
6
7
8
// 开启
$ sudo systemctl start rabbitmq-server.service

// 重启
$ sudo systemctl restart rabbitmq-server.service

// 停止
$ sudo systemctl stop rabbitmq-server.service

开启 Web 界面

1
2
3
4
5
// 开启
$ sudo rabbitmq-plugins enable rabbitmq_management

// 禁用
$ sudo rabbitmq-plugins disable rabbitmq_management

访问 your-server-ip:15672 就能看到 RabbitMQ Web 界面,默认的用户名:guest,密码:guest

实例

基于 Python 的发布订阅模式。

安装依赖

1
2
3
4
5
// 更新 pip
$ pip install --upgrade pip

// 安装 pika 库
$ pip install pika

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# producer.py

import pika
import sys

# 建立到代理服务器的连接
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)

# 获取信道
channel = connection.channel()

# 声明交换机
channel.exchange_declare(exchange='HelloExchange',
exchange_type='fanout',
passive=False,
durable=True,
auto_delete=False)

# 创建消息
message = ' '.join(sys.argv[1:]) or "🔥 info: Hello World!"

# 发布消息
channel.basic_publish(exchange='HelloExchange',
routing_key='hello',
body=message)

print(' [😭] Sent %r' % message)

# 关闭连接
connection.close()

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# customer.py

import pika
import sys

# 建立到代理服务器的连接
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)

# 获取信道
channel = connection.channel()

# 声明交换机
channel.exchange_declare(exchange='HelloExchange',
exchange_type='fanout',
passive=False,
durable=True,
auto_delete=False)

# 声明队列
channel.queue_declare(queue='HelloQueue')

# 通过键『hello』将队列和交换机绑定
channel.queue_bind(exchange='HelloExchange',
routing_key='hello',
queue='HelloQueue')

print(' [🤷‍♂️] Waiting for logs. To exit press CTRL+C')


# 用户处理传入的消息函数
def callback(channel, method, header, body):
# 消息确认
channel.basic_ack(delivery_tag=method.delivery_tag)

if body.decode() == 'quit':
# 停止消费
channel.stop_consuming()
return
else:
print(" [😄] %r" % body.decode())
return


# 订阅消费者
channel.basic_consume(callback, queue='HelloQueue')

# 开始消费
channel.start_consuming()

测试

分别打开两个终端窗口,假设 A 窗口为消费窗口,运行 python ./customer.py,B 窗口为生产窗口,运行 python ./producer.py 你输入的字符串,观察运行情况。

本笔记是笔者在学习和工作中的一些整理,如对您有用,请鼓励我继续写作