Trove create 源码解析

trove create命令(python-troveclient仓库的一部分)封装了用户提供的所有命令行参数,并发送一个POST到Trove公共API定义的URI /v1.0/{tenant_id}/instances。

trove在api-paste.ini中定义程序的入口。

1
2
[app:troveapp]
paste.app_factory = trove.common.api:app_factory

可以在trove.common.api.py文件中,找到API类。其中它的_instance_router定义了实例的路由路径和被调用的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def app_factory(global_conf, **local_conf):
return API()

class API(wsgi.Router):
"""Defines the API routes."""
def __init__(self):
mapper = routes.Mapper()
super(API, self).__init__(mapper)
self._instance_router(mapper)
...

def _instance_router(self, mapper):
instance_resource = InstanceController().create_resource()
mapper.connect("/{tenant_id}/instances",
controller=instance_resource,
action="create",
conditions={'method': ['POST']})
...

InstanceController类在trove.instance.service.py文件中被定义。实例创建时将会调用create()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
class InstanceController(wsgi.Controller):
...
def create(self, req, body, tenant_id):
# TODO(hub-cap): turn this into middleware
LOG.info(_LI("Creating a database instance for tenant '%s'"),
tenant_id)
LOG.debug("req : '%s'\n\n", strutils.mask_password(req))
LOG.debug("body : '%s'\n\n", strutils.mask_password(body))
context = req.environ[wsgi.CONTEXT_KEY]
policy.authorize_on_tenant(context, 'instance:create')
# 消息
context.notification = notification.DBaaSInstanceCreate(context,
request=req)
# 数据库参数
datastore_args = body['instance'].get('datastore', {})
# 验证并获取数据库以及版本
datastore, datastore_version = (
datastore_models.get_datastore_version(**datastore_args))
# 镜像
image_id = datastore_version.image_id
# 实例名
name = body['instance']['name']
# flavor
flavor_ref = body['instance']['flavorRef']
flavor_id = utils.get_id_from_href(flavor_ref)

# 配置组
configuration = self._configuration_parse(context, body)
databases = populate_validated_databases(
body['instance'].get('databases', []))
database_names = [database.get('_name', '') for database in databases]
users = None
try:
users = populate_users(body['instance'].get('users', []),
database_names)
except ValueError as ve:
raise exception.BadRequest(msg=ve)

modules = body['instance'].get('modules')

# The following operations have their own API calls.
# We need to make sure the same policies are enforced when
# creating an instance.
# i.e. if attaching configuration group to an existing instance is not
# allowed, it should not be possible to create a new instance with the
# group attached either
if configuration:
policy.authorize_on_tenant(context, 'instance:update')
if modules:
policy.authorize_on_tenant(context, 'instance:module_apply')
if users:
policy.authorize_on_tenant(
context, 'instance:extension:user:create')
if databases:
policy.authorize_on_tenant(
context, 'instance:extension:database:create')

# 卷
if 'volume' in body['instance']:
volume_info = body['instance']['volume']
volume_size = int(volume_info['size'])
volume_type = volume_info.get('type')
else:
volume_size = None
volume_type = None

# 备份
if 'restorePoint' in body['instance']:
backupRef = body['instance']['restorePoint']['backupRef']
backup_id = utils.get_id_from_href(backupRef)
else:
backup_id = None

availability_zone = body['instance'].get('availability_zone')
nics = body['instance'].get('nics')

# 复制
slave_of_id = body['instance'].get('replica_of',
# also check for older name
body['instance'].get('slave_of'))
replica_count = body['instance'].get('replica_count')
locality = body['instance'].get('locality')
if locality:
locality_domain = ['affinity', 'anti-affinity']
locality_domain_msg = ("Invalid locality '%s'. "
"Must be one of ['%s']" %
(locality,
"', '".join(locality_domain)))
if locality not in locality_domain:
raise exception.BadRequest(msg=locality_domain_msg)
if slave_of_id:
dupe_locality_msg = (
'Cannot specify locality when adding replicas to existing '
'master.')
raise exception.BadRequest(msg=dupe_locality_msg)
region_name = body['instance'].get('region_name', CONF.os_region_name)

instance = models.Instance.create(context, name, flavor_id,
image_id, databases, users,
datastore, datastore_version,
volume_size, backup_id,
availability_zone, nics,
configuration, slave_of_id,
replica_count=replica_count,
volume_type=volume_type,
modules=modules,
locality=locality,
region_name=region_name)

