记得以前在看APUE《UNIX环境高级编程》的时候,最让我云里雾里的就是信号量那部分的内容,在完善Tempest的过程中,突然偶遇了Lock,不过这里是python多线程用到的,我印象中python的解释器是单线程的,因此之前multi threading一直支持得不好,当然不知道现在如何
因为目前主要完善Tempest里有关云主机部分的用例,由于做了相当多定制化的修改,而且havana版的tempest,因此很多功能都有改进,需要完善
首先看到这个case
@attr(type='gate') def test_aggregate_add_host_list(self): # Add an host to the given aggregate and list. self.useFixture(fixtures.LockFixture('availability_zone')) aggregate_name = data_utils.rand_name(self.aggregate_name_prefix) resp, aggregate = self.client.create_aggregate(aggregate_name) self.addCleanup(self.client.delete_aggregate, aggregate['id']) self.client.add_host(aggregate['id'], self.host) self.addCleanup(self.client.remove_host, aggregate['id'], self.host) resp, aggregates = self.client.list_aggregates() aggs = filter(lambda x: x['id'] == aggregate['id'], aggregates) self.assertEqual(1, len(aggs)) agg = aggs[0] self.assertEqual(aggregate_name, agg['name']) self.assertEqual(None, agg['availability_zone']) self.assertIn(self.host, agg['hosts'])
在执行到这个testcase的时候,报了一个错误
lihui@MacBook ~/work/cloud/openstack/tempest-ci/tempest/api/compute/admin master ●✚ nosetests -sv test_aggregates.py:AggregatesAdminTestJSON.test_aggregate_create_update_with_az tempest.api.compute.admin.test_aggregates.AggregatesAdminTestJSON.test_aggregate_create_update_with_az ... ERROR ====================================================================== ERROR: tempest.api.compute.admin.test_aggregates.AggregatesAdminTestJSON.test_aggregate_create_update_with_az ---------------------------------------------------------------------- _StringException: pythonlogging:'': {{{ 2017-01-03 21:16:13,758 Got semaphore "availability_zone" 2017-01-03 21:16:13,759 Attempting to grab file lock "availability_zone" }}} Traceback (most recent call last): File "/Users/lihui/work/cloud/openstack/tempest-ci/tempest/api/compute/admin/test_aggregates.py", line 115, in test_aggregate_create_update_with_az self.useFixture(fixtures.LockFixture('availability_zone')) File "/usr/local/lib/python2.7/site-packages/testtools/testcase.py", line 679, in useFixture reraise(*exc_info) File "/usr/local/lib/python2.7/site-packages/testtools/testcase.py", line 666, in useFixture fixture.setUp() File "/Users/lihui/work/cloud/openstack/tempest-ci/tempest/openstack/common/fixture/lockutils.py", line 51, in setUp self.mgr.__enter__() File "/usr/local/Cellar/python/2.7.10/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py", line 17, in __enter__ return self.gen.next() File "/Users/lihui/work/cloud/openstack/tempest-ci/tempest/openstack/common/lockutils.py", line 186, in lock fileutils.ensure_tree(local_lock_path) File "/Users/lihui/work/cloud/openstack/tempest-ci/tempest/openstack/common/fileutils.py", line 37, in ensure_tree os.makedirs(path) File "/usr/local/Cellar/python/2.7.10/Frameworks/Python.framework/Versions/2.7/lib/python2.7/os.py", line 150, in makedirs makedirs(head, mode) File "/usr/local/Cellar/python/2.7.10/Frameworks/Python.framework/Versions/2.7/lib/python2.7/os.py", line 150, in makedirs makedirs(head, mode) File "/usr/local/Cellar/python/2.7.10/Frameworks/Python.framework/Versions/2.7/lib/python2.7/os.py", line 157, in makedirs mkdir(name, mode) OSError: [Errno 13] Permission denied: '/opt/stack' -------------------- >> begin captured logging << -------------------- tempest.openstack.common.lockutils: DEBUG: Got semaphore "availability_zone" tempest.openstack.common.lockutils: DEBUG: Attempting to grab file lock "availability_zone" --------------------- >> end captured logging << --------------------- ---------------------------------------------------------------------- Ran 1 test in 3.609s FAILED (errors=1)
顺着报错信息查找原因,令人惊奇的是,这个case并没有开始调用API就已经在self.useFixture这行出错了
首先看下useFixture的作用
def useFixture(self, fixture): """Use fixture in a test case. The fixture will be setUp, and self.addCleanup(fixture.cleanUp) called. :param fixture: The fixture to use. :return: The fixture, after setting it up and scheduling a cleanup for it. """ try: fixture.setUp() except: exc_info = sys.exc_info() try: gather_details(fixture.getDetails(), self.getDetails()) except: # Report the setUp exception, then raise the error during # gather_details. self._report_traceback(exc_info) raise else: # Gather_details worked, so raise the exception setUp # encountered. reraise(*exc_info) else: self.addCleanup(fixture.cleanUp) self.addCleanup( gather_details, fixture.getDetails(), self.getDetails()) return fixture
这里其实有两个动作,一个是setUp了fixture,中间的异常处理先不管,然后就是cleanUp了fixture,也就是资源的创建和销毁工作,这里的useFixture有兴趣的可以搜下testtools.TestCase里它的使用方法
既然知道了它是创建,那么看下useFixture的参数传的是个啥玩意
这里要注意的是虽然参数是fixtures.LockFixture,但是fixtures.py是找不到的,原因是import的时候换了个名
from tempest.common import tempest_fixtures as fixtures
因此可以在tempest_fixtures.py里找到
from tempest.openstack.common.fixture import lockutils class LockFixture(lockutils.LockFixture): def __init__(self, name): super(LockFixture, self).__init__(name, 'tempest-')
调用的时候,传了一个参数availability_zone,也就是传给构造函数里的name,这里和另一个参数tempest-一起调用了基类的构造函数,继续看看想干什么
class LockFixture(fixtures.Fixture): """External locking fixture. This fixture is basically an alternative to the synchronized decorator with the external flag so that tearDowns and addCleanups will be included in the lock context for locking between tests. The fixture is recommended to be the first line in a test method, like so:: def test_method(self): self.useFixture(LockFixture) ... or the first line in setUp if all the test methods in the class are required to be serialized. Something like:: class TestCase(testtools.testcase): def setUp(self): self.useFixture(LockFixture) super(TestCase, self).setUp() ... This is because addCleanups are put on a LIFO queue that gets run after the test method exits. (either by completing or raising an exception) """ def __init__(self, name, lock_file_prefix=None): self.mgr = lockutils.lock(name, lock_file_prefix, True)
这里name依旧是availability_zone,tempest-赋给了lock_file_prefix,第三个参数是True,调用了lockutils.lock,看这命名应该就是返回了锁,这就说得通了,useFixture参数是一把锁,创建这个锁的原因当然就是多线程同步,不出意外上面两个参数合并成了锁的名字,如此说来肯定还有其它地方也会加锁,而且和此处锁是同一把,作用就是防止并发,更具体一点就是避免会相互造成影响的几个testcase并发执行导致异常,因此加锁同步执行
看下具体lockutils.lock方法
@contextlib.contextmanager def lock(name, lock_file_prefix=None, external=False, lock_path=None): """Context based lock This function yields a `threading.Semaphore` instance (if we don't use eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is True, in which case, it'll yield an InterProcessLock instance. :param lock_file_prefix: The lock_file_prefix argument is used to provide lock files on disk with a meaningful prefix. :param external: The external keyword argument denotes whether this lock should work across multiple processes. This means that if two different workers both run a a method decorated with @synchronized('mylock', external=True), only one of them will execute at a time. :param lock_path: The lock_path keyword argument is used to specify a special location for external lock files to live. If nothing is set, then CONF.lock_path is used as a default. """ with _semaphores_lock: try: sem = _semaphores[name] except KeyError: sem = threading.Semaphore() _semaphores[name] = sem with sem: LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name}) # NOTE(mikal): I know this looks odd if not hasattr(local.strong_store, 'locks_held'): local.strong_store.locks_held = [] local.strong_store.locks_held.append(name) try: if external and not CONF.disable_process_locking: LOG.debug(_('Attempting to grab file lock "%(lock)s"'), {'lock': name}) # We need a copy of lock_path because it is non-local local_lock_path = lock_path or CONF.lock_path if not local_lock_path: raise cfg.RequiredOptError('lock_path') if not os.path.exists(local_lock_path): fileutils.ensure_tree(local_lock_path) LOG.info(_('Created lock path: %s'), local_lock_path) def add_prefix(name, prefix): if not prefix: return name sep = '' if prefix.endswith('-') else '-' return '%s%s%s' % (prefix, sep, name) # NOTE(mikal): the lock name cannot contain directory # separators lock_file_name = add_prefix(name.replace(os.sep, '_'), lock_file_prefix) lock_file_path = os.path.join(local_lock_path, lock_file_name) try: lock = InterProcessLock(lock_file_path) with lock as lock: LOG.debug(_('Got file lock "%(lock)s" at %(path)s'), {'lock': name, 'path': lock_file_path}) yield lock finally: LOG.debug(_('Released file lock "%(lock)s" at %(path)s'), {'lock': name, 'path': lock_file_path}) else: yield sem finally: local.strong_store.locks_held.remove(name)
看这长篇大论,先看下注释,lock_file_prefix说了半天,说明最后锁会保存在磁盘里,external,如果是True,那么每次加锁只能让一个线程执行,这里传的参数的确是True
前面还有一个装饰器@contextlib.contextmanager,这里不细看了,涉及到的流程相关,比较复杂
顺便吐槽一下pycharm,本来是看中可以直接贴上带高亮颜色的CODE,结果贴出来格式老是乱的,还是老实地用VIM看得了
接着重新回到nose执行的错误地点,可以看到是fileutils.ensure_tree(local_lock_path)这里返回的,看上去是目录不存在要重新创建
def ensure_tree(path): """Create a directory (and any ancestor directories required) :param path: Directory to create """ try: os.makedirs(path) except OSError as exc: if exc.errno == errno.EEXIST: if not os.path.isdir(path): raise else: raise
看到这里,就明白了,这个用例真正错误的原因是local_lock_path,也就是锁的目录不存在,但是想mkdir的时候,又没有权限,我这里执行tempest的地点是本地MAC上,因此会失败,但是需要了解的是为什么偏偏要创建/opt/stack,像是单节点devstack的配置
OSError: [Errno 13] Permission denied: '/opt/stack'
因此查看下local_lock_path是哪里来的
local_lock_path = lock_path or CONF.lock_path
lock_path传进来的是空,因此看下配置文件里的lock_path
# lock/semaphore base directory lock_path=/opt/stack/data/tempest
这里配置了一个绝对路径,而无法直接mkdir,因此导致锁存放的目录创建失败,因此case执行失败,解决办法就是修改目录,最好和执行的tempest目录放一起
# lock/semaphore base directory lock_path=/Users/lihui/work/cloud/openstack/tempest-ci/tempest
修改之后,试试这个用例,就不会报错了
lihui@MacBook ~/work/cloud/openstack/tempest-ci/tempest/api/compute/admin master ●✚ nosetests -sv test_aggregates.py:AggregatesAdminTestJSON.test_aggregate_create_update_with_az tempest.api.compute.admin.test_aggregates.AggregatesAdminTestJSON.test_aggregate_create_update_with_az ... ok ---------------------------------------------------------------------- Ran 1 test in 8.911s OK
此时可以看到配置的目录下多了一个空文件,这就是保存在disk上的锁
lihui@MacBook ~/work/cloud/openstack/tempest-ci/tempest master ●✚ ls -l tempest-availability_zone -rw-r--r-- 1 lihui staff 0 1 3 23:25 tempest-availability_zone lihui@MacBook ~/work/cloud/openstack/tempest-ci/tempest master ●✚ lihui@MacBook ~/work/cloud/openstack/tempest-ci/tempest master ●✚ lihui@MacBook ~/work/cloud/openstack/tempest-ci/tempest master ●✚ file tempest-availability_zone tempest-availability_zone: empty
顺便看下日志,可以看到有创建和释放锁
lihui@MacBook ~/work/cloud/openstack/tempest-ci/tempest/api/compute/admin master ●✚ grep tempest-availability_zone tempest.log --color | tail -n 2 2017-01-03 23:25:08.070 98920 DEBUG tempest.openstack.common.lockutils [-] Got file lock "availability_zone" at /Users/lihui/work/cloud/openstack/tempest-ci/tempest/tempest-availability_zone lock /Users/lihui/work/cloud/openstack/tempest-ci/tempest/openstack/common/lockutils.py:206 2017-01-03 23:25:09.495 98920 DEBUG tempest.openstack.common.lockutils [-] Released file lock "availability_zone" at /Users/lihui/work/cloud/openstack/tempest-ci/tempest/tempest-availability_zone lock /Users/lihui/work/cloud/openstack/tempest-ci/tempest/openstack/common/lockutils.py:210
都是在这里执行的
try: lock = InterProcessLock(lock_file_path) with lock as lock: LOG.debug(_('Got file lock "%(lock)s" at %(path)s'), {'lock': name, 'path': lock_file_path}) yield lock finally: LOG.debug(_('Released file lock "%(lock)s" at %(path)s'), {'lock': name, 'path': lock_file_path})
这里通过InterProcessLock创建锁,来自
if os.name == 'nt': import msvcrt InterProcessLock = _WindowsLock else: import fcntl InterProcessLock = _PosixLock
用的Linux,走else
class _PosixLock(_InterProcessLock): def trylock(self): fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) def unlock(self): fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
这也是一个类,依旧啥都没干,那就看它的基类,起码要知道参数传进去干了啥
class _InterProcessLock(object): """Lock implementation which allows multiple locks, working around issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does not require any cleanup. Since the lock is always held on a file descriptor rather than outside of the process, the lock gets dropped automatically if the process crashes, even if __exit__ is not executed. There are no guarantees regarding usage by multiple green threads in a single process here. This lock works only between processes. Exclusive access between local threads should be achieved using the semaphores in the @synchronized decorator. Note these locks are released when the descriptor is closed, so it's not safe to close the file descriptor while another green thread holds the lock. Just opening and closing the lock file can break synchronisation, so lock files must be accessed only using this abstraction. """ def __init__(self, name): self.lockfile = None self.fname = name def __enter__(self): self.lockfile = open(self.fname, 'w')
看到这里,其实参数lock_file_path啥都没干,就是给成员变量附了一个值,返回了一个_PosixLock类型的lock
上面只是返回了lock类对象,还没有真正写入磁盘,下面才是
with lock as lock:
通过一个迭代生成器,不停返回这个lock给前面的useFixture,同时不停写入日志信息,上面传入的文件名参数实际上只是为了写入日志lock的标识符
因此这里的逻辑就是所有加了下面这行的case,无法并发执行,会加锁同步一个一个case执行
self.useFixture(fixtures.LockFixture('availability_zone'))
搜一下整个目录,的确有很多地方
lihui@MacBook ~/work/cloud/openstack/tempest-ci/tempest master ●✚ grep fixtures.LockFixture ./* -R ./api/compute/admin/test_aggregates.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/admin/test_aggregates.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/admin/test_aggregates.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/admin/test_aggregates.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/admin/test_aggregates.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/admin/test_aggregates_negative.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/admin/test_aggregates_negative.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/admin/test_hosts.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/v3/admin/test_aggregates.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/v3/admin/test_aggregates.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/v3/admin/test_aggregates.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/v3/admin/test_aggregates.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/v3/admin/test_aggregates.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/v3/admin/test_aggregates_negative.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/v3/admin/test_aggregates_negative.py: self.useFixture(fixtures.LockFixture('availability_zone')) ./api/compute/v3/admin/test_hosts.py: self.useFixture(fixtures.LockFixture('availability_zone'))
因此,如果存在多个case之间必须同步执行,而不能并发的情况,可以相应地添加useFixture方法