目录

  1. cliff
    1. 示例
  2. oslo.config
    1. 示例
    2. 配置选项支持的类型
    3. 自定义配置选项类型
    4. 使用配置文件和命令行选项指定配置选项
    5. 使用其他模块中已经注册过的配置选项
  3. oslo.db
    1. 使用SQLAlchemy的session和connection
    2. 使用通用的SQLAlchemy model类
    3. 不同DB后端的支持
  4. oslo.messaging
    1. Transport
    2. Target
    3. Server
    4. RPC Client
    5. Notifier
    6. Notification Listener
    7. 利用oslo.messaging来实现远程过程调用
    8. 利用oslo.messaging实现通知消息处理
  5. stevedore
    1. 插件
    2. 插件的实现
    3. 插件的注册
    4. 插件的载入
  6. taskflow
    1. 示例
    2. task
    3. retry
    4. engine
    5. task和flow的输入/输出
      1. requires
      2. rebind
      3. provides
  7. oslo.policy
  8. oslo.rootwarp
    1. 构造rootwrap shell脚本
    2. 调用rootwrap shell脚本
    3. rootwrap配置文件
    4. 定义Filter
  9. oslo.test
  10. oslo.versionedobjects

cliff

cliff(Command Line Interface Formulation Framework)可以用来帮助构建命令行程序。

开发者利用cliff框架可以构建诸如svn、git那样的支持多层命令的命令行程序。主程序只负责基本的命令行参数的解析,然后调用各个子命令去执行不同的操作。利用Python动态代码载入的特性,Cliff框架中的每个子命令可以和主程序分开来实现、打包和分发。

整个cliff框架主要包括以下4种不同类型的对象。

  • cliff.app.App:主程序对象,用来启动程序,并且负责一些对所有子命令都通用的操作,比如设置日志选项和输入/输出等。

  • cliff.commandmanager.CommandManager:主要用来载入每一个子命令插件。默认是通过Setuptools的entry points来载入的。

  • cliff.command.Command:用户可以实现Command的子类来实现不同的子命令,这些子命令被注册在Setuptools的entry points中,被CommandManager载入,每个子命令可以有自己的参数解析(一般使用argparse),同时要实现take_action()方法完成具体的命令。

  • cliff.interactive.InteractiveApp:实现交互式命令行,一般使用框架提供的默认实现。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# main.py

import sys
import logging
from cliff.app import App
from cliff.commandmanager import CommandManager


class MyApp(App):
log = logging.getLogger(__name__)

def __init__(self):
super(MyApp, self).__init__(
description='Cliff Demo App',
version='1.0',
command_manager=CommandManager('cliff.cliffdemo')
)

def initialize_app(self, argv):
self.LOG.debug('initialize_app')

def prepare_to_run_command(self, cmd):
self.LOG.debug('prepare_to_run_command %s' % cmd.__class__.__name__)

def clean_up(self, cmd, result, err):
self.LOG.debug('clean_up %s' % cmd.__class__.__name__)
if err:
self.LOG.debug('got an error: %s' % err)


def main(argv=sys.argv[1:]):
my_app = MyApp()
return my_app.run(argv)


if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))

上面是主程序代码,新建一个MyApp实例对象,并且调用其run方法运行。MyApp是cliff.app.App的子类,它的初始化函数的原型定义如下。

1
2
3
4
5
6
7
8
9
class cliff.app.App(description, 
version,
command_manager,
stdin=None,
stdout=None,
stderr=None,
interactive_app_factory=<class cliff.interactive.InteractiveApp>,
deferred_help=False):
pass

其中,stdin/stdout/stderr可以用来自定义用户自己的标准输入/输出/错误,command_manager必须指向一个cliff.commandmanager.CommandManager的对象实例,来载入各个子命令插件。

cliff.commandmanager.CommandManager类的初始化函数原型定义如下。

1
2
3
class cliff.commandmanager.CommandManager(namespace,
convert_underscores=True):
pass

其中,namespace用来指定Setuptool entry points的命名空间,CommandManager只会从这个命名空间中载入插件,convert_underscores参数指明是否需要把entry points中的下划线转为空格。

cliff.App类的方法initialize_app(),会在主程序解析完用户的命令行参数后被调用,而且只会被调用一次。
prepare_to_run_command()方法可以被用来做一些针对某个具体子命令的初始化工作,它将在该子命令执行之前被调用。
clean_up()方法会在具体某个子命令完成后被调用,用来进行一些清理工作。

具体某个子命令的实现通过继承cliff.command.Command来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# simple.py

import logging
from cliff.command import Command


class Simple(Command):

log = logging.getLogger(__name__)

def take_action(self, parsed_args):
self.log.info('sending greeting')
self.log.debug('debugging')
self.app.stdout.write('Hello world \n')

子命令的实际工作由take_action()来完成。这例子里,simple子命令向标准输出打印一个字符串,它的实现代码由cliff.commandmanager.CommandManager通过Setuptools entry points来载入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# setup.py

from setuptools import setup, find_packages

setup(
name='cliffdemo',
version='1.0',
install_requires=['cliff'],
namespace_packages=[],
packages=find_packages(),

entry_points={
'console_scripts': [
'cliffdemo = cliffdemo.main:main'
],
'cliff.cliffdemo': [
'simple = cliffdemo.simple:Simple'
]
}
)

在Setuptools entry points的命名空间cliff.demo中,定义了命令simple所对应的插件实现是Simple类。Cliff主程序解析用户的输入后,会通过这里所定义的对应关系调用不同的实现类。