view = views.InstanceDetailView(instance, req=req)
return wsgi.Result(view.data(), 200)
...

在拆包并理解了接收到的req和body中的参数的意义后,信息被传递到trove.instances.models.py中(见第98行代码)的create() 方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
@classmethod
def create(cls, context, name, flavor_id, image_id, databases, users,
datastore, datastore_version, volume_size, backup_id,
availability_zone=None, nics=None,
configuration_id=None, slave_of_id=None, cluster_config=None,
replica_count=None, volume_type=None, modules=None,
locality=None, region_name=None):

region_name = region_name or CONF.os_region_name

call_args = {
'name': name,
'flavor_id': flavor_id,
'datastore': datastore.name if datastore else None,
'datastore_version': datastore_version.name,
'image_id': image_id,
'availability_zone': availability_zone,
'region_name': region_name,
}

# All nova flavors are permitted for a datastore-version unless one
# or more entries are found in datastore_version_metadata,
# in which case only those are permitted.
bound_flavors = DBDatastoreVersionMetadata.find_all(
datastore_version_id=datastore_version.id,
key='flavor', deleted=False
)
if bound_flavors.count() > 0:
valid_flavors = tuple(f.value for f in bound_flavors)
if flavor_id not in valid_flavors:
raise exception.DatastoreFlavorAssociationNotFound(
datastore=datastore.name,
datastore_version=datastore_version.name,
flavor_id=flavor_id)

datastore_cfg = CONF.get(datastore_version.manager)

# 调用Nova检查flavor是否有效。
client = create_nova_client(context)
try:
flavor = client.flavors.get(flavor_id)
except nova_exceptions.NotFound:
raise exception.FlavorNotFound(uuid=flavor_id)

# If a different region is specified for the instance, ensure
# that the flavor and image are the same in both regions
if region_name and region_name != CONF.os_region_name:
cls._validate_remote_datastore(context, region_name, flavor,
datastore, datastore_version)

deltas = {'instances': 1}
volume_support = datastore_cfg.volume_support
if volume_support:
call_args['volume_type'] = volume_type
dvm.validate_volume_type(context, volume_type,
datastore.name, datastore_version.name)
call_args['volume_size'] = volume_size
validate_volume_size(volume_size)
deltas['volumes'] = volume_size
# Instance volume should have enough space for the backup
# Backup, and volume sizes are in GBs
target_size = volume_size
else:
target_size = flavor.disk # local_storage
if volume_size is not None:
raise exception.VolumeNotSupported()
if datastore_cfg.device_path:
if flavor.ephemeral == 0:
raise exception.LocalStorageNotSpecified(flavor=flavor_id)
target_size = flavor.ephemeral # ephemeral_Storage

# 从现有的备份启动一个实例。(用户指定一个备份作为实例的源。)
if backup_id:
call_args['backup_id'] = backup_id
backup_info = Backup.get_by_id(context, backup_id)
if not backup_info.is_done_successfuly:
raise exception.BackupNotCompleteError(
backup_id=backup_id, state=backup_info.state)

if backup_info.size > target_size:
raise exception.BackupTooLarge(
backup_size=backup_info.size, disk_size=target_size)

if not backup_info.check_swift_object_exist(
context,
verify_checksum=CONF.verify_swift_checksum_on_restore):
raise exception.BackupFileNotFound(
location=backup_info.location)

if (backup_info.datastore_version_id
and backup_info.datastore.name != datastore.name):
raise exception.BackupDatastoreMismatchError(
datastore1=backup_info.datastore.name,
datastore2=datastore.name)

