summaryrefslogtreecommitdiffstats
path: root/engine
diff options
context:
space:
mode:
authorArthur Martella <arthur.martella.1@att.com>2019-03-15 12:19:34 -0400
committerArthur Martella <arthur.martella.1@att.com>2019-03-15 12:19:34 -0400
commitf0a9edb94c527c74f45416b9f20c5c90a11bb5de (patch)
tree3af724220ce338b6450b4a0e1d2469927afddbe3 /engine
parent2f66a722dc1a6d4b37218c554199c046719a6873 (diff)
Initial upload of F-GPS seed code 6/21
Includes: Engine resource manager code Change-Id: I1562676793020164686bde53adea33fc3a1e603e Issue-ID: OPTFRA-440 Signed-off-by: arthur.martella.1@att.com
Diffstat (limited to 'engine')
-rw-r--r--engine/src/valet/engine/resource_manager/__init__.py18
-rw-r--r--engine/src/valet/engine/resource_manager/compute_manager.py201
-rw-r--r--engine/src/valet/engine/resource_manager/metadata_manager.py424
-rw-r--r--engine/src/valet/engine/resource_manager/naming.py146
-rw-r--r--engine/src/valet/engine/resource_manager/nova_compute.py544
-rw-r--r--engine/src/valet/engine/resource_manager/resource.py1589
-rw-r--r--engine/src/valet/engine/resource_manager/resource_handler.py299
-rw-r--r--engine/src/valet/engine/resource_manager/topology_manager.py237
8 files changed, 3458 insertions, 0 deletions
diff --git a/engine/src/valet/engine/resource_manager/__init__.py b/engine/src/valet/engine/resource_manager/__init__.py
new file mode 100644
index 0000000..bd50995
--- /dev/null
+++ b/engine/src/valet/engine/resource_manager/__init__.py
@@ -0,0 +1,18 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
diff --git a/engine/src/valet/engine/resource_manager/compute_manager.py b/engine/src/valet/engine/resource_manager/compute_manager.py
new file mode 100644
index 0000000..81a95ee
--- /dev/null
+++ b/engine/src/valet/engine/resource_manager/compute_manager.py
@@ -0,0 +1,201 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+#!/bin/python
+
+
+from valet.engine.resource_manager.resources.host import Host
+
+
+class ComputeManager(object):
+ """Resource Manager to maintain compute host resources."""
+
+ def __init__(self, _source, _logger):
+ """Define compute hosts and server allocations."""
+
+ self.source = _source
+
+ self.hosts = {}
+
+ self.logger = _logger
+
+ def get_hosts(self, _resource):
+ """Check any inconsistency and perform garbage collection if necessary."""
+
+ self.logger.info("set compute hosts...")
+
+ # Init first
+ self.hosts.clear()
+
+ # Get available hosts only
+ if self.source.get_hosts(self.hosts) != "ok":
+ self.logger.warning("fail to set hosts from source (e.g., nova)")
+ return False
+
+ # Get servers
+ if self.source.get_servers_in_hosts(self.hosts) != "ok":
+ self.logger.warning("fail to set servers from source (e.g., nova)")
+ return False
+
+ self._check_host_updated(_resource)
+
+ self._check_server_placements(_resource)
+
+ return True
+
+ def _check_host_updated(self, _resource):
+ """Check if hosts and their properties are changed."""
+
+ for hk in self.hosts.keys():
+ if hk not in _resource.hosts.keys():
+ _resource.hosts[hk] = Host(hk)
+ _resource.mark_host_updated(hk)
+
+ self.logger.info("new host (" + hk + ") added")
+
+ for rhk in _resource.hosts.keys():
+ if rhk not in self.hosts.keys():
+ _resource.hosts[rhk].status = "disabled"
+ _resource.mark_host_updated(rhk)
+
+ self.logger.info("host (" + rhk + ") disabled")
+
+ for hk in self.hosts.keys():
+ host = self.hosts[hk]
+ rhost = _resource.hosts[hk]
+
+ if self._is_host_resources_updated(host, rhost):
+ _resource.mark_host_updated(hk)
+
+ def _is_host_resources_updated(self, _host, _rhost):
+ """Check the resource amount consistency."""
+
+ resource_updated = False
+
+ if _host.original_vCPUs != _rhost.original_vCPUs:
+ _rhost.original_vCPUs = _host.original_vCPUs
+
+ self.logger.info("host (" + _rhost.name + ") updated (origin CPU updated)")
+ resource_updated = True
+
+ if _host.vCPUs_used != _rhost.vCPUs_used:
+ _rhost.vCPUs_used = _host.vCPUs_used
+
+ self.logger.info("host (" + _rhost.name + ") updated (CPU updated)")
+ resource_updated = True
+
+ if _host.original_mem_cap != _rhost.original_mem_cap:
+ _rhost.original_mem_cap = _host.original_mem_cap
+
+ self.logger.info("host (" + _rhost.name + ") updated (origin mem updated)")
+ resource_updated = True
+
+ if _host.free_mem_mb != _rhost.free_mem_mb:
+ _rhost.free_mem_mb = _host.free_mem_mb
+
+ self.logger.info("host (" + _rhost.name + ") updated (mem updated)")
+ resource_updated = True
+
+ if _host.original_local_disk_cap != _rhost.original_local_disk_cap:
+ _rhost.original_local_disk_cap = _host.original_local_disk_cap
+
+ self.logger.info("host (" + _rhost.name + ") updated (origin disk updated)")
+ resource_updated = True
+
+ if _host.free_disk_gb != _rhost.free_disk_gb:
+ _rhost.free_disk_gb = _host.free_disk_gb
+
+ self.logger.info("host (" + _rhost.name + ") updated (local disk space updated)")
+ resource_updated = True
+
+ if _host.disk_available_least != _rhost.disk_available_least:
+ _rhost.disk_available_least = _host.disk_available_least
+
+ self.logger.info("host (" + _rhost.name + ") updated (least disk space updated)")
+ resource_updated = True
+
+ return resource_updated
+
+ def _check_server_placements(self, _resource):
+ """Check the consistency of server placements with nova."""
+
+ # To keep how server placements changed.
+ # key =
+ # If uuid is available, uuid
+ # Else stack_id:name
+ # value = {new_host, old_host, server_info}
+ # The server's state must be either 'created', 'migrated', or 'rebuilt'.
+ # That is, deal with only the server which placement decision is confirmed.
+ # If value of new_host (from nova) exists but not for old_host (valet),
+ # the server is unknown one to valet.
+ # If value of new_host not exists but exist for old_host,
+ # the server is deleted by nova.
+ # If value exists both in new_host and old_host,
+ # the server is moved from old to new host.
+ # If value not exist neither,
+ # the server is placed as planned.
+ change_of_placements = {}
+
+ for hk, host in self.hosts.iteritems():
+ rhost = _resource.hosts[hk]
+
+ for s_info in host.server_list:
+ if s_info["stack_id"] != "none":
+ sid = s_info["stack_id"] + ":" + s_info["name"]
+ else:
+ sid = s_info["uuid"]
+
+ change_of_placements[sid] = {}
+ change_of_placements[sid]["info"] = s_info
+
+ if not rhost.has_server(s_info):
+ change_of_placements[sid]["new_host"] = hk
+
+ self.logger.info("host (" + hk + ") updated (server added)")
+ else:
+ change_of_placements[sid]["host"] = hk
+
+ for rhk, rhost in _resource.hosts.iteritems():
+ if not rhost.is_available():
+ continue
+
+ host = self.hosts[rhk]
+
+ for s_info in rhost.server_list:
+ # Deal with confirmed placements only.
+ if s_info["state"] not in ("created", "migrated", "rebuilt"):
+ continue
+
+ if s_info["stack_id"] != "none":
+ sid = s_info["stack_id"] + ":" + s_info["name"]
+ else:
+ sid = s_info["uuid"]
+
+ if not host.has_server(s_info):
+ if sid in change_of_placements.keys():
+ change_of_placements[sid]["old_host"] = rhk
+
+ self.logger.info("server (" + sid + ") is migrated`")
+ else:
+ change_of_placements[sid] = {}
+ change_of_placements[sid]["old_host"] = rhk
+ change_of_placements[sid]["info"] = s_info
+
+ self.logger.info("server (" + sid + ") is deleted")
+
+ _resource.change_of_placements = change_of_placements
diff --git a/engine/src/valet/engine/resource_manager/metadata_manager.py b/engine/src/valet/engine/resource_manager/metadata_manager.py
new file mode 100644
index 0000000..f34e3f0
--- /dev/null
+++ b/engine/src/valet/engine/resource_manager/metadata_manager.py
@@ -0,0 +1,424 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+#!/bin/python
+
+
+import json
+
+from copy import deepcopy
+
+
+class MetadataManager(object):
+ """Metadata Manager to maintain flavors and groups."""
+
+ def __init__(self, _source, _logger):
+ self.source = _source
+
+ self.groups = {}
+
+ self.flavors = {}
+
+ self.logger = _logger
+
+ def get_groups(self, _resource):
+ """Set groups (availability-zones, host-aggregates, server groups)
+
+ from platform (e.g., nova).
+ """
+
+ self.logger.info("set metadata (groups)...")
+
+ # Init first
+ self.groups.clear()
+
+ # Get enabled groups only
+ if self.source.get_groups(self.groups) != "ok":
+ self.logger.warning("fail to set groups from source (e.g., nova)")
+ return False
+
+ self._check_group_updated(_resource)
+
+ self._check_host_memberships_updated(_resource)
+
+ return True
+
+ def _check_group_updated(self, _resource):
+ """Check any inconsistency for groups."""
+
+ for gk in self.groups.keys():
+ if gk not in _resource.groups.keys():
+ _resource.groups[gk] = deepcopy(self.groups[gk])
+ _resource.groups[gk].updated = True
+
+ self.logger.info("new group (" + gk + ") added")
+
+ for rgk in _resource.groups.keys():
+ rg = _resource.groups[rgk]
+
+ if rg.factory != "valet":
+ if rgk not in self.groups.keys():
+ rg.status = "disabled"
+ rg.updated = True
+
+ self.logger.info("group (" + rgk + ") disabled")
+
+ for gk in self.groups.keys():
+ g = self.groups[gk]
+ rg = _resource.groups[gk]
+
+ if rg.uuid is None and g.uuid is not None:
+ rg.uuid = g.uuid
+ rg.updated = True
+
+ self.logger.info("group (" + gk + ") uuid updated")
+
+ # TODO: Clean up resource.hosts if each is not in any AZ members.
+
+ if g.group_type == "aggr":
+ if not gk.startswith("valet:"):
+ if self._is_group_metadata_updated(g, rg):
+ rg.updated = True
+
+ self.logger.info("group (" + gk + ") metadata updated")
+
+ if g.group_type == "az" or g.group_type == "aggr":
+ if self._is_member_hosts_updated(g, _resource):
+ rg.updated = True
+
+ self.logger.info("group (" + gk + ") member hosts updated")
+
+ if g.factory == "server-group":
+ if self._is_new_servers(g, rg):
+ rg.updated = True
+
+ self.logger.info("group (" + gk + ") server_list updated")
+
+ def _is_group_metadata_updated(self, _g, _rg):
+ """Check any change in metadata of group."""
+
+ updated = False
+
+ for mdk in _g.metadata.keys():
+ if mdk not in _rg.metadata.keys():
+ _rg.metadata[mdk] = _g.metadata[mdk]
+ updated = True
+
+ for rmdk in _rg.metadata.keys():
+ if rmdk not in _g.metadata.keys():
+ del _rg.metadata[rmdk]
+ updated = True
+
+ for mdk in _g.metadata.keys():
+ mdv = _g.metadata[mdk]
+ rmdv = _rg.metadata[mdk]
+ if mdv != rmdv:
+ _rg.metadata[mdk] = mdv
+ updated = True
+
+ return updated
+
+ def _is_member_hosts_updated(self, _g, _resource):
+ """Check any change in member hosts of group."""
+
+ updated = False
+
+ _rg = _resource.groups[_g.name]
+
+ for hk in _g.member_hosts.keys():
+ if hk not in _rg.member_hosts.keys():
+ if hk in _resource.hosts.keys():
+ if _resource.hosts[hk].is_available():
+ _rg.member_hosts[hk] = deepcopy(_g.member_hosts[hk])
+ updated = True
+ # else not needed
+
+ for rhk in _rg.member_hosts.keys():
+ if rhk not in _resource.hosts.keys() or \
+ not _resource.hosts[rhk].is_available() or \
+ rhk not in _g.member_hosts.keys():
+ del _rg.member_hosts[rhk]
+ updated = True
+
+ return updated
+
+ def _is_new_servers(self, _g, _rg):
+ """Check if there is any new server."""
+
+ updated = False
+
+ for s_info in _g.server_list:
+ exist = False
+ for rs_info in _rg.server_list:
+ if rs_info.get("uuid") == s_info.get("uuid"):
+ exist = True
+ break
+
+ if not exist:
+ _rg.server_list.append(s_info)
+ updated = True
+
+ return updated
+
+ def _check_host_memberships_updated(self, _resource):
+ """Check host memberships consistency."""
+
+ for gk, g in _resource.groups.iteritems():
+ # Other group types will be handled later
+ if g.factory != "valet" and g.status == "enabled":
+ for hk in g.member_hosts.keys():
+ host = _resource.hosts[hk]
+ if gk not in host.memberships.keys() or g.updated:
+ host.memberships[gk] = g
+ _resource.mark_host_updated(hk)
+
+ self.logger.info("host (" + hk + ") updated (update membership - " + gk + ")")
+
+ for hk, host in _resource.hosts.iteritems():
+ if host.is_available():
+ for gk in host.memberships.keys():
+ if gk in _resource.groups.keys():
+ g = _resource.groups[gk]
+ if g.factory != "valet":
+ if g.status == "enabled":
+ if g.updated:
+ host.memberships[gk] = g
+ _resource.mark_host_updated(hk)
+
+ self.logger.info("host (" + hk + ") updated (update membership - " + gk + ")")
+ else:
+ del host.memberships[gk]
+ _resource.mark_host_updated(hk)
+
+ self.logger.info("host (" + hk + ") updated (remove membership - " + gk + ")")
+ else:
+ del host.memberships[gk]
+ _resource.mark_host_updated(hk)
+
+ self.logger.info("host (" + hk + ") updated (remove membership - " + gk + ")")
+
+ def create_exclusive_aggregate(self, _group, _hosts):
+ """Set Host-Aggregate to apply Exclusivity."""
+
+ az = _hosts[0].get_availability_zone()
+
+ # To remove 'az:' header from name
+ az_name_elements = az.name.split(':', 1)
+ if len(az_name_elements) > 1:
+ az_name = az_name_elements[1]
+ else:
+ az_name = az.name
+
+ status = self.source.set_aggregate(_group.name, az_name)
+ if status != "ok":
+ return status
+
+ self.logger.debug("dynamic host-aggregate(" + _group.name + ") created")
+
+ aggregates = {}
+ status = self.source.get_aggregates(aggregates)
+ if status != "ok":
+ return status
+
+ if _group.name in aggregates.keys():
+ _group.uuid = aggregates[_group.name].uuid
+
+ if len(_group.metadata) > 0:
+ metadata = {}
+ for mk, mv in _group.metadata.iteritems():
+ if mk == "prior_metadata":
+ metadata[mk] = json.dumps(mv)
+ else:
+ metadata[mk] = mv
+
+ status = self.source.set_metadata_of_aggregate(_group.uuid, metadata)
+ if status != "ok":
+ return status
+
+ self.logger.debug("dynamic host-aggregate(" + _group.name + ") metadata created")
+
+ for host in _hosts:
+ if host.name in _group.metadata.keys():
+ aggr_uuids = _group.metadata[host.name].split(',')
+
+ for uuid in aggr_uuids:
+ status = self.source.remove_host_from_aggregate(int(uuid), host.name)
+ if status != "ok":
+ return status
+
+ self.logger.debug("host-aggregate(" + uuid + ") host(" + host.name + ") removed")
+
+ status = self.source.add_host_to_aggregate(_group.uuid, host.name)
+ if status != "ok":
+ return status
+
+ self.logger.debug("dynamic host-aggregate(" + _group.name + ") host(" + host.name + ") added")
+ else:
+ status = "dynamic host-aggregate not found"
+ self.logger.error(status)
+ return status
+
+ return "ok"
+
+ def update_exclusive_aggregate(self, _id, _metadata, _host, _old_aggregates):
+ """Update Host-Aggregate to apply Exclusivity."""
+
+ if len(_metadata) > 0:
+ metadata = {}
+ for mk, mv in _metadata.iteritems():
+ if mk == "prior_metadata":
+ metadata[mk] = json.dumps(mv)
+ else:
+ metadata[mk] = mv
+
+ status = self.source.set_metadata_of_aggregate(_id, metadata)
+ if status != "ok":
+ return status
+
+ self.logger.debug("dynamic host-aggregate(" + str(_id) + ") metadata updated")
+
+ for oa in _old_aggregates:
+ status = self.source.remove_host_from_aggregate(oa.uuid, _host)
+ if status != "ok":
+ return status
+
+ self.logger.debug("host-aggregate(" + oa.name + ") host(" + _host + ") removed")
+
+ status = self.source.add_host_to_aggregate(_id, _host)
+ if status != "ok":
+ return status
+
+ self.logger.debug("dynamic host-aggregate(" + str(_id) + ") host(" + _host + ") added")
+
+ return "ok"
+
+ def remove_host_from_exclusive_aggregate(self, _id, _metadata, _host, _old_aggregates):
+ """Remove host from Host-Aggregate to apply Exclusivity."""
+
+ if len(_metadata) > 0:
+ metadata = {}
+ for mk, mv in _metadata.iteritems():
+ if mk == "prior_metadata":
+ metadata[mk] = json.dumps(mv)
+ else:
+ metadata[mk] = mv
+
+ status = self.source.set_metadata_of_aggregate(_id, metadata)
+ if status != "ok":
+ return status
+
+ self.logger.debug("dynamic host-aggregate(" + str(_id) + ") metadata updated")
+
+ status = self.source.remove_host_from_aggregate(_id, _host)
+ if status != "ok":
+ return status
+
+ self.logger.debug("dynamic host-aggregate(" + str(_id) + ") host(" + _host + ") removed")
+
+ for oa in _old_aggregates:
+ status = self.source.add_host_to_aggregate(oa.uuid, _host)
+ if status != "ok":
+ return status
+
+ self.logger.debug("host-aggregate(" + oa.name + ") host(" + _host + ") added")
+
+ return "ok"
+
+ def remove_exclusive_aggregate(self, _id):
+ """Remove Host-Aggregate."""
+
+ status = self.source.delete_aggregate(_id)
+ if status != "ok":
+ return status
+
+ self.logger.debug("dynamic host-aggregate(" + str(_id) + ") removed")
+
+ return "ok"
+
+ def get_flavors(self, _resource):
+ """Set flavors from nova."""
+
+ self.logger.info("set metadata (flavors)...")
+
+ # Init first
+ self.flavors.clear()
+
+ # Get enabled flavors only
+ if self.source.get_flavors(self.flavors, detailed=False) != "ok":
+ return False
+
+ self._check_flavor_update(_resource, False)
+
+ return True
+
+ def _check_flavor_update(self, _resource, _detailed):
+ """Check flavor info consistency."""
+
+ for fk in self.flavors.keys():
+ if fk not in _resource.flavors.keys():
+ _resource.flavors[fk] = deepcopy(self.flavors[fk])
+ _resource.flavors[fk].updated = True
+
+ self.logger.info("new flavor (" + fk + ":" + self.flavors[fk].flavor_id + ") added")
+
+ for rfk in _resource.flavors.keys():
+ rf = _resource.flavors[rfk]
+ if rfk not in self.flavors.keys():
+ rf.status = "disabled"
+ rf.updated = True
+
+ self.logger.info("flavor (" + rfk + ":" + rf.flavor_id + ") removed")
+
+ if _detailed:
+ for fk in self.flavors.keys():
+ f = self.flavors[fk]
+ rf = _resource.flavors[fk]
+ if self._is_flavor_spec_updated(f, rf):
+ rf.updated = True
+
+ self.logger.info("flavor (" + fk + ":" + rf.flavor_id + ") spec updated")
+
+ def _is_flavor_spec_updated(self, _f, _rf):
+ """Check flavor's spec consistency."""
+
+ spec_updated = False
+
+ if _f.vCPUs != _rf.vCPUs or _f.mem_cap != _rf.mem_cap or _f.disk_cap != _rf.disk_cap:
+ _rf.vCPUs = _f.vCPUs
+ _rf.mem_cap = _f.mem_cap
+ _rf.disk_cap = _f.disk_cap
+ spec_updated = True
+
+ for sk in _f.extra_specs.keys():
+ if sk not in _rf.extra_specs.keys():
+ _rf.extra_specs[sk] = _f.extra_specs[sk]
+ spec_updated = True
+
+ for rsk in _rf.extra_specs.keys():
+ if rsk not in _f.extra_specs.keys():
+ del _rf.extra_specs[rsk]
+ spec_updated = True
+
+ for sk in _f.extra_specs.keys():
+ sv = _f.extra_specs[sk]
+ rsv = _rf.extra_specs[sk]
+ if sv != rsv:
+ _rf.extra_specs[sk] = sv
+ spec_updated = True
+
+ return spec_updated
diff --git a/engine/src/valet/engine/resource_manager/naming.py b/engine/src/valet/engine/resource_manager/naming.py
new file mode 100644
index 0000000..bdf5211
--- /dev/null
+++ b/engine/src/valet/engine/resource_manager/naming.py
@@ -0,0 +1,146 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+#!/bin/python
+
+
+import copy
+import re
+
+from sre_parse import isdigit
+from valet.engine.resource_manager.resources.host_group import HostGroup
+
+
+class Naming(object):
+ """Using cannonical naming convention to capture datacenter layout."""
+
+ def __init__(self, _config, _logger):
+ self.logger = _logger
+
+ self.rack_code_list = _config.get("rack_codes")
+ self.host_code_list = _config.get("host_codes")
+
+ def get_topology(self, _datacenter, _host_groups, _hosts, _rhosts):
+ """Set datacenter resource structure (racks, hosts)."""
+
+ status = "ok"
+
+ for rhk, rhost in _rhosts.iteritems():
+ h = copy.deepcopy(rhost)
+
+ (rack_name, parsing_status) = self._set_layout_by_name(rhk)
+ if parsing_status != "ok":
+ self.logger.warning(parsing_status + " in host_name (" + rhk + ")")
+
+ if rack_name == "none":
+ h.host_group = _datacenter
+ _datacenter.resources[rhk] = h
+ else:
+ if rack_name not in _host_groups.keys():
+ host_group = HostGroup(rack_name)
+ host_group.host_type = "rack"
+ _host_groups[host_group.name] = host_group
+ else:
+ host_group = _host_groups[rack_name]
+
+ h.host_group = host_group
+ host_group.child_resources[rhk] = h
+
+ _hosts[h.name] = h
+
+ for hgk, hg in _host_groups.iteritems():
+ hg.parent_resource = _datacenter
+ _datacenter.resources[hgk] = hg
+
+ if "none" in _host_groups.keys():
+ self.logger.warning("some hosts are into unknown rack")
+
+ return status
+
+ def _set_layout_by_name(self, _host_name):
+ """Set the rack-host layout, use host nameing convention.
+
+ Naming convention includes
+ zone name is any word followed by at least one of [0-9]
+ rack name is rack_code followd by at least one of [0-9]
+ host name is host_code followed by at least one of [0-9]
+ an example is
+ 'abcd_001A' (as a zone_name) +
+ 'r' (as a rack_code) + '01A' +
+ 'c' (as a host_code) + '001A'
+ """
+
+ zone_name = None
+ rack_name = None
+ host_name = None
+
+ # To check if zone name follows the rule
+ index = 0
+ for c in _host_name:
+ if isdigit(c):
+ break
+ index += 1
+ zone_indicator = _host_name[index:]
+ if len(zone_indicator) == 0:
+ return 'none', "no numberical digit in name"
+
+ # To extract rack indicator
+ for rack_code in self.rack_code_list:
+ rack_index_list = [rc.start() for rc in re.finditer(rack_code, zone_indicator)]
+
+ start_of_rack_index = -1
+ for rack_index in rack_index_list:
+ rack_prefix = rack_index + len(rack_code)
+ if rack_prefix > len(zone_indicator):
+ continue
+
+ # Once rack name follows the rule
+ if isdigit(zone_indicator[rack_prefix]):
+ rack_indicator = zone_indicator[rack_prefix:]
+
+ # To extract host indicator
+ for host_code in self.host_code_list:
+ host_index_list = [hc.start() for hc in re.finditer(host_code, rack_indicator)]
+
+ start_of_host_index = -1
+ for host_index in host_index_list:
+ host_prefix = host_index + len(host_code)
+ if host_prefix > len(rack_indicator):
+ continue
+
+ if isdigit(rack_indicator[host_prefix]):
+ host_name = rack_indicator[host_index:]
+ start_of_host_index = rack_index + host_index + 1
+ break
+
+ if host_name is not None:
+ rack_name = zone_indicator[rack_index:start_of_host_index]
+ break
+
+ if rack_name is not None:
+ start_of_rack_index = index + rack_index
+ break
+
+ if rack_name is not None:
+ zone_name = _host_name[:start_of_rack_index]
+ break
+
+ if rack_name is None:
+ return 'none', "no host or rack name found in " + _host_name
+ else:
+ return zone_name + rack_name, "ok"
diff --git a/engine/src/valet/engine/resource_manager/nova_compute.py b/engine/src/valet/engine/resource_manager/nova_compute.py
new file mode 100644
index 0000000..6887eb8
--- /dev/null
+++ b/engine/src/valet/engine/resource_manager/nova_compute.py
@@ -0,0 +1,544 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+import json
+import time
+import traceback
+
+from novaclient import client as nova_client
+
+from valet.engine.resource_manager.resources.flavor import Flavor
+from valet.engine.resource_manager.resources.group import Group
+from valet.engine.resource_manager.resources.host import Host
+from valet.utils.decryption import decrypt
+
+
+# Nova API version
+VERSION = 2
+
+
+# noinspection PyBroadException
+class NovaCompute(object):
+ """Source to collect resource status (i.e., OpenStack Nova).
+
+ Manupulate Host-Aggregate with Valet placement decisions.
+ """
+
+ def __init__(self, _config, _logger):
+ self.logger = _logger
+
+ self.nova = None
+
+ self.novas = {}
+ self.last_activate_urls = {}
+ self.life_time = 43200 # 12 hours
+
+ # TODO(Gueyoung): handle both admin and admin_view accounts.
+
+ pw = decrypt(_config["engine"]["ek"],
+ _config["logging"]["lk"],
+ _config["db"]["dk"],
+ _config["nova"]["admin_view_password"])
+
+ self.admin_username = _config["nova"]["admin_view_username"]
+ self.admin_password = pw
+ self.project = _config["nova"]["project_name"]
+
+ def set_client(self, _auth_url):
+ """Set nova client."""
+
+ try:
+ # TODO: add timeout=_timeout?
+ self.novas[_auth_url] = nova_client.Client(VERSION,
+ self.admin_username,
+ self.admin_password,
+ self.project,
+ _auth_url)
+
+ self.last_activate_urls[_auth_url] = time.time()
+
+ self.nova = self.novas[_auth_url]
+ return True
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return False
+
+ def valid_client(self, _auth_url):
+ """Check if nova connection is valid."""
+
+ if _auth_url not in self.novas.keys():
+ return False
+
+ if _auth_url not in self.last_activate_urls.keys():
+ return False
+
+ elapsed_time = time.time() - self.last_activate_urls[_auth_url]
+
+ if elapsed_time > self.life_time:
+ return False
+
+ self.nova = self.novas[_auth_url]
+
+ return True
+
+ def get_groups(self, _groups):
+ """Get server-groups, availability-zones and host-aggregates
+
+ from OpenStack Nova.
+ """
+
+ status = self._get_availability_zones(_groups)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ status = self.get_aggregates(_groups)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ status = self._get_server_groups(_groups)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ return "ok"
+
+ def _get_availability_zones(self, _groups):
+ """Set AZs."""
+
+ try:
+ # TODO: try hosts_list = self.nova.hosts.list()?
+
+ az_list = self.nova.availability_zones.list(detailed=True)
+
+ for a in az_list:
+ if a.zoneState["available"]:
+ # NOTE(Gueyoung): add 'az:' to avoid conflict with
+ # Host-Aggregate name.
+ az_id = "az:" + a.zoneName
+
+ az = Group(az_id)
+
+ az.group_type = "az"
+ az.factory = "nova"
+ az.level = "host"
+
+ # TODO: Get AZ first with init Compute Hosts?
+
+ for hk, h_info in a.hosts.iteritems():
+ if "nova-compute" in h_info.keys():
+ if h_info["nova-compute"]["active"] and \
+ h_info["nova-compute"]["available"]:
+ az.member_hosts[hk] = []
+
+ _groups[az_id] = az
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting availability-zones from Nova"
+
+ return "ok"
+
+ def get_aggregates(self, _groups):
+ """Set host-aggregates and corresponding hosts."""
+
+ try:
+ aggregate_list = self.nova.aggregates.list()
+
+ for a in aggregate_list:
+ if not a.deleted:
+ aggregate = Group(a.name)
+
+ aggregate.uuid = a.id
+
+ aggregate.group_type = "aggr"
+ aggregate.factory = "nova"
+ aggregate.level = "host"
+
+ metadata = {}
+ for mk in a.metadata.keys():
+ if mk == "prior_metadata":
+ metadata[mk] = json.loads(a.metadata.get(mk))
+ else:
+ metadata[mk] = a.metadata.get(mk)
+ aggregate.metadata = metadata
+
+ for hn in a.hosts:
+ aggregate.member_hosts[hn] = []
+
+ _groups[aggregate.name] = aggregate
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting host-aggregates from Nova"
+
+ return "ok"
+
+ def set_aggregate(self, _name, _az):
+ """Create a Host-Aggregate."""
+
+ try:
+ self.nova.aggregates.create(_name, _az)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting a host-aggregate in Nova"
+
+ return "ok"
+
+ def add_host_to_aggregate(self, _aggr, _host):
+ """Add a Host into the Host-Aggregate."""
+
+ try:
+ self.nova.aggregates.add_host(_aggr, _host)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while adding a host into host-aggregate in Nova"
+
+ return "ok"
+
+ def delete_aggregate(self, _aggr):
+ """Delete the Host-Aggregate."""
+
+ try:
+ self.nova.aggregates.delete(_aggr)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while deleting host-aggregate from Nova"
+
+ return "ok"
+
+ def remove_host_from_aggregate(self, _aggr, _host):
+ """Remove the Host from the Host-Aggregate."""
+
+ try:
+ self.nova.aggregates.remove_host(_aggr, _host)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while removing host from host-aggregate in Nova"
+
+ return "ok"
+
+ def set_metadata_of_aggregate(self, _aggr, _metadata):
+ """Set metadata.
+
+ Note that Nova adds key/value pairs into metadata instead of replacement.
+ """
+
+ try:
+ self.nova.aggregates.set_metadata(_aggr, _metadata)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting metadata of host-aggregate in Nova"
+
+ return "ok"
+
+ def _get_server_groups(self, _groups):
+ """Set host-aggregates and corresponding hosts."""
+
+ try:
+ # NOTE(Gueyoung): novaclient v2.18.0 does not have 'all_projects=True' param.
+ server_group_list = self.nova.server_groups.list()
+
+ for g in server_group_list:
+ server_group = Group(g.name)
+
+ server_group.uuid = g.id
+
+ # TODO: Check len(g.policies) == 1
+ # policy is either 'affinity', 'anti-affinity', 'soft-affinity',
+ # or 'soft-anti-affinity'
+ if g.policies[0] == "anti-affinity":
+ server_group.group_type = "diversity"
+ else:
+ server_group.group_type = g.policies[0]
+ server_group.factory = "server-group"
+ server_group.level = "host"
+
+ # Members attribute is a list of server uuids
+ for s_uuid in g.members:
+ s_info = {}
+ s_info["stack_id"] = "none"
+ s_info["stack_name"] = "none"
+ s_info["uuid"] = s_uuid
+ s_info["orch_id"] = "none"
+ s_info["name"] = "none"
+ s_info["flavor_id"] = "none"
+ s_info["vcpus"] = -1
+ s_info["mem"] = -1
+ s_info["disk"] = -1
+ s_info["numa"] = "none"
+ s_info["image_id"] = "none"
+ s_info["tenant_id"] = "none"
+ s_info["state"] = "created"
+ s_info["status"] = "valid"
+
+ server_group.server_list.append(s_info)
+
+ # TODO: Check duplicated name as group identifier
+ _groups[server_group.name] = server_group
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting server-groups from Nova"
+
+ return "ok"
+
+ def get_hosts(self, _hosts):
+ """Set host resources info."""
+
+ # TODO: Deprecated as of version 2.43
+ status = self._get_hosts(_hosts)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ status = self._get_host_details(_hosts)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ return "ok"
+
+ # TODO: Deprecated as of version 2.43
+ def _get_hosts(self, _hosts):
+ """Init hosts."""
+
+ try:
+ host_list = self.nova.hosts.list()
+
+ for h in host_list:
+ if h.service == "compute":
+ host = Host(h.host_name)
+ _hosts[host.name] = host
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting hosts from Nova"
+
+ return "ok"
+
+ def _get_host_details(self, _hosts):
+ """Get each host's resource status."""
+
+ try:
+ # TODO: marker: the last UUID of return, limit: the number of hosts returned.
+ # with_servers=True
+ host_list = self.nova.hypervisors.list(detailed=True)
+
+ for hv in host_list:
+ if hv.service['host'] in _hosts.keys():
+ if hv.status != "enabled" or hv.state != "up":
+ del _hosts[hv.service['host']]
+ else:
+ host = _hosts[hv.service['host']]
+
+ host.uuid = hv.id
+
+ host.status = hv.status
+ host.state = hv.state
+ host.original_vCPUs = float(hv.vcpus)
+ host.vCPUs_used = float(hv.vcpus_used)
+ host.original_mem_cap = float(hv.memory_mb)
+ host.free_mem_mb = float(hv.free_ram_mb)
+ host.original_local_disk_cap = float(hv.local_gb)
+ host.free_disk_gb = float(hv.free_disk_gb)
+ host.disk_available_least = float(hv.disk_available_least)
+
+ # TODO: cpu_info:topology:sockets
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting host resources from Nova"
+
+ return "ok"
+
+ def get_servers_in_hosts(self, _hosts):
+ """Set servers in hosts."""
+
+ (status, server_list) = self.get_server_detail()
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ for s in server_list:
+ s_info = {}
+
+ if "stack-id" in s.metadata.keys():
+ s_info["stack_id"] = s.metadata["stack-id"]
+ else:
+ s_info["stack_id"] = "none"
+ s_info["stack_name"] = "none"
+
+ s_info["uuid"] = s.id
+
+ s_info["orch_id"] = "none"
+ s_info["name"] = s.name
+
+ s_info["flavor_id"] = s.flavor["id"]
+
+ if "vcpus" in s.flavor.keys():
+ s_info["vcpus"] = s.flavor["vcpus"]
+ s_info["mem"] = s.flavor["ram"]
+ s_info["disk"] = s.flavor["disk"]
+ s_info["disk"] += s.flavor["ephemeral"]
+ s_info["disk"] += s.flavor["swap"] / float(1024)
+ else:
+ s_info["vcpus"] = -1
+ s_info["mem"] = -1
+ s_info["disk"] = -1
+
+ s_info["numa"] = "none"
+
+ try:
+ s_info["image_id"] = s.image["id"]
+ except TypeError:
+ self.logger.warning("In get_servers_in_hosts, expected s.image to have id tag, but it's actually " + s.image)
+ s_info["image_id"] = s.image
+
+ s_info["tenant_id"] = s.tenant_id
+
+ s_info["state"] = "created"
+ s_info["status"] = "valid"
+
+ s_info["host"] = s.__getattr__("OS-EXT-SRV-ATTR:host")
+
+ # s_info["power_state"] = s.__getattr__("OS-EXT-STS:power_state")
+ # s_info["vm_state"] = s.__getattr__("OS-EXT-STS:vm_state")
+ # s_info["task_state"] = s.__getattr__("OS-EXT-STS:task_state")
+
+ if s_info["host"] in _hosts.keys():
+ host = _hosts[s_info["host"]]
+ host.server_list.append(s_info)
+
+ return "ok"
+
+ def get_server_detail(self, project_id=None, host_name=None, server_name=None, uuid=None):
+ """Get the detail of server with search by option."""
+
+ # TODO: Get servers' info in each host
+ # Minimum requirement for server info: s["metadata"]["stack-id"],
+ # More: s["flavor"]["id"], s["tenant_id"]
+ # Maybe: s["image"], server.__getattr__("OS-EXT-AZ:availability_zone"), s["status"]
+ # and scheduler_hints?
+ try:
+ options = {"all_tenants": 1}
+ if project_id is not None:
+ options["project_id"] = project_id
+ if host_name is not None:
+ options["host"] = host_name
+ if server_name is not None:
+ options["name"] = server_name
+ if uuid is not None:
+ options["uuid"] = uuid
+
+ # TODO: search by vm_state?
+
+ if len(options) > 0:
+ server_list = self.nova.servers.list(detailed=True, search_opts=options)
+ else:
+ server_list = self.nova.servers.list(detailed=True)
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while getting server detail from nova", None
+
+ return "ok", server_list
+
+ def get_flavors(self, _flavors, detailed=True):
+ """Get flavors."""
+
+ if detailed:
+ result_status = self._get_flavors(_flavors, True)
+ else:
+ result_status = self._get_flavors(_flavors, False)
+
+ if result_status != "ok":
+ self.logger.error(result_status)
+
+ return result_status
+
+ def _get_flavors(self, _flavors, _detailed):
+ """Get a list of all flavors."""
+
+ try:
+ flavor_list = self.nova.flavors.list(detailed=_detailed)
+
+ for f in flavor_list:
+ flavor = self._set_flavor(f, _detailed)
+ _flavors[flavor.name] = flavor
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while getting flavors"
+
+ # To get non-public flavors.
+ try:
+ flavor_list = self.nova.flavors.list(detailed=_detailed, is_public=False)
+
+ for f in flavor_list:
+ if f.name not in _flavors.keys():
+ flavor = self._set_flavor(f, _detailed)
+ _flavors[flavor.name] = flavor
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while getting flavors"
+
+ return "ok"
+
+ def get_flavor(self, _flavor_id):
+ """Get the flavor."""
+
+ try:
+ f = self.nova.flavors.get(_flavor_id)
+ flavor = self._set_flavor(f, True)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return None
+
+ return flavor
+
+ def _set_flavor(self, _f, _detailed):
+ """Set flavor with detailed infomation."""
+
+ flavor = Flavor(_f.name)
+
+ flavor.flavor_id = _f.id
+
+ if _detailed:
+ # NOTE(Gueyoung): This is not allowed with current credential.
+ # if getattr(_f, "OS-FLV-DISABLED:disabled"):
+ # flavor.status = "disabled"
+
+ flavor.vCPUs = float(_f.vcpus)
+ flavor.mem_cap = float(_f.ram)
+
+ root_gb = float(_f.disk)
+ ephemeral_gb = 0.0
+ if hasattr(_f, "OS-FLV-EXT-DATA:ephemeral"):
+ ephemeral_gb = float(getattr(_f, "OS-FLV-EXT-DATA:ephemeral"))
+ swap_mb = 0.0
+ if hasattr(_f, "swap"):
+ sw = getattr(_f, "swap")
+ if sw != '':
+ swap_mb = float(sw)
+ flavor.disk_cap = root_gb + ephemeral_gb + swap_mb / float(1024)
+
+ extra_specs = _f.get_keys()
+ for sk, sv in extra_specs.iteritems():
+ flavor.extra_specs[sk] = sv
+
+ return flavor
diff --git a/engine/src/valet/engine/resource_manager/resource.py b/engine/src/valet/engine/resource_manager/resource.py
new file mode 100644
index 0000000..0f2b550
--- /dev/null
+++ b/engine/src/valet/engine/resource_manager/resource.py
@@ -0,0 +1,1589 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+import json
+import six
+import time
+
+from valet.engine.app_manager.group import LEVEL
+from valet.engine.resource_manager.resources.datacenter import Datacenter
+from valet.engine.resource_manager.resources.flavor import Flavor
+from valet.engine.resource_manager.resources.group import Group
+from valet.engine.resource_manager.resources.host import Host
+from valet.engine.resource_manager.resources.host_group import HostGroup
+from valet.engine.resource_manager.resources.numa import NUMA
+
+
+class Resource(object):
+ """Container for resource status of a datacenter and all metadata."""
+
+ def __init__(self, _datacenter, _dbh, _compute, _metadata, _topology, _logger):
+ self.dbh = _dbh
+
+ self.compute = _compute
+ self.metadata = _metadata
+ self.topology = _topology
+
+ self.group_rules = {}
+
+ self.datacenter = None
+ self.datacenter_id = _datacenter.get("id")
+ self.datacenter_url = _datacenter.get("url", "none")
+
+ self.host_groups = {}
+ self.hosts = {}
+
+ self.change_of_placements = {}
+
+ self.groups = {}
+ self.flavors = {}
+
+ self.CPU_avail = 0
+ self.mem_avail = 0
+ self.local_disk_avail = 0
+
+ self.default_cpu_allocation_ratio = 1.0
+ self.default_ram_allocation_ratio = 1.0
+ self.default_disk_allocation_ratio = 1.0
+
+ self.new = False
+
+ # To keep unconfirmed requests.
+ # If exist, do NOT sync with platform for the next request.
+ self.pending_requests = []
+
+ self.logger = _logger
+
+ def set_config(self, _cpu_ratio, _ram_ratio, _disk_ratio):
+ self.default_cpu_allocation_ratio = _cpu_ratio
+ self.default_ram_allocation_ratio = _ram_ratio
+ self.default_disk_allocation_ratio = _disk_ratio
+
+ def set_group_rules(self, _rules):
+ self.group_rules = _rules
+
+ def load_resource_from_db(self):
+ """Load datacenter's resource info from DB.
+
+ Note: all resources in DB are enabled ones.
+ """
+
+ self.logger.info("load datacenter resource info from DB")
+
+ # Load Valet groups first.
+ valet_group_list = self.dbh.get_valet_groups()
+ if valet_group_list is None:
+ return None
+
+ valet_groups = {}
+ for vg in valet_group_list:
+ vgk = vg.get("id")
+ dc_id = vgk.split(':', 1)
+
+ if dc_id[0] == self.datacenter_id:
+ if vg["rule_id"] in self.group_rules.keys():
+ vg["metadata"] = json.loads(vg["metadata"])
+ vg["server_list"] = json.loads(vg["server_list"])
+ vg["member_hosts"] = json.loads(vg["member_hosts"])
+ vg["group_type"] = vg["type"]
+
+ valet_groups[vgk] = vg
+
+ self._load_groups(valet_groups)
+
+ dcr = self.dbh.get_resource(self.datacenter_id)
+ if dcr is None:
+ return None
+
+ if len(dcr) == 0:
+ return "no resource found for datacenter = " + self.datacenter_id
+
+ if self.datacenter_url == "none":
+ self.datacenter_url = dcr["url"]
+
+ pending_requests = json.loads(dcr["requests"])
+ for req in pending_requests:
+ self.pending_requests.append(req)
+
+ resource = json.loads(dcr["resource"])
+
+ groups = resource.get("groups")
+ if groups:
+ self._load_groups(groups)
+
+ flavors = resource.get("flavors")
+ if flavors:
+ self._load_flavors(flavors)
+
+ if len(self.flavors) == 0:
+ self.logger.warning("no flavors in db record")
+
+ hosts = resource.get("hosts")
+ if hosts:
+ self._load_hosts(hosts)
+
+ if len(self.hosts) == 0:
+ self.logger.warning("no hosts in db record")
+
+ host_groups = resource.get("host_groups")
+ if host_groups:
+ self._load_host_groups(host_groups)
+
+ if len(self.host_groups) == 0:
+ self.logger.warning("no host_groups (rack)")
+
+ dc = resource.get("datacenter")
+ self._load_datacenter(dc)
+
+ for ck in dc.get("children"):
+ if ck in self.host_groups.keys():
+ self.datacenter.resources[ck] = self.host_groups[ck]
+ elif ck in self.hosts.keys():
+ self.datacenter.resources[ck] = self.hosts[ck]
+
+ hgs = resource.get("host_groups")
+ if hgs:
+ for hgk, hg in hgs.iteritems():
+ host_group = self.host_groups[hgk]
+
+ pk = hg.get("parent")
+ if pk == self.datacenter.name:
+ host_group.parent_resource = self.datacenter
+ elif pk in self.host_groups.keys():
+ host_group.parent_resource = self.host_groups[pk]
+
+ for ck in hg.get("children"):
+ if ck in self.hosts.keys():
+ host_group.child_resources[ck] = self.hosts[ck]
+ elif ck in self.host_groups.keys():
+ host_group.child_resources[ck] = self.host_groups[ck]
+
+ hs = resource.get("hosts")
+ if hs:
+ for hk, h in hs.iteritems():
+ host = self.hosts[hk]
+
+ pk = h.get("parent")
+ if pk == self.datacenter.name:
+ host.host_group = self.datacenter
+ elif pk in self.host_groups.keys():
+ host.host_group = self.host_groups[pk]
+
+ for _, g in self.groups.iteritems():
+ for hk in g.member_hosts.keys():
+ if hk not in self.hosts.keys() and \
+ hk not in self.host_groups.keys():
+ del g.member_hosts[hk]
+
+ self._update_compute_avail()
+
+ return "ok"
+
+ def _load_groups(self, _groups):
+ """Take JSON group data as defined in /resources/group and
+ create Group instance.
+ """
+
+ for gk, g in _groups.iteritems():
+ group = Group(gk)
+
+ group.status = g.get("status")
+
+ group.uuid = g.get("uuid")
+
+ group.group_type = g.get("group_type")
+ group.level = g.get("level")
+ group.factory = g.get("factory")
+
+ rule_id = g.get("rule_id")
+ if rule_id != "none" and rule_id in self.group_rules.keys():
+ group.rule = self.group_rules[rule_id]
+
+ for mk, mv in g["metadata"].iteritems():
+ group.metadata[mk] = mv
+
+ for s_info in g["server_list"]:
+ group.server_list.append(s_info)
+
+ for hk, server_list in g["member_hosts"].iteritems():
+ group.member_hosts[hk] = []
+ for s_info in server_list:
+ group.member_hosts[hk].append(s_info)
+
+ self.groups[gk] = group
+
+ def _load_flavors(self, _flavors):
+ """Take JSON flavor data as defined in /resources/flavor and
+ create Flavor instance.
+ """
+
+ for fk, f in _flavors.iteritems():
+ flavor = Flavor(fk)
+
+ flavor.status = f.get("status")
+
+ flavor.flavor_id = f.get("flavor_id")
+ flavor.vCPUs = f.get("vCPUs")
+ flavor.mem_cap = f.get("mem")
+ flavor.disk_cap = f.get("disk")
+ for k, v in f["extra_specs"].iteritems():
+ flavor.extra_specs[k] = v
+
+ self.flavors[fk] = flavor
+
+ def _load_hosts(self, _hosts):
+ """Take JSON host data as defined in /resources/host and
+ create Host instance.
+ """
+
+ for hk, h in _hosts.iteritems():
+ host = Host(hk)
+
+ host.status = h.get("status")
+ host.state = h.get("state")
+
+ host.uuid = h.get("uuid")
+
+ host.vCPUs = h.get("vCPUs")
+ host.original_vCPUs = h.get("original_vCPUs")
+ host.vCPUs_used = h.get("vCPUs_used")
+ host.avail_vCPUs = h.get("avail_vCPUs")
+
+ host.mem_cap = h.get("mem")
+ host.original_mem_cap = h.get("original_mem")
+ host.free_mem_mb = h.get("free_mem_mb")
+ host.avail_mem_cap = h.get("avail_mem")
+
+ host.local_disk_cap = h.get("local_disk")
+ host.original_local_disk_cap = h.get("original_local_disk")
+ host.free_disk_gb = h.get("free_disk_gb")
+ host.disk_available_least = h.get("disk_available_least")
+ host.avail_local_disk_cap = h.get("avail_local_disk")
+
+ host.NUMA = NUMA(numa=h.get("NUMA"))
+
+ for s_info in h["server_list"]:
+ host.server_list.append(s_info)
+
+ for gk in h["membership_list"]:
+ if gk in self.groups.keys():
+ host.memberships[gk] = self.groups[gk]
+
+ # Not used by Valet currently, only capacity planning module
+ if "candidate_host_types" in h.keys():
+ for htk, ht in h["candidate_host_types"].iteritems():
+ host.candidate_host_types[htk] = ht
+ else:
+ host.candidate_host_types = {}
+
+ self.hosts[hk] = host
+
+ def _load_host_groups(self, _host_groups):
+ for hgk, hg in _host_groups.iteritems():
+ host_group = HostGroup(hgk)
+
+ host_group.status = hg.get("status")
+
+ host_group.host_type = hg.get("host_type")
+
+ host_group.vCPUs = hg.get("vCPUs")
+ host_group.avail_vCPUs = hg.get("avail_vCPUs")
+
+ host_group.mem_cap = hg.get("mem")
+ host_group.avail_mem_cap = hg.get("avail_mem")
+
+ host_group.local_disk_cap = hg.get("local_disk")
+ host_group.avail_local_disk_cap = hg.get("avail_local_disk")
+
+ for s_info in hg["server_list"]:
+ host_group.server_list.append(s_info)
+
+ for gk in hg.get("membership_list"):
+ if gk in self.groups.keys():
+ host_group.memberships[gk] = self.groups[gk]
+
+ self.host_groups[hgk] = host_group
+
+ def _load_datacenter(self, _dc):
+ self.datacenter = Datacenter(_dc.get("name"))
+
+ self.datacenter.status = _dc.get("status")
+
+ self.datacenter.vCPUs = _dc.get("vCPUs")
+ self.datacenter.avail_vCPUs = _dc.get("avail_vCPUs")
+
+ self.datacenter.mem_cap = _dc.get("mem")
+ self.datacenter.avail_mem_cap = _dc.get("avail_mem")
+
+ self.datacenter.local_disk_cap = _dc.get("local_disk")
+ self.datacenter.avail_local_disk_cap = _dc.get("avail_local_disk")
+
+ for s_info in _dc["server_list"]:
+ self.datacenter.server_list.append(s_info)
+
+ for gk in _dc.get("membership_list"):
+ if gk in self.groups.keys():
+ self.datacenter.memberships[gk] = self.groups[gk]
+
+ def _update_compute_avail(self):
+ """Update amount of total available resources."""
+
+ self.CPU_avail = self.datacenter.avail_vCPUs
+ self.mem_avail = self.datacenter.avail_mem_cap
+ self.local_disk_avail = self.datacenter.avail_local_disk_cap
+
+ def update_resource(self):
+ """Update resource status triggered by placements, events, and batch."""
+
+ for level in LEVEL:
+ for _, host_group in self.host_groups.iteritems():
+ if host_group.host_type == level:
+ if host_group.is_available() and host_group.updated:
+ self._update_host_group(host_group)
+
+ if self.datacenter.updated:
+ self._update_datacenter()
+
+ self._update_compute_avail()
+
+ def _update_host_group(self, _host_group):
+ """Update host group (rack) status."""
+
+ _host_group.init_resources()
+ del _host_group.server_list[:]
+ _host_group.init_memberships()
+
+ for _, host in _host_group.child_resources.iteritems():
+ if host.is_available():
+ _host_group.vCPUs += host.vCPUs
+ _host_group.avail_vCPUs += host.avail_vCPUs
+ _host_group.mem_cap += host.mem_cap
+ _host_group.avail_mem_cap += host.avail_mem_cap
+ _host_group.local_disk_cap += host.local_disk_cap
+ _host_group.avail_local_disk_cap += host.avail_local_disk_cap
+
+ for server_info in host.server_list:
+ _host_group.server_list.append(server_info)
+
+ for gk in host.memberships.keys():
+ _host_group.memberships[gk] = host.memberships[gk]
+
+ def _update_datacenter(self):
+ """Update datacenter status."""
+
+ self.datacenter.init_resources()
+ del self.datacenter.server_list[:]
+ self.datacenter.memberships.clear()
+
+ for _, resource in self.datacenter.resources.iteritems():
+ if resource.is_available():
+ self.datacenter.vCPUs += resource.vCPUs
+ self.datacenter.avail_vCPUs += resource.avail_vCPUs
+ self.datacenter.mem_cap += resource.mem_cap
+ self.datacenter.avail_mem_cap += resource.avail_mem_cap
+ self.datacenter.local_disk_cap += resource.local_disk_cap
+ self.datacenter.avail_local_disk_cap += resource.avail_local_disk_cap
+
+ for s in resource.server_list:
+ self.datacenter.server_list.append(s)
+
+ for gk in resource.memberships.keys():
+ self.datacenter.memberships[gk] = resource.memberships[gk]
+
+ def compute_resources(self, host):
+ """Compute the amount of resources with oversubsription ratios."""
+
+ ram_allocation_ratio_list = []
+ cpu_allocation_ratio_list = []
+ disk_allocation_ratio_list = []
+
+ for _, g in host.memberships.iteritems():
+ if g.group_type == "aggr":
+ if g.name.startswith("valet:"):
+ metadata = g.metadata["prior_metadata"]
+ else:
+ metadata = g.metadata
+
+ if "ram_allocation_ratio" in metadata.keys():
+ if isinstance(metadata["ram_allocation_ratio"], list):
+ for r in metadata["ram_allocation_ratio"]:
+ ram_allocation_ratio_list.append(float(r))
+ else:
+ ram_allocation_ratio_list.append(float(metadata["ram_allocation_ratio"]))
+ if "cpu_allocation_ratio" in metadata.keys():
+ if isinstance(metadata["cpu_allocation_ratio"], list):
+ for r in metadata["cpu_allocation_ratio"]:
+ cpu_allocation_ratio_list.append(float(r))
+ else:
+ cpu_allocation_ratio_list.append(float(metadata["cpu_allocation_ratio"]))
+ if "disk_allocation_ratio" in metadata.keys():
+ if isinstance(metadata["disk_allocation_ratio"], list):
+ for r in metadata["disk_allocation_ratio"]:
+ disk_allocation_ratio_list.append(float(r))
+ else:
+ disk_allocation_ratio_list.append(float(metadata["disk_allocation_ratio"]))
+
+ ram_allocation_ratio = 1.0
+ if len(ram_allocation_ratio_list) > 0:
+ ram_allocation_ratio = min(ram_allocation_ratio_list)
+ else:
+ if self.default_ram_allocation_ratio > 0:
+ ram_allocation_ratio = self.default_ram_allocation_ratio
+
+ host.compute_mem(ram_allocation_ratio)
+
+ cpu_allocation_ratio = 1.0
+ if len(cpu_allocation_ratio_list) > 0:
+ cpu_allocation_ratio = min(cpu_allocation_ratio_list)
+ else:
+ if self.default_cpu_allocation_ratio > 0:
+ cpu_allocation_ratio = self.default_cpu_allocation_ratio
+
+ host.compute_cpus(cpu_allocation_ratio)
+
+ disk_allocation_ratio = 1.0
+ if len(disk_allocation_ratio_list) > 0:
+ disk_allocation_ratio = min(disk_allocation_ratio_list)
+ else:
+ if self.default_disk_allocation_ratio > 0:
+ disk_allocation_ratio = self.default_disk_allocation_ratio
+
+ host.compute_disk(disk_allocation_ratio)
+
+ def compute_avail_resources(self, host):
+ """Compute available amount of resources after placements."""
+
+ status = host.compute_avail_mem()
+ if status != "ok":
+ self.logger.warning(status)
+
+ status = host.compute_avail_cpus()
+ if status != "ok":
+ self.logger.warning(status)
+
+ status = host.compute_avail_disk()
+ if status != "ok":
+ self.logger.warning(status)
+
+ def mark_host_updated(self, _host_name):
+ """Mark the host updated."""
+
+ host = self.hosts[_host_name]
+ host.updated = True
+
+ if host.host_group is not None:
+ if isinstance(host.host_group, HostGroup):
+ self.mark_host_group_updated(host.host_group.name)
+ else:
+ self.mark_datacenter_updated()
+
+ def mark_host_group_updated(self, _name):
+ """Mark the host_group updated."""
+
+ host_group = self.host_groups[_name]
+ host_group.updated = True
+
+ if host_group.parent_resource is not None:
+ if isinstance(host_group.parent_resource, HostGroup):
+ self.mark_host_group_updated(host_group.parent_resource.name)
+ else:
+ self.mark_datacenter_updated()
+
+ def mark_datacenter_updated(self):
+ """Mark the datacenter updated."""
+
+ if self.datacenter is not None:
+ self.datacenter.updated = True
+
+ def get_host_of_server(self, _s_info):
+ """Check and return host that hosts this server."""
+
+ host = None
+
+ if len(self.change_of_placements) > 0:
+ if _s_info["stack_id"] != "none":
+ sid = _s_info["stack_id"] + ":" + _s_info["name"]
+ else:
+ sid = _s_info["uuid"]
+
+ if sid in self.change_of_placements.keys():
+ host_name = None
+ if "host" in self.change_of_placements[sid].keys():
+ host_name = self.change_of_placements[sid]["host"]
+ elif "new_host" in self.change_of_placements[sid].keys():
+ host_name = self.change_of_placements[sid]["new_host"]
+
+ if host_name is not None:
+ host = self.hosts[host_name]
+ else:
+ for _, h in self.hosts.iteritems():
+ if h.has_server(_s_info):
+ host = h
+ break
+
+ return host
+
+ def update_server_placements(self, change_of_placements=None, sync=False):
+ """Update hosts with the change of server placements.
+
+ Update the available resources of host and NUMA if sync is True.
+ """
+
+ if change_of_placements is None:
+ change_of_placements = self.change_of_placements
+
+ for _, change in change_of_placements.iteritems():
+ if "new_host" in change and "old_host" in change:
+ # Migration case
+
+ old_host = self.hosts[change.get("old_host")]
+ new_host = self.hosts[change.get("new_host")]
+
+ s_info = change.get("info")
+ old_info = old_host.get_server_info(s_info)
+
+ if sync:
+ # Adjust available remaining amount.
+
+ old_flavor = self.get_flavor(old_info.get("flavor_id"))
+ new_flavor = self.get_flavor(s_info.get("flavor_id"))
+
+ if new_flavor is None or old_flavor is None:
+ # NOTE(Gueyoung): ignore at this time.
+ # return False
+ pass
+ else:
+ s_info["vcpus"] = new_flavor.vCPUs
+ s_info["mem"] = new_flavor.mem_cap
+ s_info["disk"] = new_flavor.disk_cap
+
+ new_host.deduct_avail_resources(s_info)
+
+ if new_flavor.need_numa_alignment():
+ cell = new_host.NUMA.deduct_server_resources(s_info)
+ s_info["numa"] = cell
+
+ old_info["vcpus"] = old_flavor.vCPUs
+ old_info["mem"] = old_flavor.mem_cap
+ old_info["disk"] = old_flavor.disk_cap
+
+ old_host.rollback_avail_resources(old_info)
+
+ if old_flavor.need_numa_alignment():
+ old_host.NUMA.rollback_server_resources(old_info)
+
+ old_host.remove_server(old_info)
+
+ new_host.add_server(old_info)
+ new_host.update_server(s_info)
+
+ self.mark_host_updated(change.get("new_host"))
+ self.mark_host_updated(change.get("old_host"))
+
+ elif "new_host" in change and "old_host" not in change:
+ # New server case
+
+ host = self.hosts[change.get("new_host")]
+ s_info = change.get("info")
+
+ flavor = self.get_flavor(s_info.get("flavor_id"))
+
+ if flavor is None:
+ # NOTE(Gueyoung): ignore at this time.
+ # return False
+ pass
+ else:
+ s_info["vcpus"] = flavor.vCPUs
+ s_info["mem"] = flavor.mem_cap
+ s_info["disk"] = flavor.disk_cap
+
+ host.deduct_avail_resources(s_info)
+
+ host.add_server(s_info)
+
+ if sync:
+ if flavor is not None:
+ # Adjust available remaining amount.
+ if flavor.need_numa_alignment():
+ host.NUMA.deduct_server_resources(s_info)
+ else:
+ if s_info.get("numa") != "none":
+ host.NUMA.add_server(s_info)
+
+ self.mark_host_updated(change.get("new_host"))
+
+ elif "new_host" not in change and "old_host" in change:
+ # Deletion case
+
+ host = self.hosts[change.get("old_host")]
+ s_info = change.get("info")
+
+ flavor = self.get_flavor(s_info.get("flavor_id"))
+
+ if flavor is None:
+ # NOTE(Gueyoung): ignore at this time.
+ # return False
+ pass
+ else:
+ s_info["vcpus"] = flavor.vCPUs
+ s_info["mem"] = flavor.mem_cap
+ s_info["disk"] = flavor.disk_cap
+
+ host.rollback_avail_resources(s_info)
+
+ if flavor.need_numa_alignment():
+ host.NUMA.rollback_server_resources(s_info)
+
+ host.remove_server(s_info)
+
+ self.mark_host_updated(change.get("old_host"))
+
+ else:
+ # Update case
+
+ host = self.hosts[change.get("host")]
+ s_info = change.get("info")
+
+ if sync:
+ # Adjust available remaining amount.
+
+ old_info = host.get_server_info(s_info)
+
+ if s_info["flavor_id"] != old_info["flavor_id"]:
+ old_flavor = self.get_flavor(old_info.get("flavor_id"))
+ new_flavor = self.get_flavor(s_info.get("flavor_id"))
+
+ if old_flavor is None or new_flavor is None:
+ # NOTE(Gueyoung): ignore at this time.
+ # return False
+ pass
+ else:
+ host.rollback_avail_resources(old_info)
+
+ if old_flavor.need_numa_alignment():
+ host.NUMA.rollback_server_resources(old_info)
+
+ s_info["vcpus"] = new_flavor.vCPUs
+ s_info["mem"] = new_flavor.mem_cap
+ s_info["disk"] = new_flavor.disk_cap
+
+ host.deduct_avail_resources(s_info)
+
+ if new_flavor.need_numa_alignment():
+ cell = host.NUMA.deduct_server_resources(s_info)
+ s_info["numa"] = cell
+
+ new_info = host.update_server(s_info)
+
+ if new_info is not None:
+ self.mark_host_updated(change.get("host"))
+
+ return True
+
+ def update_server_grouping(self, change_of_placements=None, new_groups=None):
+ """Update group member_hosts and hosts' memberships
+
+ Caused by server addition, deletion, and migration.
+ """
+
+ if change_of_placements is None:
+ change_of_placements = self.change_of_placements
+
+ if new_groups is None:
+ new_groups = self._get_new_grouping()
+
+ for _, placement in change_of_placements.iteritems():
+ if "new_host" in placement.keys() and "old_host" in placement.keys():
+ # Migrated server. This server can be unknown one previously.
+
+ old_host = self.hosts[placement.get("old_host")]
+ new_host = self.hosts[placement.get("new_host")]
+ s_info = placement.get("info")
+ new_info = new_host.get_server_info(s_info)
+
+ # A list of Valet groups
+ group_list = []
+ self.get_groups_of_server(old_host, new_info, group_list)
+
+ _group_list = self._get_groups_of_server(new_info, new_groups)
+ for gk in _group_list:
+ if gk not in group_list:
+ group_list.append(gk)
+
+ self._remove_server_from_groups(old_host, new_info)
+
+ self._add_server_to_groups(new_host, new_info, group_list)
+
+ elif "new_host" in placement.keys() and "old_host" not in placement.keys():
+ # New server
+
+ new_host = self.hosts[placement.get("new_host")]
+ s_info = placement.get("info")
+ new_s_info = new_host.get_server_info(s_info)
+
+ group_list = self._get_groups_of_server(new_s_info, new_groups)
+
+ self._add_server_to_groups(new_host, new_s_info, group_list)
+
+ elif "new_host" not in placement.keys() and "old_host" in placement.keys():
+ # Deleted server. This server can be unknown one previously.
+
+ # Enabled host
+ host = self.hosts[placement["old_host"]]
+
+ self._remove_server_from_groups(host, placement.get("info"))
+
+ else:
+ host_name = placement.get("host")
+ s_info = placement.get("info")
+
+ if host_name in self.hosts.keys():
+ host = self.hosts[host_name]
+ new_info = host.get_server_info(s_info)
+
+ if new_info is not None:
+ self._update_server_in_groups(host, new_info)
+
+ # To create, delete, and update dynamic Host-Aggregates.
+ # TODO(Gueyoung): return error if fail to connect to Nova.
+ self._manage_dynamic_host_aggregates()
+
+ def _get_new_grouping(self, change_of_placements=None):
+ """Verify and get new hosts' memberships."""
+
+ if change_of_placements is None:
+ change_of_placements = self.change_of_placements
+
+ new_groups = {}
+
+ # TODO: grouping verification for 'new' servers.
+ # by calling verify_pre_valet_placements()
+ # Should add each host's new memberships.
+
+ # Add host's memberships for server-group.
+ # Do not need to verify.
+ for _, placement in change_of_placements.iteritems():
+ if "new_host" in placement.keys():
+ host = self.hosts[placement.get("new_host")]
+ s_info = placement.get("info")
+ new_info = host.get_server_info(s_info)
+
+ for gk, g in self.groups.iteritems():
+ if g.factory == "server-group" and g.status == "enabled":
+ if g.has_server_uuid(new_info.get("uuid")):
+ if gk not in host.memberships.keys():
+ host.memberships[gk] = g
+ self.mark_host_updated(host.name)
+
+ if gk not in new_groups.keys():
+ new_groups[gk] = []
+ new_groups[gk].append(new_info)
+
+ return new_groups
+
+ def _get_groups_of_server(self, _s_info, new_groups):
+ """Check and return group list where server belongs to."""
+
+ group_list = []
+
+ _stack_id = _s_info.get("stack_id")
+ _stack_name = _s_info.get("stack_name")
+ _uuid = _s_info.get("uuid")
+ _name = _s_info.get("name")
+
+ for gk, server_list in new_groups.iteritems():
+ for s_info in server_list:
+ if s_info["uuid"] != "none":
+ if s_info["uuid"] == _uuid:
+ if gk not in group_list:
+ group_list.append(gk)
+ break
+
+ if s_info["name"] != "none":
+ if s_info["stack_id"] != "none":
+ if s_info["stack_id"] == _stack_id and \
+ s_info["name"] == _name:
+ if gk not in group_list:
+ group_list.append(gk)
+ break
+
+ if s_info["stack_name"] != "none":
+ if s_info["stack_name"] == _stack_name and \
+ s_info["name"] == _name:
+ if gk not in group_list:
+ group_list.append(gk)
+ break
+
+ return group_list
+
+ def get_groups_of_server(self, _host, _s_info, _group_list):
+ """Get groups where the server is assigned."""
+
+ for gk in _host.memberships.keys():
+ if gk not in self.groups.keys() or self.groups[gk].status != "enabled":
+ del _host.memberships[gk]
+ if isinstance(_host, Host):
+ self.mark_host_updated(_host.name)
+ elif isinstance(_host, HostGroup):
+ self.mark_host_group_updated(_host.name)
+ else:
+ self.mark_datacenter_updated()
+ continue
+
+ g = self.groups[gk]
+
+ if g.factory not in ("valet", "server-group"):
+ continue
+
+ if isinstance(_host, HostGroup):
+ if g.level != _host.host_type:
+ continue
+
+ if g.has_server_in_host(_host.name, _s_info):
+ if gk not in _group_list:
+ _group_list.append(gk)
+
+ if isinstance(_host, Host) and _host.host_group is not None:
+ if _host.host_group.is_available():
+ self.get_groups_of_server(_host.host_group, _s_info, _group_list)
+ elif isinstance(_host, HostGroup) and _host.parent_resource is not None:
+ if _host.parent_resource.is_available():
+ if isinstance(_host.parent_resource, HostGroup):
+ self.get_groups_of_server(_host.parent_resource, _s_info, _group_list)
+
+ def _add_server_to_groups(self, _host, _s_info, _groups):
+ """Add new server into groups."""
+
+ for gk in _groups:
+ # The group must be verified for host membership
+ if gk not in _host.memberships.keys():
+ continue
+
+ if gk not in self.groups.keys() or self.groups[gk].status != "enabled":
+ del _host.memberships[gk]
+ if isinstance(_host, Host):
+ self.mark_host_updated(_host.name)
+ elif isinstance(_host, HostGroup):
+ self.mark_host_group_updated(_host.name)
+ else:
+ self.mark_datacenter_updated()
+ continue
+
+ g = self.groups[gk]
+
+ if g.factory not in ("valet", "server-group"):
+ continue
+
+ if isinstance(_host, HostGroup):
+ if g.level != _host.host_type:
+ continue
+
+ if g.factory == "server-group":
+ g.clean_server(_s_info["uuid"], _host.name)
+
+ if g.add_server(_s_info, _host.name):
+ g.updated = True
+ else:
+ self.logger.warning("server already exists in group")
+
+ if isinstance(_host, Host) and _host.host_group is not None:
+ if _host.host_group.is_available():
+ self._add_server_to_groups(_host.host_group, _s_info, _groups)
+ elif isinstance(_host, HostGroup) and _host.parent_resource is not None:
+ if _host.parent_resource.is_available():
+ if isinstance(_host.parent_resource, HostGroup):
+ self._add_server_to_groups(_host.parent_resource, _s_info, _groups)
+
+ def _remove_server_from_groups(self, _host, _s_info):
+ """Remove server from related groups."""
+
+ for gk in _host.memberships.keys():
+ if gk not in self.groups.keys() or self.groups[gk].status != "enabled":
+ del _host.memberships[gk]
+
+ if isinstance(_host, Host):
+ self.mark_host_updated(_host.name)
+ elif isinstance(_host, HostGroup):
+ self.mark_host_group_updated(_host.name)
+ else:
+ self.mark_datacenter_updated()
+ continue
+
+ g = self.groups[gk]
+
+ if g.factory not in ("valet", "server-group"):
+ continue
+
+ if isinstance(_host, HostGroup):
+ if g.level != _host.host_type:
+ continue
+
+ if g.remove_server(_s_info):
+ g.updated = True
+
+ if g.remove_server_from_host(_host.name, _s_info):
+ g.updated = True
+
+ # Remove host from group's membership if the host has no servers of the group.
+ if g.remove_member(_host.name):
+ g.updated = True
+
+ # Remove group from host's membership if group does not have the host
+ # Not consider group has datacenter level.
+ if isinstance(_host, Host) or isinstance(_host, HostGroup):
+ if _host.remove_membership(g):
+ if isinstance(_host, Host):
+ self.mark_host_updated(_host.name)
+ elif isinstance(_host, HostGroup):
+ self.mark_host_group_updated(_host.name)
+ else:
+ self.mark_datacenter_updated()
+
+ if len(g.server_list) == 0:
+ g.status = "disabled"
+ g.updated = True
+
+ if isinstance(_host, Host) and _host.host_group is not None:
+ if _host.host_group.is_available():
+ self._remove_server_from_groups(_host.host_group, _s_info)
+ elif isinstance(_host, HostGroup) and _host.parent_resource is not None:
+ if _host.parent_resource.is_available():
+ if isinstance(_host.parent_resource, HostGroup):
+ self._remove_server_from_groups(_host.parent_resource, _s_info)
+
+ def _update_server_in_groups(self, _host, _s_info):
+ """Update server info in groups."""
+
+ for gk in _host.memberships.keys():
+ if gk not in self.groups.keys() or self.groups[gk].status != "enabled":
+ del _host.memberships[gk]
+ if isinstance(_host, Host):
+ self.mark_host_updated(_host.name)
+ elif isinstance(_host, HostGroup):
+ self.mark_host_group_updated(_host.name)
+ else:
+ self.mark_datacenter_updated()
+ continue
+
+ g = self.groups[gk]
+
+ if g.factory not in ("valet", "server-group"):
+ continue
+
+ if isinstance(_host, HostGroup):
+ if g.level != _host.host_type:
+ continue
+
+ if g.update_server(_s_info):
+ g.update_server_in_host(_host.name, _s_info)
+ g.updated = True
+
+ if isinstance(_host, Host) and _host.host_group is not None:
+ if _host.host_group.is_available():
+ self._update_server_in_groups(_host.host_group, _s_info)
+ elif isinstance(_host, HostGroup) and _host.parent_resource is not None:
+ if _host.parent_resource.is_available():
+ if isinstance(_host.parent_resource, HostGroup):
+ self._update_server_in_groups(_host.parent_resource, _s_info)
+
+ def add_group(self, _g_name, _g_type, _level, _factory, _host_name):
+ """Add/Enable group unless the group exists or disabled."""
+
+ if _g_name not in self.groups.keys():
+ group = Group(_g_name)
+ group.group_type = _g_type
+ group.factory = _factory
+ group.level = _level
+ group.rule = self._get_rule_of_group(_g_name)
+ group.new = True
+ group.updated = True
+ self.groups[_g_name] = group
+ elif self.groups[_g_name].status != "enabled":
+ self.groups[_g_name].status = "enabled"
+ self.groups[_g_name].updated = True
+
+ if _host_name in self.hosts.keys():
+ host = self.hosts[_host_name]
+ else:
+ host = self.host_groups[_host_name]
+
+ # Update host memberships.
+ if host is not None:
+ if _g_name not in host.memberships.keys():
+ host.memberships[_g_name] = self.groups[_g_name]
+
+ if isinstance(host, Host):
+ self.mark_host_updated(_host_name)
+ elif isinstance(host, HostGroup):
+ self.mark_host_group_updated(_host_name)
+
+ return True
+
+ def _get_rule_of_group(self, _gk):
+ """Get valet group rule of the given group."""
+
+ rule_name_elements = _gk.split(':')
+ rule_name = rule_name_elements[len(rule_name_elements)-1]
+
+ if rule_name in self.group_rules.keys():
+ return self.group_rules[rule_name]
+
+ return None
+
+ def get_group_by_uuid(self, _uuid):
+ """Check and get the group with its uuid."""
+
+ for _, g in self.groups.iteritems():
+ if g.uuid == _uuid:
+ return g
+
+ return None
+
+ def check_valid_rules(self, _tenant_id, _rule_list, use_ex=True):
+ """Check if given rules are valid to be used."""
+
+ for rk in _rule_list:
+ if rk not in self.group_rules.keys():
+ return "not exist rule (" + rk + ")"
+
+ # TODO(Gueyoung): if disabled,
+ # what to do with placed servers under this rule?
+ if self.group_rules[rk].status != "enabled":
+ return "rule (" + rk + ") is not enabled"
+
+ if not use_ex:
+ if self.group_rules[rk].rule_type == "exclusivity":
+ return "exclusivity not supported"
+
+ rule = self.group_rules[rk]
+ if len(rule.members) > 0 and _tenant_id not in rule.members:
+ return "no valid tenant to use rule (" + rk + ")"
+
+ return "ok"
+
+ def _manage_dynamic_host_aggregates(self):
+ """Create, delete, or update Host-Aggregates after placement decisions."""
+
+ for gk in self.groups.keys():
+ g = self.groups[gk]
+ if g.group_type == "exclusivity" and g.status == "enabled":
+ aggr_name = "valet:" + g.name
+ if aggr_name not in self.groups.keys():
+ # Create Host-Aggregate.
+ status = self._add_exclusivity_aggregate(aggr_name, g)
+ # TODO(Gueyoung): return error
+ if status != "ok":
+ self.logger.warning("error while adding dynamic host-aggregate")
+ else:
+ dha = self.groups[aggr_name]
+ for hk in g.member_hosts.keys():
+ if hk not in dha.member_hosts.keys():
+ # Add new host into Host-Aggregate.
+ status = self._update_exclusivity_aggregate(dha,
+ self.hosts[hk])
+ # TODO(Gueyoung): return error
+ if status != "ok":
+ self.logger.warning("error while updating dynamic host-aggregate")
+
+ for gk in self.groups.keys():
+ g = self.groups[gk]
+ if g.group_type == "aggr" and g.status == "enabled":
+ if g.name.startswith("valet:"):
+ if g.metadata["valet_type"] == "exclusivity":
+ name_elements = g.name.split(':', 1)
+ ex_group_name = name_elements[1]
+ if ex_group_name not in self.groups.keys() or \
+ self.groups[ex_group_name].status != "enabled":
+ # Delete Host-Aggregate
+ status = self._remove_exclusivity_aggregate(g)
+ # TODO(Gueyoung): return error
+ if status != "ok":
+ self.logger.warning("error while removing dynamic host-aggregate")
+ else:
+ ex_group = self.groups[ex_group_name]
+ for hk in g.member_hosts.keys():
+ if hk not in ex_group.member_hosts.keys():
+ # Remove host from Host-Aggregate.
+ status = self._remove_host_from_exclusivity_aggregate(g,
+ self.hosts[hk])
+
+ # TODO(Gueyoung): return error
+ if status != "ok":
+ self.logger.warning("error while removing host from dynamic host-aggregate")
+
+ def _add_exclusivity_aggregate(self, _name, _group):
+ """Create platform Host-Aggregate for Valet rules.
+
+ Exclusivity: create Host-Aggregate, and lock.
+ """
+
+ group = Group(_name)
+ group.group_type = "aggr"
+ group.level = "host"
+ group.factory = "nova"
+
+ metadata = {"valet_type": "exclusivity"}
+
+ new_host_list = []
+ ex_metadata = {}
+
+ for hk in _group.member_hosts.keys():
+ host = self.hosts[hk]
+ aggregates = host.get_aggregates()
+
+ old_aggregates = []
+ for a in aggregates:
+ if a.name.startswith("valet:"):
+ continue
+
+ for mk, mv in a.metadata.iteritems():
+ if mk not in ex_metadata.keys():
+ ex_metadata[mk] = mv
+ else:
+ if isinstance(ex_metadata[mk], list):
+ if mv not in ex_metadata[mk]:
+ ex_metadata[mk].append(mv)
+ self.logger.warning("multiple values of metadata key")
+ else:
+ if mv != ex_metadata[mk]:
+ value_list = [ex_metadata[mk], mv]
+ ex_metadata[mk] = value_list
+ self.logger.warning("multiple values of metadata key")
+
+ old_aggregates.append(a)
+
+ if hk in a.member_hosts.keys():
+ del a.member_hosts[hk]
+ a.updated = True
+
+ if a.name in host.memberships.keys():
+ del host.memberships[a.name]
+
+ if len(old_aggregates) > 0:
+ metadata[hk] = str(old_aggregates[0].uuid)
+ for i in range(1, len(old_aggregates)):
+ metadata[hk] += ("," + str(old_aggregates[i].uuid))
+
+ new_host_list.append(host)
+
+ metadata["prior_metadata"] = ex_metadata
+
+ group.metadata = metadata
+
+ for host in new_host_list:
+ group.member_hosts[host.name] = []
+
+ host.memberships[_name] = group
+ self.mark_host_updated(host.name)
+
+ group.updated = True
+
+ if not self.metadata.source.valid_client(self.datacenter_url):
+ self.metadata.source.set_client(self.datacenter_url)
+
+ status = self.metadata.create_exclusive_aggregate(group,
+ new_host_list)
+
+ self.groups[_name] = group
+
+ return status
+
+ def _update_exclusivity_aggregate(self, _group, _host):
+ """Update platform Host-Aggregate for Valet rules.
+
+ Exclusivity: update Host-Aggregate, and lock.
+ """
+
+ status = "ok"
+
+ aggregates = _host.get_aggregates()
+
+ if _group.group_type == "aggr":
+ if _host.name not in _group.member_hosts.keys():
+ old_aggregates = []
+ ex_metadata = _group.metadata["prior_metadata"]
+
+ for a in aggregates:
+ if a.name.startswith("valet:"):
+ continue
+
+ for mk, mv in a.metadata.iteritems():
+ if mk not in ex_metadata.keys():
+ ex_metadata[mk] = mv
+ else:
+ if isinstance(ex_metadata[mk], list):
+ if mv not in ex_metadata[mk]:
+ ex_metadata[mk].append(mv)
+ self.logger.warning("multiple values of metadata key")
+ else:
+ if mv != ex_metadata[mk]:
+ value_list = [ex_metadata[mk], mv]
+ ex_metadata[mk] = value_list
+ self.logger.warning("multiple values of metadata key")
+
+ old_aggregates.append(a)
+
+ if _host.name in a.member_hosts.keys():
+ del a.member_hosts[_host.name]
+ a.updated = True
+
+ if a.name in _host.memberships.keys():
+ del _host.memberships[a.name]
+
+ if len(old_aggregates) > 0:
+ _group.metadata[_host.name] = str(old_aggregates[0].uuid)
+ for i in range(1, len(old_aggregates)):
+ _group.metadata[_host.name] += ("," + str(old_aggregates[i].uuid))
+
+ _group.metadata["prior_metadata"] = ex_metadata
+
+ _group.member_hosts[_host.name] = []
+ _group.updated = True
+
+ _host.memberships[_group.name] = _group
+ self.mark_host_updated(_host.name)
+
+ if not self.metadata.source.valid_client(self.datacenter_url):
+ self.metadata.source.set_client(self.datacenter_url)
+
+ status = self.metadata.update_exclusive_aggregate(_group.uuid,
+ _group.metadata,
+ _host.name,
+ old_aggregates)
+
+ return status
+
+ def _remove_exclusivity_aggregate(self, _group):
+ """Remove dynamic Host-Aggregate."""
+
+ for hk in _group.member_hosts.keys():
+ host = self.hosts[hk]
+
+ status = self._remove_host_from_exclusivity_aggregate(_group, host)
+ if status != "ok":
+ self.logger.warning("error while removing host from dynamic host-aggregate")
+
+ del self.groups[_group.name]
+
+ if not self.metadata.source.valid_client(self.datacenter_url):
+ self.metadata.source.set_client(self.datacenter_url)
+
+ return self.metadata.remove_exclusive_aggregate(_group.uuid)
+
+ def _remove_host_from_exclusivity_aggregate(self, _group, _host):
+ """Update platform Host-Aggregate for Valet rules.
+
+ Exclusivity: delete host from dynamic Host-Aggregate.
+ """
+
+ status = "ok"
+
+ if _group.group_type == "aggr":
+ if _host.name in _group.member_hosts.keys():
+ old_aggregates = []
+ if _host.name in _group.metadata.keys():
+ aggr_ids = _group.metadata[_host.name].split(',')
+
+ for aid in aggr_ids:
+ aggr = self.get_group_by_uuid(int(aid))
+ if aggr is not None:
+ aggr.member_hosts[_host.name] = []
+ aggr.updated = True
+ old_aggregates.append(aggr)
+
+ if aggr.name not in _host.memberships.keys():
+ _host.memberships[aggr.name] = aggr
+
+ _group.metadata[_host.name] = ""
+
+ del _group.member_hosts[_host.name]
+ _group.updated = True
+
+ del _host.memberships[_group.name]
+ self.mark_host_updated(_host.name)
+
+ if not self.metadata.source.valid_client(self.datacenter_url):
+ self.metadata.source.set_client(self.datacenter_url)
+
+ status = self.metadata.remove_host_from_exclusive_aggregate(_group.uuid,
+ _group.metadata,
+ _host.name,
+ old_aggregates)
+
+ return status
+
+ def sync_with_platform(self, store=False):
+ """Communicate with platform (e.g., nova) to get resource status.
+
+ Due to dependencies between resource types,
+ keep the following order of process.
+ """
+
+ if len(self.pending_requests) > 0:
+ return True
+
+ self.logger.info("load data from platform (e.g., nova)")
+
+ # Set the platorm client lib (e.g., novaclient).
+ if not self.metadata.source.valid_client(self.datacenter_url):
+ count = 0
+ while count < 3:
+ if not self.metadata.source.set_client(self.datacenter_url):
+ self.logger.warning("fail to set novaclient: try again")
+ count += 1
+ time.sleep(1)
+ else:
+ break
+ if count == 3:
+ self.logger.error("fail to set novaclient")
+ return False
+
+ count = 0
+ while count < 3:
+ # Set each flavor and its metadata.
+ if not self.metadata.get_flavors(self):
+ self.logger.warning("fail to get flavors: try again")
+ count += 1
+ time.sleep(1)
+ else:
+ break
+ if count == 3:
+ self.logger.error("fail to get flavors")
+ return False
+
+ count = 0
+ while count < 3:
+ # Set each compute host and servers information.
+ if not self.compute.get_hosts(self):
+ self.logger.warning("fail to get hosts: try again")
+ count += 1
+ time.sleep(1)
+ else:
+ break
+ if count == 3:
+ self.logger.error("fail to get hosts")
+ return False
+
+ # TODO(Gueyoung): need to every time?
+ # Set the layout between each compute host and rack.
+ if not self.topology.get_topology(self):
+ return False
+
+ count = 0
+ while count < 3:
+ # Set the availability-zone, host-aggregate, and server-group
+ # of each compute host.
+ if not self.metadata.get_groups(self):
+ self.logger.warning("fail to get groups: try again")
+ count += 1
+ time.sleep(1)
+ else:
+ break
+ if count == 3:
+ self.logger.error("fail to get groups")
+ return False
+
+ # Update total capacities of each host.
+ # Triggered by overcommit ratio update or newly added.
+ for _, host in self.hosts.iteritems():
+ if host.is_available() and host.updated:
+ self.compute_resources(host)
+
+ # Update server placements in hosts
+ # If sync is True, update the available capacities.
+ if not self.update_server_placements(sync=True):
+ return False
+
+ # Update the available capacities of each NUMA and host.
+ # Triggered by unknown server additions and deletions.
+ for _, host in self.hosts.iteritems():
+ if host.is_available() and host.updated:
+ self.compute_avail_resources(host)
+
+ # Update server grouping changed by deletion and migration of servers.
+ # TODO(Gueyoung): return False if fail to connect to Nova.
+ self.update_server_grouping()
+
+ # Update racks (and clusters) and datacenter based on host change.
+ self.update_resource()
+
+ # TODO: If peoridic batches to collect data from platform is activated,
+ # should check if there is any update before storing data into DB.
+ if store:
+ self.store_resource()
+
+ return True
+
+ def get_flavor(self, _id):
+ """Get a flavor info."""
+
+ if isinstance(_id, six.string_types):
+ flavor_id = _id
+ else:
+ flavor_id = str(_id)
+
+ self.logger.debug("fetching flavor = " + flavor_id)
+
+ flavor = None
+ if flavor_id in self.flavors.keys():
+ flavor = self.flavors[flavor_id]
+ else:
+ for _, f in self.flavors.iteritems():
+ if f.flavor_id == flavor_id:
+ flavor = f
+ break
+
+ if flavor is not None:
+ # Check if detailed information.
+ # TODO(Gueyoung): what if flavor specs changed from platform?
+ if flavor.vCPUs == 0:
+ if not self.metadata.source.valid_client(self.datacenter_url):
+ count = 0
+ while count < 3:
+ if not self.metadata.source.set_client(self.datacenter_url):
+ self.logger.warning("fail to set novaclient: try again")
+ count += 1
+ time.sleep(1)
+ else:
+ break
+ if count == 3:
+ self.logger.error("fail to set novaclient")
+ return None
+
+ f = self.metadata.source.get_flavor(flavor.flavor_id)
+ if f is None:
+ flavor = None
+ else:
+ flavor.set_info(f)
+ flavor.updated = True
+
+ self.logger.debug("flavor (" + flavor.name + ") fetched")
+ else:
+ self.logger.warning("unknown flavor = " + flavor_id)
+
+ return flavor
+
+ def store_resource(self, opt=None, req_id=None):
+ """Store resource status into DB."""
+
+ flavor_updates = {}
+ group_updates = {}
+ host_updates = {}
+ host_group_updates = {}
+
+ # Do not store disbaled resources.
+
+ for fk, flavor in self.flavors.iteritems():
+ # TODO(Gueyoung): store disabled flavor?
+ flavor_updates[fk] = flavor.get_json_info()
+
+ for gk, group in self.groups.iteritems():
+ if group.status == "enabled":
+ if group.factory != "valet":
+ group_updates[gk] = group.get_json_info()
+
+ for hk, host in self.hosts.iteritems():
+ if host.is_available():
+ host_updates[hk] = host.get_json_info()
+
+ for hgk, host_group in self.host_groups.iteritems():
+ if host_group.is_available():
+ host_group_updates[hgk] = host_group.get_json_info()
+
+ datacenter_update = self.datacenter.get_json_info()
+
+ # If there is pending requests (i.e., not confirmed nor rollbacked),
+ # do NOT sync with platform when dealing with new request.
+ # Here, add/remove request from/to pending list
+ # to track the list of pending requests.
+ if opt is not None and req_id is not None:
+ if opt in ("create", "delete", "update"):
+ self.pending_requests.append(req_id)
+ elif opt in ("confirm", "rollback"):
+ for rid in self.pending_requests:
+ if rid == req_id:
+ self.pending_requests.remove(rid)
+ break
+
+ json_update = {'flavors': flavor_updates, 'groups': group_updates, 'hosts': host_updates,
+ 'host_groups': host_group_updates, 'datacenter': datacenter_update}
+
+ if self.new:
+ if not self.dbh.create_resource(self.datacenter_id,
+ self.datacenter_url,
+ self.pending_requests,
+ json_update):
+ return False
+ else:
+ if not self.dbh.update_resource(self.datacenter_id,
+ self.datacenter_url,
+ self.pending_requests,
+ json_update):
+ return False
+
+ if self.new:
+ self.logger.debug("new datacenter = " + self.datacenter_id)
+ self.logger.debug(" url = " + self.datacenter_url)
+ else:
+ self.logger.debug("updated datacenter = " + self.datacenter_id)
+ self.logger.debug(" url = " + self.datacenter_url)
+ self.logger.debug("region = " + json.dumps(json_update['datacenter'], indent=4))
+ self.logger.debug("racks = " + json.dumps(json_update['host_groups'], indent=4))
+ self.logger.debug("hosts = " + json.dumps(json_update['hosts'], indent=4))
+ self.logger.debug("groups = " + json.dumps(json_update['groups'], indent=4))
+ self.logger.debug("flavors = ")
+ for fk, f_info in json_update['flavors'].iteritems():
+ if f_info["vCPUs"] > 0:
+ self.logger.debug(json.dumps(f_info, indent=4))
+
+ updated_valet_groups = {}
+ new_valet_groups = {}
+ deleted_valet_groups = {}
+ for gk, group in self.groups.iteritems():
+ if group.status == "enabled":
+ if group.factory == "valet":
+ if group.new:
+ new_valet_groups[gk] = group.get_json_info()
+ elif group.updated:
+ updated_valet_groups[gk] = group.get_json_info()
+ else:
+ if group.factory == "valet":
+ deleted_valet_groups[gk] = group.get_json_info()
+
+ for gk, g_info in new_valet_groups.iteritems():
+ if not self.dbh.create_valet_group(gk, g_info):
+ return False
+
+ self.logger.debug("new valet group = " + gk)
+ self.logger.debug("info = " + json.dumps(g_info, indent=4))
+
+ for gk, g_info in updated_valet_groups.iteritems():
+ if not self.dbh.update_valet_group(gk, g_info):
+ return False
+
+ self.logger.debug("updated valet group = " + gk)
+ self.logger.debug("info = " + json.dumps(g_info, indent=4))
+
+ for gk, g_info in deleted_valet_groups.iteritems():
+ if not self.dbh.delete_valet_group(gk):
+ return False
+
+ self.logger.debug("deleted valet group = " + gk)
+ self.logger.debug("info = " + json.dumps(g_info, indent=4))
+
+ return True
diff --git a/engine/src/valet/engine/resource_manager/resource_handler.py b/engine/src/valet/engine/resource_manager/resource_handler.py
new file mode 100644
index 0000000..38868c7
--- /dev/null
+++ b/engine/src/valet/engine/resource_manager/resource_handler.py
@@ -0,0 +1,299 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+import json
+
+from valet.engine.resource_manager.resource import Resource
+from valet.engine.resource_manager.resources.group_rule import GroupRule
+from valet.engine.resource_manager.resources.host_group import HostGroup
+
+
+class ResourceHandler:
+ """Handler for dealing with all existing datacenters and their resources."""
+
+ def __init__(self, _tid, _dbh, _compute, _metadata, _topology,
+ _config, _logger):
+ self.end_of_process = False
+
+ self.dbh = _dbh
+
+ self.compute = _compute
+ self.metadata = _metadata
+ self.topology = _topology
+
+ self.default_cpu_allocation_ratio = _config.get("default_cpu_allocation_ratio")
+ self.default_ram_allocation_ratio = _config.get("default_ram_allocation_ratio")
+ self.default_disk_allocation_ratio = _config.get("default_disk_allocation_ratio")
+ self.batch_sync_interval = _config.get("batch_sync_interval")
+
+ self.group_rules = {}
+ self.resource_list = []
+
+ self.logger = _logger
+
+ def load_group_rules_from_db(self):
+ """Get all defined valet group rules from DB.
+
+ Note that rules are applied to all datacenters.
+ """
+
+ # Init first
+ self.group_rules = {}
+
+ rule_list = self.dbh.get_group_rules()
+ if rule_list is None:
+ return None
+
+ for r in rule_list:
+ rule = GroupRule(r.get("id"))
+
+ rule.status = r.get("status")
+
+ rule.app_scope = r.get("app_scope")
+ rule.rule_type = r.get("type")
+ rule.level = r.get("level")
+ rule.members = json.loads(r.get("members"))
+ rule.desc = r.get("description")
+
+ self.group_rules[rule.rule_id] = rule
+
+ return "ok"
+
+ def load_group_rule_from_db(self, _id):
+ """Get valet group rule from DB."""
+
+ # Init first
+ self.group_rules = {}
+
+ r = self.dbh.get_group_rule(_id)
+ if r is None:
+ return None
+ elif len(r) == 0:
+ return "rule not found"
+
+ rule = GroupRule(r.get("id"))
+
+ rule.status = r.get("status")
+
+ rule.app_scope = r.get("app_scope")
+ rule.rule_type = r.get("type")
+ rule.level = r.get("level")
+ rule.members = json.loads(r.get("members"))
+ rule.desc = r.get("description")
+
+ self.group_rules[rule.rule_id] = rule
+
+ return "ok"
+
+ def create_group_rule(self, _name, _scope, _type, _level, _members, _desc):
+ """Create a new group rule in DB."""
+
+ r = self.dbh.get_group_rule(_name)
+ if r is None:
+ return None
+ elif len(r) > 0:
+ return "rule already exists"
+
+ if not self.dbh.create_group_rule(_name, _scope, _type, _level,
+ _members, _desc):
+ return None
+
+ return "ok"
+
+ def get_rules(self):
+ """Return basic info of valet rules."""
+
+ rule_list = []
+
+ valet_group_list = self.dbh.get_valet_groups()
+ if valet_group_list is None:
+ return None
+
+ for rk, rule in self.group_rules.iteritems():
+ rule_info = self._get_rule(rule)
+
+ for vg in valet_group_list:
+ if vg["rule_id"] == rk:
+ gk = vg.get("id")
+ gk_elements = gk.split(":")
+ dc_id = gk_elements[0]
+
+ if dc_id not in rule_info["regions"]:
+ rule_info["regions"].append(dc_id)
+
+ rule_list.append(rule_info)
+
+ return rule_list
+
+ def _get_rule(self, _rule):
+ """Return rule info."""
+
+ rule_info = {}
+
+ rule_info["id"] = _rule.rule_id
+ rule_info["type"] = _rule.rule_type
+ rule_info["app_scope"] = _rule.app_scope
+ rule_info["level"] = _rule.level
+ rule_info["members"] = _rule.members
+ rule_info["description"] = _rule.desc
+ rule_info["status"] = _rule.status
+ rule_info["regions"] = []
+
+ return rule_info
+
+ def get_placements_under_rule(self, _rule_name, _resource):
+ """Get server placements info under given rule in datacenter."""
+
+ placements = {}
+
+ rule = self.group_rules[_rule_name]
+
+ for gk, g in _resource.groups.iteritems():
+ if g.factory == "valet":
+ if g.rule.rule_id == _rule_name:
+ placements[gk] = self._get_placements(g, _resource)
+
+ result = {}
+ result["id"] = rule.rule_id
+ result["type"] = rule.rule_type
+ result["app_scope"] = rule.app_scope
+ result["level"] = rule.level
+ result["members"] = rule.members
+ result["description"] = rule.desc
+ result["status"] = rule.status
+ result["placements"] = placements
+
+ return result
+
+ def _get_placements(self, _g, _resource):
+ """Get placement info of servers in group."""
+
+ placements = {}
+
+ for hk, server_list in _g.member_hosts.iteritems():
+ for s_info in server_list:
+ sid = s_info.get("stack_name") + ":" + s_info.get("name")
+ placements[sid] = {}
+ placements[sid]["region"] = _resource.datacenter_id
+
+ if hk in _resource.hosts.keys():
+ host = _resource.hosts[hk]
+
+ placements[sid]["host"] = host.name
+
+ hg = host.host_group
+ if isinstance(hg, HostGroup) and hg.host_type == "rack":
+ placements[sid]["rack"] = hg.name
+ else:
+ placements[sid]["rack"] = "na"
+
+ az = host.get_availability_zone()
+ az_name_elements = az.name.split(':', 1)
+ if len(az_name_elements) > 1:
+ az_name = az_name_elements[1]
+ else:
+ az_name = az.name
+ placements[sid]["availability-zone"] = az_name
+
+ elif hk in _resource.host_groups.keys():
+ hg = _resource.host_groups[hk]
+
+ if hg.host_type == "rack":
+ placements[sid]["rack"] = hg.name
+
+ for hhk, host in hg.child_resources.iteritems():
+ if host.has_server(s_info):
+ placements[sid]["host"] = host.name
+
+ az = host.get_availability_zone()
+ az_name_elements = az.name.split(':', 1)
+ if len(az_name_elements) > 1:
+ az_name = az_name_elements[1]
+ else:
+ az_name = az.name
+ placements[sid]["availability-zone"] = az_name
+
+ break
+ else:
+ # TODO(Gueyoung): Look for az, rack and host
+ placements[sid]["availability-zone"] = "na"
+ placements[sid]["rack"] = "na"
+ placements[sid]["host"] = "na"
+
+ else:
+ placements[sid]["availability-zone"] = "na"
+ placements[sid]["rack"] = "na"
+ placements[sid]["host"] = "na"
+
+ return placements
+
+ def load_resource(self, _datacenter):
+ """Create a resource for placement decisions
+
+ in a given target datacenter.
+ """
+
+ # Init first
+ del self.resource_list[:]
+
+ resource = Resource(_datacenter, self.dbh,
+ self.compute, self.metadata, self.topology,
+ self.logger)
+
+ resource.set_config(self.default_cpu_allocation_ratio,
+ self.default_ram_allocation_ratio,
+ self.default_disk_allocation_ratio)
+
+ resource.set_group_rules(self.group_rules)
+
+ status = resource.load_resource_from_db()
+ if status is None:
+ return False
+ elif status != "ok":
+ self.logger.warning(status)
+ resource.new = True
+
+ self.resource_list.append(resource)
+
+ return True
+
+ def load_resource_with_rule(self, _datacenter):
+ """Create and return a resource with valet group rule."""
+
+ # Init first
+ del self.resource_list[:]
+
+ resource = Resource(_datacenter, self.dbh,
+ self.compute, self.metadata, self.topology,
+ self.logger)
+
+ resource.set_config(self.default_cpu_allocation_ratio,
+ self.default_ram_allocation_ratio,
+ self.default_disk_allocation_ratio)
+
+ resource.set_group_rules(self.group_rules)
+
+ status = resource.load_resource_from_db()
+ if status is None:
+ return None
+ elif status != "ok":
+ return status
+
+ self.resource_list.append(resource)
+
+ return "ok"
diff --git a/engine/src/valet/engine/resource_manager/topology_manager.py b/engine/src/valet/engine/resource_manager/topology_manager.py
new file mode 100644
index 0000000..f8422d3
--- /dev/null
+++ b/engine/src/valet/engine/resource_manager/topology_manager.py
@@ -0,0 +1,237 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+#!/bin/python
+
+
+from valet.engine.resource_manager.resources.datacenter import Datacenter
+from valet.engine.resource_manager.resources.host_group import HostGroup
+
+
+class TopologyManager(object):
+ """Manager to maintain the layout of datacenter."""
+
+ def __init__(self, _source, _logger):
+ self.source = _source
+
+ self.datacenter = None
+ self.host_groups = {}
+ self.hosts = {}
+
+ self.logger = _logger
+
+ def get_topology(self, _resource):
+ """Set datacenter layout into resource."""
+
+ self.logger.info("set datacenter layout...")
+
+ # Init first
+ self.datacenter = Datacenter(_resource.datacenter_id)
+ self.host_groups.clear()
+ self.hosts.clear()
+
+ if self.source.get_topology(self.datacenter, self.host_groups, self.hosts,
+ _resource.hosts) != "ok":
+ return False
+
+ self._check_updated(_resource)
+
+ return True
+
+ def _check_updated(self, _resource):
+ """Check if the layout is changed."""
+
+ if _resource.datacenter is None:
+ _resource.datacenter = Datacenter(_resource.datacenter_id)
+ _resource.datacenter.updated = True
+
+ self.logger.info("new datacenter (" + _resource.datacenter_id + ") added")
+
+ for hgk in self.host_groups.keys():
+ if hgk not in _resource.host_groups.keys():
+ new_host_group = HostGroup(hgk)
+ new_host_group.host_type = self.host_groups[hgk].host_type
+
+ _resource.host_groups[new_host_group.name] = new_host_group
+ _resource.mark_host_group_updated(hgk)
+
+ self.logger.info("new host_group (" + hgk + ") added")
+
+ for rhgk in _resource.host_groups.keys():
+ if rhgk not in self.host_groups.keys():
+ host_group = _resource.host_groups[rhgk]
+ host_group.status = "disabled"
+ host_group.mark_host_group_updated(rhgk)
+
+ self.logger.info("host_group (" + rhgk + ") disabled")
+
+ # TODO(Gueyoung): what if host exists in topology,
+ # but does not in resource (DB or platform)?
+
+ for rhk in _resource.hosts.keys():
+ if not _resource.hosts[rhk].is_available():
+ continue
+
+ if rhk not in self.hosts.keys():
+ _resource.hosts[rhk].status = "disabled"
+ _resource.mark_host_updated(rhk)
+
+ self.logger.info("host (" + rhk + ") removed from topology")
+
+ if self._is_datacenter_updated(_resource):
+ _resource.datacenter.updated = True
+
+ for hgk in self.host_groups.keys():
+ hg = self.host_groups[hgk]
+
+ if self._is_host_group_updated(hg, _resource):
+ _resource.mark_host_group_updated(hgk)
+
+ for hk in self.hosts.keys():
+ if hk in _resource.hosts.keys():
+ if not _resource.hosts[hk].is_available():
+ continue
+
+ host = self.hosts[hk]
+
+ if self._is_host_updated(host, _resource):
+ _resource.mark_host_updated(hk)
+
+ # TODO(Gueyoung): Hierachical failure propagation
+
+ def _is_datacenter_updated(self, _resource):
+ """Check if datacenter's resources are changed."""
+
+ updated = False
+
+ _rdatacenter = _resource.datacenter
+
+ for rk in self.datacenter.resources.keys():
+
+ h = None
+ if rk in _resource.host_groups.keys():
+ h = _resource.host_groups[rk]
+ elif rk in _resource.hosts.keys():
+ h = _resource.hosts[rk]
+
+ if h is not None and h.is_available():
+ if rk not in _rdatacenter.resources.keys() or h.updated:
+ _rdatacenter.resources[rk] = h
+ updated = True
+
+ self.logger.info("datacenter updated (new resource)")
+
+ for rk in _rdatacenter.resources.keys():
+
+ h = None
+ if rk in _resource.host_groups.keys():
+ h = _resource.host_groups[rk]
+ elif rk in _resource.hosts.keys():
+ h = _resource.hosts[rk]
+
+ if h is None or \
+ not h.is_available() or \
+ rk not in self.datacenter.resources.keys():
+ del _rdatacenter.resources[rk]
+ updated = True
+
+ self.logger.info("datacenter updated (resource removed)")
+
+ return updated
+
+ def _is_host_group_updated(self, _hg, _resource):
+ """Check if host_group's parent or children are changed."""
+
+ updated = False
+
+ _rhg = _resource.host_groups[_hg.name]
+
+ if _hg.host_type != _rhg.host_type:
+ _rhg.host_type = _hg.host_type
+ updated = True
+ self.logger.info("host_group (" + _rhg.name + ") updated (hosting type)")
+
+ if _rhg.parent_resource is None or \
+ _rhg.parent_resource.name != _hg.parent_resource.name:
+ if _hg.parent_resource.name in _resource.host_groups.keys():
+ hg = _resource.host_groups[_hg.parent_resource.name]
+ if hg.is_available():
+ _rhg.parent_resource = hg
+ updated = True
+ elif _hg.parent_resource.name == _resource.datacenter.name:
+ _rhg.parent_resource = _resource.datacenter
+ updated = True
+
+ if updated:
+ self.logger.info("host_group (" + _rhg.name + ") updated (parent host_group)")
+
+ for rk in _hg.child_resources.keys():
+
+ h = None
+ if rk in _resource.hosts.keys():
+ h = _resource.hosts[rk]
+ elif rk in _resource.host_groups.keys():
+ h = _resource.host_groups[rk]
+
+ if h is not None and h.is_available():
+ if rk not in _rhg.child_resources.keys() or h.updated:
+ _rhg.child_resources[rk] = h
+ updated = True
+
+ self.logger.info("host_group (" + _rhg.name + ") updated (new child host)")
+
+ for rk in _rhg.child_resources.keys():
+
+ h = None
+ if rk in _resource.hosts.keys():
+ h = _resource.hosts[rk]
+ elif rk in _resource.host_groups.keys():
+ h = _resource.host_groups[rk]
+
+ if h is None or \
+ not h.is_available() or \
+ rk not in _hg.child_resources.keys():
+ del _rhg.child_resources[rk]
+ updated = True
+
+ self.logger.info("host_group (" + _rhg.name + ") updated (child host removed)")
+
+ return updated
+
+ def _is_host_updated(self, _host, _resource):
+ """Check if host's parent (e.g., rack) is changed."""
+
+ updated = False
+
+ _rhost = _resource.hosts[_host.name]
+
+ if _rhost.host_group is None or \
+ _rhost.host_group.name != _host.host_group.name:
+ if _host.host_group.name in _resource.host_groups.keys():
+ rhost_group = _resource.host_groups[_host.host_group.name]
+ if rhost_group.is_available():
+ _rhost.host_group = rhost_group
+ updated = True
+ elif _host.host_group.name == _resource.datacenter.name:
+ _rhost.host_group = _resource.datacenter
+ updated = True
+
+ if updated:
+ self.logger.info("host (" + _rhost.name + ") updated (host_group)")
+
+ return False