目录

  1. 介绍
  2. 版本
  3. 服务启动
    1. WSGI App
      1. WSGI App 启动
      2. WSGI 配置文件
    2. APIRouter
    3. 插件加载
      1. NeutronManager初始化
        1. ml2插件加载
          1. TypeManager初始化
          2. ExtensionManager初始化
          3. MechanismManager初始化
          4. NeutronDbPluginV2初始化
          5. Dirver初始化
            1. Type_Driver初始化
            2. Extension_Driver初始化
            3. Mechanism_Driver初始化
          6. DHCP组件初始化
            1. RPC_Notifier初始化

介绍

neutron-server提供了一个公开的Neutron API的Web服务器,并将所有Web服务调用传递给Neutron插件进行处理。

版本

Rocky

服务启动

neutron-server的本质是一个Python Web Server Gateway Interface(WSGI),是通过eventlet来实现服务的异步并发模型的。实际工作时,通过serve_wsgi()方法来构造NeutronApiService实例,通过该实例生成eventlet.Greenpool()来运行WSGI App来响应客户端请求。

neutron的所有服务(services)的启动入口都定义在setup.cfg文件中的console_scripts。

neutron-server服务启动方法。

1
2
3
4
# setup.py
[entry_points]
console_scripts =
neutron-server = neutron.cmd.eventlet.server:main

查看neutron.cmd.eventlet.server.main()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
# neutron/cmd/eventlet/server/__init__.py

from neutron import server
from neutron.server import rpc_eventlet
from neutron.server import wsgi_eventlet


def main():
server.boot_server(wsgi_eventlet.eventlet_wsgi_server)


def main_rpc_eventlet():
server.boot_server(rpc_eventlet.eventlet_rpc_server)

查看wsgi_eventlet.eventlet_wsgi_server方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# neutron/server/wsgi_eventlet.py

def eventlet_wsgi_server():
# 启动wsigi服务
neutron_api = service.serve_wsgi(service.NeutronApiService)
start_api_and_rpc_workers(neutron_api)


def start_api_and_rpc_workers(neutron_api):
try:
worker_launcher = service.start_all_workers()

# 创建eventlet.GreenPool()
pool = eventlet.GreenPool()

# 创建api_thread用来监听api命令
# 当命令到达时,通过wsgi服务路由到neutron.api.v2.base中的Controller中去处理。
api_thread = pool.spawn(neutron_api.wait)

# 创建plugin_workers_thread与rpc_work相关联,监听topics中的消息队列的请求,
# 完成neutron内部组件之间的通信
plugin_workers_thread = pool.spawn(worker_launcher.wait)

# api and other workers should die together. When one dies,
# kill the other.
api_thread.link(lambda gt: plugin_workers_thread.kill())
plugin_workers_thread.link(lambda gt: api_thread.kill())

pool.waitall()
except NotImplementedError:
LOG.info("RPC was already started in parent process by "
"plugin.")

neutron_api.wait()

WSGI App

WSGI App 启动

查看NeutronApiService类和serve_wsgi()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# neutron/service.py

class NeutronApiService(WsgiService):
"""Class for neutron-api service."""
def __init__(self, app_name):
profiler.setup('neutron-server', cfg.CONF.host)
super(NeutronApiService, self).__init__(app_name)

@classmethod
def create(cls, app_name='neutron'):
# Setup logging early
config.setup_logging()
service = cls(app_name)
return service


def serve_wsgi(cls):

try:
service = cls.create()
service.start()
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception('Unrecoverable error: please check log '
'for details.')

registry.publish(resources.PROCESS, events.BEFORE_SPAWN, service)
return service

查看NeutronApiService的基类WsgiService。上面serve_wsgi方法中service.start()调用就是WsgiService中的start()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# neutron/service.py

class WsgiService(object):
"""Base class for WSGI based services.

For each api you define, you must also define these flags:
:<api>_listen: The address on which to listen
:<api>_listen_port: The port on which to listen

"""

def __init__(self, app_name):
self.app_name = app_name
self.wsgi_app = None

def start(self):
self.wsgi_app = _run_wsgi(self.app_name)

def wait(self):
self.wsgi_app.wait()

查看_run_wsgi()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# neutron/service.py

# 通过调用load_paste_app()方法生成app,并调用run_wsgi_app()方法来启动app。
def _run_wsgi(app_name):
app = config.load_paste_app(app_name)
if not app:
LOG.error('No known API applications configured.')
return
return run_wsgi_app(app)


