偶遇线程锁

记得以前在看APUE《UNIX环境高级编程》的时候,最让我云里雾里的就是信号量那部分的内容,在完善Tempest的过程中,突然偶遇了Lock,不过这里是python多线程用到的,我印象中python的解释器是单线程的,因此之前multi threading一直支持得不好,当然不知道现在如何

因为目前主要完善Tempest里有关云主机部分的用例,由于做了相当多定制化的修改,而且havana版的tempest,因此很多功能都有改进,需要完善

首先看到这个case

    @attr(type='gate')
    def test_aggregate_add_host_list(self):
        # Add an host to the given aggregate and list.
        self.useFixture(fixtures.LockFixture('availability_zone'))
        aggregate_name = data_utils.rand_name(self.aggregate_name_prefix)
        resp, aggregate = self.client.create_aggregate(aggregate_name)
        self.addCleanup(self.client.delete_aggregate, aggregate['id'])
        self.client.add_host(aggregate['id'], self.host)
        self.addCleanup(self.client.remove_host, aggregate['id'], self.host)

        resp, aggregates = self.client.list_aggregates()
        aggs = filter(lambda x: x['id'] == aggregate['id'], aggregates)
        self.assertEqual(1, len(aggs))
        agg = aggs[0]
        self.assertEqual(aggregate_name, agg['name'])
        self.assertEqual(None, agg['availability_zone'])
        self.assertIn(self.host, agg['hosts'])

在执行到这个testcase的时候,报了一个错误

lihui@MacBook  ~/work/cloud/openstack/tempest-ci/tempest/api/compute/admin   master ●✚  nosetests -sv test_aggregates.py:AggregatesAdminTestJSON.test_aggregate_create_update_with_az
tempest.api.compute.admin.test_aggregates.AggregatesAdminTestJSON.test_aggregate_create_update_with_az ... ERROR

======================================================================
ERROR: tempest.api.compute.admin.test_aggregates.AggregatesAdminTestJSON.test_aggregate_create_update_with_az
----------------------------------------------------------------------
_StringException: pythonlogging:'': {{{
2017-01-03 21:16:13,758 Got semaphore "availability_zone"
2017-01-03 21:16:13,759 Attempting to grab file lock "availability_zone"
}}}

Traceback (most recent call last):
  File "/Users/lihui/work/cloud/openstack/tempest-ci/tempest/api/compute/admin/test_aggregates.py", line 115, in test_aggregate_create_update_with_az
    self.useFixture(fixtures.LockFixture('availability_zone'))
  File "/usr/local/lib/python2.7/site-packages/testtools/testcase.py", line 679, in useFixture
    reraise(*exc_info)
  File "/usr/local/lib/python2.7/site-packages/testtools/testcase.py", line 666, in useFixture
    fixture.setUp()
  File "/Users/lihui/work/cloud/openstack/tempest-ci/tempest/openstack/common/fixture/lockutils.py", line 51, in setUp
    self.mgr.__enter__()
  File "/usr/local/Cellar/python/2.7.10/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py", line 17, in __enter__
    return self.gen.next()
  File "/Users/lihui/work/cloud/openstack/tempest-ci/tempest/openstack/common/lockutils.py", line 186, in lock
    fileutils.ensure_tree(local_lock_path)
  File "/Users/lihui/work/cloud/openstack/tempest-ci/tempest/openstack/common/fileutils.py", line 37, in ensure_tree
    os.makedirs(path)
  File "/usr/local/Cellar/python/2.7.10/Frameworks/Python.framework/Versions/2.7/lib/python2.7/os.py", line 150, in makedirs
    makedirs(head, mode)
  File "/usr/local/Cellar/python/2.7.10/Frameworks/Python.framework/Versions/2.7/lib/python2.7/os.py", line 150, in makedirs
    makedirs(head, mode)
  File "/usr/local/Cellar/python/2.7.10/Frameworks/Python.framework/Versions/2.7/lib/python2.7/os.py", line 157, in makedirs
    mkdir(name, mode)
OSError: [Errno 13] Permission denied: '/opt/stack'

-------------------- >> begin captured logging << --------------------
tempest.openstack.common.lockutils: DEBUG: Got semaphore "availability_zone"
tempest.openstack.common.lockutils: DEBUG: Attempting to grab file lock "availability_zone"
--------------------- >> end captured logging << ---------------------

