From 3212850cfc6d88d4d186b6805e7774cb70fbd137 Mon Sep 17 00:00:00 2001 From: rl001m Date: Sun, 17 Dec 2017 09:22:15 -0500 Subject: Added controller directory to the repository Added the HAS-Controller module in ONAP Change-Id: I8ef7fcee936f3a607069b8d5da65beb0210a784c Issue-ID: OPTFRA-13 Signed-off-by: rl001m --- conductor/conductor/controller/__init__.py | 20 + conductor/conductor/controller/rpc.py | 99 +++ conductor/conductor/controller/service.py | 104 +++ conductor/conductor/controller/translator.py | 822 +++++++++++++++++++++++ conductor/conductor/controller/translator_svc.py | 162 +++++ 5 files changed, 1207 insertions(+) create mode 100644 conductor/conductor/controller/__init__.py create mode 100644 conductor/conductor/controller/rpc.py create mode 100644 conductor/conductor/controller/service.py create mode 100644 conductor/conductor/controller/translator.py create mode 100644 conductor/conductor/controller/translator_svc.py (limited to 'conductor') diff --git a/conductor/conductor/controller/__init__.py b/conductor/conductor/controller/__init__.py new file mode 100644 index 0000000..013ad0a --- /dev/null +++ b/conductor/conductor/controller/__init__.py @@ -0,0 +1,20 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from .service import ControllerServiceLauncher # noqa: F401 diff --git a/conductor/conductor/controller/rpc.py b/conductor/conductor/controller/rpc.py new file mode 100644 index 0000000..fb385ac --- /dev/null +++ b/conductor/conductor/controller/rpc.py @@ -0,0 +1,99 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import uuid + + +class ControllerRPCEndpoint(object): + """Controller Endpoint""" + + def __init__(self, conf, plan_class): + self.conf = conf + self.Plan = plan_class + + def plan_create(self, ctx, arg): + """Create a plan""" + name = arg.get('name', str(uuid.uuid4())) + timeout = arg.get('timeout', self.conf.controller.timeout) + recommend_max = arg.get('limit', self.conf.controller.limit) + template = arg.get('template', None) + status = self.Plan.TEMPLATE + new_plan = self.Plan(name, timeout, recommend_max, template, + status=status) + + if new_plan: + plan_json = { + "plan": { + "name": new_plan.name, + "id": new_plan.id, + "status": status, + } + } + rtn = { + 'response': plan_json, + 'error': False} + else: + # TODO(jdandrea): Catch and show the error here + rtn = { + 'response': {}, + 'error': True} + return rtn + + def plans_delete(self, ctx, arg): + """Delete one or more plans""" + plan_id = arg.get('plan_id') + if plan_id: + plans = self.Plan.query.filter_by(id=plan_id) + else: + plans = self.Plan.query.all() + for the_plan in plans: + the_plan.delete() + + rtn = { + 'response': {}, + 'error': False} + return rtn + + def plans_get(self, ctx, arg): + """Get one or more plans""" + plan_id = arg.get('plan_id') + if plan_id: + plans = self.Plan.query.filter_by(id=plan_id) + else: + plans = self.Plan.query.all() + + plan_list = [] + for the_plan in plans: + plan_json = { + "name": the_plan.name, + "id": the_plan.id, + "status": the_plan.status, + } + if the_plan.message: + plan_json["message"] = the_plan.message + if the_plan.solution: + recs = the_plan.solution.get('recommendations') + if recs: + plan_json["recommendations"] = recs + plan_list.append(plan_json) + + rtn = { + 'response': {"plans": plan_list}, + 'error': False} + return rtn diff --git a/conductor/conductor/controller/service.py b/conductor/conductor/controller/service.py new file mode 100644 index 0000000..d13518c --- /dev/null +++ b/conductor/conductor/controller/service.py @@ -0,0 +1,104 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import cotyledon +from oslo_config import cfg +from oslo_log import log + +from conductor.common.models import plan +from conductor.common.music import api +from conductor.common.music import messaging as music_messaging +from conductor.common.music.model import base +from conductor.controller import rpc +from conductor.controller import translator_svc +from conductor import messaging +from conductor import service + +LOG = log.getLogger(__name__) + +CONF = cfg.CONF + +CONTROLLER_OPTS = [ + cfg.IntOpt('timeout', + default=10, + min=1, + help='Timeout for planning requests. ' + 'Default value is 10.'), + cfg.IntOpt('limit', + default=1, + min=1, + help='Maximum number of result sets to return. ' + 'Default value is 1.'), + cfg.IntOpt('workers', + default=1, + min=1, + help='Number of workers for controller service. ' + 'Default value is 1.'), + cfg.BoolOpt('concurrent', + default=False, + help='Set to True when controller will run in active-active ' + 'mode. When set to False, controller will flush any ' + 'abandoned messages at startup. The controller always ' + 'restarts abandoned template translations at startup.'), +] + +CONF.register_opts(CONTROLLER_OPTS, group='controller') + +# Pull in service opts. We use them here. +OPTS = service.OPTS +CONF.register_opts(OPTS) + + +class ControllerServiceLauncher(object): + """Launcher for the controller service.""" + def __init__(self, conf): + self.conf = conf + + # Set up Music access. + self.music = api.API() + self.music.keyspace_create(keyspace=conf.keyspace) + + # Dynamically create a plan class for the specified keyspace + self.Plan = base.create_dynamic_model( + keyspace=conf.keyspace, baseclass=plan.Plan, classname="Plan") + + if not self.Plan: + raise + + def run(self): + transport = messaging.get_transport(self.conf) + if transport: + topic = "controller" + target = music_messaging.Target(topic=topic) + endpoints = [rpc.ControllerRPCEndpoint(self.conf, self.Plan), ] + flush = not self.conf.controller.concurrent + kwargs = {'transport': transport, + 'target': target, + 'endpoints': endpoints, + 'flush': flush, } + svcmgr = cotyledon.ServiceManager() + svcmgr.add(music_messaging.RPCService, + workers=self.conf.controller.workers, + args=(self.conf,), kwargs=kwargs) + + kwargs = {'plan_class': self.Plan, } + svcmgr.add(translator_svc.TranslatorService, + workers=self.conf.controller.workers, + args=(self.conf,), kwargs=kwargs) + svcmgr.run() diff --git a/conductor/conductor/controller/translator.py b/conductor/conductor/controller/translator.py new file mode 100644 index 0000000..eb467fe --- /dev/null +++ b/conductor/conductor/controller/translator.py @@ -0,0 +1,822 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import copy +import datetime +import json +import os +import uuid +import yaml + +from oslo_config import cfg +from oslo_log import log +import six + +from conductor import __file__ as conductor_root +from conductor.common.music import messaging as music_messaging +from conductor.common import threshold +from conductor import messaging +from conductor import service + +LOG = log.getLogger(__name__) + +CONF = cfg.CONF + +VERSIONS = ["2016-11-01", "2017-10-10"] +LOCATION_KEYS = ['latitude', 'longitude', 'host_name', 'clli_code'] +INVENTORY_PROVIDERS = ['aai'] +INVENTORY_TYPES = ['cloud', 'service'] +DEFAULT_INVENTORY_PROVIDER = INVENTORY_PROVIDERS[0] +CANDIDATE_KEYS = ['inventory_type', 'candidate_id', 'location_id', + 'location_type', 'cost'] +DEMAND_KEYS = ['inventory_provider', 'inventory_type', 'service_type', + 'service_id', 'service_resource_id', 'customer_id', + 'default_cost', 'candidates', 'region', 'complex', + 'required_candidates', 'excluded_candidates', + 'subdivision', 'flavor'] +CONSTRAINT_KEYS = ['type', 'demands', 'properties'] +CONSTRAINTS = { + # constraint_type: { + # split: split into individual constraints, one per demand + # required: list of required property names, + # optional: list of optional property names, + # thresholds: dict of property/base-unit pairs for threshold parsing + # allowed: dict of keys and allowed values (if controlled vocab); + # only use this for Conductor-controlled values! + # } + 'attribute': { + 'split': True, + 'required': ['evaluate'], + }, + 'distance_between_demands': { + 'required': ['distance'], + 'thresholds': { + 'distance': 'distance' + }, + }, + 'distance_to_location': { + 'split': True, + 'required': ['distance', 'location'], + 'thresholds': { + 'distance': 'distance' + }, + }, + 'instance_fit': { + 'split': True, + 'required': ['controller'], + 'optional': ['request'], + }, + 'inventory_group': {}, + 'region_fit': { + 'split': True, + 'required': ['controller'], + 'optional': ['request'], + }, + 'zone': { + 'required': ['qualifier', 'category'], + 'allowed': {'qualifier': ['same', 'different'], + 'category': ['disaster', 'region', 'complex', + 'time', 'maintenance']}, + }, +} + + +class TranslatorException(Exception): + pass + + +class Translator(object): + """Template translator. + + Takes an input template and translates it into + something the solver can use. Calls the data service + as needed, giving it the inventory provider as context. + Presently the only inventory provider is A&AI. Others + may be added in the future. + """ + + def __init__(self, conf, plan_name, plan_id, template): + self.conf = conf + self._template = copy.deepcopy(template) + self._plan_name = plan_name + self._plan_id = plan_id + self._translation = None + self._valid = False + self._ok = False + + # Set up the RPC service(s) we want to talk to. + self.data_service = self.setup_rpc(self.conf, "data") + + def setup_rpc(self, conf, topic): + """Set up the RPC Client""" + # TODO(jdandrea): Put this pattern inside music_messaging? + transport = messaging.get_transport(conf=conf) + target = music_messaging.Target(topic=topic) + client = music_messaging.RPCClient(conf=conf, + transport=transport, + target=target) + return client + + def create_components(self): + # TODO(jdandrea): Make deep copies so the template is untouched + self._version = self._template.get("homing_template_version") + self._parameters = self._template.get("parameters", {}) + self._locations = self._template.get("locations", {}) + self._demands = self._template.get("demands", {}) + self._constraints = self._template.get("constraints", {}) + self._optmization = self._template.get("optimization", {}) + self._reservations = self._template.get("reservation", {}) + + if type(self._version) is datetime.date: + self._version = str(self._version) + + def validate_components(self): + """Cursory validation of template components. + + More detailed validation happens while parsing each component. + """ + self._valid = False + + # Check version + if self._version not in VERSIONS: + raise TranslatorException( + "conductor_template_version must be one " + "of: {}".format(', '.join(VERSIONS))) + + # Check top level structure + components = { + "parameters": { + "name": "Parameter", + "content": self._parameters, + }, + "locations": { + "name": "Location", + "keys": LOCATION_KEYS, + "content": self._locations, + }, + "demands": { + "name": "Demand", + "content": self._demands, + }, + "constraints": { + "name": "Constraint", + "keys": CONSTRAINT_KEYS, + "content": self._constraints, + }, + "optimization": { + "name": "Optimization", + "content": self._optmization, + }, + "reservations": { + "name": "Reservation", + "content": self._reservations, + } + } + for name, component in components.items(): + name = component.get('name') + keys = component.get('keys', None) + content = component.get('content') + + if type(content) is not dict: + raise TranslatorException( + "{} section must be a dictionary".format(name)) + for content_name, content_def in content.items(): + if not keys: + continue + + for key in content_def: + if key not in keys: + raise TranslatorException( + "{} {} has an invalid key {}".format( + name, content_name, key)) + + demand_keys = self._demands.keys() + location_keys = self._locations.keys() + for constraint_name, constraint in self._constraints.items(): + + # Require a single demand (string), or a list of one or more. + demands = constraint.get('demands') + if isinstance(demands, six.string_types): + demands = [demands] + if not isinstance(demands, list) or len(demands) < 1: + raise TranslatorException( + "Demand list for Constraint {} must be " + "a list of names or a string with one name".format( + constraint_name)) + if not set(demands).issubset(demand_keys): + raise TranslatorException( + "Undefined Demand(s) {} in Constraint '{}'".format( + list(set(demands).difference(demand_keys)), + constraint_name)) + + properties = constraint.get('properties', None) + if properties: + location = properties.get('location', None) + if location: + if location not in location_keys: + raise TranslatorException( + "Location {} in Constraint {} is undefined".format( + location, constraint_name)) + + self._valid = True + + def _parse_parameters(self, obj, path=[]): + """Recursively parse all {get_param: X} occurrences + + This modifies obj in-place. If you want to keep the original, + pass in a deep copy. + """ + # Ok to start with a string ... + if isinstance(path, six.string_types): + # ... but the breadcrumb trail goes in an array. + path = [path] + + # Traverse a list + if type(obj) is list: + for idx, val in enumerate(obj, start=0): + # Add path to the breadcrumb trail + new_path = list(path) + new_path[-1] += "[{}]".format(idx) + + # Look at each element. + obj[idx] = self._parse_parameters(val, new_path) + + # Traverse a dict + elif type(obj) is dict: + # Did we find a "{get_param: ...}" intrinsic? + if obj.keys() == ['get_param']: + param_name = obj['get_param'] + + # The parameter name must be a string. + if not isinstance(param_name, six.string_types): + path_str = ' > '.join(path) + raise TranslatorException( + "Parameter name '{}' not a string in path {}".format( + param_name, path_str)) + + # Parameter name must be defined. + if param_name not in self._parameters: + path_str = ' > '.join(path) + raise TranslatorException( + "Parameter '{}' undefined in path {}".format( + param_name, path_str)) + + # Return the value in place of the call. + return self._parameters.get(param_name) + + # Not an intrinsic. Traverse as usual. + for key in obj.keys(): + # Add path to the breadcrumb trail. + new_path = list(path) + new_path.append(key) + + # Look at each key/value pair. + obj[key] = self._parse_parameters(obj[key], new_path) + + # Return whatever we have after unwinding. + return obj + + def parse_parameters(self): + """Resolve all parameters references.""" + locations = copy.deepcopy(self._locations) + self._locations = self._parse_parameters(locations, 'locations') + + demands = copy.deepcopy(self._demands) + self._demands = self._parse_parameters(demands, 'demands') + + constraints = copy.deepcopy(self._constraints) + self._constraints = self._parse_parameters(constraints, 'constraints') + + reservations = copy.deepcopy(self._reservations) + self._reservations = self._parse_parameters(reservations, + 'reservations') + + def parse_locations(self, locations): + """Prepare the locations for use by the solver.""" + parsed = {} + for location, args in locations.items(): + ctxt = {} + response = self.data_service.call( + ctxt=ctxt, + method="resolve_location", + args=args) + + resolved_location = \ + response and response.get('resolved_location') + if not resolved_location: + raise TranslatorException( + "Unable to resolve location {}".format(location) + ) + parsed[location] = resolved_location + return parsed + + def parse_demands(self, demands): + """Validate/prepare demands for use by the solver.""" + if type(demands) is not dict: + raise TranslatorException("Demands must be provided in " + "dictionary form") + + # Look at each demand + demands_copy = copy.deepcopy(demands) + parsed = {} + for name, requirements in demands_copy.items(): + inventory_candidates = [] + for requirement in requirements: + for key in requirement: + if key not in DEMAND_KEYS: + raise TranslatorException( + "Demand {} has an invalid key {}".format( + requirement, key)) + + if 'candidates' in requirement: + # Candidates *must* specify an inventory provider + provider = requirement.get("inventory_provider") + if provider and provider not in INVENTORY_PROVIDERS: + raise TranslatorException( + "Unsupported inventory provider {} " + "in demand {}".format(provider, name)) + else: + provider = DEFAULT_INVENTORY_PROVIDER + + # Check each candidate + for candidate in requirement.get('candidates'): + # Must be a dictionary + if type(candidate) is not dict: + raise TranslatorException( + "Candidate found in demand {} that is " + "not a dictionary".format(name)) + + # Must have only supported keys + for key in candidate.keys(): + if key not in CANDIDATE_KEYS: + raise TranslatorException( + "Candidate with invalid key {} found " + "in demand {}".format(key, name) + ) + + # TODO(jdandrea): Check required/optional keys + + # Set the inventory provider if not already + candidate['inventory_provider'] = \ + candidate.get('inventory_provider', provider) + + # Set cost if not already (default cost is 0?) + candidate['cost'] = candidate.get('cost', 0) + + # Add to our list of parsed candidates + inventory_candidates.append(candidate) + + # candidates are specified through inventory providers + # Do the basic sanity checks for inputs + else: + # inventory provider MUST be specified + provider = requirement.get("inventory_provider") + if not provider: + raise TranslatorException( + "Inventory provider not specified " + "in demand {}".format(name) + ) + elif provider and provider not in INVENTORY_PROVIDERS: + raise TranslatorException( + "Unsupported inventory provider {} " + "in demand {}".format(provider, name) + ) + else: + provider = DEFAULT_INVENTORY_PROVIDER + requirement['provider'] = provider + + # inventory type MUST be specified + inventory_type = requirement.get('inventory_type') + if not inventory_type or inventory_type == '': + raise TranslatorException( + "Inventory type not specified for " + "demand {}".format(name) + ) + if inventory_type and \ + inventory_type not in INVENTORY_TYPES: + raise TranslatorException( + "Unknown inventory type {} specified for " + "demand {}".format(inventory_type, name) + ) + + # For service inventories, customer_id and + # service_type MUST be specified + if inventory_type == 'service': + customer_id = requirement.get('customer_id') + if not customer_id: + raise TranslatorException( + "Customer ID not specified for " + "demand {}".format(name) + ) + service_type = requirement.get('service_type') + if not service_type: + raise TranslatorException( + "Service Type not specified for " + "demand {}".format(name) + ) + + # TODO(jdandrea): Check required/optional keys for requirement + # elif 'inventory_type' in requirement: + # # For now this is just a stand-in candidate + # candidate = { + # 'inventory_provider': + # requirement.get('inventory_provider', + # DEFAULT_INVENTORY_PROVIDER), + # 'inventory_type': + # requirement.get('inventory_type', ''), + # 'candidate_id': '', + # 'location_id': '', + # 'location_type': '', + # 'cost': 0, + # } + # + # # Add to our list of parsed candidates + # inventory_candidates.append(candidate) + + # Ask conductor-data for one or more candidates. + ctxt = { + "plan_id": self._plan_id, + "plan_name": self._plan_name, + } + args = { + "demands": { + name: requirements, + } + } + + # Check if required_candidate and excluded candidate + # are mutually exclusive. + for requirement in requirements: + required_candidates = requirement.get("required_candidates") + excluded_candidates = requirement.get("excluded_candidates") + if (required_candidates and + excluded_candidates and + set(map(lambda entry: entry['candidate_id'], + required_candidates)) + & set(map(lambda entry: entry['candidate_id'], + excluded_candidates))): + raise TranslatorException( + "Required candidate list and excluded candidate" + " list are not mutually exclusive for demand" + " {}".format(name) + ) + + response = self.data_service.call( + ctxt=ctxt, + method="resolve_demands", + args=args) + + resolved_demands = \ + response and response.get('resolved_demands') + + required_candidates = resolved_demands\ + .get('required_candidates') + if not resolved_demands: + raise TranslatorException( + "Unable to resolve inventory " + "candidates for demand {}" + .format(name) + ) + resolved_candidates = resolved_demands.get(name) + for candidate in resolved_candidates: + inventory_candidates.append(candidate) + if len(inventory_candidates) < 1: + if not required_candidates: + raise TranslatorException( + "Unable to find any candidate for " + "demand {}".format(name) + ) + else: + raise TranslatorException( + "Unable to find any required " + "candidate for demand {}" + .format(name) + ) + parsed[name] = { + "candidates": inventory_candidates, + } + + return parsed + + def parse_constraints(self, constraints): + """Validate/prepare constraints for use by the solver.""" + if type(constraints) is not dict: + raise TranslatorException("Constraints must be provided in " + "dictionary form") + + # Look at each constraint. Properties must exist, even if empty. + constraints_copy = copy.deepcopy(constraints) + + parsed = {} + for name, constraint in constraints_copy.items(): + + if not constraint.get('properties'): + constraint['properties'] = {} + + constraint_type = constraint.get('type') + constraint_def = CONSTRAINTS.get(constraint_type) + + # Is it a supported type? + if constraint_type not in CONSTRAINTS: + raise TranslatorException( + "Unsupported type '{}' found in constraint " + "named '{}'".format(constraint_type, name)) + + # Now walk through the constraint's content + for key, value in constraint.items(): + # Must be a supported key + if key not in CONSTRAINT_KEYS: + raise TranslatorException( + "Invalid key '{}' found in constraint " + "named '{}'".format(key, name)) + + # For properties ... + if key == 'properties': + # Make sure all required properties are present + required = constraint_def.get('required', []) + for req_prop in required: + if req_prop not in value.keys(): + raise TranslatorException( + "Required property '{}' not found in " + "constraint named '{}'".format( + req_prop, name)) + if not value.get(req_prop) \ + or value.get(req_prop) == '': + raise TranslatorException( + "No value specified for property '{}' in " + "constraint named '{}'".format( + req_prop, name)) + + # Make sure there are no unknown properties + optional = constraint_def.get('optional', []) + for prop_name in value.keys(): + if prop_name not in required + optional: + raise TranslatorException( + "Unknown property '{}' in " + "constraint named '{}'".format( + prop_name, name)) + + # If a property has a controlled vocabulary, make + # sure its value is one of the allowed ones. + allowed = constraint_def.get('allowed', {}) + for prop_name, allowed_values in allowed.items(): + if prop_name in value.keys(): + prop_value = value.get(prop_name, '') + if prop_value not in allowed_values: + raise TranslatorException( + "Property '{}' value '{}' unsupported in " + "constraint named '{}' (must be one of " + "{})".format(prop_name, prop_value, + name, allowed_values)) + + # Break all threshold-formatted values into parts + thresholds = constraint_def.get('thresholds', {}) + for thr_prop, base_units in thresholds.items(): + if thr_prop in value.keys(): + expression = value.get(thr_prop) + thr = threshold.Threshold(expression, base_units) + value[thr_prop] = thr.parts + + # We already know we have one or more demands due to + # validate_components(). We still need to coerce the demands + # into a list in case only one demand was provided. + constraint_demands = constraint.get('demands') + if isinstance(constraint_demands, six.string_types): + constraint['demands'] = [constraint_demands] + + # Either split the constraint into parts, one per demand, + # or use it as-is + if constraint_def.get('split'): + for demand in constraint.get('demands', []): + constraint_demand = name + '_' + demand + parsed[constraint_demand] = copy.deepcopy(constraint) + parsed[constraint_demand]['name'] = name + parsed[constraint_demand]['demands'] = demand + else: + parsed[name] = copy.deepcopy(constraint) + parsed[name]['name'] = name + + return parsed + + def parse_optimization(self, optimization): + """Validate/prepare optimization for use by the solver.""" + + # WARNING: The template format for optimization is generalized, + # however the solver is very particular about the expected + # goal, functions, and operands. Therefore, for the time being, + # we are choosing to be highly conservative in what we accept + # at the template level. Once the solver can handle the more + # general form, we can make the translation pass using standard + # compiler techniques and tools like antlr (antlr4-python2-runtime). + + if not optimization: + LOG.debug("No objective function or " + "optimzation provided in the template") + return + + optimization_copy = copy.deepcopy(optimization) + parsed = { + "goal": "min", + "operation": "sum", + "operands": [], + } + + if type(optimization_copy) is not dict: + raise TranslatorException("Optimization must be a dictionary.") + + goals = optimization_copy.keys() + if goals != ['minimize']: + raise TranslatorException( + "Optimization must contain a single goal of 'minimize'.") + + funcs = optimization_copy['minimize'].keys() + if funcs != ['sum']: + raise TranslatorException( + "Optimization goal 'minimize' must " + "contain a single function of 'sum'.") + + operands = optimization_copy['minimize']['sum'] + if type(operands) is not list: + # or len(operands) != 2: + raise TranslatorException( + "Optimization goal 'minimize', function 'sum' " + "must be a list of exactly two operands.") + + def get_distance_between_args(operand): + args = operand.get('distance_between') + if type(args) is not list and len(args) != 2: + raise TranslatorException( + "Optimization 'distance_between' arguments must " + "be a list of length two.") + + got_demand = False + got_location = False + for arg in args: + if not got_demand and arg in self._demands.keys(): + got_demand = True + if not got_location and arg in self._locations.keys(): + got_location = True + if not got_demand or not got_location: + raise TranslatorException( + "Optimization 'distance_between' arguments {} must " + "include one valid demand name and one valid " + "location name.".format(args)) + + return args + + for operand in operands: + weight = 1.0 + args = None + + if operand.keys() == ['distance_between']: + # Value must be a list of length 2 with one + # location and one demand + args = get_distance_between_args(operand) + + elif operand.keys() == ['product']: + for product_op in operand['product']: + if threshold.is_number(product_op): + weight = product_op + elif type(product_op) is dict: + if product_op.keys() == ['distance_between']: + function = 'distance_between' + args = get_distance_between_args(product_op) + elif product_op.keys() == ['cloud_version']: + function = 'cloud_version' + args = product_op.get('cloud_version') + + if not args: + raise TranslatorException( + "Optimization products must include at least " + "one 'distance_between' function call and " + "one optional number to be used as a weight.") + + # We now have our weight/function_param. + parsed['operands'].append( + { + "operation": "product", + "weight": weight, + "function": function, + "function_param": args, + } + ) + return parsed + + def parse_reservations(self, reservations): + demands = self._demands + if type(reservations) is not dict: + raise TranslatorException("Reservations must be provided in " + "dictionary form") + + parsed = {} + if reservations: + parsed['counter'] = 0 + for name, reservation in reservations.items(): + if not reservation.get('properties'): + reservation['properties'] = {} + for demand in reservation.get('demands', []): + if demand in demands.keys(): + constraint_demand = name + '_' + demand + parsed['demands'] = {} + parsed['demands'][constraint_demand] = \ + copy.deepcopy(reservation) + parsed['demands'][constraint_demand]['name'] = name + parsed['demands'][constraint_demand]['demand'] = demand + + return parsed + + def do_translation(self): + """Perform the translation.""" + if not self.valid: + raise TranslatorException("Can't translate an invalid template.") + self._translation = { + "conductor_solver": { + "version": self._version, + "plan_id": self._plan_id, + "locations": self.parse_locations(self._locations), + "demands": self.parse_demands(self._demands), + "constraints": self.parse_constraints(self._constraints), + "objective": self.parse_optimization(self._optmization), + "reservations": self.parse_reservations(self._reservations), + } + } + + def translate(self): + """Translate the template for the solver.""" + self._ok = False + try: + self.create_components() + self.validate_components() + self.parse_parameters() + self.do_translation() + self._ok = True + except Exception as exc: + self._error_message = exc.message + + @property + def valid(self): + """Returns True if the template has been validated.""" + return self._valid + + @property + def ok(self): + """Returns True if the translation was successful.""" + return self._ok + + @property + def translation(self): + """Returns the translation if it was successful.""" + return self._translation + + @property + def error_message(self): + """Returns the last known error message.""" + return self._error_message + + +def main(): + template_name = 'some_template' + + path = os.path.abspath(conductor_root) + dir_path = os.path.dirname(path) + + # Prepare service-wide components (e.g., config) + conf = service.prepare_service( + [], config_files=[dir_path + '/../etc/conductor/conductor.conf']) + # conf.set_override('mock', True, 'music_api') + + t1 = threshold.Threshold("< 500 ms", "time") + t2 = threshold.Threshold("= 120 mi", "distance") + t3 = threshold.Threshold("160", "currency") + t4 = threshold.Threshold("60-80 Gbps", "throughput") + print('t1: {}\nt2: {}\nt3: {}\nt4: {}\n'.format(t1, t2, t3, t4)) + + template_file = dir_path + '/tests/data/' + template_name + '.yaml' + fd = open(template_file, "r") + template = yaml.load(fd) + + trns = Translator(conf, template_name, str(uuid.uuid4()), template) + trns.translate() + if trns.ok: + print(json.dumps(trns.translation, indent=2)) + else: + print("TESTING - Translator Error: {}".format(trns.error_message)) + +if __name__ == '__main__': + main() diff --git a/conductor/conductor/controller/translator_svc.py b/conductor/conductor/controller/translator_svc.py new file mode 100644 index 0000000..425ff36 --- /dev/null +++ b/conductor/conductor/controller/translator_svc.py @@ -0,0 +1,162 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import time + +import cotyledon +import futurist +from oslo_config import cfg +from oslo_log import log + +from conductor.common.music import api +from conductor.common.music import messaging as music_messaging +from conductor.controller import translator +from conductor.i18n import _LE, _LI +from conductor import messaging + +LOG = log.getLogger(__name__) + +CONF = cfg.CONF + +CONTROLLER_OPTS = [ + cfg.IntOpt('polling_interval', + default=1, + min=1, + help='Time between checking for new plans. ' + 'Default value is 1.'), +] + +CONF.register_opts(CONTROLLER_OPTS, group='controller') + + +class TranslatorService(cotyledon.Service): + """Template Translator service. + + This service looks for untranslated templates and + preps them for solving by the Solver service. + """ + + # This will appear in 'ps xaf' + name = "Template Translator" + + def __init__(self, worker_id, conf, **kwargs): + """Initializer""" + LOG.debug("%s" % self.__class__.__name__) + super(TranslatorService, self).__init__(worker_id) + self._init(conf, **kwargs) + self.running = True + + def _init(self, conf, **kwargs): + self.conf = conf + self.Plan = kwargs.get('plan_class') + self.kwargs = kwargs + + # Set up the RPC service(s) we want to talk to. + self.data_service = self.setup_rpc(conf, "data") + + # Set up Music access. + self.music = api.API() + + def _gracefully_stop(self): + """Gracefully stop working on things""" + pass + + def _restart(self): + """Prepare to restart the service""" + pass + + def setup_rpc(self, conf, topic): + """Set up the RPC Client""" + # TODO(jdandrea): Put this pattern inside music_messaging? + transport = messaging.get_transport(conf=conf) + target = music_messaging.Target(topic=topic) + client = music_messaging.RPCClient(conf=conf, + transport=transport, + target=target) + return client + + def translate(self, plan): + """Translate the plan to a format the solver can use""" + # Update the translation field and set status to TRANSLATED. + try: + LOG.info(_LI("Requesting plan {} translation").format( + plan.id)) + trns = translator.Translator( + self.conf, plan.name, plan.id, plan.template) + trns.translate() + if trns.ok: + plan.translation = trns.translation + plan.status = self.Plan.TRANSLATED + LOG.info(_LI( + "Plan {} translated. Ready for solving").format( + plan.id)) + else: + plan.message = trns.error_message + plan.status = self.Plan.ERROR + LOG.error(_LE( + "Plan {} translation error encountered").format( + plan.id)) + except Exception as ex: + template = "An exception of type {0} occurred, arguments:\n{1!r}" + plan.message = template.format(type(ex).__name__, ex.args) + plan.status = self.Plan.ERROR + + plan.update() + + def __check_for_templates(self): + """Wait for the polling interval, then do the real template check.""" + + # Wait for at least poll_interval sec + polling_interval = self.conf.controller.polling_interval + time.sleep(polling_interval) + + # Look for plans with the status set to TEMPLATE + plans = self.Plan.query.all() + for plan in plans: + # If there's a template to be translated, do it! + if plan.status == self.Plan.TEMPLATE: + self.translate(plan) + break + elif plan.timedout: + # Move plan to error status? Create a new timed-out status? + # todo(snarayanan) + continue + + def run(self): + """Run""" + LOG.debug("%s" % self.__class__.__name__) + + # Look for templates to translate from within a thread + executor = futurist.ThreadPoolExecutor() + while self.running: + fut = executor.submit(self.__check_for_templates) + fut.result() + executor.shutdown() + + def terminate(self): + """Terminate""" + LOG.debug("%s" % self.__class__.__name__) + self.running = False + self._gracefully_stop() + super(TranslatorService, self).terminate() + + def reload(self): + """Reload""" + LOG.debug("%s" % self.__class__.__name__) + self._restart() -- cgit 1.2.3-korg