def run_wsgi_app(app):
server = wsgi.Server("Neutron")
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
workers=_get_api_workers())
LOG.info("Neutron service started, listening on %(host)s:%(port)s",
{'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port})
return server

查看load_paste_app()方法,从注释可以了解,该方法从配置文件构造并返回一个WSGI App。根据追踪,该配置文件主要是/etc/neutron/api-paste.ini,server通过解析该文件来指定WSGI App的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# neutron/common/config.py

def load_paste_app(app_name):
"""Builds and returns a WSGI app from a paste config file.

:param app_name: Name of the application to load
"""
# 加载WSGI的配置文件
loader = wsgi.Loader(cfg.CONF)

# Log the values of registered opts
if cfg.CONF.debug:
cfg.CONF.log_opt_values(LOG, logging.DEBUG)
app = loader.load_app(app_name)
return app

Loader类的load_app()方法中,调用paste模块库deploy来实现对api-paste.ini中配置信息的解析和app的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# oslo_service/wsgi.py

class Loader(object):
"""Used to load WSGI applications from paste configurations."""

def __init__(self, conf):
"""Initialize the loader, and attempt to find the config.

:param conf: Application config
:returns: None

"""
conf.register_opts(_options.wsgi_opts)
self.config_path = None

config_path = conf.api_paste_config
if not os.path.isabs(config_path):
self.config_path = conf.find_file(config_path)
elif os.path.exists(config_path):
self.config_path = config_path

if not self.config_path:
raise ConfigNotFound(path=config_path)

def load_app(self, name):
"""Return the paste URLMap wrapped WSGI application.

:param name: Name of the application to load.
:returns: Paste URLMap object wrapping the requested application.
:raises: PasteAppNotFound

"""
try:
LOG.debug("Loading app %(name)s from %(path)s",
{'name': name, 'path': self.config_path})
return deploy.loadapp("config:%s" % self.config_path, name=name)
except LookupError:
LOG.exception("Couldn't lookup app: %s", name)
raise PasteAppNotFound(name=name, path=self.config_path)

PasteDeployment是一种机制或者说是一种设计模式,它用于在应用WSGI Application和Server提供一个联系的桥梁,并且为用户提供一个接口,当配置好PasteDeployment之后,用户只需调用loadapp方法就可以使用现有的WSGI Application,而保持了WSGI Application对用户的透明性。

WSGI 配置文件

追踪配置文件加载。首先查看neutron/service.py文件。

1
2
3
# neutron/service.py

from neutron import wsgi

找到neutron/wsgi.py文件

1
2
3
# neutron/wsgi.py

wsgi_config.register_socket_opts()

register_socket_opts()方法注册WSGI和socket的配置项

1
2
3
4
5
# neutron/conf/wsgi.py

def register_socket_opts(cfg=cfg.CONF):
cfg.register_opts(socket_opts)
wsgi.register_opts(cfg)

oslo.serivce中的wsgi.py文件。

1
2
3
4
5
6
# oslo_service/wsgi.py

def register_opts(conf):
"""Registers WSGI config options."""
return conf.register_opts(_options.wsgi_opts)

_options.wsgi_opts定义WSGI的配置项。其中api_paste_config默认是api-paste.ini,一般位于/etc/neutron/目录下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# oslo_service/_options.py

wsgi_opts = [
cfg.StrOpt('api_paste_config',
default="api-paste.ini",
help='File name for the paste.deploy config for api service'),
cfg.StrOpt('wsgi_log_format',
default='%(client_ip)s "%(request_line)s" status: '
'%(status_code)s len: %(body_length)s time:'
' %(wall_seconds).7f',
help='A python format string that is used as the template to '
'generate log lines. The following values can be'
'formatted into it: client_ip, date_time, request_line, '
'status_code, body_length, wall_seconds.'),
cfg.IntOpt('tcp_keepidle',
default=600,
help="Sets the value of TCP_KEEPIDLE in seconds for each "
"server socket. Not supported on OS X."),
cfg.IntOpt('wsgi_default_pool_size',
default=100,
help="Size of the pool of greenthreads used by wsgi"),
cfg.IntOpt('max_header_line',
default=16384,
help="Maximum line size of message headers to be accepted. "
"max_header_line may need to be increased when using "
"large tokens (typically those generated when keystone "
"is configured to use PKI tokens with big service "
"catalogs)."),
cfg.BoolOpt('wsgi_keep_alive',
default=True,
help="If False, closes the client socket connection "
"explicitly."),
cfg.IntOpt('client_socket_timeout', default=900,
help="Timeout for client connections' socket operations. "
"If an incoming connection is idle for this number of "
"seconds it will be closed. A value of '0' means "
"wait forever."),
]

api-paste.ini文件的主要内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# /etc/api-paste.ini

[composite:neutron]
use = egg:Paste#urlmap
/: neutronversions_composite
/v2.0: neutronapi_v2_0

[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory
noauth = cors http_proxy_to_wsgi request_id catch_errors extensions neutronapiapp_v2_0
keystone = cors http_proxy_to_wsgi request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

[composite:neutronversions_composite]
use = call:neutron.auth:pipeline_factory
noauth = cors http_proxy_to_wsgi neutronversions
keystone = cors http_proxy_to_wsgi neutronversions

[filter:request_id]
paste.filter_factory = oslo_middleware:RequestId.factory

[filter:catch_errors]
paste.filter_factory = oslo_middleware:CatchErrors.factory

[filter:cors]
paste.filter_factory = oslo_middleware.cors:filter_factory
oslo_config_project = neutron

[filter:http_proxy_to_wsgi]
paste.filter_factory = oslo_middleware.http_proxy_to_wsgi:HTTPProxyToWSGI.factory

[filter:keystonecontext]
paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory

[filter:authtoken]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory

[filter:extensions]
paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

[app:neutronversions]
paste.app_factory = neutron.pecan_wsgi.app:versions_factory

[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

[filter:osprofiler]
paste.filter_factory = osprofiler.web:WsgiMiddleware.factory

APIRouter

查看neutron/api/v2/router.py文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# neutron/api/v2/router.py

from neutron.pecan_wsgi import app as pecan_app


def APIRouter(**local_config):
return pecan_app.v2_factory(None, **local_config)


def _factory(global_config, **local_config):
return pecan_app.v2_factory(global_config, **local_config)


setattr(APIRouter, 'factory', _factory)

查看v2_factory()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# /neutron/peacn_wsig/app.py

def v2_factory(global_config, **local_config):
# Processing Order:
# As request enters lower priority called before higher.
# Response from controller is passed from higher priority to lower.
app_hooks = [
hooks.UserFilterHook(), # priority 90
hooks.ContextHook(), # priority 95
hooks.ExceptionTranslationHook(), # priority 100
hooks.BodyValidationHook(), # priority 120
hooks.OwnershipValidationHook(), # priority 125
hooks.QuotaEnforcementHook(), # priority 130
hooks.NotifierHook(), # priority 135
hooks.QueryParametersHook(), # priority 139
hooks.PolicyHook(), # priority 140
]
app = pecan.make_app(root.V2Controller(),
debug=False,
force_canonical=False,
hooks=app_hooks,
guess_content_type_from_ext=True)
startup.initialize_all()
return app

查看root.V2Controller()类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# neutron/pecan_wsgi/controllers/root.py

class V2Controller(object):

# Same data structure as neutron.api.versions.Versions for API backward
# compatibility
version_info = {
'id': 'v2.0',
'status': 'CURRENT'
}
_load_version_info(version_info)

# NOTE(blogan): Paste deploy handled the routing to the legacy extension
# controller. If the extensions filter is removed from the api-paste.ini
# then this controller will be routed to This means operators had
# the ability to turn off the extensions controller via tha api-paste but
# will not be able to turn it off with the pecan switch.
extensions = ext_ctrl.ExtensionsController()

@utils.expose(generic=True)
def index(self):
if not pecan.request.path_url.endswith('/'):
pecan.abort(404)

layout = []
for name, collection in _CORE_RESOURCES.items():
href = urlparse.urljoin(pecan.request.path_url, collection)
resource = {'name': name,
'collection': collection,
'links': [{'rel': 'self',
'href': href}]}
layout.append(resource)
return {'resources': layout}

@utils.when(index, method='HEAD')
@utils.when(index, method='POST')
@utils.when(index, method='PATCH')
@utils.when(index, method='PUT')
@utils.when(index, method='DELETE')
def not_supported(self):
pecan.abort(405)

@utils.expose()
def _lookup(self, collection, *remainder):
# if collection exists in the extension to service plugins map then
# we are assuming that collection is the service plugin and
# needs to be remapped.
# Example: https://neutron.endpoint/v2.0/lbaas/loadbalancers
if (remainder and
manager.NeutronManager.get_resources_for_path_prefix(
collection)):
collection = remainder[0]
remainder = remainder[1:]
controller = manager.NeutronManager.get_controller_for_resource(
collection)
if not controller:
LOG.warning("No controller found for: %s - returning response "
"code 404", collection)
pecan.abort(404)
# Store resource and collection names in pecan request context so that
# hooks can leverage them if necessary. The following code uses
# attributes from the controller instance to ensure names have been
# properly sanitized (eg: replacing dashes with underscores)
request.context['resource'] = controller.resource
request.context['collection'] = controller.collection
# NOTE(blogan): initialize a dict to store the ids of the items walked
# in the path for example: /networks/1234 would cause uri_identifiers
# to contain: {'network_id': '1234'}
# This is for backwards compatibility with legacy extensions that
# defined their own controllers and expected kwargs to be passed in
# with the uri_identifiers
request.context['uri_identifiers'] = {}
return controller, remainder

插件加载

查看startup.initialize_all()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# neutron/pecan_wsgi/startup.py

RESOURCES = {'network': 'networks',
'subnet': 'subnets',
'subnetpool': 'subnetpools',
'port': 'ports'}


def initialize_all():
# NeutronManager初始化
manager.init()
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
ext_mgr.extend_resources("2.0", attributes.RESOURCES)
# At this stage we have a fully populated resource attribute map;
# build Pecan controllers and routes for all core resources
plugin = directory.get_plugin()
for resource, collection in RESOURCES.items():
resource_registry.register_resource_by_name(resource)
new_controller = res_ctrl.CollectionsController(collection, resource,
plugin=plugin)
manager.NeutronManager.set_controller_for_resource(
collection, new_controller)
manager.NeutronManager.set_plugin_for_resource(collection, plugin)

pecanized_resources = ext_mgr.get_pecan_resources()
for pec_res in pecanized_resources:
manager.NeutronManager.set_controller_for_resource(
pec_res.collection, pec_res.controller)
manager.NeutronManager.set_plugin_for_resource(
pec_res.collection, pec_res.plugin)

# Now build Pecan Controllers and routes for all extensions
resources = ext_mgr.get_resources()
# Extensions controller is already defined, we don't need it.
resources.pop(0)
for ext_res in resources:
path_prefix = ext_res.path_prefix.strip('/')
collection = ext_res.collection
# Retrieving the parent resource. It is expected the format of
# the parent resource to be:
# {'collection_name': 'name-of-collection',
# 'member_name': 'name-of-resource'}
# collection_name does not appear to be used in the legacy code
# inside the controller logic, so we can assume we do not need it.
parent = ext_res.parent or {}
parent_resource = parent.get('member_name')
collection_key = collection
if parent_resource:
collection_key = '/'.join([parent_resource, collection])
collection_actions = ext_res.collection_actions
member_actions = ext_res.member_actions
if manager.NeutronManager.get_controller_for_resource(collection_key):
# This is a collection that already has a pecan controller, we
# do not need to do anything else
continue
legacy_controller = getattr(ext_res.controller, 'controller',
ext_res.controller)
new_controller = None
if isinstance(legacy_controller, base.Controller):
resource = legacy_controller.resource
plugin = legacy_controller.plugin
attr_info = legacy_controller.attr_info
member_actions = legacy_controller.member_actions
pagination = legacy_controller.allow_pagination
sorting = legacy_controller.allow_sorting
# NOTE(blogan): legacy_controller and ext_res both can both have
# member_actions. the member_actions for ext_res are strictly for
# routing, while member_actions for legacy_controller are used for
# handling the request once the routing has found the controller.
# They're always the same so we will just use the ext_res
# member_action.
new_controller = res_ctrl.CollectionsController(
collection, resource, resource_info=attr_info,
parent_resource=parent_resource, member_actions=member_actions,
plugin=plugin, allow_pagination=pagination,
allow_sorting=sorting, collection_actions=collection_actions)
# new_controller.collection has replaced hyphens with underscores
manager.NeutronManager.set_plugin_for_resource(
new_controller.collection, plugin)
if path_prefix:
manager.NeutronManager.add_resource_for_path_prefix(
collection, path_prefix)
else:
new_controller = utils.ShimCollectionsController(
collection, None, legacy_controller,
collection_actions=collection_actions,
member_actions=member_actions,
action_status=ext_res.controller.action_status,
collection_methods=ext_res.collection_methods)

manager.NeutronManager.set_controller_for_resource(
collection_key, new_controller)

# Certain policy checks require that the extensions are loaded
# and the RESOURCE_ATTRIBUTE_MAP populated before they can be
# properly initialized. This can only be claimed with certainty
# once this point in the code has been reached. In the event
# that the policies have been initialized before this point,
# calling reset will cause the next policy check to
# re-initialize with all of the required data in place.
policy.reset()

NeutronManager初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# neutron/manager.py

@six.add_metaclass(profiler.TracedMeta)
class NeutronManager(object):
"""Neutron's Manager class.

Neutron's Manager class is responsible for parsing a config file and
instantiating the correct plugin that concretely implements
neutron_plugin_base class.
"""
# TODO(armax): use of the singleton pattern for this class is vestigial,
# and it is mainly relied on by the unit tests. It is safer to get rid
# of it once the entire codebase (neutron + subprojects) has switched
# entirely to using the plugins directory.
_instance = None
__trace_args__ = {"name": "rpc"}

def __init__(self, options=None, config_file=None):
# Store instances of already loaded plugins to avoid instantiate same
# plugin more than once
self._loaded_plugins = {}
# If no options have been provided, create an empty dict
if not options:
options = {}

msg = validate_pre_plugin_load()
if msg:
LOG.critical(msg)
raise Exception(msg)

# NOTE(jkoelker) Testing for the subclass with the __subclasshook__
# breaks tach monitoring. It has been removed
# intentionally to allow v2 plugins to be monitored
# for performance metrics.

# ml2
plugin_provider = cfg.CONF.core_plugin
LOG.info("Loading core plugin: %s", plugin_provider)
# NOTE(armax): keep hold of the actual plugin object

# CORE_PLUGINS_NAMESPACE = 'neutron.core_plugins'
plugin = self._get_plugin_instance(CORE_PLUGINS_NAMESPACE,
plugin_provider)
directory.add_plugin(lib_const.CORE, plugin)
msg = validate_post_plugin_load()
if msg:
LOG.critical(msg)
raise Exception(msg)

# load services from the core plugin first
self._load_services_from_core_plugin(plugin)
self._load_service_plugins()
# Used by pecan WSGI
self.resource_plugin_mappings = {}
self.resource_controller_mappings = {}
self.path_prefix_resource_mappings = defaultdict(list)

ml2插件加载

_get_plugin_instance()方法,该方法加载neutron的核心插件ml2。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@staticmethod
def load_class_for_provider(namespace, plugin_provider):
"""Loads plugin using alias or class name

:param namespace: namespace where alias is defined
:param plugin_provider: plugin alias or class name
:returns: plugin that is loaded
:raises ImportError: if fails to load plugin
"""

try:
return runtime.load_class_by_alias_or_classname(namespace,
plugin_provider)
except ImportError:
with excutils.save_and_reraise_exception():
LOG.error("Plugin '%s' not found.", plugin_provider)

def _get_plugin_class(self, namespace, plugin_provider):
return self.load_class_for_provider(namespace, plugin_provider)

def _get_plugin_instance(self, namespace, plugin_provider):
plugin_class = self._get_plugin_class(namespace, plugin_provider)
plugin_inst = self._loaded_plugins.get(plugin_class)
if not plugin_inst:
plugin_inst = plugin_class()
self._loaded_plugins[plugin_class] = plugin_inst
return plugin_inst

setup.cfg文件中的entry_points里定义neutron.core_plugins的实现类。

1
2
3
[entry_points]
neutron.core_plugins =
ml2 = neutron.plugins.ml2.plugin:Ml2Plugin

Ml2Plugin()类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# neutron/plugins/ml2/plugin.py

@resource_extend.has_resource_extenders
@registry.has_registry_receivers
class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
dvr_mac_db.DVRDbMixin,
external_net_db.External_net_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin,
agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
addr_pair_db.AllowedAddressPairsMixin,
vlantransparent_db.Vlantransparent_db_mixin,
extradhcpopt_db.ExtraDhcpOptMixin,
address_scope_db.AddressScopeDbMixin,
subnet_service_type_mixin.SubnetServiceTypeMixin):

@resource_registry.tracked_resources(
network=models_v2.Network,
port=models_v2.Port,
subnet=models_v2.Subnet,
subnetpool=models_v2.SubnetPool,
security_group=sg_models.SecurityGroup,
security_group_rule=sg_models.SecurityGroupRule)
def __init__(self):
# First load drivers, then initialize DB, then initialize drivers

# TypeManager初始化
self.type_manager = managers.TypeManager()
# ExtensionManager初始化
self.extension_manager = managers.ExtensionManager()
# MechanismManager初始化
self.mechanism_manager = managers.MechanismManager()

NeutronDbPluginV2初始化
super(Ml2Plugin, self).__init__()

# type_driver初始化
self.type_manager.initialize()

# extension_driver初始化
self.extension_manager.initialize()

# mechanism_driver初始化
self.mechanism_manager.initialize()

# DHCP组件初始化
self._setup_dhcp()

# rpc_notifier初始化
self._start_rpc_notifiers()
self.add_agent_status_check_worker(self.agent_health_check)
self.add_workers(self.mechanism_manager.get_workers())
self._verify_service_plugins_requirements()
LOG.info("Modular L2 Plugin initialization complete")

TypeManager初始化

TypeManager类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# neutron/plugins/ml2/managers.py

class TypeManager(stevedore.named.NamedExtensionManager):
"""Manage network segment types using drivers."""

def __init__(self):
# Mapping from type name to DriverManager
self.drivers = {}

LOG.info("Configured type driver names: %s",
cfg.CONF.ml2.type_drivers)
super(TypeManager, self).__init__('neutron.ml2.type_drivers',
cfg.CONF.ml2.type_drivers,
invoke_on_load=True)
LOG.info("Loaded type driver names: %s", self.names())

# 注册type_driver
self._register_types()

# tenant_network_type和external_network_type
# 必须是conf.ml2.type_drivers中定义的
# 检查tenant_network_types
self._check_tenant_network_types(cfg.CONF.ml2.tenant_network_types)
# 检查external_network_type,默认为空
self._check_external_network_type(cfg.CONF.ml2.external_network_type)

def _register_types(self):
for ext in self:

# type_driver对应实现类的get_type()
# 例如:vxlan的实现类是neutron.plugins.ml2.drivers.type_vxlan:VxlanTypeDriver
# VxlanTypeDriver中的get_type()方法返回的是vxlan。
network_type = ext.obj.get_type()

if network_type in self.drivers:
LOG.error("Type driver '%(new_driver)s' ignored because"
" type driver '%(old_driver)s' is already"
" registered for type '%(type)s'",
{'new_driver': ext.name,
'old_driver': self.drivers[network_type].name,
'type': network_type})
else:
self.drivers[network_type] = ext
LOG.info("Registered types: %s", self.drivers.keys())

def _check_tenant_network_types(self, types):
self.tenant_network_types = []
for network_type in types:
if network_type in self.drivers:
self.tenant_network_types.append(network_type)
else:
LOG.error("No type driver for tenant network_type: %s. "
"Service terminated!", network_type)
raise SystemExit(1)
LOG.info("Tenant network_types: %s", self.tenant_network_types)

def _check_external_network_type(self, ext_network_type):
if ext_network_type and ext_network_type not in self.drivers:
LOG.error("No type driver for external network_type: %s. "
"Service terminated!", ext_network_type)
raise SystemExit(1)

def initialize(self):
for network_type, driver in self.drivers.items():
LOG.info("Initializing driver for type '%s'", network_type)
# 初识化每一个type_driver
driver.obj.initialize()

type_drivers插件的初始化方法,在setup.cfg文件中的entry_points里面定义。

1
2
3
4
5
6
7
8
[entrypoint]
neutron.ml2.type_drivers =
flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver
local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver
vlan = neutron.plugins.ml2.drivers.type_vlan:VlanTypeDriver
geneve = neutron.plugins.ml2.drivers.type_geneve:GeneveTypeDriver
gre = neutron.plugins.ml2.drivers.type_gre:GreTypeDriver
vxlan = neutron.plugins.ml2.drivers.type_vxlan:VxlanTypeDriver
ExtensionManager初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# neutron/plugins/ml2/managers.py

class ExtensionManager(stevedore.named.NamedExtensionManager):
"""Manage extension drivers using drivers."""

def __init__(self):
# Ordered list of extension drivers, defining
# the order in which the drivers are called.
self.ordered_ext_drivers = []

LOG.info("Configured extension driver names: %s",
cfg.CONF.ml2.extension_drivers)
super(ExtensionManager, self).__init__('neutron.ml2.extension_drivers',
cfg.CONF.ml2.extension_drivers,
invoke_on_load=True,
name_order=True)
LOG.info("Loaded extension driver names: %s", self.names())
self._register_drivers()

def _register_drivers(self):
"""Register all extension drivers.

This method should only be called once in the ExtensionManager
constructor.
"""
for ext in self:
self.ordered_ext_drivers.append(ext)
LOG.info("Registered extension drivers: %s",
[driver.name for driver in self.ordered_ext_drivers])

def initialize(self):
# Initialize each driver in the list.
for driver in self.ordered_ext_drivers:
LOG.info("Initializing extension driver '%s'", driver.name)
driver.obj.initialize()

extension_driver的初始化方法定义在setup.cfg文件中的entry_points里面。

1
2
3
4
5
6
7
8
neutron.ml2.extension_drivers =
test = neutron.tests.unit.plugins.ml2.drivers.ext_test:TestExtensionDriver
testdb = neutron.tests.unit.plugins.ml2.drivers.ext_test:TestDBExtensionDriver
port_security = neutron.plugins.ml2.extensions.port_security:PortSecurityExtensionDriver
qos = neutron.plugins.ml2.extensions.qos:QosExtensionDriver
dns = neutron.plugins.ml2.extensions.dns_integration:DNSExtensionDriverML2
data_plane_status = neutron.plugins.ml2.extensions.data_plane_status:DataPlaneStatusExtensionDriver
dns_domain_ports = neutron.plugins.ml2.extensions.dns_integration:DNSDomainPortsExtensionDriver
MechanismManager初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# neutron/plugins/ml2/managers.py

class MechanismManager(stevedore.named.NamedExtensionManager):
"""Manage networking mechanisms using drivers."""

def __init__(self):
# Registered mechanism drivers, keyed by name.
self.mech_drivers = {}
# Ordered list of mechanism drivers, defining
# the order in which the drivers are called.
self.ordered_mech_drivers = []

LOG.info("Configured mechanism driver names: %s",
cfg.CONF.ml2.mechanism_drivers)
super(MechanismManager, self).__init__(
'neutron.ml2.mechanism_drivers',
cfg.CONF.ml2.mechanism_drivers,
invoke_on_load=True,
name_order=True,
on_missing_entrypoints_callback=self._driver_not_found,
on_load_failure_callback=self._driver_not_loaded
)
LOG.info("Loaded mechanism driver names: %s", self.names())
self._register_mechanisms()
self.host_filtering_supported = self.is_host_filtering_supported()
if not self.host_filtering_supported:
LOG.info("No mechanism drivers provide segment reachability "
"information for agent scheduling.")

def _driver_not_found(self, names):
msg = (_("The following mechanism drivers were not found: %s")
% names)
LOG.critical(msg)
raise SystemExit(msg)

def _driver_not_loaded(self, manager, entrypoint, exception):
LOG.critical("The '%(entrypoint)s' entrypoint could not be"
" loaded for the following reason: '%(reason)s'.",
{'entrypoint': entrypoint,
'reason': exception})
raise SystemExit(str(exception))

def _register_mechanisms(self):
"""Register all mechanism drivers.

This method should only be called once in the MechanismManager
constructor.
"""
for ext in self:
self.mech_drivers[ext.name] = ext
self.ordered_mech_drivers.append(ext)
LOG.info("Registered mechanism drivers: %s",
[driver.name for driver in self.ordered_mech_drivers])

mechanism_driver的初始化方法定义在setup.cfg文件中的entry_points里。

1
2
3
4
5
6
7
8
9
10
neutron.ml2.mechanism_drivers =
logger = neutron.tests.unit.plugins.ml2.drivers.mechanism_logger:LoggerMechanismDriver
test = neutron.tests.unit.plugins.ml2.drivers.mechanism_test:TestMechanismDriver
linuxbridge = neutron.plugins.ml2.drivers.linuxbridge.mech_driver.mech_linuxbridge:LinuxbridgeMechanismDriver
macvtap = neutron.plugins.ml2.drivers.macvtap.mech_driver.mech_macvtap:MacvtapMechanismDriver
openvswitch = neutron.plugins.ml2.drivers.openvswitch.mech_driver.mech_openvswitch:OpenvswitchMechanismDriver
l2population = neutron.plugins.ml2.drivers.l2pop.mech_driver:L2populationMechanismDriver
sriovnicswitch = neutron.plugins.ml2.drivers.mech_sriov.mech_driver.mech_driver:SriovNicSwitchMechanismDriver
fake_agent = neutron.tests.unit.plugins.ml2.drivers.mech_fake_agent:FakeAgentMechanismDriver
faulty_agent = neutron.tests.unit.plugins.ml2.drivers.mech_faulty_agent:FaultyAgentMechanismDriver
NeutronDbPluginV2初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# neutron/db/db_base_plugin_v2.py

@registry.has_registry_receivers
class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
neutron_plugin_base_v2.NeutronPluginBaseV2,
rbac_mixin.RbacPluginMixin,
stattr_db.StandardAttrDescriptionMixin):
"""V2 Neutron plugin interface implementation using SQLAlchemy models.

Whenever a non-read call happens the plugin will call an event handler
class method (e.g., network_created()). The result is that this class
can be sub-classed by other classes that add custom behaviors on certain
events.
"""

# This attribute specifies whether the plugin supports or not
# bulk/pagination/sorting operations. Name mangling is used in
# order to ensure it is qualified by class
__native_bulk_support = True
__native_pagination_support = True
__native_sorting_support = True
# This attribute specifies whether the plugin supports or not
# filter validations. Name mangling is used in
# order to ensure it is qualified by class
__filter_validation_support = False

def has_native_datastore(self):
return True

def __new__(cls, *args, **kwargs):
model_query.register_hook(
models_v2.Port,
"port",
query_hook=None,
filter_hook=_port_filter_hook,
result_filters=None)
return super(NeutronDbPluginV2, cls).__new__(cls, *args, **kwargs)

def __init__(self):
# 设置IP管理后端
self.set_ipam_backend()

# notify_nova_on_port_status_changes默认为True
# 在port状态更新时是否给Nova发送通知
if cfg.CONF.notify_nova_on_port_status_changes:
# Import nova conditionally to support the use case of Neutron
# being used outside of an OpenStack context.
from neutron.notifiers import nova
# NOTE(arosen) These event listeners are here to hook into when
# port status changes and notify nova about their change.
self.nova_notifier = nova.Notifier.get_instance()
lib_db_api.sqla_listen(models_v2.Port, 'after_insert',
self.nova_notifier.send_port_status)
lib_db_api.sqla_listen(models_v2.Port, 'after_update',
self.nova_notifier.send_port_status)
lib_db_api.sqla_listen(
models_v2.Port.status, 'set',
self.nova_notifier.record_port_status_changed)

Dirver初始化
Type_Driver初始化
1
2
3
4
5
6
7
# neutron/plugins/ml2/managers.py
# TypeManager

def initialize(self):
for network_type, driver in self.drivers.items():
LOG.info("Initializing driver for type '%s'", network_type)
driver.obj.initialize()

例如type_driver vxlan的初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# neutron/plugins/ml2/drivers/type_vxlan.py

class VxlanTypeDriver(type_tunnel.EndpointTunnelTypeDriver):

def __init__(self):
super(VxlanTypeDriver, self).__init__(
vxlan_obj.VxlanAllocation, vxlan_obj.VxlanEndpoint)

def get_type(self):
return p_const.TYPE_VXLAN

def initialize(self):
try:
# 初始化vxlan的vni范围
self._initialize(cfg.CONF.ml2_type_vxlan.vni_ranges)
except n_exc.NetworkTunnelRangeError:
LOG.exception("Failed to parse vni_ranges. "
"Service terminated!")
raise SystemExit()

Extension_Driver初始化
1
2
3
4
5
6
7
8
# neutron/plugins/ml2/managers.py
# ExtensionManager

def initialize(self):
# Initialize each driver in the list.
for driver in self.ordered_ext_drivers:
LOG.info("Initializing extension driver '%s'", driver.name)
driver.obj.initialize()

例如extension_driver qos的初始化。

1
2
3
4
5
6
7
# neutron/plugins/ml2/extensions/qos.py

class QosExtensionDriver(api.ExtensionDriver):

def initialize(self):
self.core_ext_handler = qos_core.QosCoreResourceExtension()
LOG.debug("QosExtensionDriver initialization complete")
Mechanism_Driver初始化
1
2
3
4
5
6
7
# neutron/plugins/ml2/managers.py
# MechanismManager

def initialize(self):
for driver in self.ordered_mech_drivers:
LOG.info("Initializing mechanism driver '%s'", driver.name)
driver.obj.initialize()
DHCP组件初始化
1
2
3
4
5
6
7
8
9
10
# neutron/plugins/ml2/plugin.py
# Ml2Plugin

def _setup_dhcp(self):
"""Initialize components to support DHCP."""
self.network_scheduler = importutils.import_object(
# 默认是neutron.scheduler.dhcp_agent_scheduler.WeightScheduler
cfg.CONF.network_scheduler_driver
)
self.add_periodic_dhcp_agent_status_check()

查看WeightScheduler。

1
2
3
4
5
6
# neutron/scheduler/dhcp_agent_scheduler.py

class WeightScheduler(base_scheduler.BaseWeightScheduler, AutoScheduler):

def __init__(self):
super(WeightScheduler, self).__init__(DhcpFilter())

查看WeightScheduler的基类BaseWeightScheduler。

1
2
3
4
5
6
7
8
9
10
11
12
13
# neutron/scheduler/base_scheduler.py

class BaseWeightScheduler(BaseScheduler):
"""Choose agents based on load."""

def __init__(self, resource_filter):
self.resource_filter = resource_filter

def select(self, plugin, context, resource_hostable_agents,
resource_hosted_agents, num_agents_needed):
chosen_agents = sorted(resource_hostable_agents,
key=attrgetter('load'))[0:num_agents_needed]
return chosen_agents

查看add_periodic_dhcp_agent_status_check()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# neutron/db/agentschedulers_db.py
# DhcpAgentSchedulerDbMixin

def add_periodic_dhcp_agent_status_check(self):
# allow_automatic_dhcp_failover默认为True
# 当Agent离线时,是否自动将网络移除
if not cfg.CONF.allow_automatic_dhcp_failover:
LOG.info("Skipping periodic DHCP agent status check because "
"automatic network rescheduling is disabled.")
return

self.add_agent_status_check_worker(
self.remove_networks_from_down_agents
)

def add_agent_status_check_worker(self, function):
# TODO(enikanorov): make interval configurable rather than computed
# agent_down_time默认是75秒
interval = max(cfg.CONF.agent_down_time // 2, 1)
# add random initial delay to allow agents to check in after the
# neutron server first starts. random to offset multiple servers
initial_delay = random.randint(interval, interval * 2)

check_worker = neutron_worker.PeriodicWorker(function, interval,
initial_delay)
self.add_worker(check_worker)


def remove_networks_from_down_agents(self):
"""Remove networks from down DHCP agents if admin state is up.

Reschedule them if configured so.
"""

agent_dead_limit = self.agent_dead_limit_seconds()
self.wait_down_agents('DHCP', agent_dead_limit)
cutoff = self.get_cutoff_time(agent_dead_limit)

context = ncontext.get_admin_context()
try:
down_bindings = network.NetworkDhcpAgentBinding.get_down_bindings(
context, cutoff)
dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
dead_bindings = [b for b in
self._filter_bindings(context, down_bindings)]
agents = self.get_agent_objects(
context, {'agent_type': [constants.AGENT_TYPE_DHCP]})
if not agents:
# No agents configured so nothing to do.
return
active_agents = [agent for agent in agents if
self.is_eligible_agent(context, True, agent)]
if not active_agents:
LOG.warning("No DHCP agents available, "
"skipping rescheduling")
return
for binding in dead_bindings:
LOG.warning("Removing network %(network)s from agent "
"%(agent)s because the agent did not report "
"to the server in the last %(dead_time)s "
"seconds.",
{'network': binding.network_id,
'agent': binding.dhcp_agent_id,
'dead_time': agent_dead_limit})
# save binding object to avoid ObjectDeletedError
# in case binding is concurrently deleted from the DB
saved_binding = {'net': binding.network_id,
'agent': binding.dhcp_agent_id}
try:
# do not notify agent if it considered dead
# so when it is restarted it won't see network delete
# notifications on its queue
self.remove_network_from_dhcp_agent(context,
binding.dhcp_agent_id,
binding.network_id,
notify=False)
except das_exc.NetworkNotHostedByDhcpAgent:
# measures against concurrent operation
LOG.debug("Network %(net)s already removed from DHCP "
"agent %(agent)s",
saved_binding)
# still continue and allow concurrent scheduling attempt
except Exception:
LOG.exception("Unexpected exception occurred while "
"removing network %(net)s from agent "
"%(agent)s",
saved_binding)

if cfg.CONF.network_auto_schedule:
self._schedule_network(
context, saved_binding['net'], dhcp_notifier)
except Exception:
# we want to be thorough and catch whatever is raised
# to avoid loop abortion
LOG.exception("Exception encountered during network "
"rescheduling")

RPC_Notifier初始化
1
2
3
4
5
6
7
8
9
10
11
# neutron/plugins/ml2/plugin.py
# Ml2Plugin

@log_helpers.log_method_call
def _start_rpc_notifiers(self):
"""Initialize RPC notifiers for agents."""
self.ovo_notifier = ovo_rpc.OVOServerRpcInterface()
self.notifier = rpc.AgentNotifierApi(topics.AGENT)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# neutron/plugins/ml2/ovo_rpc.py

class OVOServerRpcInterface(object):
"""ML2 server-side RPC interface.

Generates RPC callback notifications on ML2 object changes.
"""

def __init__(self):
self._rpc_pusher = resources_rpc.ResourcesPushRpcApi()
self._setup_change_handlers()
LOG.debug("ML2 OVO RPC backend initialized.")

def _setup_change_handlers(self):
"""Setup all of the local callback listeners for resource changes."""
resource_objclass_map = {
resources.PORT: ports.Port,
resources.SUBNET: subnet.Subnet,
resources.NETWORK: network.Network,
resources.SECURITY_GROUP: securitygroup.SecurityGroup,
resources.SECURITY_GROUP_RULE: securitygroup.SecurityGroupRule,
}
self._resource_handlers = {
res: _ObjectChangeHandler(res, obj_class, self._rpc_pusher)
for res, obj_class in resource_objclass_map.items()
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# neutron/api/rpc/handlers/resources_rpc.py

class ResourcesPushRpcApi(object):
"""Plugin-side RPC for plugin-to-agents interaction.

This interface is designed to push versioned object updates to interested
agents using fanout topics.

This class implements the caller side of an rpc interface. The receiver
side can be found below: ResourcesPushRpcCallback.
"""

def __init__(self):
target = oslo_messaging.Target(
namespace=constants.RPC_NAMESPACE_RESOURCES)
self.client = n_rpc.get_client(target)

Neutron Callback System

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# neutron/plugins/ml2/ovo_rpc.py

class _ObjectChangeHandler(object):
def __init__(self, resource, object_class, resource_push_api):
self._resource = resource
self._obj_class = object_class
self._resource_push_api = resource_push_api
self._resources_to_push = {}

# NOTE(annp): uWSGI seems not happy with eventlet.GreenPool.
# So switching to ThreadPool
self._worker_pool = futurist.ThreadPoolExecutor()
self.fts = []

self._semantic_warned = False
for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
events.AFTER_DELETE):
registry.subscribe(self.handle_event, resource, event)