simple命令执行结果如下。

oslo.config

oslo.config用于解析命令行和配置文件中的配置项。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
service.py

import os
from oslo_config import cfg

opts = [
cfg.StrOpt(
'host',
default='127.0.0.1',
help='host of node'
),
cfg.IntOpt(
'collector_workers',
default=2,
help='Number of workers for collector service.'
),
]

# 注册配置选项
cfg.CONF.register_opts(opts)

# 将配置选项注册为命令行选项
cli_opts = [
cfg.StrOpt(
'os-tenant-id',
deprecated_group='DEFAULT',
default=os.environ.get('OS_TENANT_ID', ''),
help='Tenant ID to use for Openstack service access.'
),
cfg.BoolOpt(
'insecure',
default=False,
help='xxx'
),
]

cfg.CONF.register_cli_opts(cli_opts, group='service_credentials')

配置选项支持的类型

配置选项支持的类型如下。

类名 说明
oslo_config.cfg.StrOpt 字符串类型
oslo_config.cfg.BoolOpt 布尔型
oslo_config.cfg.IntOpt 整数类型
oslo_config.cfg.FloatOpt 浮点数类型
oslo_config.cfg.ListOpt 字符串列表类型
oslo_config.cfg.DictOpt 字典类型,字典中的值需要是字符串类型。
oslo_config.cfg.MultiStrOpt 可以分多次配置的字符串列表
oslo_config.cfg.IPOpt IP地址类型
oslo_config.cfg.HostnameOpt 域名类型
oslo_config.cfg.URIOpt URI类型

定义后的配置项,必须要注册才能使用。
配置项还可以注册为命令行选项,之后,可以从命令行读取配置选项的值,并覆盖从配置文件中读取的值。
注册配置选项时,可以把某些配置选项注册在一个特定的组下。默认的组是“DEFAULT”。

自定义配置选项类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14

from oslo_config import cfg
from oslo_config import types

PortType = types.Integer(1, 65535)

common_opts = [
cfg.Opt(
'bind_port',
type=PortType(),
default=8080,
help='Port number to listen on.'
)
]

相比于前面的方法,这种定义配置选项的方式能够更好地支持选项值得合法性检查,同时也能支持自定义选项类型。

使用配置文件和命令行选项指定配置选项

为了正确使用oslo.config,应用程序一般需要在启动的时候初始化。

1
2
3
from olso_config import cfg

cfg.CONF(sys.argv[1:], project='xyz')

初始化之后,才能正常解析配置文件和命令行选项。最终用户可以用默认的命令行选项“–config-file”或者“–config-dir”来指定配置文件名或者位置。如果没有明确指定,默认按照下面的顺序来寻找配置文件。

1
~/.xyz/xyz.conf ~/xyz.conf /etc/xyz/xyz.conf /etc/xyz.conf

配置文件一般采用类似.ini文件的格式,其中每一个Section对应oslo.config中定义的一个配置选项组,Section[DEFAULT]对应了默认组“DEFAULT”。

1
2
3
4
5
6
7
[DEFULT]
host = 127.0.0.1
collector_workers = 3

[service_credentials]
insecure = True
os-tenant-id = xyz123

用命令行指定配置选项值时,如果是定义在某个选项组中的选项,命令行选项名中需要包括该组名作为前缀。

1
--service_credentials-os-tenant-id abc123

使用其他模块中已经注册过的配置选项

对于已经注册过的配置选项,开发者可以直接访问。

1
2
3
4
from oslo_config import cfg

host = cfg.CONF.host
tenant_id = cfg.CONF.service_credentials.os-tenant-id

还可以使用import_opt来申明在别的模块中定义的配置选项。

1
2
3
4
from oslo_config import cfg

cfg.CONF.import_opt('host', 'service')
host = cfg.CONF.host

oslo.db

oslo.db是针对SQLAlchemy访问的抽象。

使用SQLAlchemy的session和connection

oslo.db提供了oslo_db.sqlachemy.enginefacade模块来获取session和connection,有两种方法来使用enginefacade,即函数装饰器(decorator)和上下文管理器(context mangager)。这两种调用方式都需要提供一个上下文对象。上下文对象可以是任何Python类。这样做的目的是提供一个统一规范的session使用模式,避免调用者使用不当造成数据库事务(transaction)的滥用和嵌套。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from oslo_db.sqlalchemy import enginefacade


class SomeClass(object):
def __init__(self, x, y):
self.x = x
self.y = y


class MyContext(object):
pass
'User-defined context class'


def read(context):
with enginefacade.reader.using(context) as session:
return session.query(SomeClass).all()


def write(context, x, y):
with enginefacade.writer.using(context) as session:
session.add(SomeClass(x, y))


def main():
context = MyContext()
results = read(context)
write(context, 0, 0)

当使用装饰器模式的时候需要对context对象做特殊处理,调用transaction_context_provider装饰context对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from oslo_db.sqlalchemy import enginefacade


class SomeClass(object):
def __init__(self, x, y):
self.x = x
self.y = y


@enginefacade.transaction_context_provider
class MyContext(object):
pass
'User-defined context class'


@enginefacade.reader
def read(context):
return context.session.query(SomeClass).all()


@enginefacade.writer
def write(context, x, y):
context.session.add(SomeClass(x, y))


def main():
context = MyContext()
results = read(context)
write(context, 0, 0)

