RabbitMQ

OpenStack里很多服务都是通过RabbitMQ来进行通信,因此很有必要了解下

下面这张图来自网上,已经不清楚版权到底属于哪位了

RabbitMQ.png

从图中可以看到RabbitMQ是在生产者和消费者中间,相当于一个缓冲区

这里可以看到有三个名词:Exchange,RoutingKey和Queue

Queue:消息队列,RabbitMQ内部储存消息的

Exchange:可以看到生产者将消息发出之后,并没有直接到达Queue,而是中间经历了一次Exchange,就把它当做是交换机,它来决定将收到的消息分别路由到哪一个Queue上

BindingKey:Exchange和Queue之间连接的规则通过RabbitMQ的Binding动作来实现,通过binding将Exchange和Queue关联起来,这样才知道如何将消息路由到正确的Queue,在binding的同时,还会指定一个BindingKey

RoutingKey:生产者再将消息发送给Exchange的时候,会指定一个RoutingKey,来指定这个消息的路由规则,而RoutingKey和BindingKey匹配的时候,消息就会被路由到对应绑定的Queue中去

还有两个隐藏的名词:

Connection:一个TCP连接,生产者和消费者都是通过TCP连接到RabbitMQ

Channels:虚拟连接,建立在TCP连接之上,数据流动都在这里进行

下面是模仿网上一位帅哥用python的pika模块来完成生产者和消费者这些动作

首先,安装rabbitmq server并启动,安装慢得等的我花儿都谢了

$ brew install rabbitmq
==> Downloading https://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-mac-standalone-3.5.6.tar.gz
######################################################################## 100.0%
==> /usr/bin/unzip -qq -j /usr/local/Cellar/rabbitmq/3.5.6/plugins/rabbitmq_management-3.5.6.ez rabbitmq_management-3.5.6/priv/www/cli
==> Caveats
Management Plugin enabled by default at http://localhost:15672

Bash completion has been installed to:
  /usr/local/etc/bash_completion.d

To have launchd start rabbitmq at login:
  ln -sfv /usr/local/opt/rabbitmq/*.plist ~/Library/LaunchAgents
Then to load rabbitmq now:
  launchctl load ~/Library/LaunchAgents/homebrew.mxcl.rabbitmq.plist
Or, if you don't want/need launchctl, you can just run:
  rabbitmq-server
==> Summary
🍺  /usr/local/Cellar/rabbitmq/3.5.6: 1039 files, 28M, built in 1115 seconds

手动启动进程服务

$ sudo /usr/local/Cellar/rabbitmq/3.5.6/sbin/rabbitmq-server start
Password:

              RabbitMQ 3.5.6. Copyright (C) 2007-2015 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker... completed with 10 plugins.

然后生产者,首先是建立TCP连接,然后是建立channel,由于rabbitmq此时运行的机器和脚本运行机器同节点,因此参数为本地IP,接着创建一个queue名为myqueue,最后生产者能接触到的只能到达Exchange,这里使用默认的空字符Exchange,默认Exchange可以指定routing_key为queue的名字,最后是发送的内容

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
channel = connection.channel()
channel.queue_declare(queue='myqueue')
channel.basic_publish(exchange='',routing_key='myqueue',body=‘Congratulations!')
print 'Send Congratulations!'
connection.close()

最后是消费者,首先同样建立TCP连接,然后建立channel,接着这里也创建一个queue,原因是假如没有queue,生产者的message就会被丢弃,消费者接收不到任何message,所以生产者和消费者都创建queue,反正rabbitmq会轮循一个个定位,接着定义一个回调函数处理接收的message,最后循环接收

#!/usr/bin/env python
import pika

def callback(ch, method, properties, body):
    print 'Receive %r' %body

connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
channel = connection.channel()
channel.queue_declare(queue = 'myqueue')
print 'Waiting for messages!'
channel.basic_consume(callback, queue = 'myqueue', no_ack = True)
channel.start_consuming()

最后执行的结果:

生产者

~ on  master! ⌚ 1:21:58
$ python send.py
Send Congratulations!

~ on  master! ⌚ 1:22:20
$ python send.py
Send Congratulations!

~ on  master! ⌚ 1:22:23
$ python send.py
Send Congratulations!

消费者

$ python recv.py
Waiting for messages!
Receive 'Congratulations!'
Receive 'Congratulations!'
Receive 'Congratulations!'

发表评论