Nova-compute定时任务解析

每个计算节点都有一个nova-compute进程服务,而nova-scheduler在调度的时候会根据所有计算节点的资源信息来进行filter,因此每个计算节点资源信息的正确性必须得到保证,所以提供了一个定时任务,来通过Hypervisor获取相关信息,持续地更新到数据库当中,周期性地保障信息准确性

我这里OpenStack的版本是havana版,可能比较老,有兴趣可以对照最新版

在nova/nova/compute/manager.py里,类ComputeManager里的update_available_resource方法

@periodic_task.periodic_task
def update_available_resource(self, context):
"""See driver.get_available_resource()

Periodic process that keeps that the compute host's understanding of
resource availability and usage in sync with the underlying hypervisor.

:param context: security context
"""
new_resource_tracker_dict = {}
nodenames = set(self.driver.get_available_nodes())
for nodename in nodenames:
rt = self._get_resource_tracker(nodename)
rt.update_available_resource(context)
new_resource_tracker_dict[nodename] = rt

# Delete orphan compute node not reported by driver but still in db
compute_nodes_in_db = self._get_compute_nodes_in_db(context)

for cn in compute_nodes_in_db:
if cn.get('hypervisor_hostname') not in nodenames:
LOG.audit(_("Deleting orphan compute node %s") % cn['id'])
self.conductor_api.compute_node_delete(context, cn)

self._resource_tracker_dict = new_resource_tracker_dict

从函数前面的装饰器可以看出来,这是一个定时执行的函数,这个等会再看;这个函数里首先获取了所有计算节点的列表,赋给nodenames,然后每个节点作为参数传到了函数_get_resource_tracker里,先看下这个函数

def _get_resource_tracker(self, nodename):
rt = self._resource_tracker_dict.get(nodename)
if not rt:
if not self.driver.node_is_available(nodename):
raise exception.NovaException(
_("%s is not a valid node managed by this "
"compute host.") % nodename)

rt = resource_tracker.ResourceTracker(self.host,
self.driver,
nodename)
self._resource_tracker_dict[nodename] = rt
return rt

继续看看resource_tracker.py里的ResourceTracker

class ResourceTracker(object):
"""Compute helper class for keeping track of resource usage as instances
are built and destroyed.
"""

def __init__(self, host, driver, nodename):
self.host = host
self.driver = driver
self.pci_tracker = None
self.nodename = nodename
self.compute_node = None
self.stats = importutils.import_object(CONF.compute_stats_class)
self.tracked_instances = {}
self.tracked_migrations = {}
self.conductor_api = conductor.API()

其实上面rt = self._get_resource_tracker(nodename)只是获取了该计算节点的ResourceTracker对象,作用就是查询和管理计算节点的资源信息

接着rt.update_available_resource(context),就是调用了ResourceTracker对象的update_available_resource方法(同名),更新指定计算节点的资源信息

@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
def update_available_resource(self, context):
"""Override in-memory calculations of compute node resource usage based
on data audited from the hypervisor layer.

Add in resource claims in progress to account for operations that have
declared a need for resources, but not necessarily retrieved them from
the hypervisor layer yet.
"""
LOG.audit(_("Auditing locally available compute resources"))
resources = self.driver.get_available_resource(self.nodename)

if not resources:
# The virt driver does not support this function
LOG.audit(_("Virt driver does not support "
"'get_available_resource' Compute tracking is disabled."))
self.compute_node = None
return
resources['host_ip'] = CONF.my_ip

self._verify_resources(resources)

self._report_hypervisor_resource_view(resources)

if 'pci_passthrough_devices' in resources:
if not self.pci_tracker:
self.pci_tracker = pci_manager.PciDevTracker()
self.pci_tracker.set_hvdevs(jsonutils.loads(resources.pop(
'pci_passthrough_devices')))

# Grab all instances assigned to this node:
instances = instance_obj.InstanceList.get_by_host_and_node(
context, self.host, self.nodename)

# Now calculate usage based on instance utilization:
self._update_usage_from_instances(resources, instances)

# Grab all in-progress migrations:
capi = self.conductor_api
migrations = capi.migration_get_in_progress_by_host_and_node(context,
self.host, self.nodename)

self._update_usage_from_migrations(context, resources, migrations)

# Detect and account for orphaned instances that may exist on the
# hypervisor, but are not in the DB:
orphans = self._find_orphaned_instances()
self._update_usage_from_orphans(resources, orphans)