# 启动一个实例作为另一个实例的副本。(用户启动复制。)
if slave_of_id:
call_args['replica_of'] = slave_of_id
call_args['replica_count'] = replica_count
replication_support = datastore_cfg.replication_strategy
if not replication_support:
raise exception.ReplicationNotSupported(
datastore=datastore.name)
try:
# looking for replica source
replica_source = DBInstance.find_by(
context,
id=slave_of_id,
deleted=False)
if replica_source.slave_of_id:
raise exception.Forbidden(
_("Cannot create a replica of a replica %(id)s.")
% {'id': slave_of_id})
# load the replica source status to check if
# source is available
load_simple_instance_server_status(
context,
replica_source)
replica_source_instance = Instance(
context, replica_source,
None,
InstanceServiceStatus.find_by(
context,
instance_id=slave_of_id))
replica_source_instance.validate_can_perform_action()
except exception.ModelNotFoundError:
LOG.exception(
_("Cannot create a replica of %(id)s "
"as that instance could not be found."),
{'id': slave_of_id})
raise exception.NotFound(uuid=slave_of_id)
elif replica_count and replica_count != 1:
raise exception.Forbidden(_(
"Replica count only valid when creating replicas. Cannot "
"create %(count)d instances.") % {'count': replica_count})
multi_replica = slave_of_id and replica_count and replica_count > 1
instance_count = replica_count if multi_replica else 1
if locality:
call_args['locality'] = locality

if not nics:
nics = []
if CONF.default_neutron_networks:
nics = [{"net-id": net_id}
for net_id in CONF.default_neutron_networks] + nics
if nics:
call_args['nics'] = nics
if cluster_config:
call_args['cluster_id'] = cluster_config.get("id", None)

if not modules:
modules = []
module_ids = [mod['id'] for mod in modules]
modules = module_models.Modules.load_by_ids(context, module_ids)
auto_apply_modules = module_models.Modules.load_auto_apply(
context, datastore.id, datastore_version.id)
for aa_module in auto_apply_modules:
if aa_module.id not in module_ids:
modules.append(aa_module)
module_models.Modules.validate(
modules, datastore.id, datastore_version.id)
module_list = module_views.convert_modules_to_list(modules)

def _create_resources():

if cluster_config:
cluster_id = cluster_config.get("id", None)
shard_id = cluster_config.get("shard_id", None)
instance_type = cluster_config.get("instance_type", None)
else:
cluster_id = shard_id = instance_type = None

ids = []
names = []
root_passwords = []
root_password = None
for instance_index in range(0, instance_count):
# 实例的创建被记录在基础设施数据库中,
# 并且该实例被标记为InstanceTasks.BUILDING状态
db_info = DBInstance.create(
name=name, flavor_id=flavor_id, tenant_id=context.tenant,
volume_size=volume_size,
datastore_version_id=datastore_version.id,
task_status=InstanceTasks.BUILDING,
configuration_id=configuration_id,
slave_of_id=slave_of_id, cluster_id=cluster_id,
shard_id=shard_id, type=instance_type,
region_id=region_name)
LOG.debug("Tenant %(tenant)s created new Trove instance "
"%(db)s in region %(region)s.",
{'tenant': context.tenant, 'db': db_info.id,
'region': region_name})

instance_id = db_info.id
cls.add_instance_modules(context, instance_id, modules)
instance_name = name
ids.append(instance_id)
names.append(instance_name)
root_passwords.append(None)
# change the name to be name + replica_number if more than one
if multi_replica:
replica_number = instance_index + 1
names[instance_index] += '-' + str(replica_number)
setattr(db_info, 'name', names[instance_index])
db_info.save()

# if a configuration group is associated with an instance,
# generate an overrides dict to pass into the instance creation
# method

config = Configuration(context, configuration_id)
overrides = config.get_configuration_overrides()
service_status = InstanceServiceStatus.create(
instance_id=instance_id,
status=tr_instance.ServiceStatuses.NEW)

if CONF.trove_dns_support:
dns_client = create_dns_client(context)
hostname = dns_client.determine_hostname(instance_id)
db_info.hostname = hostname
db_info.save()
# 如果用户需要指定一个root密码,则必须在启动时指定。
# root密码在这时生成。
if cls.get_root_on_create(
datastore_version.manager) and not backup_id:
root_password = utils.generate_random_password()
root_passwords[instance_index] = root_password

if instance_count > 1:
instance_id = ids
instance_name = names
root_password = root_passwords

# Trove API发送一个请求给Trove task manager来做其余的工作。
# 将用户请求的实例的上下文和用户端提供的所有参数都传递给task manager。
task_api.API(context).create_instance(
instance_id, instance_name, flavor, image_id, databases, users,
datastore_version.manager, datastore_version.packages,
volume_size, backup_id, availability_zone, root_password,
nics, overrides, slave_of_id, cluster_config,
volume_type=volume_type, modules=module_list,
locality=locality)

