分布式进程

面试的时候,相比Java老是问算法,Python经常会问一些概念,比如元类,闭包等等,当然分布式进程也是其中之一

分布式进程应该爬虫用得比较多,一个作为master,其它workers可以作为分布式进程来执行各自的业务,而它们之间通过网络进行数据通信

主要用到了managers子模块,官方说明了一些用法

16.6.2.7. Managers

Managers provide a way to create data which can be shared between different processes. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.

multiprocessing.Manager()

Returns a started SyncManager object which can be used for sharing objects between processes. The returned manager object corresponds to a spawned child process and has methods which will create shared objects and return corresponding proxies.

Manager processes will be shutdown as soon as they are garbage collected or their parent process exits. The manager classes are defined in the multiprocessing.managers module:

 

class multiprocessing.managers.BaseManager([address[authkey]])

Create a BaseManager object.

Once created one should call start() or get_server().serve_forever() to ensure that the manager object refers to a started manager process.

address is the address on which the manager process listens for new connections. If address is None then an arbitrary one is chosen.

authkey is the authentication key which will be used to check the validity of incoming connections to the server process. If authkey is None then current_process().authkey. Otherwise authkey is used and it must be a string.

start([initializer[initargs]])

Start a subprocess to start the manager. If initializer is not None then the subprocess will call initializer(*initargs) when it starts.

get_server()

Returns a Server object which represents the actual server under the control of the Manager. The Server object supports the serve_forever() method:

>>>

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey='abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server additionally has an address attribute.

connect()

Connect a local manager object to a remote manager process:

>>>

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey='abc')
>>> m.connect()
shutdown()

Stop the process used by the manager. This is only available if start() has been used to start the server process.

This can be called multiple times.

register(typeid[callable[proxytype[exposed[method_to_typeid[create_method]]]]])

A classmethod which can be used for registering a type or callable with the manager class.

typeid is a “type identifier” which is used to identify a particular type of shared object. This must be a string.

callable is a callable used for creating objects for this type identifier. If a manager instance will be created using the from_address() classmethod or if the create_method argument is False then this can be left as None.

proxytype is a subclass of BaseProxy which is used to create proxies for shared objects with this typeid. If None then a proxy class is created automatically.

exposed is used to specify a sequence of method names which proxies for this typeid should be allowed to access using BaseProxy._callmethod(). (If exposed is None then proxytype._exposed_ is used instead if it exists.) In the case where no exposed list is specified, all “public methods” of the shared object will be accessible. (Here a “public method” means any attribute which has a __call__() method and whose name does not begin with '_'.)

method_to_typeid is a mapping used to specify the return type of those exposed methods which should return a proxy. It maps method names to typeid strings. (If method_to_typeid is None thenproxytype._method_to_typeid_ is used instead if it exists.) If a method’s name is not a key of this mapping or if the mapping is None then the object returned by the method will be copied by value.

create_method determines whether a method should be created with name typeid which can be used to tell the server process to create a new shared object and return a proxy for it. By default it is True.

BaseManager instances also have one read-only property:

address

The address used by the manager.

这里比较麻烦的就是在Manager这边创建的Queue都在网络上进行注册,这样网络上其它的机器就可以来进行访问注册的这些Queue了,同时Queue的启动也在Manager这端,进而任务数据都可以写入到Queue;而Worker端只从网络上来获取Queue,因此注册的时候,只传入和Manager端一致的Queue名字,或者说代号,然后通过网络进行连接,来完成数据的获取,处理和回复

这里也列举了一个简单的用法

16.6.2.7.2. Using a remote manager

It is possible to run a manager server on one machine and have clients use it from other machines (assuming that the firewalls involved allow it).

Running the following commands creates a server for a single shared queue which remote clients can access:

>>>

>>> from multiprocessing.managers import BaseManager
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

One client can access the server as follows:

>>>

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

Another client can also use it:

>>>

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

Local processes can also access that queue, using the code from above on the client to access it remotely:

>>>

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

下面是一个完整的例子

Manager

# /usr/bin/env python
# -*- coding: utf-8 -*-