# NOTE(yjiang5): Because pci device tracker status is not cleared in
# this periodic task, and also because the resource tracker is not
# notified when instances are deleted, we need remove all usages
# from deleted instances.
if self.pci_tracker:
self.pci_tracker.clean_usage(instances, migrations, orphans)
resources['pci_stats'] = jsonutils.dumps(self.pci_tracker.stats)
else:
resources['pci_stats'] = jsonutils.dumps({})

self._report_final_resource_view(resources)

if CONF.ignore_disk_over_commit:
resources['disk_available_least'] = resources['free_disk_gb']

self._sync_compute_node(context, resources)

上面一堆场景,看下这个函数self.driver.get_available_resource,在nova/nova/virt/libvirt/driver.py

def get_available_resource(self, nodename):
"""Retrieve resource information.

This method is called when nova-compute launches, and
as part of a periodic task that records the results in the DB.

:param nodename: will be put in PCI device
:returns: dictionary containing resource info
"""

# Temporary: convert supported_instances into a string, while keeping
# the RPC version as JSON. Can be changed when RPC broadcast is removed
stats = self.host_state.get_host_stats(refresh=True)
stats['supported_instances'] = jsonutils.dumps(
stats['supported_instances'])
return stats

继续看下这里的get_host_stats方法,是通过它返回的

def get_host_stats(self, refresh=False):
"""Return the current state of the host.

If 'refresh' is True, run update the stats first.
"""
if refresh or not self._stats:
self.update_status()
return self._stats

继续看下update_status,看到注释Retrieve status info from libvirt也明白了,从libvirt那里将节点的相关资源都获取了

def update_status(self):
"""Retrieve status info from libvirt."""
def _get_disk_available_least():
"""Return total real disk available least size.

The size of available disk, when block_migration command given
disk_over_commit param is FALSE.

The size that deducted real instance disk size from the total size
of the virtual disk of all instances.

"""
disk_free_gb = disk_info_dict['free']
disk_over_committed = (self.driver.
get_disk_over_committed_size_total())
# Disk available least size
available_least = disk_free_gb * (1024 ** 3) - disk_over_committed
return (available_least / (1024 ** 3))

LOG.debug(_("Updating host stats"))
disk_info_dict = self.driver.get_local_gb_info()
data = {}

#NOTE(dprince): calling capabilities before getVersion works around
# an initialization issue with some versions of Libvirt (1.0.5.5).
# See: https://bugzilla.redhat.com/show_bug.cgi?id=1000116
# See: https://bugs.launchpad.net/nova/+bug/1215593
LOG.debug(_("get instance capabilities"))
data["supported_instances"] = \
self.driver.get_instance_capabilities()

# NOTE(hzguanqiang): hard coding here to support ceph filter.
if CONF.libvirt_images_type == 'rbd':
ceph_pool = CONF.libvirt_images_rbd_pool
data["supported_instances"].append(["ceph", ceph_pool])

LOG.debug(_("get vcpu total"))
data["vcpus"] = self.driver.get_vcpu_total()
LOG.debug(_("get memory mb total"))
data["memory_mb"] = self.driver.get_memory_mb_total()
data["local_gb"] = disk_info_dict['total']
LOG.debug(_("get vcpu used"))
data["vcpus_used"] = self.driver.get_vcpu_used()
LOG.debug(_("get memory mb used"))
data["memory_mb_used"] = self.driver.get_memory_mb_used()
data["local_gb_used"] = disk_info_dict['used']
LOG.debug(_("get hypervisor type"))
data["hypervisor_type"] = self.driver.get_hypervisor_type()
LOG.debug(_("get hypervisor version"))
data["hypervisor_version"] = self.driver.get_hypervisor_version()
LOG.debug(_("get hypervisor hostname"))
data["hypervisor_hostname"] = self.driver.get_hypervisor_hostname()
LOG.debug(_("get cpu info"))
data["cpu_info"] = self.driver.get_cpu_info()
if not CONF.ignore_disk_over_commit:
LOG.debug(_("get disk available least"))
data['disk_available_least'] = _get_disk_available_least()
else:
LOG.debug(_("skip get disk available least"))
data['disk_available_least'] = disk_info_dict['free']

LOG.debug(_("get pci passthrough devices"))
data['pci_passthrough_devices'] = \
self.driver.get_pci_passthrough_devices()

self._stats = data

LOG.debug(_("Update host stats end"))
return data

data这个dict保存了信息之后,一路往上返回,最终将节点的资源信息return给resource

最终调用resource_tracker.py里下面self._sync_compute_node方法写入到数据库里