return SimpleInstance(context, db_info, service_status,
root_password, locality=locality)

with StartNotification(context, **call_args):
return run_with_quotas(context.tenant, deltas, _create_resources)

Trove API发送请求到task manager。在trove.taskmanager.api.py文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def create_instance(self, instance_id, name, flavor,
image_id, databases, users, datastore_manager,
packages, volume_size, backup_id=None,
availability_zone=None, root_password=None,
nics=None, overrides=None, slave_of_id=None,
cluster_config=None, volume_type=None,
modules=None, locality=None):

LOG.debug("Making async call to create instance %s ", instance_id)
version = self.API_BASE_VERSION
self._cast("create_instance", version=version,
instance_id=instance_id, name=name,
flavor=self._transform_obj(flavor),
image_id=image_id,
databases=databases,
users=users,
datastore_manager=datastore_manager,
packages=packages,
volume_size=volume_size,
backup_id=backup_id,
availability_zone=availability_zone,
root_password=root_password,
nics=nics,
overrides=overrides,
slave_of_id=slave_of_id,
cluster_config=cluster_config,
volume_type=volume_type,
modules=modules, locality=locality)

该api得到了被提供的所有信息,并发送一个异步_cast()调用到task manager。

_cast()是异步请求。
call()是同步请求。

程序进入到trove.taskmanager.manager.py文件下,并执行下面的create_instance方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def _create_instance(self, context, instance_id, name, flavor,
image_id, databases, users, datastore_manager,
packages, volume_size, backup_id, availability_zone,
root_password, nics, overrides, slave_of_id,
cluster_config, volume_type, modules, locality):

# 创建现有实例的副本
if slave_of_id:
self._create_replication_slave(context, instance_id, name,
flavor, image_id, databases, users,
datastore_manager, packages,
volume_size,
availability_zone, root_password,
nics, overrides, slave_of_id,
backup_id, volume_type, modules)
else:
if type(instance_id) in [list]:
raise AttributeError(_(
"Cannot create multiple non-replica instances."))
instance_tasks = FreshInstanceTasks.load(context, instance_id)

scheduler_hints = srv_grp.ServerGroup.build_scheduler_hint(
context, locality, instance_id)
instance_tasks.create_instance(flavor, image_id, databases, users,
datastore_manager, packages,
volume_size, backup_id,
availability_zone, root_password,
nics, overrides, cluster_config,
None, volume_type, modules,
scheduler_hints)
timeout = (CONF.restore_usage_timeout if backup_id
else CONF.usage_timeout)
instance_tasks.wait_for_instance(timeout, flavor)

def create_instance(self, context, instance_id, name, flavor,
image_id, databases, users, datastore_manager,
packages, volume_size, backup_id, availability_zone,
root_password, nics, overrides, slave_of_id,
cluster_config, volume_type, modules, locality):
with EndNotification(context,
instance_id=(instance_id[0]
if isinstance(instance_id, list)
else instance_id)):
self._create_instance(context, instance_id, name, flavor,
image_id, databases, users,
datastore_manager, packages, volume_size,
backup_id, availability_zone,
root_password, nics, overrides, slave_of_id,
cluster_config, volume_type, modules,
locality)

在第24行代码中,instance_tasks在trove.taskmanager.models.py中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):

def create_instance(self, flavor, image_id, databases, users,
datastore_manager, packages, volume_size,
backup_id, availability_zone, root_password, nics,
overrides, cluster_config, snapshot, volume_type,
modules, scheduler_hints):
# It is the caller's responsibility to ensure that
# FreshInstanceTasks.wait_for_instance is called after
# create_instance to ensure that the proper usage event gets sent

LOG.info(_("Creating instance %s."), self.id)
security_groups = None

if CONF.trove_security_groups_support:
try:
security_groups = self._create_secgroup(datastore_manager)
except Exception as e:
msg = (_("Error creating security group for instance: %s") %
self.id)
err = inst_models.InstanceTasks.BUILDING_ERROR_SEC_GROUP
self._log_and_raise(e, msg, err)
else:
LOG.debug("Successfully created security group for "
"instance: %s", self.id)

# 获取发送到guest的文件。(下面会讲到)
files = self.get_injected_files(datastore_manager)

cinder_volume_type = volume_type or CONF.cinder_volume_type

