neutron-server的本质是一个Python Web Server Gateway Interface(WSGI),是通过eventlet来实现服务的异步并发模型的。实际工作时,通过serve_wsgi()方法来构造NeutronApiService实例,通过该实例生成eventlet.Greenpool()来运行WSGI App来响应客户端请求。
# api and other workers should die together. When one dies, # kill the other. api_thread.link(lambda gt: plugin_workers_thread.kill()) plugin_workers_thread.link(lambda gt: api_thread.kill())
pool.waitall() except NotImplementedError: LOG.info("RPC was already started in parent process by " "plugin.")
classWsgiService(object): """Base class for WSGI based services. For each api you define, you must also define these flags: :<api>_listen: The address on which to listen :<api>_listen_port: The port on which to listen """
# 通过调用load_paste_app()方法生成app,并调用run_wsgi_app()方法来启动app。 def_run_wsgi(app_name): app = config.load_paste_app(app_name) ifnot app: LOG.error('No known API applications configured.') return return run_wsgi_app(app)
defrun_wsgi_app(app): server = wsgi.Server("Neutron") server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host, workers=_get_api_workers()) LOG.info("Neutron service started, listening on %(host)s:%(port)s", {'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port}) return server
defload_paste_app(app_name): """Builds and returns a WSGI app from a paste config file. :param app_name: Name of the application to load """ # 加载WSGI的配置文件 loader = wsgi.Loader(cfg.CONF)
# Log the values of registered opts if cfg.CONF.debug: cfg.CONF.log_opt_values(LOG, logging.DEBUG) app = loader.load_app(app_name) return app
wsgi_opts = [ cfg.StrOpt('api_paste_config', default="api-paste.ini", help='File name for the paste.deploy config for api service'), cfg.StrOpt('wsgi_log_format', default='%(client_ip)s "%(request_line)s" status: ' '%(status_code)s len: %(body_length)s time:' ' %(wall_seconds).7f', help='A python format string that is used as the template to ' 'generate log lines. The following values can be' 'formatted into it: client_ip, date_time, request_line, ' 'status_code, body_length, wall_seconds.'), cfg.IntOpt('tcp_keepidle', default=600, help="Sets the value of TCP_KEEPIDLE in seconds for each " "server socket. Not supported on OS X."), cfg.IntOpt('wsgi_default_pool_size', default=100, help="Size of the pool of greenthreads used by wsgi"), cfg.IntOpt('max_header_line', default=16384, help="Maximum line size of message headers to be accepted. " "max_header_line may need to be increased when using " "large tokens (typically those generated when keystone " "is configured to use PKI tokens with big service " "catalogs)."), cfg.BoolOpt('wsgi_keep_alive', default=True, help="If False, closes the client socket connection " "explicitly."), cfg.IntOpt('client_socket_timeout', default=900, help="Timeout for client connections' socket operations. " "If an incoming connection is idle for this number of " "seconds it will be closed. A value of '0' means " "wait forever."), ]
# Same data structure as neutron.api.versions.Versions for API backward # compatibility version_info = { 'id': 'v2.0', 'status': 'CURRENT' } _load_version_info(version_info)
# NOTE(blogan): Paste deploy handled the routing to the legacy extension # controller. If the extensions filter is removed from the api-paste.ini # then this controller will be routed to This means operators had # the ability to turn off the extensions controller via tha api-paste but # will not be able to turn it off with the pecan switch. extensions = ext_ctrl.ExtensionsController()
@utils.expose() def_lookup(self, collection, *remainder): # if collection exists in the extension to service plugins map then # we are assuming that collection is the service plugin and # needs to be remapped. # Example: https://neutron.endpoint/v2.0/lbaas/loadbalancers if (remainder and manager.NeutronManager.get_resources_for_path_prefix( collection)): collection = remainder[0] remainder = remainder[1:] controller = manager.NeutronManager.get_controller_for_resource( collection) ifnot controller: LOG.warning("No controller found for: %s - returning response " "code 404", collection) pecan.abort(404) # Store resource and collection names in pecan request context so that # hooks can leverage them if necessary. The following code uses # attributes from the controller instance to ensure names have been # properly sanitized (eg: replacing dashes with underscores) request.context['resource'] = controller.resource request.context['collection'] = controller.collection # NOTE(blogan): initialize a dict to store the ids of the items walked # in the path for example: /networks/1234 would cause uri_identifiers # to contain: {'network_id': '1234'} # This is for backwards compatibility with legacy extensions that # defined their own controllers and expected kwargs to be passed in # with the uri_identifiers request.context['uri_identifiers'] = {} return controller, remainder
definitialize_all(): # NeutronManager初始化 manager.init() ext_mgr = extensions.PluginAwareExtensionManager.get_instance() ext_mgr.extend_resources("2.0", attributes.RESOURCES) # At this stage we have a fully populated resource attribute map; # build Pecan controllers and routes for all core resources plugin = directory.get_plugin() for resource, collection in RESOURCES.items(): resource_registry.register_resource_by_name(resource) new_controller = res_ctrl.CollectionsController(collection, resource, plugin=plugin) manager.NeutronManager.set_controller_for_resource( collection, new_controller) manager.NeutronManager.set_plugin_for_resource(collection, plugin)
pecanized_resources = ext_mgr.get_pecan_resources() for pec_res in pecanized_resources: manager.NeutronManager.set_controller_for_resource( pec_res.collection, pec_res.controller) manager.NeutronManager.set_plugin_for_resource( pec_res.collection, pec_res.plugin)
# Now build Pecan Controllers and routes for all extensions resources = ext_mgr.get_resources() # Extensions controller is already defined, we don't need it. resources.pop(0) for ext_res in resources: path_prefix = ext_res.path_prefix.strip('/') collection = ext_res.collection # Retrieving the parent resource. It is expected the format of # the parent resource to be: # {'collection_name': 'name-of-collection', # 'member_name': 'name-of-resource'} # collection_name does not appear to be used in the legacy code # inside the controller logic, so we can assume we do not need it. parent = ext_res.parent or {} parent_resource = parent.get('member_name') collection_key = collection if parent_resource: collection_key = '/'.join([parent_resource, collection]) collection_actions = ext_res.collection_actions member_actions = ext_res.member_actions if manager.NeutronManager.get_controller_for_resource(collection_key): # This is a collection that already has a pecan controller, we # do not need to do anything else continue legacy_controller = getattr(ext_res.controller, 'controller', ext_res.controller) new_controller = None ifisinstance(legacy_controller, base.Controller): resource = legacy_controller.resource plugin = legacy_controller.plugin attr_info = legacy_controller.attr_info member_actions = legacy_controller.member_actions pagination = legacy_controller.allow_pagination sorting = legacy_controller.allow_sorting # NOTE(blogan): legacy_controller and ext_res both can both have # member_actions. the member_actions for ext_res are strictly for # routing, while member_actions for legacy_controller are used for # handling the request once the routing has found the controller. # They're always the same so we will just use the ext_res # member_action. new_controller = res_ctrl.CollectionsController( collection, resource, resource_info=attr_info, parent_resource=parent_resource, member_actions=member_actions, plugin=plugin, allow_pagination=pagination, allow_sorting=sorting, collection_actions=collection_actions) # new_controller.collection has replaced hyphens with underscores manager.NeutronManager.set_plugin_for_resource( new_controller.collection, plugin) if path_prefix: manager.NeutronManager.add_resource_for_path_prefix( collection, path_prefix) else: new_controller = utils.ShimCollectionsController( collection, None, legacy_controller, collection_actions=collection_actions, member_actions=member_actions, action_status=ext_res.controller.action_status, collection_methods=ext_res.collection_methods)
# Certain policy checks require that the extensions are loaded # and the RESOURCE_ATTRIBUTE_MAP populated before they can be # properly initialized. This can only be claimed with certainty # once this point in the code has been reached. In the event # that the policies have been initialized before this point, # calling reset will cause the next policy check to # re-initialize with all of the required data in place. policy.reset()
@six.add_metaclass(profiler.TracedMeta) classNeutronManager(object): """Neutron's Manager class. Neutron's Manager class is responsible for parsing a config file and instantiating the correct plugin that concretely implements neutron_plugin_base class. """ # TODO(armax): use of the singleton pattern for this class is vestigial, # and it is mainly relied on by the unit tests. It is safer to get rid # of it once the entire codebase (neutron + subprojects) has switched # entirely to using the plugins directory. _instance = None __trace_args__ = {"name": "rpc"}
def__init__(self, options=None, config_file=None): # Store instances of already loaded plugins to avoid instantiate same # plugin more than once self._loaded_plugins = {} # If no options have been provided, create an empty dict ifnot options: options = {}
msg = validate_pre_plugin_load() if msg: LOG.critical(msg) raise Exception(msg)
# NOTE(jkoelker) Testing for the subclass with the __subclasshook__ # breaks tach monitoring. It has been removed # intentionally to allow v2 plugins to be monitored # for performance metrics.
# ml2 plugin_provider = cfg.CONF.core_plugin LOG.info("Loading core plugin: %s", plugin_provider) # NOTE(armax): keep hold of the actual plugin object
# load services from the core plugin first self._load_services_from_core_plugin(plugin) self._load_service_plugins() # Used by pecan WSGI self.resource_plugin_mappings = {} self.resource_controller_mappings = {} self.path_prefix_resource_mappings = defaultdict(list)
@staticmethod defload_class_for_provider(namespace, plugin_provider): """Loads plugin using alias or class name :param namespace: namespace where alias is defined :param plugin_provider: plugin alias or class name :returns: plugin that is loaded :raises ImportError: if fails to load plugin """
try: return runtime.load_class_by_alias_or_classname(namespace, plugin_provider) except ImportError: with excutils.save_and_reraise_exception(): LOG.error("Plugin '%s' not found.", plugin_provider)
if network_type in self.drivers: LOG.error("Type driver '%(new_driver)s' ignored because" " type driver '%(old_driver)s' is already" " registered for type '%(type)s'", {'new_driver': ext.name, 'old_driver': self.drivers[network_type].name, 'type': network_type}) else: self.drivers[network_type] = ext LOG.info("Registered types: %s", self.drivers.keys()) def_check_tenant_network_types(self, types): self.tenant_network_types = [] for network_type in types: if network_type in self.drivers: self.tenant_network_types.append(network_type) else: LOG.error("No type driver for tenant network_type: %s. " "Service terminated!", network_type) raise SystemExit(1) LOG.info("Tenant network_types: %s", self.tenant_network_types)
def_check_external_network_type(self, ext_network_type): if ext_network_type and ext_network_type notin self.drivers: LOG.error("No type driver for external network_type: %s. " "Service terminated!", ext_network_type) raise SystemExit(1) definitialize(self): for network_type, driver in self.drivers.items(): LOG.info("Initializing driver for type '%s'", network_type) # 初识化每一个type_driver driver.obj.initialize()
def_register_drivers(self): """Register all extension drivers. This method should only be called once in the ExtensionManager constructor. """ for ext in self: self.ordered_ext_drivers.append(ext) LOG.info("Registered extension drivers: %s", [driver.name for driver in self.ordered_ext_drivers]) definitialize(self): # Initialize each driver in the list. for driver in self.ordered_ext_drivers: LOG.info("Initializing extension driver '%s'", driver.name) driver.obj.initialize()
classMechanismManager(stevedore.named.NamedExtensionManager): """Manage networking mechanisms using drivers."""
def__init__(self): # Registered mechanism drivers, keyed by name. self.mech_drivers = {} # Ordered list of mechanism drivers, defining # the order in which the drivers are called. self.ordered_mech_drivers = []
def_driver_not_found(self, names): msg = (_("The following mechanism drivers were not found: %s") % names) LOG.critical(msg) raise SystemExit(msg)
def_driver_not_loaded(self, manager, entrypoint, exception): LOG.critical("The '%(entrypoint)s' entrypoint could not be" " loaded for the following reason: '%(reason)s'.", {'entrypoint': entrypoint, 'reason': exception}) raise SystemExit(str(exception))
def_register_mechanisms(self): """Register all mechanism drivers. This method should only be called once in the MechanismManager constructor. """ for ext in self: self.mech_drivers[ext.name] = ext self.ordered_mech_drivers.append(ext) LOG.info("Registered mechanism drivers: %s", [driver.name for driver in self.ordered_mech_drivers])
@registry.has_registry_receivers classNeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, neutron_plugin_base_v2.NeutronPluginBaseV2, rbac_mixin.RbacPluginMixin, stattr_db.StandardAttrDescriptionMixin): """V2 Neutron plugin interface implementation using SQLAlchemy models. Whenever a non-read call happens the plugin will call an event handler class method (e.g., network_created()). The result is that this class can be sub-classed by other classes that add custom behaviors on certain events. """
# This attribute specifies whether the plugin supports or not # bulk/pagination/sorting operations. Name mangling is used in # order to ensure it is qualified by class __native_bulk_support = True __native_pagination_support = True __native_sorting_support = True # This attribute specifies whether the plugin supports or not # filter validations. Name mangling is used in # order to ensure it is qualified by class __filter_validation_support = False
# notify_nova_on_port_status_changes默认为True # 在port状态更新时是否给Nova发送通知 if cfg.CONF.notify_nova_on_port_status_changes: # Import nova conditionally to support the use case of Neutron # being used outside of an OpenStack context. from neutron.notifiers import nova # NOTE(arosen) These event listeners are here to hook into when # port status changes and notify nova about their change. self.nova_notifier = nova.Notifier.get_instance() lib_db_api.sqla_listen(models_v2.Port, 'after_insert', self.nova_notifier.send_port_status) lib_db_api.sqla_listen(models_v2.Port, 'after_update', self.nova_notifier.send_port_status) lib_db_api.sqla_listen( models_v2.Port.status, 'set', self.nova_notifier.record_port_status_changed)
Dirver初始化
Type_Driver初始化
1 2 3 4 5 6 7
# neutron/plugins/ml2/managers.py # TypeManager
definitialize(self): for network_type, driver in self.drivers.items(): LOG.info("Initializing driver for type '%s'", network_type) driver.obj.initialize()
definitialize(self): # Initialize each driver in the list. for driver in self.ordered_ext_drivers: LOG.info("Initializing extension driver '%s'", driver.name) driver.obj.initialize()
defadd_agent_status_check_worker(self, function): # TODO(enikanorov): make interval configurable rather than computed # agent_down_time默认是75秒 interval = max(cfg.CONF.agent_down_time // 2, 1) # add random initial delay to allow agents to check in after the # neutron server first starts. random to offset multiple servers initial_delay = random.randint(interval, interval * 2)
context = ncontext.get_admin_context() try: down_bindings = network.NetworkDhcpAgentBinding.get_down_bindings( context, cutoff) dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP) dead_bindings = [b for b in self._filter_bindings(context, down_bindings)] agents = self.get_agent_objects( context, {'agent_type': [constants.AGENT_TYPE_DHCP]}) ifnot agents: # No agents configured so nothing to do. return active_agents = [agent for agent in agents if self.is_eligible_agent(context, True, agent)] ifnot active_agents: LOG.warning("No DHCP agents available, " "skipping rescheduling") return for binding in dead_bindings: LOG.warning("Removing network %(network)s from agent " "%(agent)s because the agent did not report " "to the server in the last %(dead_time)s " "seconds.", {'network': binding.network_id, 'agent': binding.dhcp_agent_id, 'dead_time': agent_dead_limit}) # save binding object to avoid ObjectDeletedError # in case binding is concurrently deleted from the DB saved_binding = {'net': binding.network_id, 'agent': binding.dhcp_agent_id} try: # do not notify agent if it considered dead # so when it is restarted it won't see network delete # notifications on its queue self.remove_network_from_dhcp_agent(context, binding.dhcp_agent_id, binding.network_id, notify=False) except das_exc.NetworkNotHostedByDhcpAgent: # measures against concurrent operation LOG.debug("Network %(net)s already removed from DHCP " "agent %(agent)s", saved_binding) # still continue and allow concurrent scheduling attempt except Exception: LOG.exception("Unexpected exception occurred while " "removing network %(net)s from agent " "%(agent)s", saved_binding)
if cfg.CONF.network_auto_schedule: self._schedule_network( context, saved_binding['net'], dhcp_notifier) except Exception: # we want to be thorough and catch whatever is raised # to avoid loop abortion LOG.exception("Exception encountered during network " "rescheduling")
def__init__(self): self._rpc_pusher = resources_rpc.ResourcesPushRpcApi() self._setup_change_handlers() LOG.debug("ML2 OVO RPC backend initialized.")
def_setup_change_handlers(self): """Setup all of the local callback listeners for resource changes.""" resource_objclass_map = { resources.PORT: ports.Port, resources.SUBNET: subnet.Subnet, resources.NETWORK: network.Network, resources.SECURITY_GROUP: securitygroup.SecurityGroup, resources.SECURITY_GROUP_RULE: securitygroup.SecurityGroupRule, } self._resource_handlers = { res: _ObjectChangeHandler(res, obj_class, self._rpc_pusher) for res, obj_class in resource_objclass_map.items() }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# neutron/api/rpc/handlers/resources_rpc.py
classResourcesPushRpcApi(object): """Plugin-side RPC for plugin-to-agents interaction. This interface is designed to push versioned object updates to interested agents using fanout topics. This class implements the caller side of an rpc interface. The receiver side can be found below: ResourcesPushRpcCallback. """
# NOTE(annp): uWSGI seems not happy with eventlet.GreenPool. # So switching to ThreadPool self._worker_pool = futurist.ThreadPoolExecutor() self.fts = []
self._semantic_warned = False for event in (events.AFTER_CREATE, events.AFTER_UPDATE, events.AFTER_DELETE): registry.subscribe(self.handle_event, resource, event)