summaryrefslogtreecommitdiffstats
path: root/conductor
diff options
context:
space:
mode:
authorrl001m <ruilu@research.att.com>2017-12-17 09:22:15 -0500
committerrl001m <ruilu@research.att.com>2017-12-17 09:24:44 -0500
commit3212850cfc6d88d4d186b6805e7774cb70fbd137 (patch)
tree0b98d20f39661ec25bd7790b0f44f4795b738bef /conductor
parent22cff5d3b51d9aa2d4fd11f657264e41063add1c (diff)
Added controller directory to the repository
Added the HAS-Controller module in ONAP Change-Id: I8ef7fcee936f3a607069b8d5da65beb0210a784c Issue-ID: OPTFRA-13 Signed-off-by: rl001m <ruilu@research.att.com>
Diffstat (limited to 'conductor')
-rw-r--r--conductor/conductor/controller/__init__.py20
-rw-r--r--conductor/conductor/controller/rpc.py99
-rw-r--r--conductor/conductor/controller/service.py104
-rw-r--r--conductor/conductor/controller/translator.py822
-rw-r--r--conductor/conductor/controller/translator_svc.py162
5 files changed, 1207 insertions, 0 deletions
diff --git a/conductor/conductor/controller/__init__.py b/conductor/conductor/controller/__init__.py
new file mode 100644
index 0000000..013ad0a
--- /dev/null
+++ b/conductor/conductor/controller/__init__.py
@@ -0,0 +1,20 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2015-2017 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+
+from .service import ControllerServiceLauncher # noqa: F401
diff --git a/conductor/conductor/controller/rpc.py b/conductor/conductor/controller/rpc.py
new file mode 100644
index 0000000..fb385ac
--- /dev/null
+++ b/conductor/conductor/controller/rpc.py
@@ -0,0 +1,99 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2015-2017 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+
+import uuid
+
+
+class ControllerRPCEndpoint(object):
+ """Controller Endpoint"""
+
+ def __init__(self, conf, plan_class):
+ self.conf = conf
+ self.Plan = plan_class
+
+ def plan_create(self, ctx, arg):
+ """Create a plan"""
+ name = arg.get('name', str(uuid.uuid4()))
+ timeout = arg.get('timeout', self.conf.controller.timeout)
+ recommend_max = arg.get('limit', self.conf.controller.limit)
+ template = arg.get('template', None)
+ status = self.Plan.TEMPLATE
+ new_plan = self.Plan(name, timeout, recommend_max, template,
+ status=status)
+
+ if new_plan:
+ plan_json = {
+ "plan": {
+ "name": new_plan.name,
+ "id": new_plan.id,
+ "status": status,
+ }
+ }
+ rtn = {
+ 'response': plan_json,
+ 'error': False}
+ else:
+ # TODO(jdandrea): Catch and show the error here
+ rtn = {
+ 'response': {},
+ 'error': True}
+ return rtn
+
+ def plans_delete(self, ctx, arg):
+ """Delete one or more plans"""
+ plan_id = arg.get('plan_id')
+ if plan_id:
+ plans = self.Plan.query.filter_by(id=plan_id)
+ else:
+ plans = self.Plan.query.all()
+ for the_plan in plans:
+ the_plan.delete()
+
+ rtn = {
+ 'response': {},
+ 'error': False}
+ return rtn
+
+ def plans_get(self, ctx, arg):
+ """Get one or more plans"""
+ plan_id = arg.get('plan_id')
+ if plan_id:
+ plans = self.Plan.query.filter_by(id=plan_id)
+ else:
+ plans = self.Plan.query.all()
+
+ plan_list = []
+ for the_plan in plans:
+ plan_json = {
+ "name": the_plan.name,
+ "id": the_plan.id,
+ "status": the_plan.status,
+ }
+ if the_plan.message:
+ plan_json["message"] = the_plan.message
+ if the_plan.solution:
+ recs = the_plan.solution.get('recommendations')
+ if recs:
+ plan_json["recommendations"] = recs
+ plan_list.append(plan_json)
+
+ rtn = {
+ 'response': {"plans": plan_list},
+ 'error': False}
+ return rtn
diff --git a/conductor/conductor/controller/service.py b/conductor/conductor/controller/service.py
new file mode 100644
index 0000000..d13518c
--- /dev/null
+++ b/conductor/conductor/controller/service.py
@@ -0,0 +1,104 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2015-2017 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+
+import cotyledon
+from oslo_config import cfg
+from oslo_log import log
+
+from conductor.common.models import plan
+from conductor.common.music import api
+from conductor.common.music import messaging as music_messaging
+from conductor.common.music.model import base
+from conductor.controller import rpc
+from conductor.controller import translator_svc
+from conductor import messaging
+from conductor import service
+
+LOG = log.getLogger(__name__)
+
+CONF = cfg.CONF
+
+CONTROLLER_OPTS = [
+ cfg.IntOpt('timeout',
+ default=10,
+ min=1,
+ help='Timeout for planning requests. '
+ 'Default value is 10.'),
+ cfg.IntOpt('limit',
+ default=1,
+ min=1,
+ help='Maximum number of result sets to return. '
+ 'Default value is 1.'),
+ cfg.IntOpt('workers',
+ default=1,
+ min=1,
+ help='Number of workers for controller service. '
+ 'Default value is 1.'),
+ cfg.BoolOpt('concurrent',
+ default=False,
+ help='Set to True when controller will run in active-active '
+ 'mode. When set to False, controller will flush any '
+ 'abandoned messages at startup. The controller always '
+ 'restarts abandoned template translations at startup.'),
+]
+
+CONF.register_opts(CONTROLLER_OPTS, group='controller')
+
+# Pull in service opts. We use them here.
+OPTS = service.OPTS
+CONF.register_opts(OPTS)
+
+
+class ControllerServiceLauncher(object):
+ """Launcher for the controller service."""
+ def __init__(self, conf):
+ self.conf = conf
+
+ # Set up Music access.
+ self.music = api.API()
+ self.music.keyspace_create(keyspace=conf.keyspace)
+
+ # Dynamically create a plan class for the specified keyspace
+ self.Plan = base.create_dynamic_model(
+ keyspace=conf.keyspace, baseclass=plan.Plan, classname="Plan")
+
+ if not self.Plan:
+ raise
+
+ def run(self):
+ transport = messaging.get_transport(self.conf)
+ if transport:
+ topic = "controller"
+ target = music_messaging.Target(topic=topic)
+ endpoints = [rpc.ControllerRPCEndpoint(self.conf, self.Plan), ]
+ flush = not self.conf.controller.concurrent
+ kwargs = {'transport': transport,
+ 'target': target,
+ 'endpoints': endpoints,
+ 'flush': flush, }
+ svcmgr = cotyledon.ServiceManager()
+ svcmgr.add(music_messaging.RPCService,
+ workers=self.conf.controller.workers,
+ args=(self.conf,), kwargs=kwargs)
+
+ kwargs = {'plan_class': self.Plan, }
+ svcmgr.add(translator_svc.TranslatorService,
+ workers=self.conf.controller.workers,
+ args=(self.conf,), kwargs=kwargs)
+ svcmgr.run()
diff --git a/conductor/conductor/controller/translator.py b/conductor/conductor/controller/translator.py
new file mode 100644
index 0000000..eb467fe
--- /dev/null
+++ b/conductor/conductor/controller/translator.py
@@ -0,0 +1,822 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2015-2017 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+
+import copy
+import datetime
+import json
+import os
+import uuid
+import yaml
+
+from oslo_config import cfg
+from oslo_log import log
+import six
+
+from conductor import __file__ as conductor_root
+from conductor.common.music import messaging as music_messaging
+from conductor.common import threshold
+from conductor import messaging
+from conductor import service
+
+LOG = log.getLogger(__name__)
+
+CONF = cfg.CONF
+
+VERSIONS = ["2016-11-01", "2017-10-10"]
+LOCATION_KEYS = ['latitude', 'longitude', 'host_name', 'clli_code']
+INVENTORY_PROVIDERS = ['aai']
+INVENTORY_TYPES = ['cloud', 'service']
+DEFAULT_INVENTORY_PROVIDER = INVENTORY_PROVIDERS[0]
+CANDIDATE_KEYS = ['inventory_type', 'candidate_id', 'location_id',
+ 'location_type', 'cost']
+DEMAND_KEYS = ['inventory_provider', 'inventory_type', 'service_type',
+ 'service_id', 'service_resource_id', 'customer_id',
+ 'default_cost', 'candidates', 'region', 'complex',
+ 'required_candidates', 'excluded_candidates',
+ 'subdivision', 'flavor']
+CONSTRAINT_KEYS = ['type', 'demands', 'properties']
+CONSTRAINTS = {
+ # constraint_type: {
+ # split: split into individual constraints, one per demand
+ # required: list of required property names,
+ # optional: list of optional property names,
+ # thresholds: dict of property/base-unit pairs for threshold parsing
+ # allowed: dict of keys and allowed values (if controlled vocab);
+ # only use this for Conductor-controlled values!
+ # }
+ 'attribute': {
+ 'split': True,
+ 'required': ['evaluate'],
+ },
+ 'distance_between_demands': {
+ 'required': ['distance'],
+ 'thresholds': {
+ 'distance': 'distance'
+ },
+ },
+ 'distance_to_location': {
+ 'split': True,
+ 'required': ['distance', 'location'],
+ 'thresholds': {
+ 'distance': 'distance'
+ },
+ },
+ 'instance_fit': {
+ 'split': True,
+ 'required': ['controller'],
+ 'optional': ['request'],
+ },
+ 'inventory_group': {},
+ 'region_fit': {
+ 'split': True,
+ 'required': ['controller'],
+ 'optional': ['request'],
+ },
+ 'zone': {
+ 'required': ['qualifier', 'category'],
+ 'allowed': {'qualifier': ['same', 'different'],
+ 'category': ['disaster', 'region', 'complex',
+ 'time', 'maintenance']},
+ },
+}
+
+
+class TranslatorException(Exception):
+ pass
+
+
+class Translator(object):
+ """Template translator.
+
+ Takes an input template and translates it into
+ something the solver can use. Calls the data service
+ as needed, giving it the inventory provider as context.
+ Presently the only inventory provider is A&AI. Others
+ may be added in the future.
+ """
+
+ def __init__(self, conf, plan_name, plan_id, template):
+ self.conf = conf
+ self._template = copy.deepcopy(template)
+ self._plan_name = plan_name
+ self._plan_id = plan_id
+ self._translation = None
+ self._valid = False
+ self._ok = False
+
+ # Set up the RPC service(s) we want to talk to.
+ self.data_service = self.setup_rpc(self.conf, "data")
+
+ def setup_rpc(self, conf, topic):
+ """Set up the RPC Client"""
+ # TODO(jdandrea): Put this pattern inside music_messaging?
+ transport = messaging.get_transport(conf=conf)
+ target = music_messaging.Target(topic=topic)
+ client = music_messaging.RPCClient(conf=conf,
+ transport=transport,
+ target=target)
+ return client
+
+ def create_components(self):
+ # TODO(jdandrea): Make deep copies so the template is untouched
+ self._version = self._template.get("homing_template_version")
+ self._parameters = self._template.get("parameters", {})
+ self._locations = self._template.get("locations", {})
+ self._demands = self._template.get("demands", {})
+ self._constraints = self._template.get("constraints", {})
+ self._optmization = self._template.get("optimization", {})
+ self._reservations = self._template.get("reservation", {})
+
+ if type(self._version) is datetime.date:
+ self._version = str(self._version)
+
+ def validate_components(self):
+ """Cursory validation of template components.
+
+ More detailed validation happens while parsing each component.
+ """
+ self._valid = False
+
+ # Check version
+ if self._version not in VERSIONS:
+ raise TranslatorException(
+ "conductor_template_version must be one "
+ "of: {}".format(', '.join(VERSIONS)))
+
+ # Check top level structure
+ components = {
+ "parameters": {
+ "name": "Parameter",
+ "content": self._parameters,
+ },
+ "locations": {
+ "name": "Location",
+ "keys": LOCATION_KEYS,
+ "content": self._locations,
+ },
+ "demands": {
+ "name": "Demand",
+ "content": self._demands,
+ },
+ "constraints": {
+ "name": "Constraint",
+ "keys": CONSTRAINT_KEYS,
+ "content": self._constraints,
+ },
+ "optimization": {
+ "name": "Optimization",
+ "content": self._optmization,
+ },
+ "reservations": {
+ "name": "Reservation",
+ "content": self._reservations,
+ }
+ }
+ for name, component in components.items():
+ name = component.get('name')
+ keys = component.get('keys', None)
+ content = component.get('content')
+
+ if type(content) is not dict:
+ raise TranslatorException(
+ "{} section must be a dictionary".format(name))
+ for content_name, content_def in content.items():
+ if not keys:
+ continue
+
+ for key in content_def:
+ if key not in keys:
+ raise TranslatorException(
+ "{} {} has an invalid key {}".format(
+ name, content_name, key))
+
+ demand_keys = self._demands.keys()
+ location_keys = self._locations.keys()
+ for constraint_name, constraint in self._constraints.items():
+
+ # Require a single demand (string), or a list of one or more.
+ demands = constraint.get('demands')
+ if isinstance(demands, six.string_types):
+ demands = [demands]
+ if not isinstance(demands, list) or len(demands) < 1:
+ raise TranslatorException(
+ "Demand list for Constraint {} must be "
+ "a list of names or a string with one name".format(
+ constraint_name))
+ if not set(demands).issubset(demand_keys):
+ raise TranslatorException(
+ "Undefined Demand(s) {} in Constraint '{}'".format(
+ list(set(demands).difference(demand_keys)),
+ constraint_name))
+
+ properties = constraint.get('properties', None)
+ if properties:
+ location = properties.get('location', None)
+ if location:
+ if location not in location_keys:
+ raise TranslatorException(
+ "Location {} in Constraint {} is undefined".format(
+ location, constraint_name))
+
+ self._valid = True
+
+ def _parse_parameters(self, obj, path=[]):
+ """Recursively parse all {get_param: X} occurrences
+
+ This modifies obj in-place. If you want to keep the original,
+ pass in a deep copy.
+ """
+ # Ok to start with a string ...
+ if isinstance(path, six.string_types):
+ # ... but the breadcrumb trail goes in an array.
+ path = [path]
+
+ # Traverse a list
+ if type(obj) is list:
+ for idx, val in enumerate(obj, start=0):
+ # Add path to the breadcrumb trail
+ new_path = list(path)
+ new_path[-1] += "[{}]".format(idx)
+
+ # Look at each element.
+ obj[idx] = self._parse_parameters(val, new_path)
+
+ # Traverse a dict
+ elif type(obj) is dict:
+ # Did we find a "{get_param: ...}" intrinsic?
+ if obj.keys() == ['get_param']:
+ param_name = obj['get_param']
+
+ # The parameter name must be a string.
+ if not isinstance(param_name, six.string_types):
+ path_str = ' > '.join(path)
+ raise TranslatorException(
+ "Parameter name '{}' not a string in path {}".format(
+ param_name, path_str))
+
+ # Parameter name must be defined.
+ if param_name not in self._parameters:
+ path_str = ' > '.join(path)
+ raise TranslatorException(
+ "Parameter '{}' undefined in path {}".format(
+ param_name, path_str))
+
+ # Return the value in place of the call.
+ return self._parameters.get(param_name)
+
+ # Not an intrinsic. Traverse as usual.
+ for key in obj.keys():
+ # Add path to the breadcrumb trail.
+ new_path = list(path)
+ new_path.append(key)
+
+ # Look at each key/value pair.
+ obj[key] = self._parse_parameters(obj[key], new_path)
+
+ # Return whatever we have after unwinding.
+ return obj
+
+ def parse_parameters(self):
+ """Resolve all parameters references."""
+ locations = copy.deepcopy(self._locations)
+ self._locations = self._parse_parameters(locations, 'locations')
+
+ demands = copy.deepcopy(self._demands)
+ self._demands = self._parse_parameters(demands, 'demands')
+
+ constraints = copy.deepcopy(self._constraints)
+ self._constraints = self._parse_parameters(constraints, 'constraints')
+
+ reservations = copy.deepcopy(self._reservations)
+ self._reservations = self._parse_parameters(reservations,
+ 'reservations')
+
+ def parse_locations(self, locations):
+ """Prepare the locations for use by the solver."""
+ parsed = {}
+ for location, args in locations.items():
+ ctxt = {}
+ response = self.data_service.call(
+ ctxt=ctxt,
+ method="resolve_location",
+ args=args)
+
+ resolved_location = \
+ response and response.get('resolved_location')
+ if not resolved_location:
+ raise TranslatorException(
+ "Unable to resolve location {}".format(location)
+ )
+ parsed[location] = resolved_location
+ return parsed
+
+ def parse_demands(self, demands):
+ """Validate/prepare demands for use by the solver."""
+ if type(demands) is not dict:
+ raise TranslatorException("Demands must be provided in "
+ "dictionary form")
+
+ # Look at each demand
+ demands_copy = copy.deepcopy(demands)
+ parsed = {}
+ for name, requirements in demands_copy.items():
+ inventory_candidates = []
+ for requirement in requirements:
+ for key in requirement:
+ if key not in DEMAND_KEYS:
+ raise TranslatorException(
+ "Demand {} has an invalid key {}".format(
+ requirement, key))
+
+ if 'candidates' in requirement:
+ # Candidates *must* specify an inventory provider
+ provider = requirement.get("inventory_provider")
+ if provider and provider not in INVENTORY_PROVIDERS:
+ raise TranslatorException(
+ "Unsupported inventory provider {} "
+ "in demand {}".format(provider, name))
+ else:
+ provider = DEFAULT_INVENTORY_PROVIDER
+
+ # Check each candidate
+ for candidate in requirement.get('candidates'):
+ # Must be a dictionary
+ if type(candidate) is not dict:
+ raise TranslatorException(
+ "Candidate found in demand {} that is "
+ "not a dictionary".format(name))
+
+ # Must have only supported keys
+ for key in candidate.keys():
+ if key not in CANDIDATE_KEYS:
+ raise TranslatorException(
+ "Candidate with invalid key {} found "
+ "in demand {}".format(key, name)
+ )
+
+ # TODO(jdandrea): Check required/optional keys
+
+ # Set the inventory provider if not already
+ candidate['inventory_provider'] = \
+ candidate.get('inventory_provider', provider)
+
+ # Set cost if not already (default cost is 0?)
+ candidate['cost'] = candidate.get('cost', 0)
+
+ # Add to our list of parsed candidates
+ inventory_candidates.append(candidate)
+
+ # candidates are specified through inventory providers
+ # Do the basic sanity checks for inputs
+ else:
+ # inventory provider MUST be specified
+ provider = requirement.get("inventory_provider")
+ if not provider:
+ raise TranslatorException(
+ "Inventory provider not specified "
+ "in demand {}".format(name)
+ )
+ elif provider and provider not in INVENTORY_PROVIDERS:
+ raise TranslatorException(
+ "Unsupported inventory provider {} "
+ "in demand {}".format(provider, name)
+ )
+ else:
+ provider = DEFAULT_INVENTORY_PROVIDER
+ requirement['provider'] = provider
+
+ # inventory type MUST be specified
+ inventory_type = requirement.get('inventory_type')
+ if not inventory_type or inventory_type == '':
+ raise TranslatorException(
+ "Inventory type not specified for "
+ "demand {}".format(name)
+ )
+ if inventory_type and \
+ inventory_type not in INVENTORY_TYPES:
+ raise TranslatorException(
+ "Unknown inventory type {} specified for "
+ "demand {}".format(inventory_type, name)
+ )
+
+ # For service inventories, customer_id and
+ # service_type MUST be specified
+ if inventory_type == 'service':
+ customer_id = requirement.get('customer_id')
+ if not customer_id:
+ raise TranslatorException(
+ "Customer ID not specified for "
+ "demand {}".format(name)
+ )
+ service_type = requirement.get('service_type')
+ if not service_type:
+ raise TranslatorException(
+ "Service Type not specified for "
+ "demand {}".format(name)
+ )
+
+ # TODO(jdandrea): Check required/optional keys for requirement
+ # elif 'inventory_type' in requirement:
+ # # For now this is just a stand-in candidate
+ # candidate = {
+ # 'inventory_provider':
+ # requirement.get('inventory_provider',
+ # DEFAULT_INVENTORY_PROVIDER),
+ # 'inventory_type':
+ # requirement.get('inventory_type', ''),
+ # 'candidate_id': '',
+ # 'location_id': '',
+ # 'location_type': '',
+ # 'cost': 0,
+ # }
+ #
+ # # Add to our list of parsed candidates
+ # inventory_candidates.append(candidate)
+
+ # Ask conductor-data for one or more candidates.
+ ctxt = {
+ "plan_id": self._plan_id,
+ "plan_name": self._plan_name,
+ }
+ args = {
+ "demands": {
+ name: requirements,
+ }
+ }
+
+ # Check if required_candidate and excluded candidate
+ # are mutually exclusive.
+ for requirement in requirements:
+ required_candidates = requirement.get("required_candidates")
+ excluded_candidates = requirement.get("excluded_candidates")
+ if (required_candidates and
+ excluded_candidates and
+ set(map(lambda entry: entry['candidate_id'],
+ required_candidates))
+ & set(map(lambda entry: entry['candidate_id'],
+ excluded_candidates))):
+ raise TranslatorException(
+ "Required candidate list and excluded candidate"
+ " list are not mutually exclusive for demand"
+ " {}".format(name)
+ )
+
+ response = self.data_service.call(
+ ctxt=ctxt,
+ method="resolve_demands",
+ args=args)
+
+ resolved_demands = \
+ response and response.get('resolved_demands')
+
+ required_candidates = resolved_demands\
+ .get('required_candidates')
+ if not resolved_demands:
+ raise TranslatorException(
+ "Unable to resolve inventory "
+ "candidates for demand {}"
+ .format(name)
+ )
+ resolved_candidates = resolved_demands.get(name)
+ for candidate in resolved_candidates:
+ inventory_candidates.append(candidate)
+ if len(inventory_candidates) < 1:
+ if not required_candidates:
+ raise TranslatorException(
+ "Unable to find any candidate for "
+ "demand {}".format(name)
+ )
+ else:
+ raise TranslatorException(
+ "Unable to find any required "
+ "candidate for demand {}"
+ .format(name)
+ )
+ parsed[name] = {
+ "candidates": inventory_candidates,
+ }
+
+ return parsed
+
+ def parse_constraints(self, constraints):
+ """Validate/prepare constraints for use by the solver."""
+ if type(constraints) is not dict:
+ raise TranslatorException("Constraints must be provided in "
+ "dictionary form")
+
+ # Look at each constraint. Properties must exist, even if empty.
+ constraints_copy = copy.deepcopy(constraints)
+
+ parsed = {}
+ for name, constraint in constraints_copy.items():
+
+ if not constraint.get('properties'):
+ constraint['properties'] = {}
+
+ constraint_type = constraint.get('type')
+ constraint_def = CONSTRAINTS.get(constraint_type)
+
+ # Is it a supported type?
+ if constraint_type not in CONSTRAINTS:
+ raise TranslatorException(
+ "Unsupported type '{}' found in constraint "
+ "named '{}'".format(constraint_type, name))
+
+ # Now walk through the constraint's content
+ for key, value in constraint.items():
+ # Must be a supported key
+ if key not in CONSTRAINT_KEYS:
+ raise TranslatorException(
+ "Invalid key '{}' found in constraint "
+ "named '{}'".format(key, name))
+
+ # For properties ...
+ if key == 'properties':
+ # Make sure all required properties are present
+ required = constraint_def.get('required', [])
+ for req_prop in required:
+ if req_prop not in value.keys():
+ raise TranslatorException(
+ "Required property '{}' not found in "
+ "constraint named '{}'".format(
+ req_prop, name))
+ if not value.get(req_prop) \
+ or value.get(req_prop) == '':
+ raise TranslatorException(
+ "No value specified for property '{}' in "
+ "constraint named '{}'".format(
+ req_prop, name))
+
+ # Make sure there are no unknown properties
+ optional = constraint_def.get('optional', [])
+ for prop_name in value.keys():
+ if prop_name not in required + optional:
+ raise TranslatorException(
+ "Unknown property '{}' in "
+ "constraint named '{}'".format(
+ prop_name, name))
+
+ # If a property has a controlled vocabulary, make
+ # sure its value is one of the allowed ones.
+ allowed = constraint_def.get('allowed', {})
+ for prop_name, allowed_values in allowed.items():
+ if prop_name in value.keys():
+ prop_value = value.get(prop_name, '')
+ if prop_value not in allowed_values:
+ raise TranslatorException(
+ "Property '{}' value '{}' unsupported in "
+ "constraint named '{}' (must be one of "
+ "{})".format(prop_name, prop_value,
+ name, allowed_values))
+
+ # Break all threshold-formatted values into parts
+ thresholds = constraint_def.get('thresholds', {})
+ for thr_prop, base_units in thresholds.items():
+ if thr_prop in value.keys():
+ expression = value.get(thr_prop)
+ thr = threshold.Threshold(expression, base_units)
+ value[thr_prop] = thr.parts
+
+ # We already know we have one or more demands due to
+ # validate_components(). We still need to coerce the demands
+ # into a list in case only one demand was provided.
+ constraint_demands = constraint.get('demands')
+ if isinstance(constraint_demands, six.string_types):
+ constraint['demands'] = [constraint_demands]
+
+ # Either split the constraint into parts, one per demand,
+ # or use it as-is
+ if constraint_def.get('split'):
+ for demand in constraint.get('demands', []):
+ constraint_demand = name + '_' + demand
+ parsed[constraint_demand] = copy.deepcopy(constraint)
+ parsed[constraint_demand]['name'] = name
+ parsed[constraint_demand]['demands'] = demand
+ else:
+ parsed[name] = copy.deepcopy(constraint)
+ parsed[name]['name'] = name
+
+ return parsed
+
+ def parse_optimization(self, optimization):
+ """Validate/prepare optimization for use by the solver."""
+
+ # WARNING: The template format for optimization is generalized,
+ # however the solver is very particular about the expected
+ # goal, functions, and operands. Therefore, for the time being,
+ # we are choosing to be highly conservative in what we accept
+ # at the template level. Once the solver can handle the more
+ # general form, we can make the translation pass using standard
+ # compiler techniques and tools like antlr (antlr4-python2-runtime).
+
+ if not optimization:
+ LOG.debug("No objective function or "
+ "optimzation provided in the template")
+ return
+
+ optimization_copy = copy.deepcopy(optimization)
+ parsed = {
+ "goal": "min",
+ "operation": "sum",
+ "operands": [],
+ }
+
+ if type(optimization_copy) is not dict:
+ raise TranslatorException("Optimization must be a dictionary.")
+
+ goals = optimization_copy.keys()
+ if goals != ['minimize']:
+ raise TranslatorException(
+ "Optimization must contain a single goal of 'minimize'.")
+
+ funcs = optimization_copy['minimize'].keys()
+ if funcs != ['sum']:
+ raise TranslatorException(
+ "Optimization goal 'minimize' must "
+ "contain a single function of 'sum'.")
+
+ operands = optimization_copy['minimize']['sum']
+ if type(operands) is not list:
+ # or len(operands) != 2:
+ raise TranslatorException(
+ "Optimization goal 'minimize', function 'sum' "
+ "must be a list of exactly two operands.")
+
+ def get_distance_between_args(operand):
+ args = operand.get('distance_between')
+ if type(args) is not list and len(args) != 2:
+ raise TranslatorException(
+ "Optimization 'distance_between' arguments must "
+ "be a list of length two.")
+
+ got_demand = False
+ got_location = False
+ for arg in args:
+ if not got_demand and arg in self._demands.keys():
+ got_demand = True
+ if not got_location and arg in self._locations.keys():
+ got_location = True
+ if not got_demand or not got_location:
+ raise TranslatorException(
+ "Optimization 'distance_between' arguments {} must "
+ "include one valid demand name and one valid "
+ "location name.".format(args))
+
+ return args
+
+ for operand in operands:
+ weight = 1.0
+ args = None
+
+ if operand.keys() == ['distance_between']:
+ # Value must be a list of length 2 with one
+ # location and one demand
+ args = get_distance_between_args(operand)
+
+ elif operand.keys() == ['product']:
+ for product_op in operand['product']:
+ if threshold.is_number(product_op):
+ weight = product_op
+ elif type(product_op) is dict:
+ if product_op.keys() == ['distance_between']:
+ function = 'distance_between'
+ args = get_distance_between_args(product_op)
+ elif product_op.keys() == ['cloud_version']:
+ function = 'cloud_version'
+ args = product_op.get('cloud_version')
+
+ if not args:
+ raise TranslatorException(
+ "Optimization products must include at least "
+ "one 'distance_between' function call and "
+ "one optional number to be used as a weight.")
+
+ # We now have our weight/function_param.
+ parsed['operands'].append(
+ {
+ "operation": "product",
+ "weight": weight,
+ "function": function,
+ "function_param": args,
+ }
+ )
+ return parsed
+
+ def parse_reservations(self, reservations):
+ demands = self._demands
+ if type(reservations) is not dict:
+ raise TranslatorException("Reservations must be provided in "
+ "dictionary form")
+
+ parsed = {}
+ if reservations:
+ parsed['counter'] = 0
+ for name, reservation in reservations.items():
+ if not reservation.get('properties'):
+ reservation['properties'] = {}
+ for demand in reservation.get('demands', []):
+ if demand in demands.keys():
+ constraint_demand = name + '_' + demand
+ parsed['demands'] = {}
+ parsed['demands'][constraint_demand] = \
+ copy.deepcopy(reservation)
+ parsed['demands'][constraint_demand]['name'] = name
+ parsed['demands'][constraint_demand]['demand'] = demand
+
+ return parsed
+
+ def do_translation(self):
+ """Perform the translation."""
+ if not self.valid:
+ raise TranslatorException("Can't translate an invalid template.")
+ self._translation = {
+ "conductor_solver": {
+ "version": self._version,
+ "plan_id": self._plan_id,
+ "locations": self.parse_locations(self._locations),
+ "demands": self.parse_demands(self._demands),
+ "constraints": self.parse_constraints(self._constraints),
+ "objective": self.parse_optimization(self._optmization),
+ "reservations": self.parse_reservations(self._reservations),
+ }
+ }
+
+ def translate(self):
+ """Translate the template for the solver."""
+ self._ok = False
+ try:
+ self.create_components()
+ self.validate_components()
+ self.parse_parameters()
+ self.do_translation()
+ self._ok = True
+ except Exception as exc:
+ self._error_message = exc.message
+
+ @property
+ def valid(self):
+ """Returns True if the template has been validated."""
+ return self._valid
+
+ @property
+ def ok(self):
+ """Returns True if the translation was successful."""
+ return self._ok
+
+ @property
+ def translation(self):
+ """Returns the translation if it was successful."""
+ return self._translation
+
+ @property
+ def error_message(self):
+ """Returns the last known error message."""
+ return self._error_message
+
+
+def main():
+ template_name = 'some_template'
+
+ path = os.path.abspath(conductor_root)
+ dir_path = os.path.dirname(path)
+
+ # Prepare service-wide components (e.g., config)
+ conf = service.prepare_service(
+ [], config_files=[dir_path + '/../etc/conductor/conductor.conf'])
+ # conf.set_override('mock', True, 'music_api')
+
+ t1 = threshold.Threshold("< 500 ms", "time")
+ t2 = threshold.Threshold("= 120 mi", "distance")
+ t3 = threshold.Threshold("160", "currency")
+ t4 = threshold.Threshold("60-80 Gbps", "throughput")
+ print('t1: {}\nt2: {}\nt3: {}\nt4: {}\n'.format(t1, t2, t3, t4))
+
+ template_file = dir_path + '/tests/data/' + template_name + '.yaml'
+ fd = open(template_file, "r")
+ template = yaml.load(fd)
+
+ trns = Translator(conf, template_name, str(uuid.uuid4()), template)
+ trns.translate()
+ if trns.ok:
+ print(json.dumps(trns.translation, indent=2))
+ else:
+ print("TESTING - Translator Error: {}".format(trns.error_message))
+
+if __name__ == '__main__':
+ main()
diff --git a/conductor/conductor/controller/translator_svc.py b/conductor/conductor/controller/translator_svc.py
new file mode 100644
index 0000000..425ff36
--- /dev/null
+++ b/conductor/conductor/controller/translator_svc.py
@@ -0,0 +1,162 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2015-2017 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+
+import time
+
+import cotyledon
+import futurist
+from oslo_config import cfg
+from oslo_log import log
+
+from conductor.common.music import api
+from conductor.common.music import messaging as music_messaging
+from conductor.controller import translator
+from conductor.i18n import _LE, _LI
+from conductor import messaging
+
+LOG = log.getLogger(__name__)
+
+CONF = cfg.CONF
+
+CONTROLLER_OPTS = [
+ cfg.IntOpt('polling_interval',
+ default=1,
+ min=1,
+ help='Time between checking for new plans. '
+ 'Default value is 1.'),
+]
+
+CONF.register_opts(CONTROLLER_OPTS, group='controller')
+
+
+class TranslatorService(cotyledon.Service):
+ """Template Translator service.
+
+ This service looks for untranslated templates and
+ preps them for solving by the Solver service.
+ """
+
+ # This will appear in 'ps xaf'
+ name = "Template Translator"
+
+ def __init__(self, worker_id, conf, **kwargs):
+ """Initializer"""
+ LOG.debug("%s" % self.__class__.__name__)
+ super(TranslatorService, self).__init__(worker_id)
+ self._init(conf, **kwargs)
+ self.running = True
+
+ def _init(self, conf, **kwargs):
+ self.conf = conf
+ self.Plan = kwargs.get('plan_class')
+ self.kwargs = kwargs
+
+ # Set up the RPC service(s) we want to talk to.
+ self.data_service = self.setup_rpc(conf, "data")
+
+ # Set up Music access.
+ self.music = api.API()
+
+ def _gracefully_stop(self):
+ """Gracefully stop working on things"""
+ pass
+
+ def _restart(self):
+ """Prepare to restart the service"""
+ pass
+
+ def setup_rpc(self, conf, topic):
+ """Set up the RPC Client"""
+ # TODO(jdandrea): Put this pattern inside music_messaging?
+ transport = messaging.get_transport(conf=conf)
+ target = music_messaging.Target(topic=topic)
+ client = music_messaging.RPCClient(conf=conf,
+ transport=transport,
+ target=target)
+ return client
+
+ def translate(self, plan):
+ """Translate the plan to a format the solver can use"""
+ # Update the translation field and set status to TRANSLATED.
+ try:
+ LOG.info(_LI("Requesting plan {} translation").format(
+ plan.id))
+ trns = translator.Translator(
+ self.conf, plan.name, plan.id, plan.template)
+ trns.translate()
+ if trns.ok:
+ plan.translation = trns.translation
+ plan.status = self.Plan.TRANSLATED
+ LOG.info(_LI(
+ "Plan {} translated. Ready for solving").format(
+ plan.id))
+ else:
+ plan.message = trns.error_message
+ plan.status = self.Plan.ERROR
+ LOG.error(_LE(
+ "Plan {} translation error encountered").format(
+ plan.id))
+ except Exception as ex:
+ template = "An exception of type {0} occurred, arguments:\n{1!r}"
+ plan.message = template.format(type(ex).__name__, ex.args)
+ plan.status = self.Plan.ERROR
+
+ plan.update()
+
+ def __check_for_templates(self):
+ """Wait for the polling interval, then do the real template check."""
+
+ # Wait for at least poll_interval sec
+ polling_interval = self.conf.controller.polling_interval
+ time.sleep(polling_interval)
+
+ # Look for plans with the status set to TEMPLATE
+ plans = self.Plan.query.all()
+ for plan in plans:
+ # If there's a template to be translated, do it!
+ if plan.status == self.Plan.TEMPLATE:
+ self.translate(plan)
+ break
+ elif plan.timedout:
+ # Move plan to error status? Create a new timed-out status?
+ # todo(snarayanan)
+ continue
+
+ def run(self):
+ """Run"""
+ LOG.debug("%s" % self.__class__.__name__)
+
+ # Look for templates to translate from within a thread
+ executor = futurist.ThreadPoolExecutor()
+ while self.running:
+ fut = executor.submit(self.__check_for_templates)
+ fut.result()
+ executor.shutdown()
+
+ def terminate(self):
+ """Terminate"""
+ LOG.debug("%s" % self.__class__.__name__)
+ self.running = False
+ self._gracefully_stop()
+ super(TranslatorService, self).terminate()
+
+ def reload(self):
+ """Reload"""
+ LOG.debug("%s" % self.__class__.__name__)
+ self._restart()