# 根据系统的配置,可以单独调用两种种方法来创建服务器和卷。
# 分别是使用Nova创建实例和卷、单独创建Nova实例和卷。
# 两种方法,配置文件都被一起传入(Nova调用注入)。
if use_nova_server_volume:
volume_info = self._create_server_volume(
flavor['id'],
image_id,
security_groups,
datastore_manager,
volume_size,
availability_zone,
nics,
files,
scheduler_hints)
else:
volume_info = self._create_server_volume_individually(
flavor['id'],
image_id,
security_groups,
datastore_manager,
volume_size,
availability_zone,
nics,
files,
cinder_volume_type,
scheduler_hints)

config = self._render_config(flavor)

backup_info = None
if backup_id is not None:
backup = bkup_models.Backup.get_by_id(self.context, backup_id)
backup_info = {'id': backup_id,
'instance_id': backup.instance_id,
'location': backup.location,
'type': backup.backup_type,
'checksum': backup.checksum,
}

# 最终转换为调用guest agent的prepare()方法。
self._guest_prepare(flavor['ram'], volume_info,
packages, databases, users, backup_info,
config.config_contents, root_password,
overrides,
cluster_config, snapshot, modules)

if root_password:
self.report_root_enabled()

if not self.db_info.task_status.is_error:
self.reset_task_status()

# when DNS is supported, we attempt to add this after the
# instance is prepared. Otherwise, if DNS fails, instances
# end up in a poorer state and there's no tooling around
# re-sending the prepare call; retrying DNS is much easier.
try:
self._create_dns_entry()
except Exception as e:
msg = _("Error creating DNS entry for instance: %s") % self.id
err = inst_models.InstanceTasks.BUILDING_ERROR_DNS
self._log_and_raise(e, msg, err)

get_injected_files()在第28行代码中被调用。该方法是执行创建工作流程中最为关键的部分。其生成的配置文件将在启动时被发送到guest实例,并提供给guest agent后续操作的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 构建files集合。
# 包含了guest_info和trove-guestagent.conf这两个文件。
def get_injected_files(self, datastore_manager):
injected_config_location = CONF.get('injected_config_location')
guest_info = CONF.get('guest_info')

if ('/' in guest_info):
# Set guest_info_file to exactly guest_info from the conf file.
# This should be /etc/guest_info for pre-Kilo compatibility.
guest_info_file = guest_info
else:
guest_info_file = os.path.join(injected_config_location,
guest_info)

files = {guest_info_file: (
"[DEFAULT]\n"
"guest_id=%s\n"
"datastore_manager=%s\n"
"tenant_id=%s\n"
% (self.id, datastore_manager, self.tenant_id))}

instance_key = get_instance_encryption_key(self.id)
if instance_key:
files = {guest_info_file: (
"%s"
"instance_rpc_encr_key=%s\n" % (
files.get(guest_info_file),
instance_key))}

if os.path.isfile(CONF.get('guest_config')):
with open(CONF.get('guest_config'), "r") as f:
files[os.path.join(injected_config_location,
"trove-guestagent.conf")] = f.read()

return files

_create_server_volume在trove.taskmanager.manager.py文件下的_create_instance中被调用(第36行代码)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def _create_server_volume(self, flavor_id, image_id, security_groups,
datastore_manager, volume_size,
availability_zone, nics, files,
scheduler_hints):
LOG.debug("Begin _create_server_volume for id: %s", self.id)
try:

# 准备用户的数据。
userdata = self._prepare_userdata(datastore_manager)
name = self.hostname or self.name
volume_desc = ("datastore volume for %s" % self.id)
volume_name = ("datastore-%s" % self.id)
volume_ref = {'size': volume_size, 'name': volume_name,
'description': volume_desc}
config_drive = CONF.use_nova_server_config_drive

# 调用Nova(公共API)创建一个实例。
server = self.nova_client.servers.create(
name, image_id, flavor_id,
files=files, volume=volume_ref,
security_groups=security_groups,
availability_zone=availability_zone,
nics=nics, config_drive=config_drive,
userdata=userdata, scheduler_hints=scheduler_hints)
server_dict = server._info
LOG.debug("Created new compute instance %(server_id)s "
"for id: %(id)s\nServer response: %(response)s",
{'server_id': server.id, 'id': self.id,
'response': server_dict})