def _sync_compute_node(self, context, resources):
"""Create or update the compute node DB record."""
if not self.compute_node:
# we need a copy of the ComputeNode record:
service = self._get_service(context)
if not service:
# no service record, disable resource
return

compute_node_refs = service['compute_node']
if compute_node_refs:
for cn in compute_node_refs:
if cn.get('hypervisor_hostname') == self.nodename:
self.compute_node = cn
if self.pci_tracker:
self.pci_tracker.set_compute_node_id(cn['id'])
break

if not self.compute_node:
# Need to create the ComputeNode record:
resources['service_id'] = service['id']
self._create(context, resources)
if self.pci_tracker:
self.pci_tracker.set_compute_node_id(self.compute_node['id'])
LOG.info(_('Compute_service record created for %(host)s:%(node)s')
% {'host': self.host, 'node': self.nodename})

else:
# just update the record:
self._update(context, resources, prune_stats=True)
LOG.info(_('Compute_service record updated for %(host)s:%(node)s')
% {'host': self.host, 'node': self.nodename})

再继续第一段代码的流程,new_resource_tracker_dict[nodename] = rt,将更新后的节点信息保存起来

据此,new_resource_tracker_dict = {}这个dict就保存好了所有node最新的资源信息

最后self._resource_tracker_dict = new_resource_tracker_dict,成员变量self._resource_tracker_dict保存了当前nova-compute服务管理的计算节点资源信息

class ComputeManager(manager.SchedulerDependentManager):
"""Manages the running instances from creation to destruction."""

RPC_API_VERSION = '2.49'

def __init__(self, compute_driver=None, *args, **kwargs):
"""Load configuration options and connect to the hypervisor."""
self.virtapi = ComputeVirtAPI(self)
self.network_api = network.API()
self.volume_api = volume.API()
self._last_host_check = 0
self._last_bw_usage_poll = 0
self._last_vol_usage_poll = 0
self._last_info_cache_heal = 0
self._last_bw_usage_cell_update = 0
self.compute_api = compute.API()
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
self.network_rpcapi = network_rpcapi.NetworkAPI()
self.conductor_api = conductor.API()
self.compute_task_api = conductor.ComputeTaskAPI()
self.is_neutron_security_groups = (
openstack_driver.is_neutron_security_groups())
self.consoleauth_rpcapi = consoleauth.rpcapi.ConsoleAuthAPI()
self.cells_rpcapi = cells_rpcapi.CellsAPI()
self._resource_tracker_dict = {}

直到下一次定时任务开启,继续更新数据库,将new_resource_tracker_dict最新的资源信息更新给该成员变量保存

这部分主要只是定时执行ComputeManager类里的update_available_resource方法,将通过Hypervisor获取最新的节点资源信息更新到DB当中

但是具体定时任务的实现,是一个装饰器,在resource_tracker.py里,如下,暂时还没看懂~! 

def periodic_task(*args, **kwargs):
"""Decorator to indicate that a method is a periodic task.

This decorator can be used in two ways:

1. Without arguments '@periodic_task', this will be run on every cycle
of the periodic scheduler.

2. With arguments:
@periodic_task(spacing=N [, run_immediately=[True|False]])
this will be run on approximately every N seconds. If this number is
negative the periodic task will be disabled. If the run_immediately
argument is provided and has a value of 'True', the first run of the
task will be shortly after task scheduler starts. If
run_immediately is omitted or set to 'False', the first time the
task runs will be approximately N seconds after the task scheduler
starts.
"""
def decorator(f):
# Test for old style invocation
if 'ticks_between_runs' in kwargs:
raise InvalidPeriodicTaskArg(arg='ticks_between_runs')

# Control if run at all
f._periodic_task = True
f._periodic_external_ok = kwargs.pop('external_process_ok', False)
if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
f._periodic_enabled = False
else:
f._periodic_enabled = kwargs.pop('enabled', True)

# Control frequency
f._periodic_spacing = kwargs.pop('spacing', 0)
f._periodic_immediate = kwargs.pop('run_immediately', False)
if f._periodic_immediate:
f._periodic_last_run = None
else:
f._periodic_last_run = timeutils.utcnow()
return f

# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
# and without parens.
#
# In the 'with-parens' case (with kwargs present), this function needs to
# return a decorator function since the interpreter will invoke it like:
#
# periodic_task(*args, **kwargs)(f)
#
# In the 'without-parens' case, the original function will be passed
# in as the first argument, like:
#
# periodic_task(f)
if kwargs:
return decorator
else:
return decorator(args[0])

 

 

发表回复