管理员可以通过配置文件来配置oslo.db的许多选项,比如:

1
2
[database]
connection = mysql://root:root@localhost/ceilometer?charset=utf8

用户可以在使用数据库之前调用oslo_db.sqlalchemy.enginefacade.configure()方法来改变已有的配置。

常用配置选项。

常用配置选项常用配置选项

使用通用的SQLAlchemy model类

1
2
3
4
5
6
7
8
9
10

from oslo_db.sqlalchemy.models import TimestampMixin, ModelBase
from oslo_db.sqlalchemy.models import types
from oslo_db.sqlalchemy.models import Column


class Project(TimestampMixin, ModelBase):
id = Column(types.Integer, primary_key=True)
name = Column(types.String)

oslol_db.sqlalchemy.models定义了两种Mixin:TimestampMixin和SoftDeleteMixin。使用TimestampMixin时SQLAlchemy model中会多出两列create_at和update_at,分别表示记录的创建时间和上一次修改时间。

SoftDeleteMixin支持使用soft delete功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
from oslo_db.sqlalchemy.models import ModelBase, SoftDeleteMixin
from oslo_db.sqlalchemy.models import types
from oslo_db.sqlalchemy.models import Column
from oslo_db.sqlalchemy.utils import model_query

class Bar(SoftDeleteMixin, ModelBase):
id = Column(types.Integer, primary_key=True)
name = Column(types.String)


count = model_query(Bar).find(id=1).soft_delete()
if count == 0:
raise Exception("0 entries were soft deleted")

不同DB后端的支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from oslo_config import cfg
from oslo_db import api as db_api

# 定义不同backend所对应的实现,如果配置选项conf.database.backend的值为sqlalchemy,
# 就用project.db.sqlalchemy.api模块中的实现。

_BACKEND_MAPPING = {
'sqlalchemy': 'project.db.sqlalchemy.api'
}

IMPL = db_api.DBAPI.from_config(cfg.CONF,
backend_mapping=_BACKEND_MAPPING)


def get_engine():
return IMPL.get_engine()


def get_session():
return IMPL.get_session()


# DB-API method
def do_something(id):
return IMPL.do_something(id)

不同backend具体实现时,需要定义如下函数返回具体DB API的实现类。

1
2
def get_backend():
return MyImplementationClass

oslo.messaging

oslo.messaging库为OpenStack各个项目使用RPC和事件通知(Event Notification)提供了一套统一的接口。

为了支持不同的RPC后端实现,oslo.messaging对如下的对象进行统一。

Transport

Transport主要实现RPC底层的通信(如Socket)以及时间循环、多线程等其他功能。用户可以通过URL来获得指向不同transport实现的句柄。URL格式如下:

1
transport://user:pass@host1:port[,hostN:portN]/virtual_host

目前支持的Transport有rabbit、qpid与zmq,分别对应不同的后端消息总线。用户可以使用oslo.messaging.get_transport函数来获得transport对象实例的句柄。

Target

Target封装了指定一个消息最终目的地的所有信息。

Target 参数Target 参数

在不同的应用场景下,构造Target对象需要不同的参数:
创建一个RPC服务器时,需要topic和server参数,exchange参数可选;
指定一个endpoint的target时,namespace和version是可选的;
客户端发送消息时,需要topic参数,其他可选。

Server

一个RPC服务器可以暴露多个endpoint,每个endpoint包含一组方法,这组方法可以被客户端通过某种Transport对象远程调用。创建Server对象时,需要指定Transport、Target和一组endpoint。

RPC Client

通过RPC Client,可以远程调用RPC Server上的方法。远程调用时,需要提供一个字典对象来指明调用的上下文,调用方法的名字和传递给调用方法的参数(用字典表示)。

有cast和call两种远程调用方式。
通过cast方式调用,请求发送后就直接返回。
通过call方式调用,需要等待响应从服务器返回。

Notifier

Notifier用来通过Transport发送通知消息。通知消息遵循如下的格式。

1
2
3
4
5
6
7
8
{ 
'message_id': six.text_type(uuid.uuid4()), # 消息id
'publisher_id': 'compute.host1' # 发送者id
'timestamp': timeutils.utcnow(), # 时间戳
'priority': 'WARN', # 通知优先级
'event_type': 'compute.create_instance', # 通知类型
'payload': {'instance_id': '123'} # 通知内容
}

用户可以在不同的优先级上发送通知,包括sample、critical、error、warn、info、debug和audit等。

Notification Listener

Notification Listener和Server类似,一个Notification Listener对象可以暴露多个endpoint,每个endpoint包含一组方法。但是与Server对象中的endpoint不同的是,这里的endpoint中的方法对应通知消息的不同优先级。

1
2
3
4
5
6
from oslo import messaging

Class ErrorEndpoint(object):
def error(self, ctxt, pulisher_id, event_type, payload, metadata):
do_something(payload)
return messaging.NotificationResult.HANDLED

endpoint中的方法如果返回messaging.NotificationResult.HANDLED或者None,表示这个通知消息已经被处理;
如果返回messaging.NotificationResult.REQUEUE,表示这个通知消息要重新进入消息队列。

利用oslo.messaging来实现远程过程调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# server.py 服务器端

from oslo_config import cfg
import oslo_messaging as messaging
import time


class ServerControllerEndpoint(object):
target = messaging.Target(
namespace='control',
version='2.0'
)

def __init__(self, server):
self.server = server

def stop(self, context):
if self.server:
self.server.stop()