volume_id = None
for volume in server_dict.get('os:volumes', []):
volume_id = volume.get('id')

# Record the server ID and volume ID in case something goes wrong.
self.update_db(compute_instance_id=server.id, volume_id=volume_id)
except Exception as e:
msg = _("Error creating server and volume for "
"instance %s") % self.id
LOG.debug("End _create_server_volume for id: %s", self.id)
err = inst_models.InstanceTasks.BUILDING_ERROR_SERVER
self._log_and_raise(e, msg, err)

device_path = self.device_path
mount_point = CONF.get(datastore_manager).mount_point
volume_info = {'device_path': device_path, 'mount_point': mount_point}
LOG.debug("End _create_server_volume for id: %s", self.id)
return volume_info

_prepare_userdata在上面的第9行被调用。其目的是生成不许传递给Nova的用户数据。

1
2
3
4
5
6
7
8
9
def _prepare_userdata(self, datastore_manager):
userdata = None
cloudinit = os.path.join(CONF.get('cloudinit_location'),
"%s.cloudinit" % datastore_manager)
if os.path.isfile(cloudinit):
with open(cloudinit, "r") as f:
userdata = f.read()
return userdata

由get_injected_files()生成配置文件,根据设定的方式传递到guest agent。

最后,guest agent调用prepare()方法,在trove.guestagent.api.py文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def prepare(self, memory_mb, packages, databases, users,
device_path='/dev/vdb', mount_point='/mnt/volume',
backup_info=None, config_contents=None, root_password=None,
overrides=None, cluster_config=None, snapshot=None,
modules=None):
"""Make an asynchronous call to prepare the guest
as a database container optionally includes a backup id for restores
"""
LOG.debug("Sending the call to prepare the Guest.")

version = self.API_BASE_VERSION

# Taskmanager is a publisher, guestagent is a consumer. Usually
# consumer creates a queue, but in this case we have to make sure
# "prepare" doesn't get lost if for some reason guest was delayed and
# didn't create a queue on time.
# 初始化与guest沟通的进程间的通信(IPC)。
self._create_guest_queue()

packages = packages.split()

# 发送prepare()到队列中。
# 一旦实例启动并连接到这个消息队列,则这个prepare()调用将被
# guest实例上的guest agent获取。
self._cast(
"prepare", version=version, packages=packages,
databases=databases, memory_mb=memory_mb, users=users,
device_path=device_path, mount_point=mount_point,
backup_info=backup_info, config_contents=config_contents,
root_password=root_password, overrides=overrides,
cluster_config=cluster_config, snapshot=snapshot, modules=modules)

prepare()方法在每个guest agent都可以找到实现方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def do_prepare(self, context, packages, databases, memory_mb, users,
device_path, mount_point, backup_info,
config_contents, root_password, overrides,
cluster_config, snapshot):
"""This is called from prepare in the base class."""
app = self.mysql_app(self.mysql_app_status.get())
app.install_if_needed(packages)
if device_path:
# stop and do not update database
app.stop_db(
do_not_start_on_reboot=self.volume_do_not_start_on_reboot)
device = volume.VolumeDevice(device_path)
# unmount if device is already mounted
device.unmount_device(device_path)
device.format()
if os.path.exists(mount_point):
# rsync existing data to a "data" sub-directory
# on the new volume
device.migrate_data(mount_point, target_subdir="data")
# mount the volume
device.mount(mount_point)
operating_system.chown(mount_point, service.MYSQL_OWNER,
service.MYSQL_OWNER,
recursive=False, as_root=True)

LOG.debug("Mounted the volume at %s.", mount_point)
# We need to temporarily update the default my.cnf so that
# mysql will start after the volume is mounted. Later on it
# will be changed based on the config template
# (see MySqlApp.secure()) and restart.
app.set_data_dir(mount_point + '/data')
app.start_mysql()
if backup_info:
self._perform_restore(backup_info, context,
mount_point + "/data", app)
app.secure(config_contents)
enable_root_on_restore = (backup_info and
self.mysql_admin().is_root_enabled())
if enable_root_on_restore:
app.secure_root(secure_remote_root=False)
self.mysql_app_status.get().report_root(context, 'root')
else:
app.secure_root(secure_remote_root=True)

if snapshot:
self.attach_replica(context, snapshot, snapshot['config'])

在prepare()调用完成后,guest数据库就可以使用了。