目录

  1. 介绍
  2. 版本
  3. 插件加载
  4. 服务启动
  5. 收集监控数据

介绍

Neutron-metering-agent用来监控公网IP的流量。文档

版本

Rocky

插件加载

neutron-server在启动时会加载ml2和service_plugins,service_plugins中就包括插件metering。

1
2
3
4
5
# setup.cfg

[entry_points]
neutron.service_plugins =
metering = neutron.services.metering.metering_plugin:MeteringPlugin

MeteringPlugin中定义了metering的rpc_listener,时刻监听从neutron-metering-agent发送过来的消息。

服务启动

文件setup.cfg中定义neutron-metering-agent服务启动方法。

1
2
3
4
5
# setup.cfg

[entry_points]
console_scripts =
neutron-metering-agent = neutron.cmd.eventlet.services.metering_agent:main

可以看到neutron-metering-agent的启动方法是位于neutron.cmd.eventlet.services.metering_agent的main方法。

1
2
3
4
5
6
7
# neutron/cmd/eventlet/services/metering_agent.py

from neutron.services.metering.agents import metering_agent


def main():
metering_agent.main()

紧接着找到python文件neutron/services/metering/agents/metering_agent.py。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# neutron/services/metering/agents/metering_agent.py

def main():
conf = cfg.CONF

# 注册服务需要的配置项
metering_agent.register_metering_agent_opts()
config.register_agent_state_opts_helper(conf)

# 设置配置文件
common_config.init(sys.argv[1:])

# 启动日志
config.setup_logging()
config.setup_privsep()

# 创建服务
server = neutron_service.Service.create(
binary='neutron-metering-agent',
topic=topics.METERING_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager='neutron.services.metering.agents.'
'metering_agent.MeteringAgentWithStateReport')

# 启动服务
service.launch(cfg.CONF, server, restart_method='mutate').wait()

创建服务过程中,会指定neutron-metering-agent的manager为neutron.services.metering.agents.metering_agent.MeteringAgentWithStateReport。来看一下MeteringAgentWithStateReport的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class MeteringAgentWithStateReport(MeteringAgent):

def __init__(self, host, conf=None):
super(MeteringAgentWithStateReport, self).__init__(host=host,
conf=conf)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
self.agent_state = {
'binary': 'neutron-metering-agent',
'host': host,
'topic': topics.METERING_AGENT,
'configurations': {
# 驱动,默认是neutron.services.metering.drivers.noop.noop_driver.NoopMeteringDriver
'metering_driver': self.conf.driver,
# 监控间隔,默认30秒
'measure_interval':
# 数据发送间隔,默认300秒
self.conf.measure_interval,
'report_interval': self.conf.report_interval
},
'start_flag': True,
'agent_type': constants.AGENT_TYPE_METERING}
report_interval = cfg.CONF.AGENT.report_interval
self.use_call = True
if report_interval:
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
self.heartbeat.start(interval=report_interval)

def _report_state(self):
try:
self.state_rpc.report_state(self.context, self.agent_state,
self.use_call)
self.agent_state.pop('start_flag', None)
self.use_call = False
except AttributeError:
# This means the server does not support report_state
LOG.warning("Neutron server does not support state report. "
"State report for this agent will be disabled.")
self.heartbeat.stop()
except Exception:
LOG.exception("Failed reporting state!")

def agent_updated(self, context, payload):
LOG.info("agent_updated by server side %s!", payload)

MeteringAgentWithStateReport的基类是MeteringAgent,neutron-metering-agent最主要的部分都是由MeteringAgent来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class MeteringAgent(MeteringPluginRpc, manager.Manager):

def __init__(self, host, conf=None):
self.conf = conf or cfg.CONF
# 加载驱动
self._load_drivers()
self.context = context.get_admin_context_without_session()
# 创建定时任务
self.metering_loop = loopingcall.FixedIntervalLoopingCall(
self._metering_loop
)
measure_interval = self.conf.measure_interval
self.last_report = 0
self.metering_loop.start(interval=measure_interval)
self.host = host

self.label_tenant_id = {}
self.routers = {}
self.metering_infos = {}
super(MeteringAgent, self).__init__(host=host)

其中self.metering_loop是负责监控流量的定时任务,来看一下定时执行的任务_metering_loop。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def _metering_loop(self):
# 获取监控数据
self._add_metering_infos()

ts = timeutils.utcnow_ts()
delta = ts - self.last_report

report_interval = self.conf.report_interval

# 时间间隔超过报告时间,就将监控数据发送。
if delta >= report_interval:
# 发送监控数据
self._metering_notification()
# 清空监控数据
self._purge_metering_info()
self.last_report = ts

MeteringAgent还有一个_sync_routers_task方法,用来在服务启动一开始同步router的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@periodic_task.periodic_task(run_immediately=True)
def _sync_routers_task(self, context):
routers = self._get_sync_data_metering(self.context)

routers_on_agent = set(self.routers.keys())
routers_on_server = set(
[router['id'] for router in routers] if routers else [])
for router_id in routers_on_agent - routers_on_server:
del self.routers[router_id]
self._invoke_driver(context, router_id, 'remove_router')

if not routers:
return
self._update_routers(context, routers)

收集监控数据

监控数据的收集主要由MeteringAgent中的_add_metering_infos方法负责。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def _add_metering_infos(self):
self.label_tenant_id = {}
for router in self.routers.values():
tenant_id = router['tenant_id']
labels = router.get(n_const.METERING_LABEL_KEY, [])
for label in labels:
label_id = label['id']
self.label_tenant_id[label_id] = tenant_id

accs = self._get_traffic_counters(self.context, self.routers.values())
if not accs:
return

for label_id, acc in accs.items():
self._add_metering_info(label_id, acc['pkts'], acc['bytes'])

其中_get_traffic_counters用来获取监控的数据。

1
2
3
4
def _get_traffic_counters(self, context, routers):
LOG.debug("Get router traffic counters")
# 调用Driver的get_traffic_counters方法
return self._invoke_driver(context, routers, 'get_traffic_counters')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# neutron/services/metering/drivers/iptables/iptables_driver.py
@log_helpers.log_method_call
def get_traffic_counters(self, context, routers):
accs = {}
routers_to_reconfigure = set()
for router in routers:
rm = self.routers.get(router['id'])
if not rm:
continue

for label_id in rm.metering_labels:
try:
chain = iptables_manager.get_chain_name(WRAP_NAME +
LABEL +
label_id,
wrap=False)

chain_acc = rm.iptables_manager.get_traffic_counters(
chain, wrap=False, zero=True)
except RuntimeError:
LOG.exception('Failed to get traffic counters, '
'router: %s', router)
routers_to_reconfigure.add(router['id'])
continue

if not chain_acc:
continue

acc = accs.get(label_id, {'pkts': 0, 'bytes': 0})

acc['pkts'] += chain_acc['pkts']
acc['bytes'] += chain_acc['bytes']

accs[label_id] = acc

for router_id in routers_to_reconfigure:
del self.routers[router_id]

return accs