class TestEndpoint(object):
def test(self, context, arg):
print arg
return arg


transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test', server='server-1')
endpoints = [
ServerControllerEndpoint(None),
TestEndpoint()
]
server = messaging.get_rpc_server(
transport=transport,
target=target,
endpoints=endpoints,
executor='blocking'
)

try:
server.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
print 'Stop server.'

server.wait()

上面定义了两个endpoint:ServerControlEndpoint与TestEndpoint。这两个endpoint中的方法stop()和test()都可以被客户端远程调用。

创建rpc server对象之前,先创建transport和target对象,这里使用get_transport()函数来获得transport对象的句柄。

get_transport 参数get_transport 参数

conf对象里,除了包含transport_url项外,还可以包含control_exchange项。control_exchange用来指明topic所属的默认范围,默认值为“openstack”。用户可以使用oslo.messaging.set_transport_defaults()函数来修改默认值。

此处构建的Target对象是用来建立RPC server的,所以需要指明topic和server参数。用户定义的endpoint对象也可以包含一个target属性,用来指明这个endpoint所支持的特定的namespace和version。

这里使用get_rpc_server()函数创建server对象,然后调用server对象的start()方法开始接收远程调用。

get_rpc_server 参数get_rpc_server 参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# client.py 客户端

from oslo_config import cfg
import oslo_messaging as messaging

transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test')
client = messaging.RPCClient(
transport=transport,
target=target
)

ret = client.call(
ctxt={},
method='test',
arg={'password': 'Hello world'}

)
cctxt = client.prepare(
namespace='control',
version='2.0'
)

cctxt.cast({}, 'stop')

这里target对象构造时,必要的参数只有topic,创建RPCClient对象时,可以接收的参数如下。

RPCClient 参数RPCClient 参数

远程调用时,需要传入调用上下文、调用方法的名字和传给调用方法的参数。

Target对象的属性在RPCClient对象构造以后,还可以通过prepare()方法修改。用户可以修改的属性包括exchange、topic、namespace、version、server、fanout、timeout、version_cap和retry。修改后target属性只在prepare()方法返回的对象中有效。

利用oslo.messaging实现通知消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# notification_listener.py 消息通知处理

from oslo_config import cfg
import oslo_messaging as messaging


class NotificationEndpoint(object):
filter_rule = messaging.NotificationFilter(
publisher_id='^compute.*'
)

def warn(self, context, publisher_id, event_type, payload, metadata):
print context, publisher_id, event_type, payload, metadata


class ErrorEndpoint(object):
filter_rule = messaging.NotificationFilter(
publisher_id='^instance\..*\.start$',
context={'ctxt_key': 'regexp'}
)

def error(self, context, publisher_id, event_type, payload, metadata):
print context, publisher_id, event_type, payload, metadata


transport = messaging.get_transport(cfg.CONF)
targets = [
messaging.Target(topic='notifications'),
messaging.Target(topic='notifications_test')
]
endpoints = [
NotificationEndpoint(),
ErrorEndpoint()
]
pool = 'listener-workers'
listener = messaging.get_notification_listener(
transport=transport,
targets=targets,
endpoints=endpoints,
pool=pool
)
listener.start()
listener.wait()

通知消息处理的endpoint对象和远程过程调用的endpoint对象不同,对象定义的方法需要和通知消息的优先级一一对应。我们可以为不同的endpoint对象指定所对应的target对象。

最后调用get_notification_listener()函数构造notification listener对象。

get_notification_listener 参数get_notification_listener 参数

相对应的发送消息通知的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# notifier_send.py

from oslo_config import cfg
import oslo_messaging as messaging

transport = messaging.get_transport(cfg.CONF)
notifier = messaging.Notifier(
transport=transport,
driver='messaging',
topics=['notifications']
)

notifier_2 = notifier.prepare(publisher_id='compute')
notifier_2.error(
ctxt={},
event_type='my_type',
payload={'content': 'Hello world'}
)

发送消息时,首先要构造Notifier对象。

Notifier 参数Notifier 参数

初始化Notifier对象的操作比较复杂,所以可以用prepare()方法修改已经创建的Notifier对象,prepare()方法返回的是新的Notifier对象的实例。

prepare 参数prepare 参数

最后可以调用Notifier对象的不同方法(error、warn、info等)发送不同优先级的消息通知。

stevedore

利用Python语言的特性,运行时动态载入代码变得更加容易。很多Python应用程序利用这样的特性在运行时发现和载入所谓的“插件”(plugin),使得自己更易于扩展。Python库stevedore就是在Setuptools的entry points基础上,构造了一层抽象层,使开发者可以更容易地在运行时发现和载入插件。

插件

entry points的每一个命名空间里,可以包含多个entry point项。stevedore要求每一项都符合如下格式:

1
name = module:importable

左边是插件的名字,右边是它的具体实现,中间用等号分隔开。插件的具体实现用“模块:可导入的对象”的形式来指定。

1
2
3
4
5
6
7
8
ceilometer.compute.virt =
libvirt = ceilometer.compute.virt.libvirt.inspector:LibvirtInspector
hyperv = ceilometer.compute.virt.hyperv.inspector:HyperVInspector
vsphere = ceilometer.compute.virt.vmware.inspector:VsphereInspector
xenapi = ceilometer.compute.virt.xenapi.inspector:XenapiInspector

ceilometer.hardware.inspectors =
snmp = ceilometer.hardware.inspector.snmp:SNMPInspector

