OpenStack里很多服务都是通过RabbitMQ来进行通信,因此很有必要了解下
下面这张图来自网上,已经不清楚版权到底属于哪位了
从图中可以看到RabbitMQ是在生产者和消费者中间,相当于一个缓冲区
这里可以看到有三个名词:Exchange,RoutingKey和Queue
Queue:消息队列,RabbitMQ内部储存消息的
Exchange:可以看到生产者将消息发出之后,并没有直接到达Queue,而是中间经历了一次Exchange,就把它当做是交换机,它来决定将收到的消息分别路由到哪一个Queue上
BindingKey:Exchange和Queue之间连接的规则通过RabbitMQ的Binding动作来实现,通过binding将Exchange和Queue关联起来,这样才知道如何将消息路由到正确的Queue,在binding的同时,还会指定一个BindingKey
RoutingKey:生产者再将消息发送给Exchange的时候,会指定一个RoutingKey,来指定这个消息的路由规则,而RoutingKey和BindingKey匹配的时候,消息就会被路由到对应绑定的Queue中去
还有两个隐藏的名词:
Connection:一个TCP连接,生产者和消费者都是通过TCP连接到RabbitMQ
Channels:虚拟连接,建立在TCP连接之上,数据流动都在这里进行
下面是模仿网上一位帅哥用python的pika模块来完成生产者和消费者这些动作
首先,安装rabbitmq server并启动,安装慢得等的我花儿都谢了
$ brew install rabbitmq ==> Downloading https://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-mac-standalone-3.5.6.tar.gz ######################################################################## 100.0% ==> /usr/bin/unzip -qq -j /usr/local/Cellar/rabbitmq/3.5.6/plugins/rabbitmq_management-3.5.6.ez rabbitmq_management-3.5.6/priv/www/cli ==> Caveats Management Plugin enabled by default at http://localhost:15672 Bash completion has been installed to: /usr/local/etc/bash_completion.d To have launchd start rabbitmq at login: ln -sfv /usr/local/opt/rabbitmq/*.plist ~/Library/LaunchAgents Then to load rabbitmq now: launchctl load ~/Library/LaunchAgents/homebrew.mxcl.rabbitmq.plist Or, if you don't want/need launchctl, you can just run: rabbitmq-server ==> Summary 🍺 /usr/local/Cellar/rabbitmq/3.5.6: 1039 files, 28M, built in 1115 seconds
手动启动进程服务
$ sudo /usr/local/Cellar/rabbitmq/3.5.6/sbin/rabbitmq-server start Password: RabbitMQ 3.5.6. Copyright (C) 2007-2015 Pivotal Software, Inc. ## ## Licensed under the MPL. See http://www.rabbitmq.com/ ## ## ########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log ###### ## /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log ########## Starting broker... completed with 10 plugins.
然后生产者,首先是建立TCP连接,然后是建立channel,由于rabbitmq此时运行的机器和脚本运行机器同节点,因此参数为本地IP,接着创建一个queue名为myqueue,最后生产者能接触到的只能到达Exchange,这里使用默认的空字符Exchange,默认Exchange可以指定routing_key为queue的名字,最后是发送的内容
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) channel = connection.channel() channel.queue_declare(queue='myqueue') channel.basic_publish(exchange='',routing_key='myqueue',body=‘Congratulations!') print 'Send Congratulations!' connection.close()
最后是消费者,首先同样建立TCP连接,然后建立channel,接着这里也创建一个queue,原因是假如没有queue,生产者的message就会被丢弃,消费者接收不到任何message,所以生产者和消费者都创建queue,反正rabbitmq会轮循一个个定位,接着定义一个回调函数处理接收的message,最后循环接收
#!/usr/bin/env python import pika def callback(ch, method, properties, body): print 'Receive %r' %body connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) channel = connection.channel() channel.queue_declare(queue = 'myqueue') print 'Waiting for messages!' channel.basic_consume(callback, queue = 'myqueue', no_ack = True) channel.start_consuming()
最后执行的结果:
生产者
~ on master! ⌚ 1:21:58 $ python send.py Send Congratulations! ~ on master! ⌚ 1:22:20 $ python send.py Send Congratulations! ~ on master! ⌚ 1:22:23 $ python send.py Send Congratulations!
消费者
$ python recv.py Waiting for messages! Receive 'Congratulations!' Receive 'Congratulations!' Receive 'Congratulations!'