diff options
author | Arthur Martella <arthur.martella.1@att.com> | 2019-03-15 12:18:35 -0400 |
---|---|---|
committer | Arthur Martella <arthur.martella.1@att.com> | 2019-03-15 12:18:35 -0400 |
commit | 2f66a722dc1a6d4b37218c554199c046719a6873 (patch) | |
tree | f1d8ae7191d3708148f6688bb9e4e95a4fbe466c /engine/src/valet | |
parent | f454f439016bdae1bb6616946a68224b393583da (diff) |
Initial upload of F-GPS seed code 5/21
Includes:
Engine app manager
Change-Id: I551b9338483d94f754d914773191752a19652d48
Issue-ID: OPTFRA-440
Signed-off-by: arthur.martella.1@att.com
Diffstat (limited to 'engine/src/valet')
-rw-r--r-- | engine/src/valet/engine/app_manager/__init__.py | 18 | ||||
-rw-r--r-- | engine/src/valet/engine/app_manager/app.py | 716 | ||||
-rw-r--r-- | engine/src/valet/engine/app_manager/app_handler.py | 307 | ||||
-rw-r--r-- | engine/src/valet/engine/app_manager/app_parser.py | 257 | ||||
-rw-r--r-- | engine/src/valet/engine/app_manager/group.py | 139 | ||||
-rw-r--r-- | engine/src/valet/engine/app_manager/server.py | 171 |
6 files changed, 1608 insertions, 0 deletions
diff --git a/engine/src/valet/engine/app_manager/__init__.py b/engine/src/valet/engine/app_manager/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/engine/app_manager/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/engine/app_manager/app.py b/engine/src/valet/engine/app_manager/app.py new file mode 100644 index 0000000..de6fb8f --- /dev/null +++ b/engine/src/valet/engine/app_manager/app.py @@ -0,0 +1,716 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import copy +import json +import re +import six + +from valet.engine.app_manager.app_parser import Parser +from valet.engine.app_manager.group import Group +from valet.engine.app_manager.server import Server + + +class App(object): + """Container to deliver the status of request.""" + + def __init__(self, _req_id, _use_dha, _logger): + + self.last_req_id = _req_id + + self.datacenter_id = None + + # stack_id given from the platform (e.g., OpenStack) + self.app_id = None + + # stack_name as datacenter_id + ":" + tenant_id + ":" + vf_module_name + self.app_name = None + + self.tenant_id = None + + # Info for group rule scope + self.service_instance_id = None + self.vnf_instance_id = None + self.vnf_instance_name = None + + # Stack resources. key = server's orch_id + self.stack = {} + + self.resource = None + + self.logger = _logger + + self.parser = Parser(self.logger) + + self.groups = {} # all valet groups used in this app + self.servers = {} # requested servers (e.g., VMs) + + # Store prior server info. + self.prior_servers = {} + + self.status = "ok" + + self.state = "plan" + + self.prior_state = "unknown" + + # Check if do not want rollback for create + self.suppress_rollback = False + + # For placement optimization + self.total_CPU = 0 + self.total_mem = 0 + self.total_local_vol = 0 + self.optimization_priority = None + + self.use_dha = _use_dha + + def set_resource(self, _resource): + self.resource = _resource + + def init_for_create(self, _req): + """Validate and init app info based on the given request.""" + + self.state = "create" + + if "datacenter" in _req.keys(): + if "id" in _req["datacenter"].keys(): + if _req["datacenter"]["id"] is not None: + self.datacenter_id = _req["datacenter"]["id"].strip() + else: + self.status = "datacenter_id is None" + return + else: + self.status = "no datacenter_id in request" + return + + if "url" in _req["datacenter"].keys(): + if _req["datacenter"]["url"] is not None: + if not self._match_url(_req["datacenter"]["url"].strip()): + self.status = "mal-formed url" + return + else: + self.status = "url is None" + return + else: + self.status = "no url in request" + return + else: + self.status = "no datacenter info in request" + return + + if "tenant_id" in _req.keys(): + if _req["tenant_id"] is not None: + self.tenant_id = _req["tenant_id"] + else: + self.status = "tenant_id is None" + return + else: + self.status = "no tenant_id in request" + return + + if "service_instance_id" in _req.keys(): + if _req["service_instance_id"] is not None: + self.service_instance_id = _req["service_instance_id"] + else: + self.status = "service_id is None" + return + else: + self.status = "no service_instance_id in request" + return + + if "vnf_instance_id" in _req.keys(): + if _req["vnf_instance_id"] is not None: + self.vnf_instance_id = _req["vnf_instance_id"] + else: + self.status = "vnf_id is None" + return + else: + self.status = "no vnf_instance_id in request" + return + + if "vnf_instance_name" in _req.keys(): + if _req["vnf_instance_name"] is not None: + self.vnf_instance_name = _req["vnf_instance_name"] + else: + self.status = "vnf_name is None" + return + else: + self.status = "no vnf_instance_name in request" + return + + if "stack_name" in _req.keys(): + if _req["stack_name"] is not None: + self.app_name = self.datacenter_id + ":" + self.tenant_id + ":" + _req["stack_name"].strip() + else: + self.status = "stack_name is None" + return + else: + self.status = "no stack_name in request" + return + + if "stack" in _req.keys(): + self.stack = self._validate_stack(_req["stack"]) + else: + self.status = "no stack in request" + return + + def init_for_delete(self, _req): + """Validate and init app info for marking delete.""" + + self.state = "delete" + + if "datacenter" in _req.keys(): + if "id" in _req["datacenter"].keys(): + if _req["datacenter"]["id"] is not None: + self.datacenter_id = _req["datacenter"]["id"].strip() + else: + self.status = "datacenter_id is None" + return + else: + self.status = "no datacenter_id in request" + return + else: + self.status = "no datacenter info in request" + return + + if "tenant_id" in _req.keys(): + if _req["tenant_id"] is not None: + self.tenant_id = _req["tenant_id"] + else: + self.status = "tenant_id is None" + return + else: + self.status = "no tenant_id in request" + return + + if "stack_name" in _req.keys(): + if _req["stack_name"] is not None: + self.app_name = self.datacenter_id + ":" + self.tenant_id + ":" + _req["stack_name"].strip() + else: + self.status = "stack_name is None" + return + else: + self.status = "no stack_name in request" + return + + def init_for_confirm(self, _req): + """Validate and init app info for confirm.""" + + # Tempoary state and will depends on prior state + self.state = "confirm" + + if "stack_id" in _req.keys(): + if _req["stack_id"] is not None: + stack_id = _req["stack_id"].strip() + + stack_id_elements = stack_id.split('/', 1) + if len(stack_id_elements) > 1: + self.app_id = stack_id_elements[1] + else: + self.app_id = stack_id + + self.logger.debug("stack_id = " + self.app_id) + + else: + self.status = "null stack_id in request" + return + else: + self.status = "no stack_id in request" + return + + def init_for_rollback(self, _req): + """Validate and init app info for rollback.""" + + # Tempoary state and will depends on prior state + self.state = "rollback" + + if "stack_id" in _req.keys(): + if _req["stack_id"] is not None: + stack_id = _req["stack_id"].strip() + + stack_id_elements = stack_id.split('/', 1) + if len(stack_id_elements) > 1: + self.app_id = stack_id_elements[1] + else: + self.app_id = stack_id + + self.logger.debug("stack_id = " + self.app_id) + else: + # If the stack fails, stack_id can be null. + self.app_id = "none" + + self.logger.debug("stack_id = None") + else: + self.status = "no stack_id in request" + return + + if "suppress_rollback" in _req.keys(): + self.suppress_rollback = _req["suppress_rollback"] + + if "error_message" in _req.keys(): + # TODO(Gueyoung): analyze the error message. + + if _req["error_message"] is None: + self.logger.warning("error message from platform: none") + else: + self.logger.warning("error message from platform:" + _req["error_message"]) + + def init_prior_app(self, _prior_app): + """Init with the prior app info.""" + + self.datacenter_id = _prior_app.get("datacenter") + + self.app_name = _prior_app.get("stack_name") + + if _prior_app["uuid"] != "none": + # Delete case. + self.app_id = _prior_app.get("uuid") + + self.tenant_id = _prior_app.get("tenant_id") + + metadata = json.loads(_prior_app.get("metadata")) + self.service_instance_id = metadata.get("service_instance_id") + self.vnf_instance_id = metadata.get("vnf_instance_id") + self.vnf_instance_name = metadata.get("vnf_instance_name") + + self.servers = json.loads(_prior_app.get("servers")) + + prior_state = _prior_app.get("state") + + if self.state == "confirm": + if prior_state == "create": + self.state = "created" + elif prior_state == "delete": + self.state = "deleted" + elif self.state == "rollback": + if prior_state == "create": + if self.suppress_rollback and self.app_id != "none": + self.state = "created" + else: + self.state = "deleted" + + if prior_state == "delete": + self.state = "created" + elif self.state == "delete": + self.prior_state = prior_state + self.prior_servers = copy.deepcopy(self.servers) + else: + self.status = "unknown state" + + def _validate_stack(self, _stack): + """Check if the stack is for Valet to make decision. + + And, check if the format is correct. + """ + + if len(_stack) == 0 or "resources" not in _stack.keys(): + self.status = "na: no resource in stack" + self.logger.warning("non-applicable to valet: no resource in stack") + return {} + + stack = {} + + for rk, r in _stack["resources"].iteritems(): + if "type" not in r.keys(): + self.status = "type key is missing in stack" + return None + + if r["type"] == "OS::Nova::Server": + if "properties" not in r.keys(): + self.status = "properties key is missing in stack" + return None + + if "name" not in r["properties"].keys(): + self.status = "name property is missing in stack" + return None + + if r["properties"]["name"] is None: + self.status = "name property is none" + return None + + if "flavor" not in r["properties"].keys(): + self.status = "flavor property is missing in stack" + return None + + if r["properties"]["flavor"] is None: + self.status = "flavor property is none" + return None + + stack[rk] = r + + if len(stack) == 0: + self.status = "na: no server resource in stack" + self.logger.warning("non-applicable to valet: no server resource in stack") + return {} + + first_resource = stack[stack.keys()[0]] + apply_valet = False + + # To apply Valet decision, availability_zone must exist. + # And its value contains host variable as a list element. + if "availability_zone" in first_resource["properties"].keys(): + az_value = first_resource["properties"]["availability_zone"] + if isinstance(az_value, list): + apply_valet = True + + for rk, r in stack.iteritems(): + if apply_valet: + if "availability_zone" not in r["properties"].keys(): + self.status = "az is missing in stack for valet" + return None + else: + az_value = r["properties"]["availability_zone"] + if not isinstance(az_value, list): + self.status = "host variable is missing in stack for valet" + return None + + if az_value[0] in ("none", "None") or az_value[1] in ("none", "None"): + self.status = "az value is missing in stack" + return None + else: + if "availability_zone" in r["properties"].keys(): + az_value = r["properties"]["availability_zone"] + if isinstance(az_value, list): + self.status = "host variable exists in stack for non-valet application" + return None + + if not apply_valet: + self.status = "na: pass valet" + self.logger.warning("non-applicable to valet") + return {} + else: + return stack + + def init_valet_groups(self): + """Create Valet groups from input request.""" + + for rk, r in self.stack.iteritems(): + properties = r.get("properties", {}) + metadata = properties.get("metadata", {}) + + if len(metadata) > 0: + valet_rules = metadata.get("valet_groups", None) + + if valet_rules is not None and valet_rules != "": + rule_list = [] + if isinstance(valet_rules, six.string_types): + rules = valet_rules.split(",") + for gr in rules: + rule_list.append(gr.strip()) + else: + self.status = "incorrect valet group metadata format" + self.logger.error(self.status) + return + + # Check rule validation of valet_groups. + self.status = self.resource.check_valid_rules(self.tenant_id, + rule_list, + use_ex=self.use_dha) + if self.status != "ok": + self.logger.error(self.status) + return + + self.status = self._make_valet_groups(properties.get("name"), + properties["availability_zone"][0], + rule_list) + if self.status != "ok": + self.logger.error(self.status) + return + + # Check and create server groups if they do not exist. + scheduler_hints = properties.get("scheduler_hints", {}) + if len(scheduler_hints) > 0: + for hint_key in scheduler_hints.keys(): + if hint_key == "group": + hint = scheduler_hints[hint_key] + self.status = self._make_group(properties.get("name"), hint) + if self.status != "ok": + self.logger.error(self.status) + return + + def _make_valet_groups(self, _rk, _az, _rule_list): + """Create Valet groups that each server is involved.""" + + for rn in _rule_list: + rule = self.resource.group_rules[rn] + + # Valet group naming convention. + # It contains datacenter id and availability_zone + # followed by service id and vnf id + # depending on scope. + # And concatenate rule name. + # Exception: quorum-diversity + + group_id = self.datacenter_id + ":" + + if rule.rule_type != "quorum-diversity": + group_id += _az + ":" + + if rule.app_scope == "lcp": + group_id += rn + elif rule.app_scope == "service": + group_id += self.service_instance_id + ":" + rn + elif rule.app_scope == "vnf": + group_id += self.service_instance_id + ":" + self.vnf_instance_id + ":" + rn + else: + return "unknown app_scope value" + + if group_id in self.groups.keys(): + group = self.groups[group_id] + else: + group = Group(group_id) + group.group_type = rule.rule_type + group.factory = "valet" + group.level = rule.level + + self.groups[group_id] = group + + group.server_list.append(self.app_name + ":" + _rk) + + return "ok" + + def _make_group(self, _rk, _group_hint): + """Create the group request based on scheduler hint.""" + + if isinstance(_group_hint, dict): + # _group_hint is a single key/value pair + g = _group_hint[_group_hint.keys()[0]] + + r_type = g.get("type", "none") + if r_type != "OS::Nova::ServerGroup": + return "support only ServerGroup resource" + + properties = g.get("properties", {}) + if len(properties) == 0: + return "no properties" + + group_name = properties.get("name", None) + if group_name is None: + return "no group name" + group_name = group_name.strip() + + policies = properties.get("policies", []) + if len(policies) == 0: + return "no policy of the group" + + if len(policies) > 1: + return "multiple policies" + + # TODO: exclude soft-affinity and soft-anti-affinity? + + if group_name in self.groups.keys(): + group = self.groups[group_name] + else: + group = Group(group_name) + + policy = policies[0].strip() + if policy == "anti-affinity": + group_type = "diversity" + else: + group_type = policy + + group.group_type = group_type + group.factory = "server-group" + group.level = "host" + + self.groups[group_name] = group + else: + # group hint is uuid string. + rg = self.resource.get_group_by_uuid(_group_hint) + if rg is None: + return "unknown group found while making group" + + # TODO: exclude soft-affinity and soft-anti-affinity? + + if rg.name in self.groups.keys(): + group = self.groups[rg.name] + else: + group = Group(rg.name) + + group.group_type = rg.group_type + group.factory = rg.factory + group.level = "host" + + self.groups[rg.name] = group + + if group is not None: + group.server_list.append(self.app_name + ":" + _rk) + + return "ok" + + def parse(self): + """Extract servers and merge groups from stack for search.""" + + (self.servers, self.groups) = self.parser.set_servers(self.app_name, + self.stack, + self.groups) + + if len(self.servers) == 0 and len(self.groups) == 0: + self.status = "parse error for " + self.app_name + ": " + self.parser.status + return False + + return True + + def set_weight(self): + """Set relative weight of each servers and groups.""" + + for _, s in self.servers.iteritems(): + self._set_server_weight(s) + + for _, g in self.groups.iteritems(): + self._set_server_weight(g) + + for _, g in self.groups.iteritems(): + self._set_group_resource(g) + + for _, g in self.groups.iteritems(): + self._set_group_weight(g) + + def _set_server_weight(self, _v): + """Set relative weight of each server against available resource amount.""" + + if isinstance(_v, Group): + for _, sg in _v.subgroups.iteritems(): + self._set_server_weight(sg) + else: + if self.resource.CPU_avail > 0: + _v.vCPU_weight = float(_v.vCPUs) / float(self.resource.CPU_avail) + else: + _v.vCPU_weight = 1.0 + self.total_CPU += _v.vCPUs + + if self.resource.mem_avail > 0: + _v.mem_weight = float(_v.mem) / float(self.resource.mem_avail) + else: + _v.mem_weight = 1.0 + self.total_mem += _v.mem + + if self.resource.local_disk_avail > 0: + _v.local_volume_weight = float(_v.local_volume_size) / float(self.resource.local_disk_avail) + else: + if _v.local_volume_size > 0: + _v.local_volume_weight = 1.0 + else: + _v.local_volume_weight = 0.0 + self.total_local_vol += _v.local_volume_size + + def _set_group_resource(self, _g): + """Sum up amount of resources of servers for each affinity group.""" + + if isinstance(_g, Server): + return + + for _, sg in _g.subgroups.iteritems(): + self._set_group_resource(sg) + _g.vCPUs += sg.vCPUs + _g.mem += sg.mem + _g.local_volume_size += sg.local_volume_size + + def _set_group_weight(self, _group): + """Set relative weight of each affinity group against available resource amount.""" + + if self.resource.CPU_avail > 0: + _group.vCPU_weight = float(_group.vCPUs) / float(self.resource.CPU_avail) + else: + if _group.vCPUs > 0: + _group.vCPU_weight = 1.0 + else: + _group.vCPU_weight = 0.0 + + if self.resource.mem_avail > 0: + _group.mem_weight = float(_group.mem) / float(self.resource.mem_avail) + else: + if _group.mem > 0: + _group.mem_weight = 1.0 + else: + _group.mem_weight = 0.0 + + if self.resource.local_disk_avail > 0: + _group.local_volume_weight = float(_group.local_volume_size) / float(self.resource.local_disk_avail) + else: + if _group.local_volume_size > 0: + _group.local_volume_weight = 1.0 + else: + _group.local_volume_weight = 0.0 + + for _, sg in _group.subgroups.iteritems(): + if isinstance(sg, Group): + self._set_group_weight(sg) + + def set_optimization_priority(self): + """Determine the optimization priority among different types of resources.""" + + if len(self.groups) == 0 and len(self.servers) == 0: + return + + if self.resource.CPU_avail > 0: + app_cpu_weight = float(self.total_CPU) / float(self.resource.CPU_avail) + else: + if self.total_CPU > 0: + app_cpu_weight = 1.0 + else: + app_cpu_weight = 0.0 + + if self.resource.mem_avail > 0: + app_mem_weight = float(self.total_mem) / float(self.resource.mem_avail) + else: + if self.total_mem > 0: + app_mem_weight = 1.0 + else: + app_mem_weight = 0.0 + + if self.resource.local_disk_avail > 0: + app_local_vol_weight = float(self.total_local_vol) / float(self.resource.local_disk_avail) + else: + if self.total_local_vol > 0: + app_local_vol_weight = 1.0 + else: + app_local_vol_weight = 0.0 + + opt = [("cpu", app_cpu_weight), + ("mem", app_mem_weight), + ("lvol", app_local_vol_weight)] + + self.optimization_priority = sorted(opt, key=lambda resource: resource[1], reverse=True) + + def reset_servers(self): + """Get servers back from containers (i.e., affinity groups)""" + + servers = [] + for _, g in self.groups.iteritems(): + g.get_servers(servers) + + for s in servers: + self.servers[s.vid] = s + + def _match_url(self, _url): + """Check if the URL is a correct form.""" + + regex = re.compile( + r'^(?:http|ftp)s?://' # http:// or https:// + r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain + r'localhost|' # localhost + r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip + r'(?::\d+)?' # optional port + r'(?:/?|[/?]\S+)$', re.IGNORECASE) + + if re.match(regex, _url): + return True + else: + return False diff --git a/engine/src/valet/engine/app_manager/app_handler.py b/engine/src/valet/engine/app_manager/app_handler.py new file mode 100644 index 0000000..14ef35c --- /dev/null +++ b/engine/src/valet/engine/app_manager/app_handler.py @@ -0,0 +1,307 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import operator +import time + +from valet.engine.app_manager.app import App + + +class AppHistory(object): + """Data container for scheduling decisions.""" + + def __init__(self, _key): + self.decision_key = _key + self.status = None + self.result = None + self.timestamp = None + + +class AppHandler(object): + """Handler for all requested applications.""" + + def __init__(self, _dbh, _use_dha, _logger): + self.dbh = _dbh + + self.decision_history = {} + self.max_decision_history = 5000 + self.min_decision_history = 1000 + + self.use_dha = _use_dha + + self.logger = _logger + + def validate_for_create(self, _req_id, _req): + """Validate create request and return app.""" + + app = App(_req_id, self.use_dha, self.logger) + app.init_for_create(_req) + + if app.status != "ok" and not app.status.startswith("na:"): + self.logger.error(app.status) + else: + self.logger.info("got 'create' for app = " + app.app_name) + + return app + + def validate_for_update(self, _req_id, _req): + """Validate update request and return app.""" + + app = App(_req_id, self.use_dha, self.logger) + + # Use create validation module. + app.init_for_create(_req) + app.state = "update" + + if app.status != "ok" and not app.status.startswith("na:"): + self.logger.error(app.status) + else: + self.logger.info("got 'update' for app = " + app.app_name) + + return app + + def validate_for_delete(self, _req_id, _req): + """Validate delete request and return app.""" + + app = App(_req_id, self.use_dha, self.logger) + app.init_for_delete(_req) + + if app.status != "ok": + self.logger.error(app.status) + return app + + prior_app = self.dbh.get_stack(app.app_name) + if prior_app is None: + return None + + if len(prior_app) == 0: + app.status = "na: no prior request via valet" + return app + + # Once taking prior app, valet deterimnes placements or error. + app.init_prior_app(prior_app) + + self.logger.info("got 'delete' for app = " + app.app_name) + + return app + + def validate_for_confirm(self, _req_id, _req): + """Validate confirm request and return app.""" + + app = App(_req_id, self.use_dha, self.logger) + app.init_for_confirm(_req) + + if app.status != "ok": + self.logger.error(app.status) + return app + + stack_id_map = self.dbh.get_stack_id_map(app.last_req_id) + if stack_id_map is None: + return None + + if len(stack_id_map) == 0: + app.status = "na: not handled request via valet" + return app + + prior_app = self.dbh.get_stack(stack_id_map.get("stack_id")) + if prior_app is None: + return None + + if len(prior_app) == 0: + app.status = "cannot find prior stack info" + return app + + # Once taking prior app, valet deterimnes placements or error. + app.init_prior_app(prior_app) + + self.logger.info("got 'confirm' for app = " + app.app_name) + + return app + + def validate_for_rollback(self, _req_id, _req): + """Validate rollback request and return app.""" + + app = App(_req_id, self.use_dha, self.logger) + app.init_for_rollback(_req) + + if app.status != "ok": + self.logger.error(app.status) + return app + + stack_id_map = self.dbh.get_stack_id_map(app.last_req_id) + if stack_id_map is None: + return None + + if len(stack_id_map) == 0: + app.status = "na: not handled request via valet" + return app + + prior_app = self.dbh.get_stack(stack_id_map.get("stack_id")) + if prior_app is None: + return None + + if len(prior_app) == 0: + app.status = "cannot find prior stack info" + return app + + # Once taking prior app, valet deterimnes placements or error. + app.init_prior_app(prior_app) + + self.logger.info("got 'rollback' for app = " + app.app_name) + + return app + + def set_for_create(self, _app): + """Set for stack-creation request.""" + + # Set Valet groups. + _app.init_valet_groups() + if _app.status != "ok": + return + + # Set flavor properties for each server. + for rk, r in _app.stack.iteritems(): + if "vcpus" not in r["properties"].keys(): + flavor = _app.resource.get_flavor(r["properties"]["flavor"]) + + if flavor is None: + _app.status = "fail to get flavor details" + self.logger.error(_app.status) + return + + if flavor.status != "enabled": + # TODO(Gueyoung): what to do if flavor is disabled? + self.logger.warning("disabled flavor = " + flavor.name) + + r["properties"]["vcpus"] = flavor.vCPUs + r["properties"]["mem"] = flavor.mem_cap + r["properties"]["local_volume"] = flavor.disk_cap + + if len(flavor.extra_specs) > 0: + extra_specs = {} + for mk, mv in flavor.extra_specs.iteritems(): + extra_specs[mk] = mv + r["properties"]["extra_specs"] = extra_specs + + # Set servers. + # Once parsing app, valet deterimnes placements or error. + if not _app.parse(): + self.logger.error(_app.status) + return + + return + + def set_for_update(self, _app): + """Set for stack-update request.""" + + # Set servers. + # Once parsing app, valet deterimnes placements or error. + if not _app.parse(): + self.logger.error(_app.status) + return + + # Skip stack-update and rely on platform at this version. + _app.status = "na:update: pass stack-update" + + return + + def check_history(self, _req_id): + """Check if the request is determined already.""" + + if _req_id in self.decision_history.keys(): + status = self.decision_history[_req_id].status + result = self.decision_history[_req_id].result + return status, result + else: + return None, None + + def record_history(self, _req_id, _status, _result): + """Record an app placement decision.""" + + if _req_id not in self.decision_history.keys(): + if len(self.decision_history) > self.max_decision_history: + self._flush_decision_history() + + app_history = AppHistory(_req_id) + app_history.status = _status + app_history.result = _result + app_history.timestamp = time.time() + + self.decision_history[_req_id] = app_history + + def _flush_decision_history(self): + """Unload app placement decisions.""" + + count = 0 + num_of_removes = len(self.decision_history) - self.min_decision_history + + remove_item_list = [] + for decision in (sorted(self.decision_history.values(), key=operator.attrgetter('timestamp'))): # type: AppHistory + remove_item_list.append(decision.decision_key) + count += 1 + if count == num_of_removes: + break + + for dk in remove_item_list: + del self.decision_history[dk] + + def store_app(self, _app): + """Store new app or update existing app.""" + + if _app.state == "create": + metadata = {"service_instance_id": _app.service_instance_id, "vnf_instance_id": _app.vnf_instance_id, + "vnf_instance_name": _app.vnf_instance_name} + + servers = {} + for sk, s in _app.servers.iteritems(): + servers[sk] = s.get_json_info() + + if not self.dbh.create_stack(_app.app_name, + _app.status, + _app.datacenter_id, _app.app_name, "none", + _app.tenant_id, metadata, + servers, {}, + _app.state, "none"): + return False + elif _app.state in ("delete", "created"): + metadata = {"service_instance_id": _app.service_instance_id, "vnf_instance_id": _app.vnf_instance_id, + "vnf_instance_name": _app.vnf_instance_name} + + if not self.dbh.update_stack(_app.app_name, + _app.status, + _app.datacenter_id, _app.app_name, _app.app_id, + _app.tenant_id, metadata, + _app.servers, _app.prior_servers, + _app.state, _app.prior_state): + return False + elif _app.state == "deleted": + if not self.dbh.delete_stack(_app.app_name): + return False + else: + self.logger.error("unknown operaton") + return False + + # To manage the map between request_id and Heat stack requested + if _app.state in ("create", "delete"): + if not self.dbh.create_stack_id_map(_app.last_req_id, _app.app_name): + return False + elif _app.state in ("created", "deleted"): + if not self.dbh.delete_stack_id_map(_app.last_req_id): + return False + + return True diff --git a/engine/src/valet/engine/app_manager/app_parser.py b/engine/src/valet/engine/app_manager/app_parser.py new file mode 100644 index 0000000..9e6336b --- /dev/null +++ b/engine/src/valet/engine/app_manager/app_parser.py @@ -0,0 +1,257 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import six + +from valet.engine.app_manager.group import Group, LEVEL +from valet.engine.app_manager.server import Server + + +class Parser(object): + + def __init__(self, _logger): + self.logger = _logger + + self.status = "ok" + + def set_servers(self, _app_name, _stack, _groups): + """Parse stack resources to set servers (e.g., VMs) for search.""" + + servers = {} + + for rk, r in _stack.iteritems(): + properties = r.get("properties") + + server_name = properties["name"].strip() + server_id = _app_name + ":" + server_name + + server = Server(server_id, rk) + + server.name = server_name + + flavor_id = properties.get("flavor") + if isinstance(flavor_id, six.string_types): + server.flavor = flavor_id.strip() + else: + server.flavor = str(flavor_id) + + image_id = properties.get("image", None) + if image_id is not None: + if isinstance(image_id, six.string_types): + server.image = image_id.strip() + else: + server.image = str(image_id) + + if "vcpus" in properties.keys(): + server.vCPUs = int(properties.get("vcpus")) + server.mem = int(properties.get("mem")) + server.local_volume_size = int(properties.get("local_volume")) + + ess = properties.get("extra_specs", {}) + if len(ess) > 0: + extra_specs = {} + for mk, mv in ess.iteritems(): + extra_specs[mk] = mv + server.extra_specs_list.append(extra_specs) + + az = properties.get("availability_zone", None) + if az is not None: + server.availability_zone = az[0].strip() + server.host_assignment_variable = az[1].strip() + if len(az) == 3: + server.host_assignment_inx = az[2] + + servers[server_id] = server + + if self._merge_diversity_groups(_groups, servers) is False: + self.status = "fail while merging diversity groups" + return {}, {} + if self._merge_quorum_diversity_groups(_groups, servers) is False: + self.status = "fail while merging quorum-diversity groups" + return {}, {} + if self._merge_exclusivity_groups(_groups, servers) is False: + self.status = "fail while merging exclusivity groups" + return {}, {} + if self._merge_affinity_groups(_groups, servers) is False: + self.status = "fail while merging affinity groups" + return {}, {} + + # To delete all exclusivity and diversity groups after merging + groups = {gk: g for gk, g in _groups.iteritems() if g.group_type == "affinity"} + + return servers, groups + + def _merge_diversity_groups(self, _groups, _servers): + """Merge diversity sub groups.""" + + for level in LEVEL: + for gk, group in _groups.iteritems(): + if group.level == level and group.group_type == "diversity": + for sk in group.server_list: + if sk in _servers.keys(): + group.subgroups[sk] = _servers[sk] + _servers[sk].diversity_groups[group.vid] = group + else: + self.status = "invalid server = " + sk + " in group = " + group.vid + return False + + return True + + def _merge_quorum_diversity_groups(self, _groups, _servers): + """Merge quorum-diversity sub groups.""" + + for level in LEVEL: + for gk, group in _groups.iteritems(): + if group.level == level and group.group_type == "quorum-diversity": + for sk in group.server_list: + if sk in _servers.keys(): + group.subgroups[sk] = _servers[sk] + _servers[sk].quorum_diversity_groups[group.vid] = group + else: + self.status = "invalid server = " + sk + " in group = " + group.vid + return False + + return True + + def _merge_exclusivity_groups(self, _groups, _servers): + """Merge exclusivity sub groups.""" + + for level in LEVEL: + for gk, group in _groups.iteritems(): + if group.level == level and group.group_type == "exclusivity": + for sk in group.server_list: + if sk in _servers.keys(): + group.subgroups[sk] = _servers[sk] + _servers[sk].exclusivity_groups[group.vid] = group + else: + self.status = "invalid server = " + sk + " in group = " + group.vid + return False + + return True + + def _merge_affinity_groups(self, _groups, _servers): + """Merge affinity subgroups.""" + + # To track each server's or group's parent group (i.e., affinity) + affinity_map = {} + + # To make cannonical order of groups + group_list = [gk for gk in _groups.keys()] + group_list.sort() + + for level in LEVEL: + for gk in group_list: + if gk in _groups.keys(): + if _groups[gk].level == level and _groups[gk].group_type == "affinity": + group = _groups[gk] + else: + continue + else: + continue + + group.server_list.sort() + + for sk in group.server_list: + if sk in _servers.keys(): + self._merge_server(group, sk, _servers, affinity_map) + else: + if sk not in affinity_map.keys(): + self.status = "invalid server = " + sk + " in group = " + group.vid + return False + + # If server belongs to the other group already, + # take the group as a subgroup of this group + if affinity_map[sk].vid != group.vid: + if group.is_parent_affinity(sk): + self._set_implicit_grouping(sk, group, affinity_map, _groups) + + return True + + def _merge_server(self, _group, _sk, _servers, _affinity_map): + """Merge a server into the group.""" + + _group.subgroups[_sk] = _servers[_sk] + _servers[_sk].surgroup = _group + _affinity_map[_sk] = _group + + self._add_implicit_diversity_groups(_group, _servers[_sk].diversity_groups) + self._add_implicit_quorum_diversity_groups(_group, _servers[_sk].quorum_diversity_groups) + self._add_implicit_exclusivity_groups(_group, _servers[_sk].exclusivity_groups) + self._add_memberships(_group, _servers[_sk]) + + del _servers[_sk] + + def _add_implicit_diversity_groups(self, _group, _diversity_groups): + """Add subgroup's diversity groups.""" + + for dk, div_group in _diversity_groups.iteritems(): + if LEVEL.index(div_group.level) >= LEVEL.index(_group.level): + _group.diversity_groups[dk] = div_group + + def _add_implicit_quorum_diversity_groups(self, _group, _quorum_diversity_groups): + """Add subgroup's quorum diversity groups.""" + + for dk, div_group in _quorum_diversity_groups.iteritems(): + if LEVEL.index(div_group.level) >= LEVEL.index(_group.level): + _group.quorum_diversity_groups[dk] = div_group + + def _add_implicit_exclusivity_groups(self, _group, _exclusivity_groups): + """Add subgroup's exclusivity groups.""" + + for ek, ex_group in _exclusivity_groups.iteritems(): + if LEVEL.index(ex_group.level) >= LEVEL.index(_group.level): + _group.exclusivity_groups[ek] = ex_group + + def _add_memberships(self, _group, _v): + """Add subgroups's host-aggregates and AZs.""" + + for extra_specs in _v.extra_specs_list: + _group.extra_specs_list.append(extra_specs) + + if isinstance(_v, Server): + if _v.availability_zone is not None: + if _v.availability_zone not in _group.availability_zone_list: + _group.availability_zone_list.append(_v.availability_zone) + + if isinstance(_v, Group): + for az in _v.availability_zone_list: + if az not in _group.availability_zone_list: + _group.availability_zone_list.append(az) + + def _set_implicit_grouping(self, _vk, _g, _affinity_map, _groups): + """Take server's most top parent as a child group of this group _g.""" + + tg = _affinity_map[_vk] # Where _vk currently belongs to + + if tg.vid in _affinity_map.keys(): # If the parent belongs to the other parent group + self._set_implicit_grouping(tg.vid, _g, _affinity_map, _groups) + else: + if LEVEL.index(tg.level) > LEVEL.index(_g.level): + tg.level = _g.level + + if _g.is_parent_affinty(tg.vid): + _g.subgroups[tg.vid] = tg + tg.surgroup = _g + _affinity_map[tg.vid] = _g + + self._add_implicit_diversity_groups(_g, tg.diversity_groups) + self._add_implicit_quorum_diversity_groups(_g, tg.quorum_diversity_groups) + self._add_implicit_exclusivity_groups(_g, tg.exclusivity_groups) + self._add_memberships(_g, tg) + + del _groups[tg.vid] diff --git a/engine/src/valet/engine/app_manager/group.py b/engine/src/valet/engine/app_manager/group.py new file mode 100644 index 0000000..69c9339 --- /dev/null +++ b/engine/src/valet/engine/app_manager/group.py @@ -0,0 +1,139 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import six + + +LEVEL = ["host", "rack", "cluster"] + +FLAVOR_TYPES = ["gv", "nv", "nd", "ns", "ss"] + + +class Group(object): + """Container to keep requested valet groups. + + TODO(Gueyoung): create parent class to make common functions + """ + + def __init__(self, _id): + """Define group info and parameters.""" + + # ID consists of + # datacenter_id + [service_instance_id] + [vnf_instance_id] + rule_name + self.vid = _id + + # Type is either affinity, diversity, quorum_diversity, or exclusivity + self.group_type = None + + self.factory = None + + # Level is host or rack + self.level = None + + # To build containment tree + self.surgroup = None # parent 'affinity' object + self.subgroups = {} # child affinity group (or server) objects + + # Group objects of this container + self.diversity_groups = {} + self.quorum_diversity_groups = {} + self.exclusivity_groups = {} + + self.availability_zone_list = [] + + self.extra_specs_list = [] + + self.vCPUs = 0 + self.mem = 0 # MB + self.local_volume_size = 0 # GB + + self.vCPU_weight = -1 + self.mem_weight = -1 + self.local_volume_weight = -1 + + # To record which servers (e.g., VMs) in given request are assigned + # to this group. + self.server_list = [] + + self.sort_base = -1 + + def get_exclusivities(self, _level): + """Return exclusivity group requested with a level (host or rack). + + Note: each affinity group must have a single exclusivity group of the level. + """ + + exclusivities = {} + + for exk, group in self.exclusivity_groups.iteritems(): + if group.level == _level: + exclusivities[exk] = group + + return exclusivities + + def need_numa_alignment(self): + """Check if this server requires NUMA alignment.""" + + if len(self.extra_specs_list) > 0: + for es in self.extra_specs_list: + for key, req in six.iteritems(es): + if key == "hw:numa_nodes" and req == 1: + return True + + return False + + def is_parent_affinity(self, _vk): + """Check recursively if _vk is located in the group.""" + + exist = False + + for sgk, sg in self.subgroups.iteritems(): + if sgk == _vk: + exist = True + break + + if isinstance(sg, Group): + if sg.is_parent_affinity(_vk): + exist = True + break + + return exist + + def get_servers(self, _servers): + """Get all child servers.""" + + for _, sg in self.subgroups.iteritems(): + if isinstance(sg, Group): + sg.get_servers(_servers) + else: + if sg not in _servers: + _servers.append(sg) + + def get_flavor_types(self): + flavor_type_list = [] + + for extra_specs in self.extra_specs_list: + for k, v in extra_specs.iteritems(): + k_elements = k.split(':') + if len(k_elements) > 1: + if k_elements[0] == "aggregate_instance_extra_specs": + if k_elements[1].lower() in FLAVOR_TYPES: + if v == "true": + flavor_type_list.append(k_elements[1]) + + return flavor_type_list diff --git a/engine/src/valet/engine/app_manager/server.py b/engine/src/valet/engine/app_manager/server.py new file mode 100644 index 0000000..9052285 --- /dev/null +++ b/engine/src/valet/engine/app_manager/server.py @@ -0,0 +1,171 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import six + +from valet.engine.app_manager.group import FLAVOR_TYPES + + +class Server(object): + """Container to keep requested server (e.g., VM).""" + + def __init__(self, _id, _orch_id): + """Define server info and parameters.""" + + # ID consists of stack_name + ":" + server_name, + # where stack_name = datacenter_id + ":" + tenant_id + ":" + vf_module_name + self.vid = _id + + # ID in stack + self.orch_id = _orch_id + + # one given in Heat stack + self.name = None + + # Affinity group object + self.surgroup = None + + self.diversity_groups = {} + self.quorum_diversity_groups = {} + self.exclusivity_groups = {} + + self.availability_zone = None + + self.flavor = None + self.image = None + + self.vCPUs = 0 + self.mem = 0 # MB + self.local_volume_size = 0 # GB + self.extra_specs_list = [] + + self.vCPU_weight = -1 + self.mem_weight = -1 + self.local_volume_weight = -1 + + # To return placement. + # If stack is nested, should point the index to distinguish + self.host_assignment_variable = None + self.host_assignment_inx = -1 + + # Placement result + self.host_group = None # e.g., rack + self.host = None + self.numa = None + + # Request state is 'create', 'migrate', 'rebuild', 'delete' + # 'created', 'migrated', 'rebuilt' + self.state = "plan" + + # To inform if the current placement violates rules and requirements + self.status = "valid" + + self.sort_base = -1 + + def get_exclusivities(self, _level): + """Return exclusivity group requested with a level (host or rack). + + Note: each server must have a single exclusivity group of the level. + """ + + exclusivities = {} + + for exk, group in self.exclusivity_groups.iteritems(): + if group.level == _level: + exclusivities[exk] = group + + return exclusivities + + def need_numa_alignment(self): + """Check if this server requires NUMA alignment.""" + + if len(self.extra_specs_list) > 0: + for es in self.extra_specs_list: + for key, req in six.iteritems(es): + if key == "hw:numa_nodes" and int(req) == 1: + return True + + return False + + def get_flavor_types(self): + flavor_type_list = [] + + for extra_specs in self.extra_specs_list: + for k, v in extra_specs.iteritems(): + k_elements = k.split(':') + if len(k_elements) > 1: + if k_elements[0] == "aggregate_instance_extra_specs": + if k_elements[1].lower() in FLAVOR_TYPES: + if v == "true": + flavor_type_list.append(k_elements[1]) + + return flavor_type_list + + def get_json_info(self): + """Format server info as JSON.""" + + if self.surgroup is None: + surgroup_id = "none" + else: + surgroup_id = self.surgroup.vid + + diversity_groups = [] + for divk in self.diversity_groups.keys(): + diversity_groups.append(divk) + + quorum_diversity_groups = [] + for divk in self.quorum_diversity_groups.keys(): + quorum_diversity_groups.append(divk) + + exclusivity_groups = [] + for exk in self.exclusivity_groups.keys(): + exclusivity_groups.append(exk) + + if self.availability_zone is None: + availability_zone = "none" + else: + availability_zone = self.availability_zone + + if self.host_group is None: + host_group = "none" + else: + host_group = self.host_group + + if self.numa is None: + numa = "none" + else: + numa = self.numa + + return {'name': self.name, + 'orch_id': self.orch_id, + 'surgroup': surgroup_id, + 'diversity_groups': diversity_groups, + 'quorum_diversity_groups': quorum_diversity_groups, + 'exclusivity_groups': exclusivity_groups, + 'availability_zones': availability_zone, + 'extra_specs_list': self.extra_specs_list, + 'flavor': self.flavor, + 'image': self.image, + 'cpus': self.vCPUs, + 'mem': self.mem, + 'local_volume': self.local_volume_size, + 'host_group': host_group, + 'host': self.host, + 'numa': numa, + 'state': self.state, + 'status': self.status} |