示例中显示了两个不同的entry points的命名空间,”ceilometer.compute.virt“和”ceilometer.hardware.inspectors“,分别注册了4个和1个插件。

根据每个插件在entry point中名字和具体实现的数量之间的对应关系不同,stevedore提供了多种不同的类来帮助开发者发现和载入插件。

插件名称: 具体实现 建议选用stevedore中的类
1: 1 stevedore.driver.DriverManager
1: n stevedore.hook.HookManager
m: n stevedore.extension.ExtensionManager

使用stevedore来帮助程序动态载入插件的过程主要分为3个部分:插件的实现、插件的注册和插件的载入。

插件的实现

Ceilometer的inspector驱动,为不从不同类型hypervisor中获取相关数据提供统一的接口以供compute agent使用。下面是它的基类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# ceilometer/compute/virt/inspector.py

class Inspector(object):

def __init__(self, conf):
self.conf = conf

def inspect_instance(self, instance, duration):
"""Inspect the CPU statistics for an instance.

:param instance: the target instance
:param duration: the last 'n' seconds, over which the value should be
inspected
:return: the instance stats
"""
raise ceilometer.NotImplementedError

ceilometer/compute/virt/hyperv/inspector.py,ceilometer/compute/virt/libvirt/inspector.py,ceilometer/compute/virt/vmware/inspector.py和ceilometer/compute/virt/xenapi/inspector.py分别为hyperv、kvm、vsphere和xenapi4种不同hypervisor的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

from ceilometer.compute.virt import inspector as virt_inspector

class LibvirtInspector(virt_inspector.Inspector):

def __init__(self, conf):
super(LibvirtInspector, self).__init__(conf)
# NOTE(sileht): create a connection on startup
self.connection

@property
def connection(self):
return libvirt_utils.refresh_libvirt_connection(self.conf, self)

@libvirt_utils.raise_nodata_if_unsupported
@libvirt_utils.retry_on_disconnect
def inspect_instance(self, instance, duration=None):
domain = self._get_domain_not_shut_off_or_raise(instance)

memory_used = memory_resident = None
memory_swap_in = memory_swap_out = None
memory_stats = domain.memoryStats()
# Stat provided from libvirt is in KB, converting it to MB.
if 'available' in memory_stats and 'unused' in memory_stats:
memory_used = (memory_stats['available'] -
memory_stats['unused']) / units.Ki
if 'rss' in memory_stats:
memory_resident = memory_stats['rss'] / units.Ki
if 'swap_in' in memory_stats and 'swap_out' in memory_stats:
memory_swap_in = memory_stats['swap_in'] / units.Ki
memory_swap_out = memory_stats['swap_out'] / units.Ki

# TODO(sileht): stats also have the disk/vnic info
# we could use that instead of the old method for Queen
stats = self.connection.domainListGetStats([domain], 0)[0][1]
cpu_time = 0
current_cpus = stats.get('vcpu.current')
# Iterate over the maximum number of CPUs here, and count the
# actual number encountered, since the vcpu.x structure can
# have holes according to
# https://libvirt.org/git/?p=libvirt.git;a=blob;f=src/libvirt-domain.c
# virConnectGetAllDomainStats()
for vcpu in six.moves.range(stats.get('vcpu.maximum', 0)):
try:
cpu_time += (stats.get('vcpu.%s.time' % vcpu) +
stats.get('vcpu.%s.wait' % vcpu))
current_cpus -= 1
except TypeError:
# pass here, if there are too many holes, the cpu count will
# not match, so don't need special error handling.
pass

if current_cpus:
# There wasn't enough data, so fall back
cpu_time = stats.get('cpu.time')

return virt_inspector.InstanceStats(
cpu_number=stats.get('vcpu.current'),
cpu_time=cpu_time,
memory_usage=memory_used,
memory_resident=memory_resident,
memory_swap_in=memory_swap_in,
memory_swap_out=memory_swap_out,
cpu_cycles=stats.get("perf.cpu_cycles"),
instructions=stats.get("perf.instructions"),
cache_references=stats.get("perf.cache_references"),
cache_misses=stats.get("perf.cache_misses"),
memory_bandwidth_total=stats.get("perf.mbmt"),
memory_bandwidth_local=stats.get("perf.mbml"),
cpu_l3_cache_usage=stats.get("perf.cmt"),
)

插件的注册

上述的插件需要在Setuptools的相关文件中注册后,才能被stevedore库所认识。

1
2
3
4
5
6
7
8
# setup.cfg

ceilometer.compute.virt =
libvirt = ceilometer.compute.virt.libvirt.inspector:LibvirtInspector
hyperv = ceilometer.compute.virt.hyperv.inspector:HyperVInspector
vsphere = ceilometer.compute.virt.vmware.inspector:VsphereInspector
xenapi = ceilometer.compute.virt.xenapi.inspector:XenapiInspector

插件的载入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# ceilometer/compute/virt/inspector.py

def get_hypervisor_inspector(conf):
try:
namespace = 'ceilometer.compute.virt'
mgr = driver.DriverManager(namespace,
conf.hypervisor_inspector,
invoke_on_load=True,
invoke_args=(conf, ))
return mgr.driver
except ImportError as e:
LOG.error("Unable to load the hypervisor inspector: %s" % e)
return Inspector(conf)

Ceilometer的compute agent通过调用函数get_hypervisor_inspector来载入具体的某一个插件。此处由于插件和具体实现之间是一对一的关系,所以选用了stevedore的DriverManager类。