----------------------------------------------------------------------
Ran 1 test in 3.609s

FAILED (errors=1)

顺着报错信息查找原因,令人惊奇的是,这个case并没有开始调用API就已经在self.useFixture这行出错了

首先看下useFixture的作用

def useFixture(self, fixture):
        """Use fixture in a test case.

        The fixture will be setUp, and self.addCleanup(fixture.cleanUp) called.

        :param fixture: The fixture to use.
        :return: The fixture, after setting it up and scheduling a cleanup for
           it.
        """
        try:
            fixture.setUp()
        except:
            exc_info = sys.exc_info()
            try:
                gather_details(fixture.getDetails(), self.getDetails())
            except:
                # Report the setUp exception, then raise the error during
                # gather_details.
                self._report_traceback(exc_info)
                raise
            else:
                # Gather_details worked, so raise the exception setUp
                # encountered.
                reraise(*exc_info)
        else:
            self.addCleanup(fixture.cleanUp)
            self.addCleanup(
                gather_details, fixture.getDetails(), self.getDetails())
            return fixture

这里其实有两个动作,一个是setUp了fixture,中间的异常处理先不管,然后就是cleanUp了fixture,也就是资源的创建和销毁工作,这里的useFixture有兴趣的可以搜下testtools.TestCase里它的使用方法

既然知道了它是创建,那么看下useFixture的参数传的是个啥玩意

这里要注意的是虽然参数是fixtures.LockFixture,但是fixtures.py是找不到的,原因是import的时候换了个名

from tempest.common import tempest_fixtures as fixtures

因此可以在tempest_fixtures.py里找到

from tempest.openstack.common.fixture import lockutils


class LockFixture(lockutils.LockFixture):
    def __init__(self, name):
        super(LockFixture, self).__init__(name, 'tempest-')

调用的时候,传了一个参数availability_zone,也就是传给构造函数里的name,这里和另一个参数tempest-一起调用了基类的构造函数,继续看看想干什么

class LockFixture(fixtures.Fixture):
    """External locking fixture.

    This fixture is basically an alternative to the synchronized decorator with
    the external flag so that tearDowns and addCleanups will be included in
    the lock context for locking between tests. The fixture is recommended to
    be the first line in a test method, like so::

        def test_method(self):
            self.useFixture(LockFixture)
                ...

    or the first line in setUp if all the test methods in the class are
    required to be serialized. Something like::

        class TestCase(testtools.testcase):
            def setUp(self):
                self.useFixture(LockFixture)
                super(TestCase, self).setUp()
                    ...

    This is because addCleanups are put on a LIFO queue that gets run after the
    test method exits. (either by completing or raising an exception)
    """
    def __init__(self, name, lock_file_prefix=None):
        self.mgr = lockutils.lock(name, lock_file_prefix, True)

这里name依旧是availability_zone,tempest-赋给了lock_file_prefix,第三个参数是True,调用了lockutils.lock,看这命名应该就是返回了锁,这就说得通了,useFixture参数是一把锁,创建这个锁的原因当然就是多线程同步,不出意外上面两个参数合并成了锁的名字,如此说来肯定还有其它地方也会加锁,而且和此处锁是同一把,作用就是防止并发,更具体一点就是避免会相互造成影响的几个testcase并发执行导致异常,因此加锁同步执行

看下具体lockutils.lock方法

