面试的时候,相比Java老是问算法,Python经常会问一些概念,比如元类,闭包等等,当然分布式进程也是其中之一
分布式进程应该爬虫用得比较多,一个作为master,其它workers可以作为分布式进程来执行各自的业务,而它们之间通过网络进行数据通信
主要用到了managers子模块,官方说明了一些用法
16.6.2.7. Managers
Managers provide a way to create data which can be shared between different processes. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
-
Returns a started
SyncManager
object which can be used for sharing objects between processes. The returned manager object corresponds to a spawned child process and has methods which will create shared objects and return corresponding proxies.
Manager processes will be shutdown as soon as they are garbage collected or their parent process exits. The manager classes are defined in the multiprocessing.managers
module:
- class
multiprocessing.managers.
BaseManager
([address[, authkey]]) -
Create a BaseManager object.
Once created one should call
start()
orget_server().serve_forever()
to ensure that the manager object refers to a started manager process.address is the address on which the manager process listens for new connections. If address is
None
then an arbitrary one is chosen.authkey is the authentication key which will be used to check the validity of incoming connections to the server process. If authkey is
None
thencurrent_process().authkey
. Otherwise authkey is used and it must be a string.start
([initializer[, initargs]])-
Start a subprocess to start the manager. If initializer is not
None
then the subprocess will callinitializer(*initargs)
when it starts.
get_server
()-
Returns a
Server
object which represents the actual server under the control of the Manager. TheServer
object supports theserve_forever()
method:Server
additionally has anaddress
attribute.
connect
()-
Connect a local manager object to a remote manager process:
shutdown
()-
Stop the process used by the manager. This is only available if
start()
has been used to start the server process.This can be called multiple times.
register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])-
A classmethod which can be used for registering a type or callable with the manager class.
typeid is a “type identifier” which is used to identify a particular type of shared object. This must be a string.
callable is a callable used for creating objects for this type identifier. If a manager instance will be created using the
from_address()
classmethod or if the create_method argument isFalse
then this can be left asNone
.proxytype is a subclass of
BaseProxy
which is used to create proxies for shared objects with this typeid. IfNone
then a proxy class is created automatically.exposed is used to specify a sequence of method names which proxies for this typeid should be allowed to access using
BaseProxy._callmethod()
. (If exposed isNone
thenproxytype._exposed_
is used instead if it exists.) In the case where no exposed list is specified, all “public methods” of the shared object will be accessible. (Here a “public method” means any attribute which has a__call__()
method and whose name does not begin with'_'
.)method_to_typeid is a mapping used to specify the return type of those exposed methods which should return a proxy. It maps method names to typeid strings. (If method_to_typeid is
None
thenproxytype._method_to_typeid_
is used instead if it exists.) If a method’s name is not a key of this mapping or if the mapping isNone
then the object returned by the method will be copied by value.create_method determines whether a method should be created with name typeid which can be used to tell the server process to create a new shared object and return a proxy for it. By default it is
True
.
BaseManager
instances also have one read-only property:address
-
The address used by the manager.
这里比较麻烦的就是在Manager这边创建的Queue都在网络上进行注册,这样网络上其它的机器就可以来进行访问注册的这些Queue了,同时Queue的启动也在Manager这端,进而任务数据都可以写入到Queue;而Worker端只从网络上来获取Queue,因此注册的时候,只传入和Manager端一致的Queue名字,或者说代号,然后通过网络进行连接,来完成数据的获取,处理和回复
这里也列举了一个简单的用法
16.6.2.7.2. Using a remote manager
It is possible to run a manager server on one machine and have clients use it from other machines (assuming that the firewalls involved allow it).
Running the following commands creates a server for a single shared queue which remote clients can access:
One client can access the server as follows:
Another client can also use it:
Local processes can also access that queue, using the code from above on the client to access it remotely:
下面是一个完整的例子
Manager
# /usr/bin/env python
# -*- coding: utf-8 -*-
import Queue
from multiprocessing.managers import BaseManager
task_queue = Queue.Queue()
result_queue = Queue.Queue()
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
manager = QueueManager(address=('', 5000), authkey='lihui')
manager.start()
task = manager.get_task_queue()
result = manager.get_result_queue()
for i in range(10):
print 'Put: %d => worker' % i
task.put(i)
print 'Waiting for results ...'
for i in range(10):
r = result.get(timeout=10)
print 'Get: %s <= worker' % r
manager.shutdown()
print 'manager exit'
Worker
# /usr/bin/env python
# -*- coding: utf-8 -*-
import time, Queue
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
server_addr = '192.168.0.9'
print 'Connect to server ', server_addr
m = QueueManager(address=(server_addr, 5000), authkey='lihui')
m.connect()
task = m.get_task_queue()
result = m.get_result_queue()
for i in range(10):
try:
n = task.get(timeout=1)
print 'Get: %d <= manager' % n
r = n * n
time.sleep(1)
print 'Put: %d => manager' % r
result.put(r)
print '#' * 30
except Queue.Empty:
print 'task queue is empty'
print 'worker exit'
执行的结果:
首先Master这边,将数据通过task_queue都分发到Worker,等待返回的结果
lihui@MacBook ~/PycharmProjects/Helloworld/Main python manager.py Put: 0 => worker Put: 1 => worker Put: 2 => worker Put: 3 => worker Put: 4 => worker Put: 5 => worker Put: 6 => worker Put: 7 => worker Put: 8 => worker Put: 9 => worker Waiting for results ...
然后Worker这边,从task_queue里接收到数据,处理之后,通过result_queue将数据返回给Master
lihui@MacBook ~/PycharmProjects/Helloworld/Main python worker.py Connect to server 192.168.0.9 Get: 0 <= manager Put: 0 => manager ############################## Get: 1 <= manager Put: 1 => manager ############################## Get: 2 <= manager Put: 4 => manager ############################## Get: 3 <= manager Put: 9 => manager ############################## Get: 4 <= manager Put: 16 => manager ############################## Get: 5 <= manager Put: 25 => manager ############################## Get: 6 <= manager Put: 36 => manager ############################## Get: 7 <= manager Put: 49 => manager ############################## Get: 8 <= manager Put: 64 => manager ############################## Get: 9 <= manager Put: 81 => manager ############################## worker exit
最后可以看到Master成功接收到这些结果
✘ lihui@MacBook ~/PycharmProjects/Helloworld/Main python manager.py Put: 0 => worker Put: 1 => worker Put: 2 => worker Put: 3 => worker Put: 4 => worker Put: 5 => worker Put: 6 => worker Put: 7 => worker Put: 8 => worker Put: 9 => worker Waiting for results ... Get: 0 <= worker Get: 1 <= worker Get: 4 <= worker Get: 9 <= worker Get: 16 <= worker Get: 25 <= worker Get: 36 <= worker Get: 49 <= worker Get: 64 <= worker Get: 81 <= worker manager exit
总结下重点:
task_queue是manager->worker发送任务的queue;result_queue是worker->manager接收结果的queue
将上面两个queue注册,就可以暴露给网络所有worker的主机
register方法,第一个参数typeid可以理解成队列的调用接口的名字,第二个参数callable关联对应的queue对象
当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加(廖雪峰原话)
绑定一个IP地址和端口,设置一个验证码,start()方法启动了Queue
QueueManager管理很多Queue,每个Queue的网络调用接口都要对应取个名字get_xxx_queue
关于这个概念,廖雪峰Python教程里,进程和线程这一章里讲得比较详细,可以直接访问:https://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000/001386832973658c780d8bfa4c6406f83b2b3097aed5df6000#0
网上其它基本也都是参考的这篇,本文也是一样