DriverManager 参数DriverManager 参数

taskflow

通过TaskFlow库,可以更容易地控制任务(Task)的执行。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow import task


class CallJim(task.Task):
def execute(self, jim_number, *args, **kwargs):
print 'Calling Jim %s.' % jim_number

def revert(self, jim_number, *args, **kwargs):
print 'Calling %s and apologizing.' % jim_number


class CallJoe(task.Task):
def execute(self, joe_number, *args, **kwargs):
print 'Calling Joe %s.' % joe_number

def revert(self, joe_number, *args, **kwargs):
print 'Calling %s and apologizing.' % joe_number


class CallHY(task.Task):
def execute(self, hy_number, *args, **kwargs):
raise IOError('HY not at home right now.')


flow = lf.Flow('simple-linear').add(
CallJim(),
CallJoe(),
CallHY()
)

try:
engines.run(
flow=flow,
engine_conf={'engine': 'serial'},
store=dict(jim_number=444,
joe_number=555,
hy_number=666)
)
except Exception as e:
print 'Flow failed: %s' % e

task

这个示例首先定义了三个task:CallJim、CallJoe和CallHY。在TaskFlow库中,task是拥有执行(execute)和回滚(revert)功能的最小单位(TaskFlow中最小的单位是atom,其他所有类包括Task都是Atom类的子类)。在Task类中,允许开发者定义自己的execute函数和revert函数,分别用来执行task和回退task到之前一次的执行结果。

然后新新建一个线性流flow,并在其中顺序加入上述3个task对象。TaskFlow中的流flow用来关联各个task,并且规范这些task之间的执行和回滚顺序。

TaskFlow中所支持的流类型。

流类型 说明
linear_flow.Flow 线性流,流中的task/flow按加入顺序执行,按加入顺序的倒序回滚。
unordered_flow.Flow 无顺序流,流中的task/flow的执行和回滚可以按任意顺序。
graph_flow.Flow 图流,流中的task/flow按照显式指定的依赖关系,或者通过其间provides和requires属性之间的隐含依赖关系,来执行或回滚。

这个示例中,由于采用的是线性流,所以这个流中task的执行顺序为:CallJim -> CallJoe -> CallHY,回滚顺序是其倒序。

retry

流中不仅可以加入任务,还可以嵌套加入其他的流。此外,流还可以通过retry来控制当错误发生时,该如何重试。

TaskFlow支持的retry类型。

Retry类型 说明
AlwaysRevert 错误发生时,回滚子流。
AlwaysRevertAll 错误发生时,回滚所有子流。
Times 错误发生时,重试子流。
ForEach 每次错误发生时,为子流中的atom提供一个新的值,然后重试,直到成功或者此retry中定义的值用光为止。
ParameterizedForEach 类似ForEach,但是是从后台存储中获取重试的值

示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

class EchoTask(task.Task):
def execute(self, name, *args, **kwargs):
print 'my name is %s' % name

def revert(self, name, *args, **kwargs):
print 'sorry, my name is %s' % name


flow = lf.Flow('f1').add(
EchoTask('t1'),
lf.Flow(
'f2',
retry=retry.ForEach(
values=['a', 'b', 'c'],
name='r1',
provides='value'
)
).add(
EchoTask('t2'),
EchoTask('t3', requires='value')
),
EchoTask('t4')

)

上面的示例,构造了一个线性流f1,它按顺序执行任务t1、子线性流f2、和任务t4。子流f2按序执行任务t2和t3。

子流f2定义了ForEach类型的retry“r1”。当任务t2或者t3失败时,子流t2首先会回滚。然后“r1”会指导子流f2使用值“a”来重新运行。如果再次失败,子流f2回滚后会再次使用“b”运行;仍然失败后回滚使用值“c”运行。如果值“c”也运行失败,由于“r1”中能够提供的值已经被完全用完,子流f2回滚后不会重新运行。

engine

TaskFlow库中的engine用来载入一个flow,然后驱动flow中的task/flow运行。可以通过engine_conf来指明不同的engine类型。

engine类型 说明
‘serial’ 所有的task都在调用engine.run的那个线程中运行。
‘parallel’ task可能会被调度到不同的线程中并发运行。
‘worker-based’ task会被调度到不同的worker中运行。
一个worker是一个单独的专门用来运行某些特定task的进程,
这个worker进程可以在远程机器上,利用AMQP来通信。

task和flow的输入/输出

示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow import engines


class Adder(task.Task):
def execute(self, x, y, *args, **kwargs):
return x + y


flow = gf.Flow('root').add(
lf.Flow('nested_linear').add(
# 从后台存储中读取名为y3和y4的参数值,并以参数x,y传递给execute方法
# x2 = y3 + y4
Adder('add2', provides='x2', rebind=['y3', 'y4']),
# x1 = y1 + y2
Adder('add1', provides='x1', rebind=['y1', 'y2']),
),
# x5 = x1 + x3
Adder('add5', provides='x5', rebind=['x1', 'x3']),
# x3 = x1 + x2
Adder('add3', provides='x3', rebind=['x1', 'x2']),
# x4 = x2 + y5
Adder('add4', provides='x4', rebind=['x2', 'y5']),
# x6 = x5 + x4
Adder('add6', provides='x6', rebind=['x5', 'x4']),
# x7 = x6 + x6
Adder('add7', provides='x7', rebind=['x6', 'x6'])
)

