概念
消息队列(MQ - Message Queue
):是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
RabbitMQ
是指实现了高级消息队列协议(AMQP:Advanced Message Queuing Protocol
)的开源消息代理软件(亦称面向消息的中间件)。
RabbitMQ
是部署最广泛的开源消息代理。RabbitMQ
是个轻量级和易于部署在云端场所。它支持多种消息传递协议。RabbitMQ
可以部署在分布式和联合的配置中,以满足高规模,高可用性要求。
安装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| // 添加 erlang yum 源 $ sudo vim /etc/yum.repos.d/rabbitmq-erlang.repo [rabbitmq-erlang] name=rabbitmq-erlang baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/21/el/7 gpgcheck=1 gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc repo_gpgcheck=0 enabled=1
// 更新 yum 源 $ sudo yum -y update
// 安装 erlang $ sudo yum install -y erlang
// 安装 rabbitmq-server $ sudo yum install -y rabbitmq-server
// 如果中途出现 file /usr/lib64/erlang/bin/epmd from install of erlang-erts-R16B-03.18.el7.x86_64 conflicts with file from package erlang-20.2.2-1.el7.centos.x86_64 这种错误,请移除 erlang-20.2.2-1.el7.centos.x86_64 包再试 $ sudo yum remove erlang-20.2.2-1.el7.centos.x86_64
|
配置
配置文件
1 2
| $ find / -name "rabbitmq.config.example" $ sudo cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
|
开放防火墙端口
1 2 3 4 5
| $ sudo firewall-cmd --permanent --zone=public --add-port=15672/tcp $ sudo firewall-cmd --permanent --zone=public --add-port=5672/tcp $ sudo systemctl restart firewalld.service 或 $ sudo firewall-cmd --reload
|
RabbitMQ
服务
1 2 3 4 5 6 7 8
| // 开启 $ sudo systemctl start rabbitmq-server.service
// 重启 $ sudo systemctl restart rabbitmq-server.service
// 停止 $ sudo systemctl stop rabbitmq-server.service
|
开启 Web
界面
1 2 3 4 5
| // 开启 $ sudo rabbitmq-plugins enable rabbitmq_management
// 禁用 $ sudo rabbitmq-plugins disable rabbitmq_management
|
访问 your-server-ip:15672
就能看到 RabbitMQ Web 界面,默认的用户名:guest
,密码:guest
实例
基于 Python
的发布订阅模式。
安装依赖
1 2 3 4 5
| // 更新 pip $ pip install --upgrade pip
// 安装 pika 库 $ pip install pika
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
import pika import sys
credentials = pika.PlainCredentials('guest', 'guest') parameters = pika.ConnectionParameters('localhost', credentials=credentials) connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange='HelloExchange', exchange_type='fanout', passive=False, durable=True, auto_delete=False)
message = ' '.join(sys.argv[1:]) or "🔥 info: Hello World!"
channel.basic_publish(exchange='HelloExchange', routing_key='hello', body=message)
print(' [😭] Sent %r' % message)
connection.close()
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
import pika import sys
credentials = pika.PlainCredentials('guest', 'guest') parameters = pika.ConnectionParameters('localhost', credentials=credentials) connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange='HelloExchange', exchange_type='fanout', passive=False, durable=True, auto_delete=False)
channel.queue_declare(queue='HelloQueue')
channel.queue_bind(exchange='HelloExchange', routing_key='hello', queue='HelloQueue')
print(' [🤷♂️] Waiting for logs. To exit press CTRL+C')
def callback(channel, method, header, body): channel.basic_ack(delivery_tag=method.delivery_tag)
if body.decode() == 'quit': channel.stop_consuming() return else: print(" [😄] %r" % body.decode()) return
channel.basic_consume(callback, queue='HelloQueue')
channel.start_consuming()
|
测试
分别打开两个终端窗口,假设 A 窗口为消费窗口,运行 python ./customer.py
,B 窗口为生产窗口,运行 python ./producer.py 你输入的字符串
,观察运行情况。