目录 介绍 版本 插件加载 服务启动 收集监控数据
介绍 Neutron-metering-agent用来监控公网IP的流量。文档
版本 Rocky
插件加载 neutron-server在启动时会加载ml2和service_plugins,service_plugins中就包括插件metering。
1 2 3 4 5 [entry_points ] neutron.service_plugins = metering = neutron.services.metering.metering_plugin:MeteringPlugin
MeteringPlugin中定义了metering的rpc_listener,时刻监听从neutron-metering-agent发送过来的消息。
服务启动 文件setup.cfg中定义neutron-metering-agent服务启动方法。
1 2 3 4 5 [entry_points ] console_scripts = neutron-metering-agent = neutron.cmd.eventlet.services.metering_agent:main
可以看到neutron-metering-agent的启动方法是位于neutron.cmd.eventlet.services.metering_agent的main方法。
1 2 3 4 5 6 7 from neutron.services.metering.agents import metering_agentdef main (): metering_agent.main()
紧接着找到python文件neutron/services/metering/agents/metering_agent.py。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def main (): conf = cfg.CONF metering_agent.register_metering_agent_opts() config.register_agent_state_opts_helper(conf) common_config.init(sys.argv[1 :]) config.setup_logging() config.setup_privsep() server = neutron_service.Service.create( binary='neutron-metering-agent' , topic=topics.METERING_AGENT, report_interval=cfg.CONF.AGENT.report_interval, manager='neutron.services.metering.agents.' 'metering_agent.MeteringAgentWithStateReport' ) service.launch(cfg.CONF, server, restart_method='mutate' ).wait()
创建服务过程中,会指定neutron-metering-agent的manager为neutron.services.metering.agents.metering_agent.MeteringAgentWithStateReport。来看一下MeteringAgentWithStateReport的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class MeteringAgentWithStateReport (MeteringAgent ): def __init__ (self, host, conf=None ): super (MeteringAgentWithStateReport, self).__init__(host=host, conf=conf) self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS) self.agent_state = { 'binary' : 'neutron-metering-agent' , 'host' : host, 'topic' : topics.METERING_AGENT, 'configurations' : { 'metering_driver' : self.conf.driver, 'measure_interval' : self.conf.measure_interval, 'report_interval' : self.conf.report_interval }, 'start_flag' : True , 'agent_type' : constants.AGENT_TYPE_METERING} report_interval = cfg.CONF.AGENT.report_interval self.use_call = True if report_interval: self.heartbeat = loopingcall.FixedIntervalLoopingCall( self._report_state) self.heartbeat.start(interval=report_interval) def _report_state (self ): try : self.state_rpc.report_state(self.context, self.agent_state, self.use_call) self.agent_state.pop('start_flag' , None ) self.use_call = False except AttributeError: LOG.warning("Neutron server does not support state report. " "State report for this agent will be disabled." ) self.heartbeat.stop() except Exception: LOG.exception("Failed reporting state!" ) def agent_updated (self, context, payload ): LOG.info("agent_updated by server side %s!" , payload)
MeteringAgentWithStateReport的基类是MeteringAgent,neutron-metering-agent最主要的部分都是由MeteringAgent来实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class MeteringAgent (MeteringPluginRpc, manager.Manager): def __init__ (self, host, conf=None ): self.conf = conf or cfg.CONF self._load_drivers() self.context = context.get_admin_context_without_session() self.metering_loop = loopingcall.FixedIntervalLoopingCall( self._metering_loop ) measure_interval = self.conf.measure_interval self.last_report = 0 self.metering_loop.start(interval=measure_interval) self.host = host self.label_tenant_id = {} self.routers = {} self.metering_infos = {} super (MeteringAgent, self).__init__(host=host)
其中self.metering_loop是负责监控流量的定时任务,来看一下定时执行的任务_metering_loop。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def _metering_loop (self ): self._add_metering_infos() ts = timeutils.utcnow_ts() delta = ts - self.last_report report_interval = self.conf.report_interval if delta >= report_interval: self._metering_notification() self._purge_metering_info() self.last_report = ts
MeteringAgent还有一个_sync_routers_task方法,用来在服务启动一开始同步router的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @periodic_task.periodic_task(run_immediately=True ) def _sync_routers_task (self, context ): routers = self._get_sync_data_metering(self.context) routers_on_agent = set (self.routers.keys()) routers_on_server = set ( [router['id' ] for router in routers] if routers else []) for router_id in routers_on_agent - routers_on_server: del self.routers[router_id] self._invoke_driver(context, router_id, 'remove_router' ) if not routers: return self._update_routers(context, routers)
收集监控数据 监控数据的收集主要由MeteringAgent中的_add_metering_infos方法负责。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def _add_metering_infos (self ): self.label_tenant_id = {} for router in self.routers.values(): tenant_id = router['tenant_id' ] labels = router.get(n_const.METERING_LABEL_KEY, []) for label in labels: label_id = label['id' ] self.label_tenant_id[label_id] = tenant_id accs = self._get_traffic_counters(self.context, self.routers.values()) if not accs: return for label_id, acc in accs.items(): self._add_metering_info(label_id, acc['pkts' ], acc['bytes' ])
其中_get_traffic_counters用来获取监控的数据。
1 2 3 4 def _get_traffic_counters (self, context, routers ): LOG.debug("Get router traffic counters" ) return self._invoke_driver(context, routers, 'get_traffic_counters' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @log_helpers.log_method_call def get_traffic_counters (self, context, routers ): accs = {} routers_to_reconfigure = set () for router in routers: rm = self.routers.get(router['id' ]) if not rm: continue for label_id in rm.metering_labels: try : chain = iptables_manager.get_chain_name(WRAP_NAME + LABEL + label_id, wrap=False ) chain_acc = rm.iptables_manager.get_traffic_counters( chain, wrap=False , zero=True ) except RuntimeError: LOG.exception('Failed to get traffic counters, ' 'router: %s' , router) routers_to_reconfigure.add(router['id' ]) continue if not chain_acc: continue acc = accs.get(label_id, {'pkts' : 0 , 'bytes' : 0 }) acc['pkts' ] += chain_acc['pkts' ] acc['bytes' ] += chain_acc['bytes' ] accs[label_id] = acc for router_id in routers_to_reconfigure: del self.routers[router_id] return accs