@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
    """Context based lock

    This function yields a `threading.Semaphore` instance (if we don't use
    eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is
    True, in which case, it'll yield an InterProcessLock instance.

    :param lock_file_prefix: The lock_file_prefix argument is used to provide
      lock files on disk with a meaningful prefix.

    :param external: The external keyword argument denotes whether this lock
      should work across multiple processes. This means that if two different
      workers both run a a method decorated with @synchronized('mylock',
      external=True), only one of them will execute at a time.

    :param lock_path: The lock_path keyword argument is used to specify a
      special location for external lock files to live. If nothing is set, then
      CONF.lock_path is used as a default.
    """
    with _semaphores_lock:
        try:
            sem = _semaphores[name]
        except KeyError:
            sem = threading.Semaphore()
            _semaphores[name] = sem

    with sem:
        LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})

        # NOTE(mikal): I know this looks odd
        if not hasattr(local.strong_store, 'locks_held'):
            local.strong_store.locks_held = []
        local.strong_store.locks_held.append(name)

        try:
            if external and not CONF.disable_process_locking:
                LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
                          {'lock': name})

                # We need a copy of lock_path because it is non-local
                local_lock_path = lock_path or CONF.lock_path
                if not local_lock_path:
                    raise cfg.RequiredOptError('lock_path')

                if not os.path.exists(local_lock_path):
                    fileutils.ensure_tree(local_lock_path)
                    LOG.info(_('Created lock path: %s'), local_lock_path)

                def add_prefix(name, prefix):
                    if not prefix:
                        return name
                    sep = '' if prefix.endswith('-') else '-'
                    return '%s%s%s' % (prefix, sep, name)

                # NOTE(mikal): the lock name cannot contain directory
                # separators
                lock_file_name = add_prefix(name.replace(os.sep, '_'),
                                            lock_file_prefix)

                lock_file_path = os.path.join(local_lock_path, lock_file_name)

                try:
                    lock = InterProcessLock(lock_file_path)
                    with lock as lock:
                        LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
                                  {'lock': name, 'path': lock_file_path})
                        yield lock
                finally:
                    LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
                              {'lock': name, 'path': lock_file_path})
            else:
                yield sem

        finally:
            local.strong_store.locks_held.remove(name)

看这长篇大论,先看下注释,lock_file_prefix说了半天,说明最后锁会保存在磁盘里,external,如果是True,那么每次加锁只能让一个线程执行,这里传的参数的确是True

前面还有一个装饰器@contextlib.contextmanager,这里不细看了,涉及到的流程相关,比较复杂

顺便吐槽一下pycharm,本来是看中可以直接贴上带高亮颜色的CODE,结果贴出来格式老是乱的,还是老实地用VIM看得了

接着重新回到nose执行的错误地点,可以看到是fileutils.ensure_tree(local_lock_path)这里返回的,看上去是目录不存在要重新创建

def ensure_tree(path):
    """Create a directory (and any ancestor directories required)

    :param path: Directory to create
    """
    try:
        os.makedirs(path)
    except OSError as exc:
        if exc.errno == errno.EEXIST:
            if not os.path.isdir(path):
                raise
        else:
            raise

看到这里,就明白了,这个用例真正错误的原因是local_lock_path,也就是锁的目录不存在,但是想mkdir的时候,又没有权限,我这里执行tempest的地点是本地MAC上,因此会失败,但是需要了解的是为什么偏偏要创建/opt/stack,像是单节点devstack的配置

OSError: [Errno 13] Permission denied: '/opt/stack'

因此查看下local_lock_path是哪里来的

local_lock_path = lock_path or CONF.lock_path

lock_path传进来的是空,因此看下配置文件里的lock_path

# lock/semaphore base directory
lock_path=/opt/stack/data/tempest

这里配置了一个绝对路径,而无法直接mkdir,因此导致锁存放的目录创建失败,因此case执行失败,解决办法就是修改目录,最好和执行的tempest目录放一起

# lock/semaphore base directory
lock_path=/Users/lihui/work/cloud/openstack/tempest-ci/tempest

修改之后,试试这个用例,就不会报错了

lihui@MacBook  ~/work/cloud/openstack/tempest-ci/tempest/api/compute/admin   master ●✚  nosetests -sv test_aggregates.py:AggregatesAdminTestJSON.test_aggregate_create_update_with_az
tempest.api.compute.admin.test_aggregates.AggregatesAdminTestJSON.test_aggregate_create_update_with_az ... ok

----------------------------------------------------------------------
Ran 1 test in 8.911s

OK

此时可以看到配置的目录下多了一个空文件,这就是保存在disk上的锁

lihui@MacBook  ~/work/cloud/openstack/tempest-ci/tempest   master ●✚  ls -l tempest-availability_zone
-rw-r--r--  1 lihui  staff  0  1  3 23:25 tempest-availability_zone
 lihui@MacBook  ~/work/cloud/openstack/tempest-ci/tempest   master ●✚ 
 lihui@MacBook  ~/work/cloud/openstack/tempest-ci/tempest   master ●✚ 
 lihui@MacBook  ~/work/cloud/openstack/tempest-ci/tempest   master ●✚  file tempest-availability_zone