store = {
'y1': 1,
'y2': 3,
'y3': 5,
'y4': 7,
'y5': 9
}

result = engines.run(
flow=flow,
store=store,
engine_conf='serial'
)

print 'single thread engine result %s' % result

result = engines.run(
flow=flow,
store=store,
engine_conf='parallel'
)

print 'multi thread engine result %s' % result

上面的示例中,定义了一个Task对象Adder,作用是完成一个加法。接下去生成一个图类型的流root,其中的task都通过provides和rebind来指明它们的输出和输入。

在engine运行时,通过store参数为流root提供所需要的输入参数,engine会把store的值保存在后台存储中:在执行各个task的过程中,各个task的输入都是从后台存储中获取,输出都保存在后台存储中。这个程序的输出结果如下:

1
2
Single thread engine result {'x1': 4, 'y5': 9, 'y4': 7, 'y1': 1, 'x2': 12, 'x3': 16, 'y3': 5, 'y2': 3, 'x6': 41, 'x7': 82, 'x4': 21, 'x5': 20}.
Multi thread engine result {'x1': 4, 'y5': 9, 'y4': 7, 'y1': 1, 'x2': 12, 'x3': 16, 'y3': 5, 'y2': 3, 'x6': 41, 'x7': 82, 'x4': 21, 'x5': 20}.

TaskFlow中的Task和Retry都是Atom的子类。对于任何一种Atom对象,都可以通过requires属性来了解它所要求的输入参数,和通过provides属性来了解它能够提供的输出结果的名字。requires和provides的类型都是包含参数名称的集合(set)。

requires

Task对象的requires可以由execute方法获得。比如示例中的Adder对象,由于execute方法的参数是execute(self, x, y),所以它的requires为:

1
2
3
Adder().requires

OrderedSet(['x', 'y'])

注意,execute方法的可选参数和*args和**kwargs并不会出现在requires中。

1
2
3
4
5
6
7
class MyTask(task.Task):
def execute(self, spam, eggs=(), *args, **kwargs):
return spam + eggs


MyTask().requires
OrderedSet(['spam'])
1
2
3
4
5
6
7
class UniTask(task.Task):
def execute(self, *args, **kwargs):
pass


UniTask().requires
OrderedSet([])

此外,也可以在创建Task时明确指定它的输入参数要求,这些参数在调用execute()方法时可以通过kwargs获得:

1
2
3
4
5
6
7
8
class Dog(task.Task):
def execute(self, food, *args, **kwargs):
pass


dog = Dog(requires=('water', 'grass'))
dog.requires
OrderedSet(['food', 'water', 'grass'])

rebind

在有些情况下,传递给某个task的输入参数名称和其所需要的参数名不同,这个时候可以通过rebind来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from taskflow import task


class SpawnVMTask(task.Task):
def execute(self, vm_name, vm_image_id, *args, **kwargs):
pass


# engine执行下面这个task时,会从后台存储中获取到名为'name'的参数值
# 然后把它当做vm_name参数传递给task的execute()方法
s = SpawnVMTask(rebind={'vm_name': 'name'})
s.requires
OrderedSet(['name', 'vm_image_id'])

# engine执行下面这个task时,会从后台存储中获取到名为'name', 'image_id'
# 和'admin_key_name'的参数值,把name和image_id分别当做vm_name
# 和vm_image_id参数,把admin_key_name当做args参数中的某一项传递
# 给task的execute()方法
s = SpawnVMTask(rebind=('name', 'image_id', 'admin_key_name'))
s.requires
OrderedSet(['name', 'image_id', 'admin_key_name'])

provides

task的输出结果一般是指其execute()方法的返回值。但是由于Python返回值是没有名字的,所有需要通过Task对象的provides属性指明返回值以什么名称存入后台存储中。根据execute()返回值类型的不同,provides可以有不同的方式指定。

  • 如果execute()方法返回的是一个单一的值。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19

    from taskflow import task
    from taskflow import engines
    from taskflow.patterns import linear_flow as lf


    class TheAnswerReturningTask(task.Task):
    def execute(self, *args, **kwargs):
    return 24


    t = TheAnswerReturningTask(provides='the_answer')
    flow = lf.Flow('linear').add(t)
    result = engines.run(
    flow=flow,
    engine_conf='serial'
    )
    result
    {'the_answer': 24}
  • 如果execute()方法返回元组tuple。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    from taskflow import task
    from taskflow import engines
    from taskflow.patterns import linear_flow as lf


    class BitsAndPiecesTask(task.Task):
    def execute(self, *args, **kwargs):
    return 'Bits', 'Pieces'


    t = BitsAndPiecesTask(provides=('bits', 'pieces'))
    flow = lf.Flow('linear').add(t)
    result = engines.run(
    flow=flow,
    engine_conf='serial'
    )
    result
    {'bits': 'Bits', 'pieces': 'Pieces'}
  • 如果execute()方法返回字典。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    from taskflow import task
    from taskflow import engines
    from taskflow.patterns import linear_flow as lf


    class BitsAndPiecesTask(task.Task):
    def execute(self, *args, **kwargs):
    return {'Bits': 123, 'Pieces': 321}


    t = BitsAndPiecesTask(provides={'Bits', 'Pieces'})
    flow = lf.Flow('linear').add(t)
    result = engines.run(
    flow=flow,
    engine_conf='serial'
    )
    result
    {'Bits': 123, 'Pieces': 321}

oslo.policy

oslo.policy用于控制用户的权限,能够执行什么样的操作。