import Queue
from multiprocessing.managers import BaseManager

task_queue = Queue.Queue()
result_queue = Queue.Queue()

class QueueManager(BaseManager):
pass

QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)

manager = QueueManager(address=('', 5000), authkey='lihui')
manager.start()
task = manager.get_task_queue()
result = manager.get_result_queue()

for i in range(10):
print 'Put: %d => worker' % i
task.put(i)

print 'Waiting for results ...'

for i in range(10):
r = result.get(timeout=10)
print 'Get: %s <= worker' % r

manager.shutdown()
print 'manager exit'

Worker

# /usr/bin/env python
# -*- coding: utf-8 -*-

import time, Queue
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
pass

QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

server_addr = '192.168.0.9'
print 'Connect to server ', server_addr
m = QueueManager(address=(server_addr, 5000), authkey='lihui')
m.connect()

task = m.get_task_queue()
result = m.get_result_queue()

for i in range(10):
try:
n = task.get(timeout=1)
print 'Get: %d <= manager' % n
r = n * n
time.sleep(1)
print 'Put: %d => manager' % r
result.put(r)
print '#' * 30
except Queue.Empty:
print 'task queue is empty'
print 'worker exit'

执行的结果:

首先Master这边,将数据通过task_queue都分发到Worker,等待返回的结果

lihui@MacBook  ~/PycharmProjects/Helloworld/Main  python manager.py
Put: 0 => worker
Put: 1 => worker
Put: 2 => worker
Put: 3 => worker
Put: 4 => worker
Put: 5 => worker
Put: 6 => worker
Put: 7 => worker
Put: 8 => worker
Put: 9 => worker
Waiting for results ...

然后Worker这边,从task_queue里接收到数据,处理之后,通过result_queue将数据返回给Master

lihui@MacBook  ~/PycharmProjects/Helloworld/Main  python worker.py
Connect to server  192.168.0.9
Get: 0 <= manager
Put: 0 => manager
##############################
Get: 1 <= manager
Put: 1 => manager
##############################
Get: 2 <= manager
Put: 4 => manager
##############################
Get: 3 <= manager
Put: 9 => manager
##############################
Get: 4 <= manager
Put: 16 => manager
##############################
Get: 5 <= manager
Put: 25 => manager
##############################
Get: 6 <= manager
Put: 36 => manager
##############################
Get: 7 <= manager
Put: 49 => manager
##############################
Get: 8 <= manager
Put: 64 => manager
##############################
Get: 9 <= manager
Put: 81 => manager
##############################
worker exit

最后可以看到Master成功接收到这些结果

✘ lihui@MacBook  ~/PycharmProjects/Helloworld/Main  python manager.py
Put: 0 => worker
Put: 1 => worker
Put: 2 => worker
Put: 3 => worker
Put: 4 => worker
Put: 5 => worker
Put: 6 => worker
Put: 7 => worker
Put: 8 => worker
Put: 9 => worker
Waiting for results ...
Get: 0 <= worker
Get: 1 <= worker
Get: 4 <= worker
Get: 9 <= worker
Get: 16 <= worker
Get: 25 <= worker
Get: 36 <= worker
Get: 49 <= worker
Get: 64 <= worker
Get: 81 <= worker
manager exit

总结下重点:

task_queue是manager->worker发送任务的queue;result_queue是worker->manager接收结果的queue

将上面两个queue注册,就可以暴露给网络所有worker的主机 

register方法,第一个参数typeid可以理解成队列的调用接口的名字,第二个参数callable关联对应的queue对象

当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加(廖雪峰原话)

绑定一个IP地址和端口,设置一个验证码,start()方法启动了Queue

QueueManager管理很多Queue,每个Queue的网络调用接口都要对应取个名字get_xxx_queue

关于这个概念,廖雪峰Python教程里,进程和线程这一章里讲得比较详细,可以直接访问:https://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000/001386832973658c780d8bfa4c6406f83b2b3097aed5df6000#0 

网上其它基本也都是参考的这篇,本文也是一样

发表回复