tempest-availability_zone: empty

顺便看下日志,可以看到有创建和释放锁

lihui@MacBook  ~/work/cloud/openstack/tempest-ci/tempest/api/compute/admin   master ●✚  grep tempest-availability_zone tempest.log --color | tail -n 2
2017-01-03 23:25:08.070 98920 DEBUG tempest.openstack.common.lockutils [-] Got file lock "availability_zone" at /Users/lihui/work/cloud/openstack/tempest-ci/tempest/tempest-availability_zone lock /Users/lihui/work/cloud/openstack/tempest-ci/tempest/openstack/common/lockutils.py:206
2017-01-03 23:25:09.495 98920 DEBUG tempest.openstack.common.lockutils [-] Released file lock "availability_zone" at /Users/lihui/work/cloud/openstack/tempest-ci/tempest/tempest-availability_zone lock /Users/lihui/work/cloud/openstack/tempest-ci/tempest/openstack/common/lockutils.py:210

都是在这里执行的

                try:
                    lock = InterProcessLock(lock_file_path)
                    with lock as lock:
                        LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
                                  {'lock': name, 'path': lock_file_path})
                        yield lock
                finally:
                    LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
                              {'lock': name, 'path': lock_file_path})

这里通过InterProcessLock创建锁,来自

if os.name == 'nt':
    import msvcrt
    InterProcessLock = _WindowsLock
else:
    import fcntl
    InterProcessLock = _PosixLock

用的Linux,走else

class _PosixLock(_InterProcessLock):
    def trylock(self):
        fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)

    def unlock(self):
        fcntl.lockf(self.lockfile, fcntl.LOCK_UN)

这也是一个类,依旧啥都没干,那就看它的基类,起码要知道参数传进去干了啥

class _InterProcessLock(object):
    """Lock implementation which allows multiple locks, working around
    issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
    not require any cleanup. Since the lock is always held on a file
    descriptor rather than outside of the process, the lock gets dropped
    automatically if the process crashes, even if __exit__ is not executed.

    There are no guarantees regarding usage by multiple green threads in a
    single process here. This lock works only between processes. Exclusive
    access between local threads should be achieved using the semaphores
    in the @synchronized decorator.

    Note these locks are released when the descriptor is closed, so it's not
    safe to close the file descriptor while another green thread holds the
    lock. Just opening and closing the lock file can break synchronisation,
    so lock files must be accessed only using this abstraction.
    """

    def __init__(self, name):
        self.lockfile = None
        self.fname = name

    def __enter__(self):
        self.lockfile = open(self.fname, 'w')

看到这里,其实参数lock_file_path啥都没干,就是给成员变量附了一个值,返回了一个_PosixLock类型的lock

上面只是返回了lock类对象,还没有真正写入磁盘,下面才是

                   with lock as lock:

通过一个迭代生成器,不停返回这个lock给前面的useFixture,同时不停写入日志信息,上面传入的文件名参数实际上只是为了写入日志lock的标识符

因此这里的逻辑就是所有加了下面这行的case,无法并发执行,会加锁同步一个一个case执行

self.useFixture(fixtures.LockFixture('availability_zone'))

搜一下整个目录,的确有很多地方

lihui@MacBook  ~/work/cloud/openstack/tempest-ci/tempest   master ●✚  grep fixtures.LockFixture ./* -R
./api/compute/admin/test_aggregates.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/admin/test_aggregates.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/admin/test_aggregates.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/admin/test_aggregates.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/admin/test_aggregates.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/admin/test_aggregates_negative.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/admin/test_aggregates_negative.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/admin/test_hosts.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/v3/admin/test_aggregates.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/v3/admin/test_aggregates.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/v3/admin/test_aggregates.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/v3/admin/test_aggregates.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/v3/admin/test_aggregates.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/v3/admin/test_aggregates_negative.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/v3/admin/test_aggregates_negative.py:        self.useFixture(fixtures.LockFixture('availability_zone'))
./api/compute/v3/admin/test_hosts.py:        self.useFixture(fixtures.LockFixture('availability_zone'))

因此,如果存在多个case之间必须同步执行,而不能并发的情况,可以相应地添加useFixture方法

 

发表回复