OpenStack的每个项目都有一个/etc//policy.yaml文件,通过这个配置文件来实现对用户的权限管理。

将policy操作的公共部分提取出来,就形成了oslo.policy库,它会负责policy的验证和rules的管理。

policy的验证,其实就是对字典key和value的判断,如果匹配成功,则通过policy,否则失败。

各个工程的API通过policy来检测用户身份群权限的规则,例如有些API只有管理员权限可以执行,有些普通用户可以执行,在代码中的体现就是判断context的project_id和user_id是不是合法类型的。

Nova API示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

def authorize(context, action, target, do_raise=True, exc=None):
"""Verifies that the action is valid on the target in this context.

:param context: nova context
:param action: string representing the action to be checked
this should be colon separated for clarity.
i.e. ``compute:create_instance``,
``compute:attach_volume``,
``volume:attach_volume``
:param target: dictionary representing the object of the action
for object creation this should be a dictionary representing the
location of the object e.g. ``{'project_id': context.project_id}``
:param do_raise: if True (the default), raises PolicyNotAuthorized;
if False, returns False
:param exc: Class of the exception to raise if the check fails.
Any remaining arguments passed to :meth:`authorize` (both
positional and keyword arguments) will be passed to
the exception class. If not specified,
:class:`PolicyNotAuthorized` will be used.

:raises nova.exception.PolicyNotAuthorized: if verification fails
and do_raise is True. Or if 'exc' is specified it will raise an
exception of that type.

:return: returns a non-False value (not necessarily "True") if
authorized, and the exact value False if not authorized and
do_raise is False.
"""
init()
credentials = context.to_policy_values()
if not exc:
exc = exception.PolicyNotAuthorized
try:
result = _ENFORCER.authorize(action, target, credentials,
do_raise=do_raise, exc=exc, action=action)
except policy.PolicyNotRegistered:
with excutils.save_and_reraise_exception():
LOG.exception(_LE('Policy not registered'))
except Exception:
with excutils.save_and_reraise_exception():
LOG.debug('Policy check for %(action)s failed with credentials '
'%(credentials)s',
{'action': action, 'credentials': credentials})
return result

对应/etc/nova/policy.json文件内容如下:

1
2
3
4
5
6
{
"context_is_admin": "role:admin",
"admin_or_owner": "is_admin:True or project_id:%(project_id)s",
"default": "rule:admin_or_owner"
......
}

从上面的例子可以看到,nova policy的rule是“is_admin:True or project_id:%(project_id)s”,需要验证policy验证是不是admin用户或者project_id是不是匹配。

oslo.rootwarp

oslo.rootwarp可以让其他OpenStack服务以root身份执行shell命令。一般来说,OpenStack的服务都是以非特权用户的身份运行的,但是当它们需要以root身份运行某些shell命令时,就需要利用到oslo.rootwrap的功能。

oslo.rootwarp首先从配置文件所定义的Filter文件目录中读取所有Filter定义,然后检查要运行的shell命令是否和Filter中的定义相匹配,匹配则运行,不匹配就不运行。

构造rootwrap shell脚本

使用rootwrap需要在一个单独的Python进程中以root身份调用Python函数oslo.rootwrap.cmd.main()。可以通过Setuptools中的console script来构造这样一个脚本。

以nova为例。

1
2
3
4
5
6
7
8
9
10
# setup.cfg

console_scripts =
nova-api = nova.cmd.api:main
nova-compute = nova.cmd.compute:main
nova-conductor = nova.cmd.conductor:main
nova-console = nova.cmd.console:main
nova-manage = nova.cmd.manage:main
nova-policy = nova.cmd.policy:main
nova-rootwrap = oslo_rootwrap.cmd:main

可以看到构造一个名为nova-rootwrap的shell脚本时,会调用oslo.rootwrap.cmd.main()函数。运行“python setup.py install”之后,nova-rootwrap脚本就会被生成。

调用rootwrap shell脚本

rootwrap的shell脚本需要以sudo方式调用。

1
sudo nova-rootwrap /etc/nova/rootwrap.conf COMAND_LINE

其中/etc/nova/rootwrap.conf是oslo.rootwrap的配置文件名,COMAND_LINE是希望root用户身份运行的shell命令。

由于rootwrap shell 脚本需要以sudo方式运行,所以还需要配置sudoers文件:

1
nova ALL = (root) NOPASSWD: /var/lib/kolla/venv/bin/nova-rootwrap /etc/nova/rootwrap.conf *

rootwrap配置文件

rootwrap配置文件是以INI的文件格式存放的。

rootwrap 配置选项rootwrap 配置选项

定义Filter

Filter定义文件一般以.filter后缀结尾,放在配置选项filters_path所指定的目录中。这些定义文件以ini格式存放,Filter的定义存放在[Filter]节中。定义的格式如下。

1
Filter名: Filter类, [Filter类参数1, Filter类参数2, ...]

rootwrap目前所支持的Filter类型。

rootwrap 所支持的Filter类型rootwrap 所支持的Filter类型

oslo.test

oslo.test库提供单元测试的基础框架。

oslo.versionedobjects

在项目的不断迭代和升级中,数据库结构和API接口的改动不可避免,如果没有一个版本控制的概念在里面,新旧不同模块之间交互就很容易出现问题。oslo.versionedobjects库提供一种通用的自带版本的对象模型,自带序列化功能,可以很容易地和oslo.messaging结合进行远程调用。