diff options
author | rl001m <ruilu@research.att.com> | 2018-09-20 11:36:14 -0400 |
---|---|---|
committer | rl001m <ruilu@research.att.com> | 2018-09-21 10:07:25 -0400 |
commit | 4a4ffec30cf335fc7ea84daea95308ea454f2376 (patch) | |
tree | b9147bdf552c9766ea4ff24cf572c1d4bf5318bf /conductor | |
parent | 933d21d61f23b9c854b883eb357aea612cf1df38 (diff) |
Digestion to ONAP
This commit contains features added across previous iteration.
Unfortunatley, its hard to separate out all the features / defect fixes
under the current setup unless an automated ingestion and integration
mechanims is in place.
Following are the details of each feature / defect fix.
- (OPTFRA-347) New Music Response Changes
Required changes for upgrading to music version 3.x
- (OPTFRA-346) Triage Tool Feature Changes
Triage tool for finding out issues with candidates and how they were dropped
- (OPTFRA-280) Order Locking Feature Changes
Orders from SO need to be parked (and later retriggered) in
scenarios where the orchestration of the already given solution
is in progress.
- (OPTFRA-281) Latency Based Homing
This features enables homing based on 'network latency' defined in
has's internally maintained tables instead of plain distance.
Other defect fixes / performance improvements
- AAI Timeout give up change
Defect fix related to dropping candidates when AAI timed out
and didn't retrun within 3 tries. has will now give up in these
scenarios.
- Defect fix replaced all() query with status based query. There was no need to reachout to Music to grab the whole plans
table - we now go only by the value of the status field.
Issue-ID: OPTFRA-280
Change-Id: I47fa7651c5c81763770771d6f7e7ff4ab155d18e
Signed-off-by: rl001m <ruilu@research.att.com>
Diffstat (limited to 'conductor')
76 files changed, 5075 insertions, 619 deletions
diff --git a/conductor/README.rst b/conductor/README.rst new file mode 100644 index 0000000..6d67fd0 --- /dev/null +++ b/conductor/README.rst @@ -0,0 +1,38 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +========= +Conductor +========= + +Conductor is the implementation of the Homing Service. + +Given the description of what needs to be deployed (demands) and the placement requirements (constraints), Conductor determines placement candidates that meet all constraints while optimizing the resource usage of the AIC infrastructure. A customer request may be satisfied by deploying new VMs in AIC (AIC inventory) or by using existing service instances with enough remaining capacity (service inventory). + +The formal project name is *Conductor*. From a canonical standpoint, Conductor is known as a *homing service*, in the same way OpenStack Heat is an orchestration service, or Nova is a compute service. References to *Homing Conductor* are a misnomer. Instead, refer to the project name (Conductor) or the canonical name (homing service). + +* Free software: License TBD +* `PyPI`_ - package installation +* `Python/Linux Distribution Notes`_ +* `Conductor Template Guide`_ +* `Example Templates`_ +* `Homing API`_ +* `Bugs`_ - issue tracking +* `Source`_ + diff --git a/conductor/conductor/api/controllers/root.py b/conductor/conductor/api/controllers/root.py index abf3f20..f0590b3 100644 --- a/conductor/conductor/api/controllers/root.py +++ b/conductor/conductor/api/controllers/root.py @@ -19,6 +19,7 @@ import pecan +from conductor import version as conductor_version from conductor.api.controllers import errors from conductor.api.controllers.v1 import root as v1 @@ -36,7 +37,7 @@ class RootController(object): def index(self): """Catchall for all methods""" base_url = pecan.request.application_url - available = [{'tag': 'v1', 'date': '2016-11-01T00:00:00Z', }] + available = [{'tag': 'v1', 'date': '2016-11-01T00:00:00Z', },{'tag': 'v1', 'date': '2018-02-01T00:00:00Z', }] collected = [version_descriptor(base_url, v['tag'], v['date']) for v in available] versions = {'versions': collected} @@ -46,12 +47,14 @@ class RootController(object): def version_descriptor(base_url, version, released_on): """Version Descriptor""" url = version_url(base_url, version) + version_current = conductor_version.version_info.version_string() return { + 'id': version, + 'version': "of-has:{}".format( str(version_current)), 'links': [ {'href': url, 'rel': 'self', }, - {'href': 'https://wiki.onap.org/pages' - '/viewpage.action?pageId=16005528', + {'href': 'https://wiki.onap.org/pages/viewpage.action?pageId=16005528', 'rel': 'describedby', 'type': 'text/html', }], 'media-types': [ {'base': 'application/json', 'type': MEDIA_TYPE_JSON % version, }], @@ -62,4 +65,4 @@ def version_descriptor(base_url, version, released_on): def version_url(base_url, version_number): """Version URL""" - return '%s/%s' % (base_url, version_number) + return '%s/%s' % (base_url, version_number)
\ No newline at end of file diff --git a/conductor/conductor/api/controllers/v1/latency_country_rules_loader.py b/conductor/conductor/api/controllers/v1/latency_country_rules_loader.py new file mode 100644 index 0000000..37ee456 --- /dev/null +++ b/conductor/conductor/api/controllers/v1/latency_country_rules_loader.py @@ -0,0 +1,132 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import base64 + +from notario import decorators +from notario.validators import types +from oslo_log import log +import pecan +from conductor.api import latency_dataloader + +from conductor.api.controllers import error +from conductor.api.controllers import string_or_dict +from conductor.i18n import _, _LI +from oslo_config import cfg + +CONF = cfg.CONF + + +LOG = log.getLogger(__name__) + +class LatencyCountryRulesBaseController(object): + + def load(self, args): + + ctx = {} + #method = 'release_orders' + method = 'load' + client = pecan.request.controller + latency_dataloader.LatencyDataLoader().load_into_country_letancy(args) + + response = "OK" + + return response + +class LatencyCountryRulesLoaderController(LatencyCountryRulesBaseController): + + @classmethod + def allow(cls): + """Allowed methods""" + return 'POST' + + @pecan.expose(generic=True, template='json') + def index(self): + """Catchall for unallowed methods""" + message = _('The {} method is not allowed.').format( + pecan.request.method) + kwargs = {'allow': self.allow()} + error('/errors/not_allowed', message, **kwargs) + + @index.when(method='OPTIONS', template='json') + def index_options(self): + """Options""" + pecan.response.headers['Allow'] = self.allow() + pecan.response.status = 204 + + + @index.when(method='POST', template='json') + # @validate(CREATE_SCHEMA, '/errors/schema') + def index_post(self): + + args = pecan.request.json + + if check_basic_auth(): + response = self.load(args) + if not response: + error('/errors/server_error', _('Unable to insert')) + else: + pecan.response.status = 201 + return response + + +def check_basic_auth(): + ''' + :return: boolean + ''' + + try: + if pecan.request.headers['Authorization'] and verify_user(pecan.request.headers['Authorization']): + LOG.debug("Authorized username and password") + plan = True + else: + plan = False + auth_str = pecan.request.headers['Authorization'] + user_pw = auth_str.split(' ')[1] + decode_user_pw = base64.b64decode(user_pw) + list_id_pw = decode_user_pw.split(':') + LOG.error("Incorrect username={} / password={}".format(list_id_pw[0],list_id_pw[1])) + except: + error('/errors/basic_auth_error', _('Unauthorized: The request does not provide any HTTP authentication (basic authetnication)')) + + if not plan: + error('/errors/authentication_error', _('Invalid credentials: username or password is incorrect')) + return plan + + +def verify_user(authstr): + """ + authenticate user as per config file + :param headers: + :return boolean value + """ + user_dict = dict() + auth_str = authstr + user_pw = auth_str.split(' ')[1] + decode_user_pw = base64.b64decode(user_pw) + list_id_pw = decode_user_pw.split(':') + user_dict['username'] = list_id_pw[0] + user_dict['password'] = list_id_pw[1] + password = CONF.conductor_api.password + username = CONF.conductor_api.username + if username == user_dict['username'] and password == user_dict['password']: + return True + else: + return False + diff --git a/conductor/conductor/api/controllers/v1/latency_loader.py b/conductor/conductor/api/controllers/v1/latency_loader.py new file mode 100644 index 0000000..f5c5336 --- /dev/null +++ b/conductor/conductor/api/controllers/v1/latency_loader.py @@ -0,0 +1,52 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from oslo_log import log +import pecan +from pecan import expose +from conductor.api import latency_dataloader + + + +LOG = log.getLogger(__name__) + +class LatencyLoadController(object): + + + def __init__(self): + pass + + + @expose(generic=True, template='json') + def index(self): + return 'call to get method' + + + + @index.when(method='POST', template='json') + def index_POST(self, **kwargs): + json_data = kwargs['data'] + test = latency_dataloader.LatencyDataLoader().load_into_rph(json_data) + + return kwargs['data'] + + + + +pecan.route(LatencyLoadController, 'data-loader', latency_dataloader.LatencyDataLoader())
\ No newline at end of file diff --git a/conductor/conductor/api/controllers/v1/latency_reduction_loader.py b/conductor/conductor/api/controllers/v1/latency_reduction_loader.py new file mode 100644 index 0000000..ac7c16e --- /dev/null +++ b/conductor/conductor/api/controllers/v1/latency_reduction_loader.py @@ -0,0 +1,132 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import base64 + +from notario import decorators +from notario.validators import types +from oslo_log import log +import pecan +from conductor.api import latency_dataloader + +from conductor.api.controllers import error +from conductor.api.controllers import string_or_dict +from conductor.i18n import _, _LI +from oslo_config import cfg + +CONF = cfg.CONF + + +LOG = log.getLogger(__name__) + +class LatencyLoaderBaseController(object): + + def load(self, args): + + ctx = {} + #method = 'release_orders' + method = 'load_latency_rules' + client = pecan.request.controller + latency_dataloader.LatencyDataLoader().load_into_rph(args) + + response = "OK" + + return response + +class LatencyLoaderController(LatencyLoaderBaseController): + + @classmethod + def allow(cls): + """Allowed methods""" + return 'POST' + + @pecan.expose(generic=True, template='json') + def index(self): + """Catchall for unallowed methods""" + message = _('The {} method is not allowed.').format( + pecan.request.method) + kwargs = {'allow': self.allow()} + error('/errors/not_allowed', message, **kwargs) + + @index.when(method='OPTIONS', template='json') + def index_options(self): + """Options""" + pecan.response.headers['Allow'] = self.allow() + pecan.response.status = 204 + + + @index.when(method='POST', template='json') + # @validate(CREATE_SCHEMA, '/errors/schema') + def index_post(self): + + args = pecan.request.json + + if check_basic_auth(): + response = self.load(args) + if not response: + error('/errors/server_error', _('Unable to insert')) + else: + pecan.response.status = 201 + return response + + +def check_basic_auth(): + ''' + :return: boolean + ''' + + try: + if pecan.request.headers['Authorization'] and verify_user(pecan.request.headers['Authorization']): + LOG.debug("Authorized username and password") + plan = True + else: + plan = False + auth_str = pecan.request.headers['Authorization'] + user_pw = auth_str.split(' ')[1] + decode_user_pw = base64.b64decode(user_pw) + list_id_pw = decode_user_pw.split(':') + LOG.error("Incorrect username={} / password={}".format(list_id_pw[0],list_id_pw[1])) + except: + error('/errors/basic_auth_error', _('Unauthorized: The request does not provide any HTTP authentication (basic authetnication)')) + + if not plan: + error('/errors/authentication_error', _('Invalid credentials: username or password is incorrect')) + return plan + + +def verify_user(authstr): + """ + authenticate user as per config file + :param headers: + :return boolean value + """ + user_dict = dict() + auth_str = authstr + user_pw = auth_str.split(' ')[1] + decode_user_pw = base64.b64decode(user_pw) + list_id_pw = decode_user_pw.split(':') + user_dict['username'] = list_id_pw[0] + user_dict['password'] = list_id_pw[1] + password = CONF.conductor_api.password + username = CONF.conductor_api.username + if username == user_dict['username'] and password == user_dict['password']: + return True + else: + return False + diff --git a/conductor/conductor/api/controllers/v1/plans.py b/conductor/conductor/api/controllers/v1/plans.py index b9f7717..3858069 100644 --- a/conductor/conductor/api/controllers/v1/plans.py +++ b/conductor/conductor/api/controllers/v1/plans.py @@ -36,6 +36,7 @@ from oslo_config import cfg CONF = cfg.CONF + LOG = log.getLogger(__name__) CONDUCTOR_API_OPTS = [ @@ -43,14 +44,14 @@ CONDUCTOR_API_OPTS = [ default='', help='Base URL for plans.'), cfg.StrOpt('username', - default='', + default='admin1', help='username for plans.'), cfg.StrOpt('password', - default='', + default='plan.15', help='password for plans.'), cfg.BoolOpt('basic_auth_secure', - default=True, - help='auth toggling.') + default=True, + help='auth toggling.') ] CONF.register_opts(CONDUCTOR_API_OPTS, group='conductor_api') @@ -60,6 +61,7 @@ CREATE_SCHEMA = ( (decorators.optional('id'), types.string), (decorators.optional('limit'), types.integer), (decorators.optional('name'), types.string), + (decorators.optional('num_solution'), types.string), ('template', string_or_dict), (decorators.optional('template_url'), types.string), (decorators.optional('timeout'), types.integer), @@ -87,8 +89,8 @@ class PlansBaseController(object): basic_auth_flag = CONF.conductor_api.basic_auth_secure if plan_id == 'healthcheck' or \ - not basic_auth_flag or \ - (basic_auth_flag and check_basic_auth()): + not basic_auth_flag or \ + (basic_auth_flag and check_basic_auth()): return self.plan_getid(plan_id) def plan_getid(self, plan_id): @@ -145,7 +147,6 @@ class PlansBaseController(object): args.get('name'))) client = pecan.request.controller - transaction_id = pecan.request.headers.get('transaction-id') if transaction_id: args['template']['transaction-id'] = transaction_id @@ -291,11 +292,10 @@ class PlansController(PlansBaseController): basic_auth_flag = CONF.conductor_api.basic_auth_secure - # Create the plan only when the basic authentication is disabled or pass the authenticaiton check + # Create the plan only when the basic authentication is disabled or pass the authentication check if not basic_auth_flag or \ - (basic_auth_flag and check_basic_auth()): + (basic_auth_flag and check_basic_auth()): plan = self.plan_create(args) - if not plan: error('/errors/server_error', _('Unable to create Plan.')) else: @@ -307,12 +307,11 @@ class PlansController(PlansBaseController): """Pecan subcontroller routing callback""" return PlansItemController(uuid4), remainder - def check_basic_auth(): """ Returns True/False if the username/password of Basic Auth match/not match :return boolean value - """ + """ try: if pecan.request.headers['Authorization'] and verify_user(pecan.request.headers['Authorization']): @@ -324,22 +323,19 @@ def check_basic_auth(): user_pw = auth_str.split(' ')[1] decode_user_pw = base64.b64decode(user_pw) list_id_pw = decode_user_pw.split(':') - LOG.error("Incorrect username={} / password={}".format(list_id_pw[0], list_id_pw[1])) + LOG.error("Incorrect username={}/password={}".format(list_id_pw[0],list_id_pw[1])) except: error('/errors/basic_auth_error', _('Unauthorized: The request does not ' 'provide any HTTP authentication (basic authentication)')) - plan = False if not plan: error('/errors/authentication_error', _('Invalid credentials: username or password is incorrect')) - return plan def verify_user(authstr): """ authenticate user as per config file - :param authstr: :return boolean value """ user_dict = dict() @@ -351,9 +347,6 @@ def verify_user(authstr): user_dict['password'] = list_id_pw[1] password = CONF.conductor_api.password username = CONF.conductor_api.username - - print ("Expected username/password: {}/{}".format(username, password)) - if username == user_dict['username'] and password == user_dict['password']: return True else: diff --git a/conductor/conductor/api/controllers/v1/release_orders.py b/conductor/conductor/api/controllers/v1/release_orders.py new file mode 100644 index 0000000..47200dd --- /dev/null +++ b/conductor/conductor/api/controllers/v1/release_orders.py @@ -0,0 +1,128 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import base64 + +from notario import decorators +from notario.validators import types +from oslo_log import log +import pecan + +from conductor.api.controllers import error +from conductor.api.controllers import string_or_dict +from conductor.i18n import _, _LI +from oslo_config import cfg + +CONF = cfg.CONF + + +LOG = log.getLogger(__name__) + +class LocksBaseController(object): + + def release_orders(self, args): + + ctx = {} + method = 'release_orders' + client = pecan.request.controller + response = client.call(ctx, method, args) + + return response + +class LocksController(LocksBaseController): + + @classmethod + def allow(cls): + """Allowed methods""" + return 'POST' + + @pecan.expose(generic=True, template='json') + def index(self): + """Catchall for unallowed methods""" + message = _('The {} method is not allowed.').format( + pecan.request.method) + kwargs = {'allow': self.allow()} + error('/errors/not_allowed', message, **kwargs) + + @index.when(method='OPTIONS', template='json') + def index_options(self): + """Options""" + pecan.response.headers['Allow'] = self.allow() + pecan.response.status = 204 + + + @index.when(method='POST', template='json') + # @validate(CREATE_SCHEMA, '/errors/schema') + def index_post(self): + + args = pecan.request.json + + if check_basic_auth(): + response = self.release_orders(args) + if not response: + error('/errors/server_error', _('Unable to release orders')) + else: + pecan.response.status = 201 + return response + + +def check_basic_auth(): + ''' + :return: boolean + ''' + + try: + if pecan.request.headers['Authorization'] and verify_user(pecan.request.headers['Authorization']): + LOG.debug("Authorized username and password") + plan = True + else: + plan = False + auth_str = pecan.request.headers['Authorization'] + user_pw = auth_str.split(' ')[1] + decode_user_pw = base64.b64decode(user_pw) + list_id_pw = decode_user_pw.split(':') + LOG.error("Incorrect username={} / password={}".format(list_id_pw[0],list_id_pw[1])) + except: + error('/errors/basic_auth_error', _('Unauthorized: The request does not provide any HTTP authentication (basic authetnication)')) + + if not plan: + error('/errors/authentication_error', _('Invalid credentials: username or password is incorrect')) + return plan + + +def verify_user(authstr): + """ + authenticate user as per config file + :param headers: + :return boolean value + """ + user_dict = dict() + auth_str = authstr + user_pw = auth_str.split(' ')[1] + decode_user_pw = base64.b64decode(user_pw) + list_id_pw = decode_user_pw.split(':') + user_dict['username'] = list_id_pw[0] + user_dict['password'] = list_id_pw[1] + password = CONF.conductor_api.password + username = CONF.conductor_api.username + if username == user_dict['username'] and password == user_dict['password']: + return True + else: + return False + diff --git a/conductor/conductor/api/controllers/v1/root.py b/conductor/conductor/api/controllers/v1/root.py index 87b4a35..f0c2a53 100644 --- a/conductor/conductor/api/controllers/v1/root.py +++ b/conductor/conductor/api/controllers/v1/root.py @@ -18,12 +18,17 @@ # from oslo_log import log -import pecan from pecan import secure from conductor.api.controllers import error from conductor.api.controllers.v1 import plans +from conductor.api.controllers.v1 import release_orders +from conductor.api.controllers.v1 import latency_reduction_loader +from conductor.api.controllers.v1 import latency_country_rules_loader +from conductor.api.controllers.v1 import triage from conductor.i18n import _ +import pecan + LOG = log.getLogger(__name__) @@ -32,6 +37,9 @@ class V1Controller(secure.SecureController): """Version 1 API controller root.""" plans = plans.PlansController() + triage_tool = triage.TriageController() + load_latency_rules = latency_reduction_loader.LatencyLoaderController() + load_latency_country_rules = latency_country_rules_loader.LatencyCountryRulesLoaderController() @classmethod def check_permissions(cls): @@ -45,3 +53,6 @@ class V1Controller(secure.SecureController): message = _('The %s method is not allowed.') % pecan.request.method kwargs = {} error('/errors/not_allowed', message, **kwargs) + +#TODO(larry): understand pecan and change the following code +pecan.route(V1Controller, 'release-orders', release_orders.LocksController()) diff --git a/conductor/conductor/api/controllers/v1/test_rules.py b/conductor/conductor/api/controllers/v1/test_rules.py new file mode 100644 index 0000000..23808e5 --- /dev/null +++ b/conductor/conductor/api/controllers/v1/test_rules.py @@ -0,0 +1,128 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import base64 + +from notario import decorators +from notario.validators import types +from oslo_log import log +import pecan + +from conductor.api.controllers import error +from conductor.api.controllers import string_or_dict +from conductor.i18n import _, _LI +from oslo_config import cfg + +CONF = cfg.CONF + + +LOG = log.getLogger(__name__) + +class TestRuleBaseController(object): + + def test_rule(self, args): + + ctx = {} + method = 'test_rule' + client = pecan.request.controller + response = client.call(ctx, method, args) + + return response + +class TestRuleController(TestRuleBaseController): + + @classmethod + def allow(cls): + """Allowed methods""" + return 'POST' + + @pecan.expose(generic=True, template='json') + def index(self): + """Catchall for unallowed methods""" + message = _('The {} method is not allowed.').format( + pecan.request.method) + kwargs = {'allow': self.allow()} + error('/errors/not_allowed', message, **kwargs) + + @index.when(method='OPTIONS', template='json') + def index_options(self): + """Options""" + pecan.response.headers['Allow'] = self.allow() + pecan.response.status = 204 + + + @index.when(method='POST', template='json') + # @validate(CREATE_SCHEMA, '/errors/schema') + def index_post(self): + + args = pecan.request.json + + if check_basic_auth(): + response = self.test_rule(args) + if not response: + error('/errors/server_error', _('Unable to release orders')) + else: + pecan.response.status = 201 + return response + + +def check_basic_auth(): + ''' + :return: boolean + ''' + + try: + if pecan.request.headers['Authorization'] and verify_user(pecan.request.headers['Authorization']): + LOG.debug("Authorized username and password") + plan = True + else: + plan = False + auth_str = pecan.request.headers['Authorization'] + user_pw = auth_str.split(' ')[1] + decode_user_pw = base64.b64decode(user_pw) + list_id_pw = decode_user_pw.split(':') + LOG.error("Incorrect username={} / password={}".format(list_id_pw[0],list_id_pw[1])) + except: + error('/errors/basic_auth_error', _('Unauthorized: The request does not provide any HTTP authentication (basic authetnication)')) + + if not plan: + error('/errors/authentication_error', _('Invalid credentials: username or password is incorrect')) + return plan + + +def verify_user(authstr): + """ + authenticate user as per config file + :param headers: + :return boolean value + """ + user_dict = dict() + auth_str = authstr + user_pw = auth_str.split(' ')[1] + decode_user_pw = base64.b64decode(user_pw) + list_id_pw = decode_user_pw.split(':') + user_dict['username'] = list_id_pw[0] + user_dict['password'] = list_id_pw[1] + password = CONF.conductor_api.password + username = CONF.conductor_api.username + if username == user_dict['username'] and password == user_dict['password']: + return True + else: + return False + diff --git a/conductor/conductor/api/controllers/v1/triage.py b/conductor/conductor/api/controllers/v1/triage.py new file mode 100644 index 0000000..60511a1 --- /dev/null +++ b/conductor/conductor/api/controllers/v1/triage.py @@ -0,0 +1,214 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import base64 + + +from oslo_log import log +import pecan + +from conductor.api.controllers import error +from conductor.i18n import _, _LI +from oslo_config import cfg + +CONF = cfg.CONF + + +LOG = log.getLogger(__name__) + +# Triage_API_OPTS = [ +# cfg.StrOpt('server_url', +# default='', +# help='Base URL for tData.'), +# cfg.StrOpt('username', +# default='admin1', +# help='username for tData.'), +# cfg.StrOpt('password', +# default='plan.15', +# help='password for tData.'), +# cfg.BoolOpt('basic_auth_secure', +# default=True, +# help='auth toggling.') +# ] +# +# CONF.register_opts(Triage_API_OPTS, group='triage_api') +# +# CREATE_SCHEMA = ( +# (decorators.optional('id'), types.string), +# (decorators.optional('name'), types.string), +# ('children', string_or_dict), +# ) + + +class TriageBaseController(object): + """Triage Base Controller - Common Methods""" + + def triage_get(self, id=None): + + basic_auth_flag = CONF.conductor_api.basic_auth_secure + + if id == 'healthcheck' or \ + not basic_auth_flag or \ + (basic_auth_flag and check_basic_auth()): + return self.data_getid(id) + + def data_getid(self, id): + ctx = {} + method = 'triage_get' + if id: + args = {'id': id} + LOG.debug('triage data {} requested.'.format(id)) + else: + args = {} + LOG.debug('All data in triage requested.') + + triage_data_list = [] + client = pecan.request.controller + result = client.call(ctx, method, args) + triage = result + + for tData in triage['triageData']: + thetData_id = tData.get('id') + triage_data_list.append(tData) + + if id: + if len(triage_data_list) == 1: + return triage_data_list[0] + else: + # For a single triage, we return None if not found + return None + else: + # For all tData, it's ok to return an empty list + return triage_data_list + + +class TraigeDataItemController(TriageBaseController): + """Triage Data Item Controller /v1/triage/{id}""" + + def __init__(self, uuid4): + """Initializer.""" + self.uuid = uuid4 + self.triage = self.triage_get(id=self.uuid) + + if not self.triage: + error('/errors/not_found', + _('DAta {} not found').format(self.uuid)) + pecan.request.context['id'] = self.uuid + + @classmethod + def allow(cls): + """Allowed methods""" + return 'GET' + + @pecan.expose(generic=True, data='json') + def index(self): + """Catchall for unallowed methods""" + message = _('The {} method is not allowed.').format( + pecan.request.method) + kwargs = {'allow': self.allow()} + error('/errors/not_allowed', message, **kwargs) + + @index.when(method='OPTIONS', triage='json') + def index_options(self): + """Options""" + pecan.response.headers['Allow'] = self.allow() + pecan.response.status = 204 + + @index.when(method='GET', triage='json') + def index_get(self): + """Get triage data """ + return self.triage + +class TriageController(TriageBaseController): + """tData Controller /v1/triage""" + + @classmethod + def allow(cls): + """Allowed methods""" + return 'GET' + + @pecan.expose(generic=True, triage='json') + def index(self): + """Catchall for unallowed methods""" + message = _('The {} method is not allowed.').format( + pecan.request.method) + kwargs = {'allow': self.allow()} + error('/errors/not_allowed', message, **kwargs) + + @index.when(method='OPTIONS', triage='json') + def index_options(self): + """Options""" + pecan.response.headers['Allow'] = self.allow() + pecan.response.status = 204 + + @index.when(method='GET', triage='json') + def index_get(self): + """Get all the tData""" + tData = self.triage_get() + return {"tData": tData} + + @pecan.expose() + def _lookup(self, uuid4, *remainder): + """Pecan subcontroller routing callback""" + return TraigeDataItemController(uuid4), remainder + + +def check_basic_auth(): + ''' + :return: boolean + ''' + + try: + if pecan.request.headers['Authorization'] and verify_user(pecan.request.headers['Authorization']): + LOG.debug("Authorized username and password") + triage = True + else: + triage = False + auth_str = pecan.request.headers['Authorization'] + user_pw = auth_str.split(' ')[1] + decode_user_pw = base64.b64decode(user_pw) + list_id_pw = decode_user_pw.split(':') + LOG.error("Incorrect username={} / password={}".format(list_id_pw[0],list_id_pw[1])) + except: + error('/errors/basic_auth_error', _('Unauthorized: The request does not provide any HTTP authentication (basic authetnication)')) + + if not triage: + error('/errors/authentication_error', _('Invalid credentials: username or password is incorrect')) + return triage + + +def verify_user(authstr): + """ + authenticate user as per config file + :param headers: + :return boolean value + """ + user_dict = dict() + auth_str = authstr + user_pw = auth_str.split(' ')[1] + decode_user_pw = base64.b64decode(user_pw) + list_id_pw = decode_user_pw.split(':') + user_dict['username'] = list_id_pw[0] + user_dict['password'] = list_id_pw[1] + password = CONF.conductor_api.password + username = CONF.conductor_api.username + if username == user_dict['username'] and password == user_dict['password']: + return True + else: + return False diff --git a/conductor/conductor/api/latency_dataloader.py b/conductor/conductor/api/latency_dataloader.py new file mode 100644 index 0000000..7279704 --- /dev/null +++ b/conductor/conductor/api/latency_dataloader.py @@ -0,0 +1,118 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import csv +from oslo_config import cfg +from oslo_log import log +import collections +import json +from conductor.common.models.region_placeholders import RegionPlaceholders +from conductor.common.models.country_latency import CountryLatency +from conductor.common.music import api +from conductor.common.music.model import base + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + +class LatencyDataLoader(object): + + def __init__(self): + self.Region_PlaceHolder = base.create_dynamic_model(keyspace=CONF.keyspace, baseclass=RegionPlaceholders, classname="RegionPlaceholders") + self.Country_Latency = base.create_dynamic_model(keyspace=CONF.keyspace, baseclass=CountryLatency,classname="CountryLatency") + + + + # load data into region place holder + def load_into_rph(self, data): + LOG.debug("load_into_rph") + datamap = collections.OrderedDict() + group_map = collections.OrderedDict() + datamap = data #= json.loads(json_data) + + region_placeholders = self.Region_PlaceHolder.query.all() + regions_id_list = list() + for region in region_placeholders: + regions_id_list.append(region.id) + + LOG.debug("Removing all existing data from region place holders table") + for region_id in regions_id_list: + replace_holder_row = self.Region_PlaceHolder() + LOG.debug("Removing "+region_id) + response = replace_holder_row.delete(region_id) + LOG.debug("Removed " +str(response) ) + + + + for i, j in enumerate(datamap): + group_map[j['group']] = j['countries'] + + LOG.debug("inserting data into region place holders table") + for k, v in group_map.items(): + group = k + countries = {k:v} + LOG.debug("inserting region "+group) + replace_holder_row = self.Region_PlaceHolder(group,countries) + response = replace_holder_row.insert() + LOG.debug("inserted " + str(response)) + + + def load_into_country_letancy(self, data): + LOG.debug("load_into_country_letancy") + datamap = collections.OrderedDict() # Ordered Dict because the order of rows is important + group_map = collections.OrderedDict() + datamap = data#json.loads(data) + + + #before inserting the new data, remove the existing data from the country_latency table + country_latency_data = self.Country_Latency.query.all() + country_id_list = list() + for country in country_latency_data: + country_id_list.append(country.id) + + LOG.debug("Removing all existing data from country latency table") + for country_id in country_id_list: + replace_holder_row = self.Country_Latency() + LOG.debug("removing " + country_id) + response = replace_holder_row.delete(country_id) + LOG.debug("removed " + str(response)) + + + + + for i, j in enumerate(datamap): + group_map[j['country_name']] = j['groups'] + + LOG.debug("inserting data into country latency table") + for k, v in group_map.items(): + country_name = k + group = list() + for g in v.split('|'): + group.append(g) + + groups = group + LOG.debug("inserting country " + country_name) + country_rules_holder_row = self.Country_Latency(country_name,groups) + response = country_rules_holder_row.insert(); + LOG.debug("inserted " + str(response)) + + + + + + diff --git a/conductor/conductor/api/middleware.py b/conductor/conductor/api/middleware.py index d7686d8..a04ea30 100644 --- a/conductor/conductor/api/middleware.py +++ b/conductor/conductor/api/middleware.py @@ -22,7 +22,6 @@ response with one formatted so the client can parse it. Based on pecan.middleware.errordocument """ - import json from lxml import etree @@ -33,8 +32,26 @@ import webob from conductor import i18n from conductor.i18n import _ +from conductor import version +from oslo_config import cfg + +CONF = cfg.CONF + + LOG = log.getLogger(__name__) +VERSION_AUTH_OPTS = [ + cfg.BoolOpt('version_auth_flag', + default=False, + help='version auth toggling.'), + cfg.StrOpt('version_auth_token', + default='', + help='version auth token') +] + +CONF.register_opts(VERSION_AUTH_OPTS, group='version_auth') + + class ParsableErrorMiddleware(object): """Replace error body with something the client can parse.""" @@ -58,7 +75,29 @@ class ParsableErrorMiddleware(object): # Request for this state, modified by replace_start_response() # and used when an error is being reported. state = {} - + latest_version = version.version_info.version_string() + latest_version_semantic = latest_version.split('.') + minor_version = latest_version_semantic[1] + patch_version = latest_version_semantic[2] + req_minor_version = environ.get('HTTP_X_MINORVERSION') + req_patch_version = environ.get('HTTP_X_PATCHVERSION') + version_auth_flag = CONF.version_auth.version_auth_flag + conf_version_auth_token = CONF.version_auth.version_auth_token + version_auth_token = environ.get('HTTP_VERSION_AUTH_TOKEN') + if req_minor_version is not None: + if int(req_minor_version) <= int(minor_version): + minor_version = req_minor_version + else: + raise Exception(( + 'Expecting minor version less than or equal to %s' % minor_version + )) + if req_patch_version is not None: + if int(req_patch_version) <= int(patch_version): + patch_version = req_patch_version + else: + raise Exception(( + 'Not expecting a patch version but the entered patch version is not acceptable if it is not less than or equal to %s' % patch_version + )) def replacement_start_response(status, headers, exc_info=None): """Overrides the default response to make errors parsable.""" try: @@ -80,6 +119,12 @@ class ParsableErrorMiddleware(object): ] # Save the headers in case we need to modify them. state['headers'] = headers + + if not version_auth_flag or \ + (version_auth_flag and version_auth_token == conf_version_auth_token): + state['headers'].append(('X-MinorVersion', minor_version)) + state['headers'].append(('X-PatchVersion', patch_version)) + state['headers'].append(('X-LatestVersion', latest_version)) return start_response(status, headers, exc_info) app_iter = self.app(environ, replacement_start_response) @@ -124,8 +169,14 @@ class ParsableErrorMiddleware(object): if six.PY3: body = body.encode('utf-8') + state['headers'].append(('Content-Length', str(len(body)))) state['headers'].append(('Content-Type', content_type)) + if not version_auth_flag or \ + (version_auth_flag and version_auth_token == conf_version_auth_token): + state['headers'].append(('X-minorVersion', minor_version)) + state['headers'].append(('X-patchVersion', patch_version)) + state['headers'].append(('X-latestVersion', latest_version)) body = [body] else: body = app_iter diff --git a/conductor/conductor/common/models/country_latency.py b/conductor/conductor/common/models/country_latency.py new file mode 100644 index 0000000..6bbe735 --- /dev/null +++ b/conductor/conductor/common/models/country_latency.py @@ -0,0 +1,113 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from conductor.common.music.model import base +from conductor.common.music import api + + +class CountryLatency(base.Base): + + __tablename__ = "country_latency" + __keyspace__ = None + + id = None + country_name = None + groups = None # type: List[Any] # + + # Status + PARKED = "parked" + UNDER_SPIN_UP = "under-spin-up" + COMPLETED = "completed" + REHOME = "rehome" + FAILED = "failed" + + @classmethod + def schema(cls): + """Return schema.""" + schema = { + 'id': 'text', + 'country_name':'text', + 'groups': 'list<text>', + 'PRIMARY KEY': '(id)' + } + return schema + + @classmethod + def atomic(cls): + """Use atomic operations""" + return True + + @classmethod + def pk_name(cls): + """Primary key name""" + return 'id' + + def pk_value(self): + """Primary key value""" + return self.id + + def values(self): + """Valu-es""" + value_dict = { + #'id': self.id, + 'country_name': self.country_name, + 'groups':self.groups + } + return value_dict + + def delete(self, country_id): + """Update country latency""" + return api.MUSIC_API.row_delete( + self.__keyspace__, self.__tablename__, self.pk_name(), + country_id, True) + + def update(self, country_name, updated_fields): + """Update country latency""" + api.MUSIC_API.row_complex_field_update( + self.__keyspace__, self.__tablename__, self.pk_name(), + self.pk_value(), country_name, updated_fields) + + #def insert(self): + # return \ + # api.MUSIC_API.row_insert_by_condition( + # self.__keyspace__, self.__tablename__, self.pk_name(), + # self.pk_value(), self.values(), self.PARKED) + + def __init__(self, country_name=None,groups=None,_insert=False): + """Initializer""" + super(CountryLatency, self).__init__() + + self.country_name = country_name + self.groups = groups + + if _insert: + self.insert() + + def __repr__(self): + """Object representation""" + return '<CountryLatency {}>'.format(self.id) + + def __json__(self): + """JSON representation""" + json_ = {} + json_[id] = self.id, + json_['country_name'] = self.country_name, + json_['groups'] = self.groups + + return json_ diff --git a/conductor/conductor/common/models/group_rules.py b/conductor/conductor/common/models/group_rules.py new file mode 100644 index 0000000..fb7453f --- /dev/null +++ b/conductor/conductor/common/models/group_rules.py @@ -0,0 +1,107 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from conductor.common.music.model import base +from conductor.common.music import api + + +class GroupRules(base.Base): + + __tablename__ = "group_rules" + __keyspace__ = None + + id = None + group = None + rule = None # type: List[Any] # + + # Status + PARKED = "parked" + UNDER_SPIN_UP = "under-spin-up" + COMPLETED = "completed" + REHOME = "rehome" + FAILED = "failed" + + @classmethod + def schema(cls): + """Return schema.""" + schema = { + 'id': 'text', + 'group':'text', + 'rule': 'map<text,text>', + 'PRIMARY KEY': '(id)' + } + return schema + + @classmethod + def atomic(cls): + """Use atomic operations""" + return True + + @classmethod + def pk_name(cls): + """Primary key name""" + return 'id' + + def pk_value(self): + """Primary key value""" + return self.id + + def values(self): + """Valu-es""" + value_dict = { + 'id': self.id, + 'group': self.group + } + return value_dict + + def update(self, group, updated_fields): + """Update country latency""" + api.MUSIC_API.row_complex_field_update( + self.__keyspace__, self.__tablename__, self.pk_name(), + self.pk_value(), group, updated_fields) + + def insert(self): + return \ + api.MUSIC_API.row_insert_by_condition( + self.__keyspace__, self.__tablename__, self.pk_name(), + self.pk_value(), self.values(), self.PARKED) + + def __init__(self, id=None, group=None,rule=None,_insert=False): + """Initializer""" + super(GroupRules, self).__init__() + + self.id = id + self.group = group + self.rule = rule + + if _insert: + self.insert() + + def __repr__(self): + """Object representation""" + return '<GroupRules {}>'.format(self.id) + + def __json__(self): + """JSON representation""" + json_ = {} + json_[id] = self.id, + json_['group'] = self.group, + json_['rule'] = self.rule + + return json_ diff --git a/conductor/conductor/common/models/groups.py b/conductor/conductor/common/models/groups.py new file mode 100644 index 0000000..b70a747 --- /dev/null +++ b/conductor/conductor/common/models/groups.py @@ -0,0 +1,107 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from conductor.common.music.model import base +from conductor.common.music import api + + +class Groups(base.Base): + + + __tablename__ = "groups" + __keyspace__ = None + + id = None + groups = None + countries = None + + # Status + PARKED = "parked" + UNDER_SPIN_UP = "under-spin-up" + COMPLETED = "completed" + REHOME = "rehome" + FAILED = "failed" + + @classmethod + def schema(cls): + """Return schema.""" + schema = { + 'id': 'text', + 'group':'text', + 'countries': 'map<text,text>', + 'PRIMARY KEY': '(id)' + } + return schema + + @classmethod + def atomic(cls): + """Use atomic operations""" + return True + + @classmethod + def pk_name(cls): + """Primary key name""" + return 'id' + + def pk_value(self): + """Primary key value""" + return self.id + + def values(self): + """Valu-es""" + value_dict = { + 'id': self.id, + 'group': self.group + } + return value_dict + + def update(self, group, updated_fields): + """Update country latency""" + api.MUSIC_API.row_complex_field_update( + self.__keyspace__, self.__tablename__, self.pk_name(), + self.pk_value(), group, updated_fields) + + def insert(self): + return \ + api.MUSIC_API.row_insert_by_condition( + self.__keyspace__, self.__tablename__, self.pk_name(), + self.pk_value(), self.values(), self.PARKED) + + def __init__(self, id=None, group=None, countries=None, _insert=False): + """Initializer""" + super(Groups, self).__init__() + self.id = id + self.group = group + self.countries = countries + + if _insert: + self.insert() + + def __repr__(self): + """Object representation""" + return '<Groups {}>'.format(self.id) + + def __json__(self): + """JSON representation""" + json_ = {} + json_[id] = self.id, + json_['group'] = self.group + json_['countries'] = self.countries + + return json_ diff --git a/conductor/conductor/common/models/order_lock.py b/conductor/conductor/common/models/order_lock.py new file mode 100644 index 0000000..ccbdd51 --- /dev/null +++ b/conductor/conductor/common/models/order_lock.py @@ -0,0 +1,118 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json +import time +from conductor.common.models import validate_uuid4 +from conductor.common.music.model import base +from conductor.common.music import api + +class OrderLock(base.Base): + + __tablename__ = "order_locks" + __keyspace__ = None + + id = None + plans = None + is_spinup_completed = None + spinup_completed_timestamp = None + + # Status + PARKED = "parked" + UNDER_SPIN_UP = "under-spin-up" + COMPLETED = "completed" + REHOME = "rehome" + FAILED = "failed" + + SPINING = [PARKED, UNDER_SPIN_UP] + REHOMABLE = [REHOME, COMPLETED] + + @classmethod + def schema(cls): + """Return schema.""" + schema = { + 'id': 'text', + 'plans': 'map<text, text>', + 'is_spinup_completed': 'boolean', + 'spinup_completed_timestamp': 'bigint', + 'PRIMARY KEY': '(id)' + } + return schema + + @classmethod + def atomic(cls): + """Use atomic operations""" + return True + + @classmethod + def pk_name(cls): + """Primary key name""" + return 'id' + + def pk_value(self): + """Primary key value""" + return self.id + + def values(self): + """Valu-es""" + value_dict = { + 'id' : self.id, + 'plans': self.plans, + 'is_spinup_completed': self.is_spinup_completed, + 'spinup_completed_timestamp': self.spinup_completed_timestamp + } + return value_dict + + def update(self, plan_id, updated_fields, values=None): + """Update order lock""" + api.MUSIC_API.row_complex_field_update( + self.__keyspace__, self.__tablename__, self.pk_name(), + self.pk_value(), plan_id, updated_fields, values) + + def insert(self): + return \ + api.MUSIC_API.row_insert_by_condition( + self.__keyspace__, self.__tablename__, self.pk_name(), + self.pk_value(), self.values(), self.PARKED) + + def __init__(self, id=None, plans=None, is_spinup_completed=False, spinup_completed_timestamp=None, _insert=False): + """Initializer""" + super(OrderLock, self).__init__() + # Breaking here with errot: Can't set attribute (TODO: Ikram/Rupali) + self.id = id + self.plans = plans + self.is_spinup_completed = is_spinup_completed + self.spinup_completed_timestamp = spinup_completed_timestamp + + if _insert: + self.insert() + + def __repr__(self): + """Object representation""" + return '<OrderLock {}>'.format(self.id) + + def __json__(self): + """JSON representation""" + json_ = {} + json_[id] = self.id, + json_['plans'] = self.plans + json_['is_spinup_completed'] = self.is_spinup_completed + json_['spinup_completed_timestamp'] = self.spinup_completed_timestamp + + return json_ diff --git a/conductor/conductor/common/models/order_lock_history.py b/conductor/conductor/common/models/order_lock_history.py new file mode 100644 index 0000000..79d79bf --- /dev/null +++ b/conductor/conductor/common/models/order_lock_history.py @@ -0,0 +1,111 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json +import time +from conductor.common.models import validate_uuid4 +from conductor.common.music.model import base +from conductor.common.music import api + +class OrderLockHistory(base.Base): + + __tablename__ = "order_locks_history" + __keyspace__ = None + + id = None + conflict_id = None + plans = None + is_spinup_completed = None + spinup_completed_timestamp = None + + # Status + PARKED = "parked" + UNDER_SPIN_UP = "under-spin-up" + COMPLETED = "completed" + REHOME = "rehome" + FAILED = "failed" + + SPINING = [PARKED, UNDER_SPIN_UP] + REHOMABLE = [REHOME, COMPLETED] + + @classmethod + def schema(cls): + """Return schema.""" + schema = { + 'id': 'text', + 'conflict_id': 'text', + 'plans': 'map<text, text>', + 'is_spinup_completed': 'boolean', + 'spinup_completed_timestamp': 'bigint', + 'PRIMARY KEY': '(id)' + } + return schema + + @classmethod + def atomic(cls): + """Use atomic operations""" + return True + + @classmethod + def pk_name(cls): + """Primary key name""" + return 'id' + + def pk_value(self): + """Primary key value""" + return self.id + + def values(self): + """Valu-es""" + value_dict = { + 'conflict_id' : self.conflict_id, + 'plans': self.plans, + 'is_spinup_completed': self.is_spinup_completed, + 'spinup_completed_timestamp': self.spinup_completed_timestamp + } + if self.id: + value_dict['id'] = self.id + return value_dict + + def __init__(self, id=None, conflict_id=None, plans=None, is_spinup_completed=False, spinup_completed_timestamp=None, _insert=False): + """Initializer""" + super(OrderLockHistory, self).__init__() + # Breaking here with errot: Can't set attribute (TODO: Ikram/Rupali) + self.conflict_id = conflict_id + self.plans = plans + self.is_spinup_completed = is_spinup_completed + self.spinup_completed_timestamp = spinup_completed_timestamp + + if _insert: + self.insert() + + def __repr__(self): + """Object representation""" + return '<OrderLock {}>'.format(self.id) + + def __json__(self): + """JSON representation""" + json_ = {} + json_[id] = self.id, + json_['conflict_id'] = self.conflict_id + json_['plans'] = self.plans + json_['is_spinup_completed'] = self.is_spinup_completed + json_['spinup_completed_timestamp'] = self.spinup_completed_timestamp + + return json_ diff --git a/conductor/conductor/common/models/plan.py b/conductor/conductor/common/models/plan.py index 8affdff..1c293c0 100644 --- a/conductor/conductor/common/models/plan.py +++ b/conductor/conductor/common/models/plan.py @@ -21,6 +21,7 @@ import json import time +import os from conductor.common.models import validate_uuid4 from conductor.common.music.model import base @@ -55,6 +56,7 @@ class Plan(base.Base): message = None translation_owner = None translation_counter = None + translation_begin_timestamp = None solver_owner = None solver_counter = None reservation_owner = None @@ -65,7 +67,7 @@ class Plan(base.Base): # Status TEMPLATE = "template" # Template ready for translation - TRANSLATING = "translating" # Translating the template + TRANSLATING = "translating" # Translating the template TRANSLATED = "translated" # Translation ready for solving SOLVING = "solving" # Search for solutions in progress # Search complete, solution with n>0 recommendations found @@ -77,10 +79,14 @@ class Plan(base.Base): RESERVING = "reserving" # Final state, Solved and Reserved resources (if required) DONE = "done" + # if any cloud candidate in the solution under spin-up in MSO, + # the plan goes to 'waiting spinup' state + WAITING_SPINUP = "waiting spinup" + STATUS = [TEMPLATE, TRANSLATING, TRANSLATED, SOLVING, SOLVED, NOT_FOUND, - ERROR, RESERVING, DONE, ] - WORKING = [TEMPLATE, TRANSLATING, TRANSLATED, SOLVING, RESERVING, ] - FINISHED = [TRANSLATED, SOLVED, NOT_FOUND, ERROR, DONE, ] + ERROR, WAITING_SPINUP, RESERVING, DONE, ] + WORKING = [TEMPLATE, TRANSLATING, SOLVING, RESERVING, ] + FINISHED = [TRANSLATED, SOLVED, NOT_FOUND, ERROR, DONE, WAITING_SPINUP] @classmethod def schema(cls): @@ -92,7 +98,7 @@ class Plan(base.Base): 'updated': 'bigint', # Last update time in msec from epoch 'name': 'text', # Plan name/alias 'timeout': 'int', # Timeout in seconds - 'recommend_max': 'int', # Max recommendations + 'recommend_max': 'text', # Max recommendations 'message': 'text', # Message (e.g., error or other info) 'template': 'text', # Plan template 'translation': 'text', # Translated template for the solver @@ -103,6 +109,7 @@ class Plan(base.Base): 'translation_counter': 'int', 'solver_counter': 'int', 'reservation_counter': 'int', + 'translation_begin_timestamp': 'bigint', 'PRIMARY KEY': '(id)', } return schema @@ -147,6 +154,17 @@ class Plan(base.Base): def working(self): return self.status in self.WORKING + def rehome_plan(self): + """reset the field values when rehoming a plan""" + self.status = self.TEMPLATE + self.updated = current_time_millis() + self.message = "" + self.translation_counter = 0 + self.solver_counter = 0 + self.reservation_counter = 0 + self.translation = {} + self.solution = {} + def update(self, condition=None): """Update plan @@ -170,6 +188,7 @@ class Plan(base.Base): 'solution': json.dumps(self.solution), 'translation_owner': self.translation_owner, 'translation_counter': self.translation_counter, + 'translation_begin_timestamp': self.translation_begin_timestamp, 'solver_owner': self.solver_owner, 'solver_counter': self.solver_counter, 'reservation_owner': self.reservation_owner, @@ -183,9 +202,9 @@ class Plan(base.Base): id=None, created=None, updated=None, status=None, message=None, translation=None, solution=None, translation_owner=None, solver_owner=None, - reservation_owner=None, translation_counter = None, - solver_counter = None, reservation_counter = None, - _insert=True): + reservation_owner=None, translation_counter=None, + solver_counter=None, reservation_counter=None, + translation_begin_timestamp=None, _insert=True): """Initializer""" super(Plan, self).__init__() self.status = status or self.TEMPLATE @@ -203,6 +222,7 @@ class Plan(base.Base): self.translation_counter = translation_counter or 0 self.solver_counter = solver_counter or 0 self.reservation_counter = reservation_counter or 0 + self.translation_begin_timestamp = translation_begin_timestamp if _insert: if validate_uuid4(id): @@ -240,10 +260,4 @@ class Plan(base.Base): json_['solver_counter'] = self.solver_counter json_['reservation_owner'] = self.reservation_owner json_['reservation_counter'] = self.reservation_counter - json_['translation_owner'] = self.translation_owner - json_['translation_counter'] = self.translation_counter - json_['solver_owner'] = self.solver_owner - json_['solver_counter'] = self.solver_counter - json_['reservation_owner'] = self.reservation_owner - json_['reservation_counter'] = self.reservation_counter return json_ diff --git a/conductor/conductor/common/models/region_placeholders.py b/conductor/conductor/common/models/region_placeholders.py new file mode 100644 index 0000000..b3845a0 --- /dev/null +++ b/conductor/conductor/common/models/region_placeholders.py @@ -0,0 +1,108 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from conductor.common.music.model import base +from conductor.common.music import api + + +class RegionPlaceholders(base.Base): + + + __tablename__ = "region_placeholders" + __keyspace__ = None + + id = None + region_name = None + countries = None + + # Status + PARKED = "parked" + UNDER_SPIN_UP = "under-spin-up" + COMPLETED = "completed" + REHOME = "rehome" + FAILED = "failed" + + @classmethod + def schema(cls): + """Return schema.""" + schema = { + 'id': 'text', + 'region_name':'text', + 'countries': 'map<text,text>', + 'PRIMARY KEY': '(id)' + } + return schema + + @classmethod + def atomic(cls): + """Use atomic operations""" + return True + + @classmethod + def pk_name(cls): + """Primary key name""" + return 'id' + + def pk_value(self): + """Primary key value""" + return self.id + + def values(self): + """Valu-es""" + value_dict = { + 'region_name': self.region_name, + 'countries': self.countries + } + if self.id: + value_dict['id'] = self.id + return value_dict + + def delete(self, region_id): + """Update country latency""" + return api.MUSIC_API.row_delete(self.__keyspace__, self.__tablename__, self.pk_name(), + region_id, True) + + + def update(self, region_name, updated_fields): + """Update country latency""" + api.MUSIC_API.row_complex_field_update( + self.__keyspace__, self.__tablename__, self.pk_name(), + self.pk_value(), region_name, updated_fields) + + def __init__(self, region_name=None, countries=None, _insert=False): + """Initializer""" + super(RegionPlaceholders, self).__init__() + self.region_name = region_name + self.countries = countries + + if _insert: + return self.insert() + + def __repr__(self): + """Object representation""" + return '<RegionPlaceholders {}>'.format(self.id) + + def __json__(self): + """JSON representation""" + json_ = {} + json_[id] = self.id, + json_['region_name'] = self.region_name + json_['countries'] = self.countries + + return json_ diff --git a/conductor/conductor/common/models/triage_tool.py b/conductor/conductor/common/models/triage_tool.py new file mode 100644 index 0000000..2f5f0b5 --- /dev/null +++ b/conductor/conductor/common/models/triage_tool.py @@ -0,0 +1,102 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json +from conductor.common.models import validate_uuid4 +from conductor.common.music.model import base +from conductor.common.music import api + +class TriageTool(base.Base): + __tablename__ = "triage_tool" + __keyspace__ = None + id =None + name = None + optimization_type = None + triage_solver = None + triage_translator = None + @classmethod + def schema(cls): + schema = { + 'id': 'text', + 'name': 'text', + "optimization_type" : 'text', + 'triage_solver': 'text', + 'triage_translator': 'text', + 'PRIMARY KEY': '(id)' + } + return schema + + @classmethod + def atomic(cls): + """Use atomic operations""" + return True + + @classmethod + def pk_name(cls): + """Primary key name""" + return 'id' + + def pk_value(self): + """Primary key value""" + return self.id + + def values(self): + """Valu-es""" + value_dict = { + 'id': self.id, + 'name': self.name, + 'optimization_type' : self.optimization_type, + 'triage_translator': json.dumps(self.triage_translator), + 'triage_solver': json.dumps(self.triage_solver) + } + return value_dict + + def __init__(self, id=None, name=None, optimization_type=None, triage_solver=None, triage_translator=None, _insert=False): + + super(TriageTool, self).__init__() + self.id = id + self.optimization_type = optimization_type + #self.triage_solver = triage_solver + #self.triage_translator = triage_translator + self.name = name + if triage_solver is not None: + self.triage_solver = json.loads(triage_solver) + else: + self.triage_solver = triage_solver + if triage_translator is not None: + self.triage_translator = json.loads(triage_translator) + else: + self.triage_translator = triage_translator + # if _insert: + # self.insert() + + def __repr__(self): + """Object representation""" + return '<Triage Tool {}>'.format(self.id) + + def __json__(self): + """JSON representation""" + json_ = {} + json_[id] = self.id, + json_['optimization_type'] = self.optimization_type, + json_['triage_solver'] = self.triage_solver, + json_['triage_translator'] = self.triage_translator, + json_['name'] = self.name + + return json_ diff --git a/conductor/conductor/common/music/api.py b/conductor/conductor/common/music/api.py index 2517b8c..c212a29 100644 --- a/conductor/conductor/common/music/api.py +++ b/conductor/conductor/common/music/api.py @@ -18,16 +18,19 @@ # """Music Data Store API""" -import base64 + import copy import logging +import json import time -from conductor.common import rest -from conductor.i18n import _LE, _LI # pylint: disable=W0212 from oslo_config import cfg from oslo_log import log +from conductor.common import rest +from conductor.common.utils import basic_auth_util +from conductor.i18n import _LE, _LI # pylint: disable=W0212 + LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -67,6 +70,7 @@ MUSIC_API_OPTS = [ help='Use mock API'), cfg.StrOpt('music_topology', default='SimpleStrategy'), + #TODO(larry); make the config inputs more generic cfg.StrOpt('first_datacenter_name', help='Name of the first data center'), cfg.IntOpt('first_datacenter_replicas', @@ -79,16 +83,11 @@ MUSIC_API_OPTS = [ help='Name of the third data center'), cfg.IntOpt('third_datacenter_replicas', help='Number of replicas in third data center'), - cfg.BoolOpt('music_new_version', - help='new or old version'), - cfg.StrOpt('music_version', - help='for version'), - cfg.StrOpt('aafuser', - help='for header value'), - cfg.StrOpt('aafpass', - help='for header value'), - cfg.StrOpt('aafns', - help='for header value'), + cfg.BoolOpt('music_new_version', help='new or old version'), + cfg.StrOpt('music_version', help='for version'), + cfg.StrOpt('aafuser', help='username value that used for creating basic authorization header'), + cfg.StrOpt('aafpass', help='password value that used for creating basic authorization header'), + cfg.StrOpt('aafns', help='AAF namespace field used in MUSIC request header'), ] CONF.register_opts(MUSIC_API_OPTS, group='music_api') @@ -107,6 +106,9 @@ class MusicAPI(object): """Initializer.""" global MUSIC_API + # set the urllib log level to ERROR + logging.getLogger('urllib3').setLevel(logging.ERROR) + LOG.info(_LI("Initializing Music API")) server_url = CONF.music_api.server_url.rstrip('/') if not server_url: @@ -117,6 +119,7 @@ class MusicAPI(object): host = next(iter(CONF.music_api.hostnames or []), 'controller') port = CONF.music_api.port or 8080 path = CONF.music_api.path or '/MUSIC/rest' + version = CONF.version server_url = 'http://{}:{}/{}'.format( host, port, version, path.rstrip('/').lstrip('/')) @@ -132,15 +135,13 @@ class MusicAPI(object): MUSIC_version = CONF.music_api.music_version.split(".") self.rest.session.headers['content-type'] = 'application/json' + self.rest.session.headers['X-minorVersion'] = MUSIC_version[1] self.rest.session.headers['X-patchVersion'] = MUSIC_version[2] self.rest.session.headers['ns'] = CONF.music_api.aafns - # auth_str = 'Basic {}'.format(base64.encodestring( - # '{}:{}'.format(CONF.music_api.aafuser, - # CONF.music_api.aafpass)).strip()) - # self.rest.session.headers['Authorization'] = auth_str - self.rest.session.headers['userId'] = CONF.music_api.aafuser self.rest.session.headers['password'] = CONF.music_api.aafpass + self.rest.session.headers['Authorization'] = basic_auth_util.encode(CONF.music_api.aafuser, + CONF.music_api.aafpass) self.lock_ids = {} @@ -149,6 +150,7 @@ class MusicAPI(object): self.replication_factor = CONF.music_api.replication_factor self.music_topology = CONF.music_api.music_topology + # TODO(larry) make the code more generic self.first_datacenter_name = CONF.music_api.first_datacenter_name self.first_datacenter_replicas = CONF.music_api.first_datacenter_replicas self.second_datacenter_name = CONF.music_api.second_datacenter_name @@ -191,6 +193,7 @@ class MusicAPI(object): def _lock_id_create(self, lock_name): """Returns the lock id. Use for acquiring and releasing.""" + path = '/locks/create/%s' % lock_name response = self.rest.request(method='post', content_type='text/plain', path=path) @@ -200,6 +203,7 @@ class MusicAPI(object): return lock_id def _lock_id_acquire(self, lock_id): + """Acquire a lock by id. Returns True if successful.""" path = '/locks/acquire/%s' % lock_id response = self.rest.request(method='get', @@ -220,7 +224,7 @@ class MusicAPI(object): pk_value=None, atomic=False, condition=None): """Initialize payload for Music requests. - Supports atomic operations. + Supports atomic operations, Returns a payload of data and lock_name (if any). """ #if atomic: @@ -228,17 +232,18 @@ class MusicAPI(object): #else: # lock_name = None - # lock_id = self.lock_ids.get(lock_name) + #lock_id = self.lock_ids.get(lock_name) data = { 'consistencyInfo': { 'type': 'atomic' if condition else 'eventual', } } + if condition: data['conditions'] = condition - # , 'lock_name': lock_name + #, 'lock_name': lock_name return {'data': data} def payload_delete(self, payload): @@ -252,10 +257,12 @@ class MusicAPI(object): self.lock_delete(lock_name) def keyspace_create(self, keyspace): + """Creates a keyspace.""" payload = self.payload_init() data = payload.get('data') data['durabilityOfWrites'] = True + replication_info = { 'class': self.music_topology, } @@ -276,6 +283,11 @@ class MusicAPI(object): if CONF.music_api.debug: LOG.debug("Creating keyspace {}".format(keyspace)) response = self.rest.request(method='post', path=path, data=data) + + if response and CONF.music_api.music_new_version: + result = response.json().get('result') + return result + return response and response.ok def keyspace_delete(self, keyspace): @@ -329,7 +341,7 @@ class MusicAPI(object): return response and response.ok def row_create(self, keyspace, table, # pylint: disable=R0913 - pk_name, pk_value, values, atomic=False): + pk_name, pk_value, values, atomic=False, conditional=False): """Create a row.""" payload = self.payload_init(keyspace, table, pk_value, atomic) data = payload.get('data') @@ -358,7 +370,12 @@ class MusicAPI(object): LOG.debug("Updating row with pk_value {} in table " "{}, keyspace {}".format(pk_value, table, keyspace)) response = self.rest.request(method='put', path=path, data=data) - # self.payload_delete(payload) + #self.payload_delete(payload) + if response is not None and CONF.music_api.music_new_version: + response_json = json.loads(response.content) + response_status = response_json.get("status") + return response_status + return response and response.ok and response.content def row_read(self, keyspace, table, pk_name=None, pk_value=None): @@ -368,6 +385,11 @@ class MusicAPI(object): LOG.debug("Reading row with pk_value {} from table " "{}, keyspace {}".format(pk_value, table, keyspace)) response = self.rest.request(path=path) + + if response is not None and CONF.music_api.music_new_version: + result = response.json().get('result') or {} + return result + return response and response.json() def row_delete(self, keyspace, table, pk_name, pk_value, atomic=False): @@ -383,6 +405,109 @@ class MusicAPI(object): self.payload_delete(payload) return response and response.ok + def row_insert_by_condition(self, keyspace, table, pk_name, pk_value, values, exists_status): + + """Insert a row with certain condition.""" + # Get the plan id from plans field + plan_id = next(iter(values.get('plans'))) + + # If id does not exist in order_locks table, insert the 'values_when_id_non_exist' + values_when_id_non_exist = values.get('plans')[plan_id] + + # If id exists in order_locks table, insert the 'values_when_id_exist' + values_when_id_exist = copy.deepcopy(values_when_id_non_exist) + values_when_id_exist['status'] = exists_status + + # Common values for new MUSIC api + common_values = copy.deepcopy(values_when_id_non_exist) + common_values.pop('status', None) + + if (CONF.music_api.music_new_version): + # Conditional Insert request body sends to new version of MUSIC (2.5.5 and lator) + data = { + "primaryKey": pk_name, + "primaryKeyValue": pk_value, + + "casscadeColumnName": "plans", + "tableValues": { + "id": pk_value, + "is_spinup_completed": values.get('is_spinup_completed') + }, + "casscadeColumnData": { + "key": plan_id, + "value": common_values + }, + "conditions": { + "exists": { + "status": values_when_id_exist.get('status') + }, + "nonexists": { + "status": values_when_id_non_exist.get('status') + } + } + } + + else: + data = { + "primaryKey": pk_name, + "primaryKeyValue": pk_value, + "cascadeColumnKey": plan_id, + "cascadeColumnName": "plans", + "values":{ + "id": pk_value, + "is_spinup_completed": values.get('is_spinup_completed') + }, + "nonExistsCondition": { + "value": values_when_id_non_exist + }, + "existsCondition": { + "value": values_when_id_exist + } + } + + #conditional/update/keyspaces/conductor_order_locks/tables/order_locks + path = '/conditional/insert/keyspaces/%(keyspace)s/tables/%(table)s' % { + 'keyspace': keyspace, + 'table': table, + } + response = self.rest.request(method='post', path=path, data=data) + return response + + + def row_complex_field_update(self, keyspace, table, pk_name, pk_value, plan_id, updated_fields, values): + + if (CONF.music_api.music_new_version): + # new version of MUSIC + data = { + "primaryKey": pk_name, + "primaryKeyValue": pk_value, + "casscadeColumnName": "plans", + "tableValues": values, + "casscadeColumnData": { + "key": plan_id, + "value": updated_fields + } + } + else: + data = { + "primaryKey": pk_name, + "primaryKeyValue": pk_value, + "cascadeColumnName": "plans", + "planId": plan_id, + "updateStatus": updated_fields, + "values": values + } + + path = '/conditional/update/keyspaces/%(keyspace)s/tables/%(table)s' % { + 'keyspace': keyspace, + 'table': table, + } + response = self.rest.request(method='put', path=path, data=data) + LOG.debug("Updated the order {} status to {} for conflict_id {} in " + "order_locks table, response from MUSIC {}".format(plan_id, updated_fields, pk_value, response)) + return response and response.ok + + @staticmethod def _table_path_generate(keyspace, table): path = '/keyspaces/%(keyspace)s/tables/%(table)s/' % { @@ -438,9 +563,6 @@ class MockAPI(object): global MUSIC_API - # set the urllib log level to ERROR - logging.getLogger('urllib3').setLevel(logging.ERROR) - self.music['keyspaces'] = {} MUSIC_API = self diff --git a/conductor/conductor/common/music/messaging/component.py b/conductor/conductor/common/music/messaging/component.py index ccfbdcf..6ecbe18 100644 --- a/conductor/conductor/common/music/messaging/component.py +++ b/conductor/conductor/common/music/messaging/component.py @@ -18,6 +18,7 @@ # import inspect +import json import sys import time import socket @@ -46,16 +47,15 @@ MESSAGING_SERVER_OPTS = [ help='Wait interval while checking for a message response. ' 'Default value is 1 second.'), cfg.IntOpt('response_timeout', - default=20, + default=120, min=1, help='Overall message response timeout. ' - 'Default value is 20 seconds.'), + 'Default value is 120 seconds.'), cfg.IntOpt('timeout', - default=600, + default=300, min=1, help='Timeout for detecting a VM is down, and other VMs can pick the plan up. ' - 'This value should be larger than solver_timeout' - 'Default value is 10 minutes. (integer value)'), + 'Default value is 5 minutes. (integer value)'), cfg.IntOpt('workers', default=1, min=1, @@ -105,8 +105,8 @@ class Target(object): baseclass=message.Message, classname=self.topic) if not self._topic_class: - raise RuntimeError("Error setting the topic class " - "for the messaging layer.") + RuntimeError("Error setting the topic class " + "for the messaging layer.") @property def topic(self): @@ -208,8 +208,7 @@ class RPCClient(object): # Check message status within a thread executor = futurist.ThreadPoolExecutor() started_at = time.time() - while (time.time() - started_at) <= \ - self.conf.messaging_server.timeout: + while (time.time() - started_at) <= self.conf.messaging_server.response_timeout: fut = executor.submit(self.__check_rpc_status, rpc_id, method) rpc = fut.result() if rpc and rpc.finished: @@ -276,6 +275,7 @@ class RPCService(cotyledon.Service): self.kwargs = kwargs self.RPC = self.target.topic_class self.name = "{}, topic({})".format(RPCSVRNAME, self.target.topic) + self.messaging_owner_condition = { "owner": socket.gethostname() } @@ -345,14 +345,13 @@ class RPCService(cotyledon.Service): # Find the first msg marked as enqueued. if msg.working and \ - (self.current_time_seconds() - self.millisec_to_sec(msg.updated)) > \ - self.conf.messaging_server.timeout: + (self.current_time_seconds() - self.millisec_to_sec(msg.updated))\ + > self.conf.messaging_server.response_timeout: msg.status = message.Message.ENQUEUED msg.update(condition=self.working_status_condition) if not msg.enqueued: continue - if 'plan_name' in msg.ctxt.keys(): LOG.info('Plan name: {}'.format(msg.ctxt['plan_name'])) elif 'plan_name' in msg.args.keys(): @@ -364,7 +363,7 @@ class RPCService(cotyledon.Service): # All update should have a condition (status == enqueued) _is_updated = msg.update(condition=self.enqueued_status_condition) - if 'FAILURE' in _is_updated: + if not _is_updated or 'FAILURE' in _is_updated: continue # RPC methods must not start/end with an underscore. @@ -410,6 +409,7 @@ class RPCService(cotyledon.Service): failure = None try: + # Add the template to conductor.plan table # Methods return an opaque dictionary result = method(msg.ctxt, msg.args) @@ -440,9 +440,11 @@ class RPCService(cotyledon.Service): LOG.debug("Message {} method {}, response: {}".format( msg.id, msg.method, msg.response)) - _is_success = 'FAILURE | Could not acquire lock' - while 'FAILURE | Could not acquire lock' in _is_success: - _is_success = msg.update(condition=self.messaging_owner_condition) + _is_success = 'FAILURE' + while 'FAILURE' in _is_success and (self.current_time_seconds() - self.millisec_to_sec(msg.updated)) <= self.conf.messaging_server.response_timeout: + _is_success = msg.update() + LOG.info(_LI("updating the message status from working to {}, " + "atomic update response from MUSIC {}").format(msg.status, _is_success)) except Exception: LOG.exception(_LE("Can not send reply for message {} " @@ -472,9 +474,6 @@ class RPCService(cotyledon.Service): # Listen for messages within a thread executor = futurist.ThreadPoolExecutor() while self.running: - # Delay time (Seconds) for MUSIC requests. - time.sleep(self.conf.delay_time) - fut = executor.submit(self.__check_for_messages) fut.result() executor.shutdown() diff --git a/conductor/conductor/common/music/model/base.py b/conductor/conductor/common/music/model/base.py index 89f4f71..0b6f638 100644 --- a/conductor/conductor/common/music/model/base.py +++ b/conductor/conductor/common/music/model/base.py @@ -101,6 +101,7 @@ class Base(object): kwargs['values'] = self.values() kwargs['atomic'] = self.atomic() pk_name = kwargs['pk_name'] + if pk_name not in kwargs['values']: # TODO(jdandrea): Make uuid4() generation a default method in Base. the_id = str(uuid.uuid4()) @@ -109,7 +110,8 @@ class Base(object): setattr(self, pk_name, the_id) else: kwargs['pk_value'] = kwargs['values'][pk_name] - api.MUSIC_API.row_create(**kwargs) + response = api.MUSIC_API.row_create(**kwargs) + return response def update(self, condition=None): """Update row""" @@ -123,8 +125,9 @@ class Base(object): kwargs['condition'] = condition # FIXME(jdandrea): Do we need this test/pop clause? pk_name = kwargs['pk_name'] - if pk_name in kwargs['values']: - kwargs['values'].pop(pk_name) + if kwargs['table'] != ('order_locks'): + if pk_name in kwargs['values']: + kwargs['values'].pop(pk_name) return api.MUSIC_API.row_update(**kwargs) def delete(self): diff --git a/conductor/conductor/common/music/model/search.py b/conductor/conductor/common/music/model/search.py index 9680da9..3cb665e 100644 --- a/conductor/conductor/common/music/model/search.py +++ b/conductor/conductor/common/music/model/search.py @@ -59,7 +59,7 @@ class Query(object): """Convert query response rows to objects""" results = [] pk_name = self.model.pk_name() # pylint: disable=E1101 - for row_id, row in rows.items(): # pylint: disable=W0612 + for row_id, row in rows.items():# pylint: disable=W0612 the_id = row.pop(pk_name) result = self.model(_insert=False, **row) setattr(result, pk_name, the_id) @@ -72,30 +72,21 @@ class Query(object): kwargs = self.__kwargs() rows = api.MUSIC_API.row_read( pk_name=pk_name, pk_value=pk_value, **kwargs) - - if 'result' in rows: - return (self.__rows_to_objects(rows['result']).first()) - else: - return (self.__rows_to_objects(rows).first()) + return (self.__rows_to_objects(rows).first()) def all(self): """Return all objects""" kwargs = self.__kwargs() rows = api.MUSIC_API.row_read(**kwargs) - - # Accommodate both Music 2.1 and 2.2 responses - if 'result' in rows: - return self.__rows_to_objects(rows['result']) - else: - return self.__rows_to_objects(rows) + return self.__rows_to_objects(rows) - def get_plan_by_id(self, plan_id): - """Return the plan with specific id""" - plan = self.one(plan_id) - - items = Results([]) - items.append(plan) - return items + def get_plan_by_col(self, pk_name, pk_value): + # Before using this method, create an index the column (except the primary key) + # you want to filter by. + kwargs = self.__kwargs() + rows = api.MUSIC_API.row_read( + pk_name=pk_name, pk_value=pk_value, **kwargs) + return self.__rows_to_objects(rows) def filter_by(self, **kwargs): """Filter objects""" @@ -103,7 +94,6 @@ class Query(object): # We need to get all items and then go looking for what we want. all_items = self.all() filtered_items = Results([]) - # For every candidate ... for item in all_items: passes = True diff --git a/conductor/conductor/common/rest.py b/conductor/conductor/common/rest.py index 7bbe5af..585a628 100644 --- a/conductor/conductor/common/rest.py +++ b/conductor/conductor/common/rest.py @@ -137,7 +137,9 @@ class REST(object): response = None for attempt in range(self.retries): if attempt > 0: - LOG.warn(_LW("Retry #{}/{}").format( + # No need to show 400 bad requests from Music - Ignorable when lock cannot be received at one particular point in time + if "MUSIC" not in full_url: + LOG.warn(_LW("Retry #{}/{}").format( attempt + 1, self.retries)) try: @@ -149,8 +151,10 @@ class REST(object): response.close() if not response.ok: - LOG.warn("Response Status: {} {}".format( - response.status_code, response.reason)) + # No need to show 400 bad requests from Music - Ignorable when lock cannot be received at one particular point in time + if "MUSIC" not in full_url: + LOG.warn("Response Status: {} {}".format( + response.status_code, response.reason)) if self.log_debug and response.text: try: response_dict = json.loads(response.text) @@ -167,6 +171,8 @@ class REST(object): # That means we must check the object type vs treating it as a bool. # More info: https://github.com/kennethreitz/requests/issues/2002 if isinstance(response, requests.models.Response) and not response.ok: - LOG.error(_LE("Status {} {} after {} retries for URL: {}").format( - response.status_code, response.reason, self.retries, full_url)) + # No need to show 400 bad requests from Music - Ignorable when lock cannot be received at one particular point in time + if "MUSIC" not in full_url: + LOG.error(_LE("Status {} {} after {} retries for URL: {}").format( + response.status_code, response.reason, self.retries, full_url)) return response diff --git a/conductor/conductor/common/utils/__init__.py b/conductor/conductor/common/utils/__init__.py index e69de29..e69de29 100755..100644 --- a/conductor/conductor/common/utils/__init__.py +++ b/conductor/conductor/common/utils/__init__.py diff --git a/conductor/conductor/common/utils/basic_auth_util.py b/conductor/conductor/common/utils/basic_auth_util.py new file mode 100644 index 0000000..a94418f --- /dev/null +++ b/conductor/conductor/common/utils/basic_auth_util.py @@ -0,0 +1,35 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import base64 + +from conductor.i18n import _, _LI, _LE +from oslo_log import log + +LOG = log.getLogger(__name__) + + +def encode(user_id, password): + """ Provide the basic authencation encoded value in an 'Authorization' Header """ + + user_pass = user_id + ':' + password + base64_val = base64.b64encode(user_pass) + authorization_val = _LE("Basic {}".format(base64_val)) + + return authorization_val diff --git a/conductor/conductor/common/utils/conductor_logging_util.py b/conductor/conductor/common/utils/conductor_logging_util.py index 13da6ff..b6ba105 100755..100644 --- a/conductor/conductor/common/utils/conductor_logging_util.py +++ b/conductor/conductor/common/utils/conductor_logging_util.py @@ -1,3 +1,22 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + import json import logging from conductor.common.music import api @@ -12,25 +31,30 @@ class LoggerFilter(logging.Filter): def getTransactionId(keyspace, plan_id): """ get transaction id from a pariticular plan in MUSIC """ + rows = api.API().row_read(keyspace, "plans", "id", plan_id) - if 'result' in rows: rows = rows['result'] - for row_id, row_value in rows.items(): - template = row_value['template'] - if template: - data = json.loads(template) - if "transaction-id" in data: - return data["transaction-id"] + template = row_value['template'] + if template: + data = json.loads(template) + if "transaction-id" in data: + return data["transaction-id"] def setLoggerFilter(logger, keyspace, plan_id): #formatter = logging.Formatter('%(asctime)s %(transaction_id)s %(levelname)s %(name)s: [-] %(plan_id)s %(message)s') - generic_formatter = logging.Formatter('%(asctime)s|%(transaction_id)s|%(thread)d|%(levelname)s|%(module)s|%(name)s: [-] plan id: %(plan_id)s [-] %(message)s') - audit_formatter = logging.Formatter('%(asctime)s|%(asctime)s|%(transaction_id)s||%(thread)d||Conductor|N/A|COMPLETE|200|sucessful||%(levelname)s|||0|%(module)s|||||||||%(name)s : [-] plan id: %(plan_id)s [-] %(message)s') - metric_formatter = logging.Formatter('%(asctime)s|%(asctime)s|%(transaction_id)s||%(thread)d||Conductor|N/A|N/A|N/A|COMPLETE|200|sucessful||%(levelname)s|||0|%(module)s||||||||||%(name)s : [-] plan id: %(plan_id)s [-] %(message)s') - error_formatter = logging.Formatter('%(asctime)s|%(transaction_id)s|%(thread)d|Conductor|N/A|N/A|N/A|ERROR|500|N/A|%(name)s : [-] plan id: %(plan_id)s [-] %(message)s') + generic_formatter = logging.Formatter('%(asctime)s|%(transaction_id)s|%(thread)d|%(levelname)s|%(module)s|%(name)s:' + ' [-] plan id: %(plan_id)s [-] %(message)s') + audit_formatter = logging.Formatter('%(asctime)s|%(asctime)s|%(transaction_id)s||%(thread)d||Conductor|N/A|COMPLETE' + '|200|sucessful||%(levelname)s|||0|%(module)s|||||||||%(name)s : [-] ' + 'plan id: %(plan_id)s [-] %(message)s') + metric_formatter = logging.Formatter('%(asctime)s|%(asctime)s|%(transaction_id)s||%(thread)d||Conductor|N/A|N/A|N/A|' + 'COMPLETE|200|sucessful||%(levelname)s|||0|%(module)s||||||||||%(name)s : [-] ' + 'plan id: %(plan_id)s [-] %(message)s') + error_formatter = logging.Formatter('%(asctime)s|%(transaction_id)s|%(thread)d|Conductor|N/A|N/A|N/A|ERROR|500' + '|N/A|%(name)s : [-] plan id: %(plan_id)s [-] %(message)s') logger_filter = LoggerFilter() logger_filter.transaction_id = getTransactionId(keyspace, plan_id) diff --git a/conductor/conductor/conf/adiod_controller.py b/conductor/conductor/conf/adiod_controller.py new file mode 100644 index 0000000..ef31f1c --- /dev/null +++ b/conductor/conductor/conf/adiod_controller.py @@ -0,0 +1,31 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from oslo_config import cfg +from conductor.i18n import _ + +ADC_CONTROLLER_EXT_MANAGER_OPTS = [ + cfg.ListOpt('extensions', + default=['sdnc_adiod'], + help=_('Extensions list to use')), +] + + +def register_extension_manager_opts(cfg=cfg.CONF): + cfg.register_opts(ADC_CONTROLLER_EXT_MANAGER_OPTS, 'adiod_controller') diff --git a/conductor/conductor/controller/rpc.py b/conductor/conductor/controller/rpc.py index 113e340..94770aa 100644 --- a/conductor/conductor/controller/rpc.py +++ b/conductor/conductor/controller/rpc.py @@ -17,10 +17,17 @@ # ------------------------------------------------------------------------- # +import json import uuid +from conductor.solver.orders_lock.orders_lock_service import OrdersLockingService +from conductor.solver.triage_tool.triage_tool_service import TriageToolService from oslo_log import log +from oslo_config import cfg + LOG = log.getLogger(__name__) +CONF = cfg.CONF + class ControllerRPCEndpoint(object): """Controller Endpoint""" @@ -28,13 +35,15 @@ class ControllerRPCEndpoint(object): def __init__(self, conf, plan_class): self.conf = conf self.Plan = plan_class + self.OrdersLockingService = OrdersLockingService() + self.TriageToolService = TriageToolService() def plan_create(self, ctx, arg): """Create a plan""" name = arg.get('name', str(uuid.uuid4())) LOG.info('Plan name: {}'.format(name)) timeout = arg.get('timeout', self.conf.controller.timeout) - recommend_max = arg.get('limit', self.conf.controller.limit) + recommend_max = arg.get('num_solution', self.conf.controller.limit) template = arg.get('template', None) status = self.Plan.TEMPLATE new_plan = self.Plan(name, timeout, recommend_max, template, @@ -62,7 +71,7 @@ class ControllerRPCEndpoint(object): """Delete one or more plans""" plan_id = arg.get('plan_id') if plan_id: - plans = self.Plan.query.get_plan_by_id(plan_id) + plans = self.Plan.query.get_plan_by_col('id', plan_id) else: plans = self.Plan.query.all() for the_plan in plans: @@ -77,7 +86,7 @@ class ControllerRPCEndpoint(object): """Get one or more plans""" plan_id = arg.get('plan_id') if plan_id: - plans = self.Plan.query.get_plan_by_id(plan_id) + plans = self.Plan.query.get_plan_by_col('id', plan_id) else: plans = self.Plan.query.all() @@ -100,3 +109,56 @@ class ControllerRPCEndpoint(object): 'response': {"plans": plan_list}, 'error': False} return rtn + + def triage_get(self, ctx, arg): + id = arg.get('id') + if id: + triage_data = self.TriageToolService._get_plans_by_id(id) + if not triage_data.triage_solver == None or type(triage_data.triage_solver) == "NoneType": + triage_solver = json.loads(triage_data.triage_solver) + else: + triage_solver = triage_data.triage_solver + + triage_data_list =[] + triage_data_json = { + "id":triage_data.id, + "name":triage_data.name, + "solver_triage": triage_solver, + "translator_triage":triage_data.triage_translator, + "optimization_type": json.loads(triage_data.optimization_type) + } + if hasattr(triage_data, 'message'): + triage_data_json["message"] = triage_data.message + + triage_data_list.append(triage_data_json) + + rtn = { + 'response': {"triageData": triage_data_list}, + 'error': False} + return rtn + + def release_orders(self, ctx, arg): + rehome_decisions = [] + release_orders = arg.get("release-locks") + LOG.info("Following Orders were received in this release call from MSO:{}".format(release_orders)) + + for release_order in release_orders: + rehome_decisions = self.OrdersLockingService.rehomes_for_service_resource(release_order['status'], + release_order['service-resource-id'], + rehome_decisions) + + self.OrdersLockingService.do_rehome(rehome_decisions) + + if not rehome_decisions: + response_msg = "Orders have been released, but no plans are effected in Conductor" + else: + response_msg = rehome_decisions + + LOG.info(response_msg) + rtn = { + 'response': { + "status": "success", + "message": response_msg + } + } + return rtn diff --git a/conductor/conductor/controller/service.py b/conductor/conductor/controller/service.py index 58f9d93..d28c37f 100644 --- a/conductor/conductor/controller/service.py +++ b/conductor/conductor/controller/service.py @@ -18,10 +18,9 @@ # import cotyledon -from oslo_config import cfg -from oslo_log import log from conductor.common.models import plan +from conductor.common.models import order_lock from conductor.common.music import api from conductor.common.music import messaging as music_messaging from conductor.common.music.model import base @@ -29,6 +28,8 @@ from conductor.controller import rpc from conductor.controller import translator_svc from conductor import messaging from conductor import service +from oslo_config import cfg +from oslo_log import log LOG = log.getLogger(__name__) @@ -56,10 +57,6 @@ CONTROLLER_OPTS = [ 'mode. When set to False, controller will flush any ' 'abandoned messages at startup. The controller always ' 'restarts abandoned template translations at startup.'), - cfg.IntOpt('weight1', - default=1), - cfg.IntOpt('weight2', - default=1), ] CONF.register_opts(CONTROLLER_OPTS, group='controller') @@ -71,7 +68,6 @@ CONF.register_opts(OPTS) class ControllerServiceLauncher(object): """Launcher for the controller service.""" - def __init__(self, conf): self.conf = conf @@ -82,9 +78,13 @@ class ControllerServiceLauncher(object): # Dynamically create a plan class for the specified keyspace self.Plan = base.create_dynamic_model( keyspace=conf.keyspace, baseclass=plan.Plan, classname="Plan") + self.OrderLock = base.create_dynamic_model( + keyspace=conf.keyspace, baseclass=order_lock.OrderLock, classname="OrderLock") if not self.Plan: raise + if not self.OrderLock: + raise def run(self): transport = messaging.get_transport(self.conf) @@ -102,7 +102,8 @@ class ControllerServiceLauncher(object): workers=self.conf.controller.workers, args=(self.conf,), kwargs=kwargs) - kwargs = {'plan_class': self.Plan, } + kwargs = {'plan_class': self.Plan, + 'order_locks': self.OrderLock} svcmgr.add(translator_svc.TranslatorService, workers=self.conf.controller.workers, args=(self.conf,), kwargs=kwargs) diff --git a/conductor/conductor/controller/translator.py b/conductor/conductor/controller/translator.py index 4f0ed20..a13c273 100644 --- a/conductor/conductor/controller/translator.py +++ b/conductor/conductor/controller/translator.py @@ -28,8 +28,11 @@ import yaml from conductor import __file__ as conductor_root from conductor import messaging from conductor import service + from conductor.common import threshold from conductor.common.music import messaging as music_messaging +from conductor.data.plugins.triage_translator.triage_translator_data import TraigeTranslatorData +from conductor.data.plugins.triage_translator.triage_translator import TraigeTranslator from oslo_config import cfg from oslo_log import log @@ -42,13 +45,14 @@ LOCATION_KEYS = ['latitude', 'longitude', 'host_name', 'clli_code'] INVENTORY_PROVIDERS = ['aai'] INVENTORY_TYPES = ['cloud', 'service', 'transport'] DEFAULT_INVENTORY_PROVIDER = INVENTORY_PROVIDERS[0] -CANDIDATE_KEYS = ['inventory_type', 'candidate_id', 'location_id', - 'location_type', 'cost'] -DEMAND_KEYS = ['inventory_provider', 'inventory_type', 'service_type', - 'service_id', 'service_resource_id', 'customer_id', - 'default_cost', 'candidates', 'region', 'complex', - 'required_candidates', 'excluded_candidates', - 'existing_placement', 'subdivision', 'flavor', 'attributes'] +CANDIDATE_KEYS = ['candidate_id', 'cost', 'inventory_type', 'location_id', + 'location_type'] +DEMAND_KEYS = ['attributes', 'candidates', 'complex', 'conflict_identifier', + 'customer_id', 'default_cost', 'excluded_candidates', + 'existing_placement', 'flavor', 'inventory_provider', + 'inventory_type', 'port_key', 'region', 'required_candidates', + 'service_id', 'service_resource_id', 'service_subscription', + 'service_type', 'subdivision', 'unique', 'vlan_key'] CONSTRAINT_KEYS = ['type', 'demands', 'properties'] CONSTRAINTS = { # constraint_type: { @@ -133,7 +137,8 @@ class Translator(object): self._translation = None self._valid = False self._ok = False - + self.triageTranslatorData= TraigeTranslatorData() + self.triageTranslator = TraigeTranslator() # Set up the RPC service(s) we want to talk to. self.data_service = self.setup_rpc(self.conf, "data") @@ -493,7 +498,13 @@ class Translator(object): args = { "demands": { name: requirements, - } + }, + "plan_info":{ + "plan_id": self._plan_id, + "plan_name": self._plan_name + }, + "triage_translator_data": self.triageTranslatorData.__dict__ + } # Check if required_candidate and excluded candidate @@ -512,7 +523,6 @@ class Translator(object): " list are not mutually exclusive for demand" " {}".format(name) ) - response = self.data_service.call( ctxt=ctxt, method="resolve_demands", @@ -520,10 +530,13 @@ class Translator(object): resolved_demands = \ response and response.get('resolved_demands') + triage_data_trans = \ + response and response.get('trans') required_candidates = resolved_demands \ .get('required_candidates') if not resolved_demands: + self.triageTranslator.thefinalCallTrans(triage_data_trans) raise TranslatorException( "Unable to resolve inventory " "candidates for demand {}" @@ -534,11 +547,13 @@ class Translator(object): inventory_candidates.append(candidate) if len(inventory_candidates) < 1: if not required_candidates: + self.triageTranslator.thefinalCallTrans(triage_data_trans) raise TranslatorException( "Unable to find any candidate for " "demand {}".format(name) ) else: + self.triageTranslator.thefinalCallTrans(triage_data_trans) raise TranslatorException( "Unable to find any required " "candidate for demand {}" @@ -547,7 +562,7 @@ class Translator(object): parsed[name] = { "candidates": inventory_candidates, } - + self.triageTranslator.thefinalCallTrans(triage_data_trans) return parsed def validate_hpa_constraints(self, req_prop, value): @@ -560,7 +575,7 @@ class Translator(object): or not para.get('flavorProperties') \ or para.get('id') == '' \ or para.get('type') == '' \ - or not isinstance(para.get('directives'), list) \ + or not isinstance(para.get('directives'), list) \ or para.get('flavorProperties') == '': raise TranslatorException( "HPA requirements need at least " @@ -754,6 +769,28 @@ class Translator(object): "Optimization goal 'minimize', function 'sum' " "must be a list of exactly two operands.") + def get_latency_between_args(operand): + args = operand.get('latency_between') + if type(args) is not list and len(args) != 2: + raise TranslatorException( + "Optimization 'latency_between' arguments must " + "be a list of length two.") + + got_demand = False + got_location = False + for arg in args: + if not got_demand and arg in self._demands.keys(): + got_demand = True + if not got_location and arg in self._locations.keys(): + got_location = True + if not got_demand or not got_location: + raise TranslatorException( + "Optimization 'latency_between' arguments {} must " + "include one valid demand name and one valid " + "location name.".format(args)) + + return args + def get_distance_between_args(operand): args = operand.get('distance_between') if type(args) is not list and len(args) != 2: @@ -791,8 +828,11 @@ class Translator(object): for product_op in operand['product']: if threshold.is_number(product_op): weight = product_op - elif type(product_op) is dict: - if product_op.keys() == ['distance_between']: + elif isinstance(product_op, dict): + if product_op.keys() == ['latency_between']: + function = 'latency_between' + args = get_latency_between_args(product_op) + elif product_op.keys() == ['distance_between']: function = 'distance_between' args = get_distance_between_args(product_op) elif product_op.keys() == ['aic_version']: @@ -814,8 +854,11 @@ class Translator(object): for nested_product_op in nested_operand['product']: if threshold.is_number(nested_product_op): nested_weight = nested_weight * int(nested_product_op) - elif type(nested_product_op) is dict: - if nested_product_op.keys() == ['distance_between']: + elif isinstance(nested_product_op, dict): + if nested_product_op.keys() == ['latency_between']: + function = 'latency_between' + args = get_latency_between_args(nested_product_op) + elif nested_product_op.keys() == ['distance_between']: function = 'distance_between' args = get_distance_between_args(nested_product_op) parsed['operands'].append( @@ -827,16 +870,6 @@ class Translator(object): } ) - elif type(product_op) is unicode: - if product_op == 'W1': - # get this weight from configuration file - weight = self.conf.controller.weight1 - elif product_op == 'W2': - # get this weight from configuration file - weight = self.conf.controller.weight2 - elif product_op == 'cost': - function = 'cost' - if not args: raise TranslatorException( "Optimization products must include at least " @@ -869,23 +902,31 @@ class Translator(object): def parse_reservations(self, reservations): demands = self._demands - if type(reservations) is not dict: + if not isinstance(reservations, dict): raise TranslatorException("Reservations must be provided in " "dictionary form") - parsed = {} if reservations: parsed['counter'] = 0 - for name, reservation in reservations.items(): - if not reservation.get('properties'): - reservation['properties'] = {} - for demand in reservation.get('demands', []): - if demand in demands.keys(): - constraint_demand = name + '_' + demand - parsed['demands'] = {} - parsed['demands'][constraint_demand] = copy.deepcopy(reservation) - parsed['demands'][constraint_demand]['name'] = name - parsed['demands'][constraint_demand]['demand'] = demand + parsed['demands'] = {} + + for key, value in reservations.items(): + + if key == "service_model": + parsed['service_model'] = value + + elif key == "service_candidates": + for name, reservation_details in value.items(): + if not reservation_details.get('properties'): + reservation_details['properties'] = {} + for demand in reservation_details.get('demands', []): + if demand in demands.keys(): + reservation_demand = name + '_' + demand + parsed['demands'][reservation_demand] = copy.deepcopy(reservation_details) + parsed['demands'][reservation_demand]['name'] = name + parsed['demands'][reservation_demand]['demands'] = demand + else: + raise TranslatorException("Demand {} must be provided in demands section".format(demand)) return parsed @@ -894,7 +935,9 @@ class Translator(object): if not self.valid: raise TranslatorException("Can't translate an invalid template.") - request_type = self._parameters.get("request_type") or "" + request_type = self._parameters.get("request_type") \ + or self._parameters.get("REQUEST_TYPE") \ + or "" self._translation = { "conductor_solver": { @@ -903,6 +946,7 @@ class Translator(object): "request_type": request_type, "locations": self.parse_locations(self._locations), "demands": self.parse_demands(self._demands), + "objective": self.parse_optimization(self._optmization), "constraints": self.parse_constraints(self._constraints), "objective": self.parse_optimization(self._optmization), "reservations": self.parse_reservations(self._reservations), diff --git a/conductor/conductor/controller/translator_svc.py b/conductor/conductor/controller/translator_svc.py index e139b5c..62e885b 100644 --- a/conductor/conductor/controller/translator_svc.py +++ b/conductor/conductor/controller/translator_svc.py @@ -17,8 +17,10 @@ # ------------------------------------------------------------------------- # -import time +import json +import os import socket +import time import cotyledon import futurist @@ -106,12 +108,11 @@ class TranslatorService(cotyledon.Service): Use this only when the translator service is not running concurrently. """ - plans = self.Plan.query.all() + plans = self.Plan.query.get_plan_by_col("status", self.Plan.TRANSLATING) for the_plan in plans: - if the_plan.status == self.Plan.TRANSLATING: - the_plan.status = self.Plan.TEMPLATE - # Use only in active-passive mode, so don't have to be atomic - the_plan.update() + the_plan.status = self.Plan.TEMPLATE + # Use only in active-passive mode, so don't have to be atomic + the_plan.update() def _restart(self): """Prepare to restart the service""" @@ -140,6 +141,7 @@ class TranslatorService(cotyledon.Service): trns = translator.Translator( self.conf, plan.name, plan.id, plan.template) trns.translate() + if trns.ok: plan.translation = trns.translation plan.status = self.Plan.TRANSLATED @@ -161,9 +163,11 @@ class TranslatorService(cotyledon.Service): plan.message = template.format(type(ex).__name__, ex.args) plan.status = self.Plan.ERROR - _is_success = 'FAILURE | Could not acquire lock' - while 'FAILURE | Could not acquire lock' in _is_success: + _is_success = 'FAILURE' + while 'FAILURE' in _is_success and (self.current_time_seconds() - self.millisec_to_sec(plan.updated)) <= self.conf.messaging_server.timeout: _is_success = plan.update(condition=self.translation_owner_condition) + LOG.info(_LI("Changing the template status from translating to {}, " + "atomic update response from MUSIC {}").format(plan.status, _is_success)) def __check_for_templates(self): """Wait for the polling interval, then do the real template check.""" @@ -171,9 +175,16 @@ class TranslatorService(cotyledon.Service): # Wait for at least poll_interval sec polling_interval = self.conf.controller.polling_interval time.sleep(polling_interval) - # Look for plans with the status set to TEMPLATE - plans = self.Plan.query.all() + + # Instead of using the query.all() method, now creating an index for 'status' + # field in conductor.plans table, and query plans by status columns + template_plans = self.Plan.query.get_plan_by_col("status", self.Plan.TEMPLATE) + translating_plans = self.Plan.query.get_plan_by_col("status", self.Plan.TRANSLATING) + + # combine the plans with status = 'template' and 'translating' together + plans = template_plans + translating_plans + for plan in plans: # If there's a template to be translated, do it! if plan.status == self.Plan.TEMPLATE: @@ -190,12 +201,15 @@ class TranslatorService(cotyledon.Service): plan.status = self.Plan.TRANSLATING plan.translation_counter += 1 plan.translation_owner = socket.gethostname() + plan.translation_begin_timestamp = int(round(time.time() * 1000)) _is_updated = plan.update(condition=self.template_status_condition) + log_util.setLoggerFilter(LOG, self.conf.keyspace, plan.id) LOG.info(_LE("Plan {} is trying to update the status from 'template' to 'translating'," - " get {} response from MUSIC") \ - .format(plan.id, _is_updated)) - if 'SUCCESS' in _is_updated: - log_util.setLoggerFilter(LOG, self.conf.keyspace, plan.id) + " get {} response from MUSIC").format(plan.id, _is_updated)) + if not _is_updated: + break + + if _is_updated and 'SUCCESS' in _is_updated: self.translate(plan) break @@ -206,21 +220,20 @@ class TranslatorService(cotyledon.Service): plan.update(condition=self.translating_status_condition) break + + elif plan.timedout: - # Move plan to error status? Create a new timed-out status? - # todo(snarayanan) + # TODO(jdandrea): How to tell all involved to stop working? + # Not enough to just set status. continue def run(self): """Run""" LOG.debug("%s" % self.__class__.__name__) - # Look for templates to translate from within a thread executor = futurist.ThreadPoolExecutor() - while self.running: - # Delay time (Seconds) for MUSIC requests. - time.sleep(self.conf.delay_time) + while self.running: fut = executor.submit(self.__check_for_templates) fut.result() executor.shutdown() diff --git a/conductor/conductor/data/plugins/inventory_provider/aai.py b/conductor/conductor/data/plugins/inventory_provider/aai.py index 8634b3f..fdf914a 100644 --- a/conductor/conductor/data/plugins/inventory_provider/aai.py +++ b/conductor/conductor/data/plugins/inventory_provider/aai.py @@ -21,10 +21,15 @@ import re import time import uuid +import json +from oslo_config import cfg +from oslo_log import log + from conductor.common import rest from conductor.data.plugins import constants from conductor.data.plugins.inventory_provider import base from conductor.data.plugins.inventory_provider import hpa_utils +from conductor.data.plugins.triage_translator.triage_translator import TraigeTranslator from conductor.i18n import _LE, _LI from oslo_config import cfg from oslo_log import log @@ -71,6 +76,7 @@ AAI_OPTS = [ help='Certificate Authority Bundle file in pem format. ' 'Must contain the appropriate trust chain for the ' 'Certificate file.'), + #TODO(larry): follow-up with ONAP people on this (AA&I basic auth username and password?) cfg.StrOpt('username', default='', help='Username for AAI.'), @@ -105,6 +111,7 @@ class AAI(base.InventoryProviderBase): self.retries = self.conf.aai.aai_retries self.username = self.conf.aai.username self.password = self.conf.aai.password + self.triage_translator=TraigeTranslator() # Cache is initially empty self._aai_cache = {} @@ -232,19 +239,15 @@ class AAI(base.InventoryProviderBase): cloud_region_version = region.get('cloud-region-version') cloud_owner = region.get('cloud-owner') - complex_name = region.get('complex-name') cloud_type = region.get('cloud-type') cloud_zone = region.get('cloud-zone') - # Added for HPA support - flavors = self._get_flavors(cloud_owner, cloud_region_id) - physical_location_list = self._get_aai_rel_link_data(data = region, related_to = 'complex', search_key = 'complex.physical-location-id') if len(physical_location_list) > 0: physical_location_id = physical_location_list[0].get('d_value') if not (cloud_region_version and - cloud_region_id and complex_name): + cloud_region_id): continue rel_link_data_list = \ self._get_aai_rel_link_data( @@ -255,6 +258,7 @@ class AAI(base.InventoryProviderBase): LOG.error(_LE("Region {} has more than one complex"). format(cloud_region_id)) LOG.debug("Region {}: {}".format(cloud_region_id, region)) + continue rel_link_data = rel_link_data_list[0] complex_id = rel_link_data.get("d_value") @@ -279,15 +283,19 @@ class AAI(base.InventoryProviderBase): state = complex_info.get('state') region = complex_info.get('region') country = complex_info.get('country') + complex_name = complex_info.get('complex-name') - if not (latitude and longitude and city and country): - keys = ('latitude', 'longitude', 'city', 'country') + if not (latitude and longitude and city and country + and complex_name): + keys = ('latitude', 'longitude', 'city', 'country', + 'complex_name') missing_keys = \ list(set(keys).difference(complex_info.keys())) LOG.error(_LE("Complex {} is missing {}, link: {}"). format(complex_id, missing_keys, complex_link)) LOG.debug("Complex {}: {}". format(complex_id, complex_info)) + continue cache['cloud_region'][cloud_region_id] = { 'cloud_region_version': cloud_region_version, @@ -305,9 +313,14 @@ class AAI(base.InventoryProviderBase): 'state': state, 'region': region, 'country': country, - }, - 'flavors': flavors + } } + + # Added for HPA support + if self.conf.HPA_enabled: + flavors = self._get_flavors(cloud_owner, cloud_region_id) + cache['cloud_region'][cloud_region_id]['flavors'] = flavors + LOG.debug("Candidate with cloud_region_id '{}' selected " "as a potential candidate - ".format(cloud_region_id)) LOG.debug("Done with region '{}' ".format(cloud_region_id)) @@ -737,11 +750,9 @@ class AAI(base.InventoryProviderBase): for attribute_key, attribute_values in template_attributes.items(): if attribute_key and (attribute_key == 'service-type' or - attribute_key == 'equipment-role' or attribute_key == 'model-invariant-id' or - attribute_key == 'model-version-id'): - continue - - if not attribute_values: + attribute_key == 'equipment-role' or + attribute_key == 'model-invariant-id' or + attribute_key == 'model-version-id'): continue match_type = 'any' @@ -790,37 +801,73 @@ class AAI(base.InventoryProviderBase): candidate['cost'] = self.conf.data.existing_placement_cost candidate['existing_placement'] = 'true' - def resolve_demands(self, demands): + def resovle_conflict_id(self, conflict_identifier, candidate): + + # Initialize the conflict_id_list + conflict_id_list = list() + # conflict_id is separated by pipe (|) + separator = '|' + + for conflict_element in conflict_identifier: + # if the conflict_element is a dictionary with key = 'get_candidate_attribute', + # then add candidate's coressponding value to conflict_id string + if isinstance(conflict_element, dict) and 'get_candidate_attribute' in conflict_element: + attribute_name = conflict_element.get('get_candidate_attribute') + conflict_id_list.append(candidate[attribute_name] + separator) + elif isinstance(conflict_element, unicode): + conflict_id_list.append(conflict_element + separator) + + return ''.join(conflict_id_list) + + + def resolve_demands(self, demands, plan_info, triage_translator_data): """Resolve demands into inventory candidate lists""" + self.triage_translator.getPlanIdNAme(plan_info['plan_name'], plan_info['plan_id'],triage_translator_data) + resolved_demands = {} for name, requirements in demands.items(): + self.triage_translator.addDemandsTriageTranslator(name, triage_translator_data) resolved_demands[name] = [] for requirement in requirements: inventory_type = requirement.get('inventory_type').lower() + # used for VLAN tagging feature + service_subscription = requirement.get('service_subscription') + candidate_uniqueness = requirement.get('unique', 'true') attributes = requirement.get('attributes') #TODO: may need to support multiple service_type and customer_id in the futrue #TODO: make it consistent for dash and underscore + # For 1802 templates and later if attributes: - service_type = attributes.get('service-type') + # catch equipment-role and service-type from template equipment_role = attributes.get('equipment-role') + service_type = attributes.get('service-type') if equipment_role: service_type = equipment_role - customer_id = attributes.get('customer-id') + # catch global-customer-id and customer-id from template global_customer_id = attributes.get('global-customer-id') + customer_id = attributes.get('customer-id') if global_customer_id: customer_id = global_customer_id + model_invariant_id = attributes.get('model-invariant-id') model_version_id = attributes.get('model-version-id') + service_role = attributes.get('service-role') + # For 1712 template and earlier else: - service_type = requirement.get('service_type') - equipment_role = service_type - customer_id = requirement.get('customer_id') + service_type = equipment_role = requirement.get('service_type') + customer_id = global_customer_id = requirement.get('customer_id') # region_id is OPTIONAL. This will restrict the initial # candidate set to come from the given region id restricted_region_id = requirement.get('region') restricted_complex_id = requirement.get('complex') + # Used for order locking feature + # by defaut, conflict id is the combination of candidate id, service type and vnf-e2e-key + conflict_identifier = requirement.get('conflict_identifier') + # VLAN tagging fields + vlan_key = requirement.get('vlan_key') + port_key = requirement.get('port_key') # get required candidates from the demand required_candidates = requirement.get("required_candidates") @@ -870,10 +917,6 @@ class AAI(base.InventoryProviderBase): candidate['cloud_owner'] = \ region['cloud_owner'] - # Added vim-id for short-term workaround - candidate['vim-id'] = \ - region['cloud_owner'] + '_' + region_id - candidate['physical_location_id'] = \ region['complex']['complex_id'] candidate['complex_name'] = \ @@ -890,10 +933,15 @@ class AAI(base.InventoryProviderBase): region['complex']['region'] candidate['country'] = \ region['complex']['country'] + candidate['uniqueness'] = candidate_uniqueness - # Added for HPA - candidate['flavors'] = \ - region['flavors'] + # Added vim-id for short-term workaround + if self.conf.HPA_enabled: + candidate['vim-id'] = \ + region['cloud_owner'] + '_' + region_id + # Added for HPA + candidate['flavors'] = \ + region['flavors'] if self.check_sriov_automation( candidate['cloud_region_version'], name, @@ -911,8 +959,13 @@ class AAI(base.InventoryProviderBase): cloud_region_attr['physical-location-id'] = region['physical_location_id'] if attributes and (not self.match_inventory_attributes(attributes, cloud_region_attr, candidate['candidate_id'])): + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason='attributes and match invetory attributes') continue + if conflict_identifier: + candidate['conflict_id'] = self.resovle_conflict_id(conflict_identifier, candidate) + if self.match_candidate_attribute( candidate, "candidate_id", restricted_region_id, name, @@ -955,6 +1008,8 @@ class AAI(base.InventoryProviderBase): break if not has_required_candidate: + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="has_required_candidate") continue # add candidate to demand candidates @@ -964,24 +1019,22 @@ class AAI(base.InventoryProviderBase): and customer_id: # First level query to get the list of generic vnfs - #TODO: extract the common part from the two calls vnf_by_model_invariant = list() if attributes and model_invariant_id: + + raw_path = '/network/generic-vnfs/' \ + '?model-invariant-id={}&depth=0'.format(model_invariant_id) if model_version_id: - path = self._aai_versioned_path( - '/network/generic-vnfs/' - '?model-invariant-id={}&model-version-id={}&depth=0'.format(model_invariant_id, model_version_id)) - else: - path = self._aai_versioned_path( - '/network/generic-vnfs/' - '?model-invariant-id={}&depth=0'.format(model_invariant_id)) + raw_path = '/network/generic-vnfs/' \ + '?model-invariant-id={}&model-version-id={}&depth=0'.format(model_invariant_id, model_version_id) + path = self._aai_versioned_path(raw_path) vnf_by_model_invariant = self.first_level_service_call(path, name, service_type) vnf_by_service_type = list() if service_type or equipment_role: path = self._aai_versioned_path( '/network/generic-vnfs/' - '?prov-status=PROV&equipment-role={}&depth=0'.format(service_type)) + '?equipment-role={}&depth=0'.format(service_type)) vnf_by_service_type = self.first_level_service_call(path, name, service_type) generic_vnf = vnf_by_model_invariant + vnf_by_service_type @@ -1008,6 +1061,9 @@ class AAI(base.InventoryProviderBase): candidate['cost'] = self.conf.data.service_candidate_cost candidate['cloud_owner'] = '' candidate['cloud_region_version'] = '' + candidate['vlan_key'] = vlan_key + candidate['port_key'] = port_key + candidate['uniqueness'] = candidate_uniqueness # start populating the candidate candidate['host_id'] = vnf.get("vnf-name") @@ -1023,6 +1079,8 @@ class AAI(base.InventoryProviderBase): self._log_multiple_item_error( name, service_type, related_to, search_key, "GENERIC-VNF", vnf) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="Generic -vnf error") continue rl_data = rl_data_list[0] @@ -1032,8 +1090,7 @@ class AAI(base.InventoryProviderBase): cloud_owner = rl_data.get('d_value') candidate['cloud_owner'] = cloud_owner - if not cloud_owner: - continue + search_key = "cloud-region.cloud-region-id" @@ -1047,14 +1104,19 @@ class AAI(base.InventoryProviderBase): self._log_multiple_item_error( name, service_type, related_to, search_key, "GENERIC-VNF", vnf) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason=" generic-vnf error") continue rl_data = rl_data_list[0] cloud_region_id = rl_data.get('d_value') candidate['location_id'] = cloud_region_id # Added vim-id for short-term workaround - candidate['vim-id'] = \ - candidate['cloud_owner'] + '_' + cloud_region_id + if self.conf.HPA_enabled: + if not cloud_owner: + continue + candidate['vim-id'] = \ + candidate['cloud_owner'] + '_' + cloud_region_id # get AIC version for service candidate if cloud_region_id: @@ -1100,6 +1162,8 @@ class AAI(base.InventoryProviderBase): self._log_multiple_item_error( name, service_type, related_to, search_key, "GENERIC-VNF", vnf) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name,triage_translator_data, + reason=" match_vserver_attribute generic-vnf") continue rl_data = rl_data_list[0] vs_cust_id = rl_data.get('d_value') @@ -1118,6 +1182,8 @@ class AAI(base.InventoryProviderBase): self._log_multiple_item_error( name, service_type, related_to, search_key, "GENERIC-VNF", vnf) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name,triage_translator_data, + reason="multiple_item_error generic-vnf") continue rl_data = rl_data_list[0] vs_service_instance_id = rl_data.get('d_value') @@ -1126,6 +1192,7 @@ class AAI(base.InventoryProviderBase): candidate['candidate_id'] = \ vs_service_instance_id else: # vserver is for a different customer + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, reason= "vserver is for a different customer") continue # Second level query to get the pserver from vserver @@ -1137,6 +1204,8 @@ class AAI(base.InventoryProviderBase): LOG.error(_LE("{} VSERVER link information not " "available from A&AI").format(name)) LOG.debug("Related link data: {}".format(rl_data)) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="VSERVER link information not") continue # move ahead with the next vnf vs_path = self._get_aai_path_from_link(vs_link) @@ -1144,12 +1213,16 @@ class AAI(base.InventoryProviderBase): LOG.error(_LE("{} VSERVER path information not " "available from A&AI - {}"). format(name, vs_path)) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="VSERVER path information not available from A&AI") continue # move ahead with the next vnf path = self._aai_versioned_path(vs_path) response = self._request( path=path, context="demand, VSERVER", value="{}, {}".format(name, vs_path)) if response is None or response.status_code != 200: + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason=response.status_code) continue body = response.json() @@ -1163,42 +1236,74 @@ class AAI(base.InventoryProviderBase): self._log_multiple_item_error( name, service_type, related_to, "item", "VSERVER", body) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="item VSERVER") continue rl_data = rl_data_list[0] ps_link = rl_data.get('link') # Third level query to get cloud region from pserver if not ps_link: - LOG.debug(_LE("{} pserver related link " - "not found in A&AI: {} using cloud-region "). + LOG.error(_LE("{} pserver related link " + "not found in A&AI: {}"). format(name, rl_data)) - if not (cloud_owner and cloud_region_id): - LOG.error("{} cloud-owner or cloud-region not " - "available from A&AI". - format(name)) - continue # move ahead with the next vnf - cloud_region_uri = \ - '/cloud-infrastructure/cloud-regions/cloud-region' \ - '/?cloud-owner=' + cloud_owner\ - + '&cloud-region-id=' + cloud_region_id - path = self._aai_versioned_path(cloud_region_uri) - response = self._request('get', - path=path, - data=None) - if response is None or response.status_code != 200: + # if HPA_feature is disabled + if not self.conf.HPA_enabled: + # (ecomp2onap-feature2) Triage Tool Feature Changes + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], + candidate['location_id'], name, + triage_translator_data, + reason="ps link not found") continue - body = response.json() + else: + if not (cloud_owner and cloud_region_id): + LOG.error("{} cloud-owner or cloud-region not " + "available from A&AI". + format(name)) + # (ecomp2onap-feature2) Triage Tool Feature Changes + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], + candidate['location_id'], name, + triage_translator_data, + reason="Cloud owner and cloud region " + "id not found") + continue # move ahead with the next vnf + cloud_region_uri = \ + '/cloud-infrastructure/cloud-regions/cloud-region' \ + '/?cloud-owner=' + cloud_owner \ + + '&cloud-region-id=' + cloud_region_id + path = self._aai_versioned_path(cloud_region_uri) + response = self._request('get', + path=path, + data=None) + if response is None or response.status_code != 200: + # (ecomp2onap-feature2) Triage Tool Feature Changes + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], + candidate['location_id'], name, + triage_translator_data, + reason=response) + continue + body = response.json() else: ps_path = self._get_aai_path_from_link(ps_link) if not ps_path: LOG.error(_LE("{} pserver path information " "not found in A&AI: {}"). format(name, ps_link)) + # (ecomp2onap-feature2) Triage Tool Feature Changes + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], + candidate['location_id'], name, + triage_translator_data, + reason="ps path not found") continue # move ahead with the next vnf path = self._aai_versioned_path(ps_path) response = self._request( path=path, context="PSERVER", value=ps_path) if response is None or response.status_code != 200: + # (ecomp2onap-feature2) Triage Tool Feature Changes + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], + candidate['location_id'], name, + triage_translator_data, + reason=response) continue body = response.json() @@ -1214,6 +1319,8 @@ class AAI(base.InventoryProviderBase): self._log_multiple_item_error( name, service_type, related_to, search_key, "PSERVER", body) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="PSERVER error") continue rl_data = rl_data_list[0] complex_list.append(rl_data) @@ -1222,6 +1329,8 @@ class AAI(base.InventoryProviderBase): len(complex_list) < 1: LOG.error("Complex information not " "available from A&AI") + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="Complex information not available from A&AI") continue # In the scenario where no pserver information is available @@ -1232,6 +1341,8 @@ class AAI(base.InventoryProviderBase): self._log_multiple_item_error( name, service_type, related_to, search_key, "GENERIC-VNF", vnf) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="Generic-vnf error") continue rl_data = complex_list[0] @@ -1243,6 +1354,8 @@ class AAI(base.InventoryProviderBase): LOG.debug("{} complex information not " "available from A&AI - {}". format(name, complex_link)) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="Complex information not available from A&AI") continue # move ahead with the next vnf else: complex_info = self._get_complex( @@ -1253,6 +1366,8 @@ class AAI(base.InventoryProviderBase): LOG.debug("{} complex information not " "available from A&AI - {}". format(name, complex_link)) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="Complex information not available from A&AI") continue # move ahead with the next vnf candidate['physical_location_id'] = \ complex_id @@ -1278,6 +1393,8 @@ class AAI(base.InventoryProviderBase): vnf['physical-location-id'] = complex_id if attributes and not self.match_inventory_attributes(attributes, vnf, candidate['candidate_id']): + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="attibute check error") continue self.assign_candidate_existing_placement(candidate, existing_placement) @@ -1295,6 +1412,8 @@ class AAI(base.InventoryProviderBase): break if has_excluded_candidate: + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="excluded candidate") continue # Pick only candidates in the required list @@ -1311,6 +1430,8 @@ class AAI(base.InventoryProviderBase): break if not has_required_candidate: + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="has_required_candidate candidate") continue # add the candidate to the demand @@ -1328,13 +1449,159 @@ class AAI(base.InventoryProviderBase): restricted_complex_id, name, inventory_type): + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="match candidate attribute") + + continue + else: + resolved_demands[name].append(candidate) + + elif inventory_type == 'transport' \ + and customer_id and service_type and \ + service_subscription and service_role: + + ''' + GET /aai/v11/business/customers/customer/31739f3e-526b-11e6-beb8-9e71128cae77/service-subscriptions/ + service-subscription/MISVPN%20Transport/service-instances?service-type=TRANSPORT&service-role=MISVPN + Sample demand section for transport services: + "TRANSPORT_DEMAND_1": [ + { + "attributes": { + "global-customer-id": "31739f3e-526b-11e6-beb8-9e71128cae77", + "service-type": "TRANSPORT", + "service-role": "MISVPN" + }, + "inventory_provider": "aai", + "inventory_type": "transport", + "service_subscription": "MISVPN%20Transport" + } + ] + ''' + path = self._aai_versioned_path('business/customers/customer/{}/service-subscriptions/' + 'service-subscription/{}/service-instances' + '?service-type={}&service-role={}'.format(customer_id, + service_subscription, + service_type, + service_role)) + + response = self._request('get', path=path, data=None) + + if response is None or response.status_code != 200: + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason=response.status_code) + continue + + body = response.json() + + transport_vnfs = body.get('service-instance', []) + + for vnf in transport_vnfs: + + # create a default candidate + candidate = dict() + candidate['inventory_provider'] = 'aai' + candidate['service_resource_id'] = service_resource_id + candidate['inventory_type'] = 'transport' + candidate['candidate_id'] = '' + candidate['location_id'] = '' + candidate['location_type'] = 'att_aic' + candidate['uniqueness'] = candidate_uniqueness + candidate['cost'] = self.conf.data.transport_candidate_cost + + vnf_service_instance_id = vnf.get('service-instance-id') + if vnf_service_instance_id: + candidate['candidate_id'] = vnf_service_instance_id + else: + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name,triage_translator_data, + reason="service-instance-id error ") + + continue + + related_to = "zone" + zone_link = self._get_aai_rel_link( + data=vnf, related_to=related_to) + + if not zone_link: + LOG.error("Zone information not available" + "from A&AI for transport candidates") + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="Zone information not available from A&AI for transport candidates") + + continue + + zone_aai_path = self._get_aai_path_from_link(zone_link) + + response = self._request('get', path=zone_aai_path, data=None) + + if response is None or response.status_code != 200: + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason=response.status_code) + + continue + body = response.json() + + candidate['zone_id'] = body.get('zone-id') + candidate['zone_name'] = body.get('zone-name') + + related_to = "complex" + search_key = "complex.physical-location-id" + rel_link_data_list = self._get_aai_rel_link_data( + data=body, + related_to=related_to, + search_key=search_key + ) + + if len(rel_link_data_list) > 1: + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="rel_link_data_list error") + + continue + rel_link_data = rel_link_data_list[0] + complex_id = rel_link_data.get("d_value") + complex_link = rel_link_data.get('link') + + if not (complex_link and complex_id): + LOG.debug("{} complex information not " + "available from A&AI - {}". + format(name, complex_link)) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="complex information not available from A&AI") continue else: + complex_info = self._get_complex( + complex_link=complex_link, + complex_id=complex_id + ) + if not complex_info: + LOG.debug("{} complex information not " + "available from A&AI - {}". + format(name, complex_link)) + self.triage_translator.collectDroppedCandiate(candidate['candidate_id'], candidate['location_id'], name, triage_translator_data, + reason="complex information not available from A&AI") + continue # move ahead with the next vnf + candidate['physical_location_id'] = \ + complex_id + candidate['complex_name'] = \ + complex_info.get('complex-name') + candidate['latitude'] = \ + complex_info.get('latitude') + candidate['longitude'] = \ + complex_info.get('longitude') + candidate['state'] = \ + complex_info.get('state') + candidate['country'] = \ + complex_info.get('country') + candidate['city'] = \ + complex_info.get('city') + candidate['region'] = \ + complex_info.get('region') + + # add candidate to demand candidates resolved_demands[name].append(candidate) + else: LOG.error("Unknown inventory_type " " {}".format(inventory_type)) - return resolved_demands def match_hpa(self, candidate, features): @@ -1343,3 +1610,4 @@ class AAI(base.InventoryProviderBase): directives = hpa_provider.match_flavor() return directives + diff --git a/conductor/conductor/data/plugins/inventory_provider/extensions.py b/conductor/conductor/data/plugins/inventory_provider/extensions.py index 18f4c4b..3b13b94 100644 --- a/conductor/conductor/data/plugins/inventory_provider/extensions.py +++ b/conductor/conductor/data/plugins/inventory_provider/extensions.py @@ -34,7 +34,7 @@ class Manager(stevedore.named.NamedExtensionManager): def __init__(self, conf, namespace): super(Manager, self).__init__( namespace, conf.inventory_provider.extensions, - invoke_on_load=True, name_order=True) + invoke_on_load=True, name_order=True, propagate_map_exceptions=True) LOG.info(_LI("Loaded inventory provider extensions: %s"), self.names()) def initialize(self): diff --git a/conductor/conductor/data/plugins/service_controller/extensions.py b/conductor/conductor/data/plugins/service_controller/extensions.py index f309102..388d527 100644 --- a/conductor/conductor/data/plugins/service_controller/extensions.py +++ b/conductor/conductor/data/plugins/service_controller/extensions.py @@ -34,7 +34,7 @@ class Manager(stevedore.named.NamedExtensionManager): def __init__(self, conf, namespace): super(Manager, self).__init__( namespace, conf.service_controller.extensions, - invoke_on_load=True, name_order=True) + invoke_on_load=True, name_order=True, propagate_map_exceptions=True) LOG.info(_LI("Loaded service controller extensions: %s"), self.names()) def initialize(self): diff --git a/conductor/conductor/data/plugins/service_controller/sdnc.py b/conductor/conductor/data/plugins/service_controller/sdnc.py index cc3118b..3242384 100644 --- a/conductor/conductor/data/plugins/service_controller/sdnc.py +++ b/conductor/conductor/data/plugins/service_controller/sdnc.py @@ -113,6 +113,7 @@ class SDNC(base.ServiceControllerBase): if response is None: LOG.error(_LE("No response from SDN-C ({}: {})"). format(context, value)) + raise Exception('SDN-C query {} timed out'.format(path)) elif response.status_code != 200: LOG.error(_LE("SDN-C request ({}: {}) returned HTTP " "status {} {}, link: {}{}"). @@ -121,6 +122,80 @@ class SDNC(base.ServiceControllerBase): self.base, path)) return response + def reserve_candidates(self, candidate_list, request): + + path = '/operations/DHVCAPACITY-API:service-capacity-check-operation' + action_type = "RESERVE" + change_type = "New-Start" + + e2evpnkey = request.get('e2evpnkey') + dhv_service_instance = request.get('dhv_service_instance') + + vnf_input_list = [] + + for candidate in candidate_list: + + # SDN-GC does not reserve cloud candidates + if candidate.get("inventory_type") == "cloud": + continue + + vnf_input = {} + # VNF input parameters common to all service_type + request = candidate.get('request') + vnf_input["device-type"] = request.get('service_type') + vnf_input['dhv-site-effective-bandwidth'] = request.get('dhv_site_effective_bandwidth') + + if candidate.get('location_id') == "AAIAIC25": + vnf_input["cloud-region-id"] = "" + else: + vnf_input["cloud-region-id"] = candidate.get('location_id') + + if "service_resource_id" in candidate: + vnf_input["cust-service-instance-id"] = candidate.get('service_resource_id') + + vnf_input["vnf-host-name"] = candidate.get('host_id') + vnf_input["infra-service-instance-id"] = candidate.get('candidate_id') + + vnf_input_list.append(vnf_input) + + data = { + "input": { + "service-capacity-check-operation": { + "sdnc-request-header": { + "request-id": + "59c36776-249b-4966-b911-9a89a63d1676" + }, + "capacity-check-information": { + "service-instance-id": dhv_service_instance, + "service": "DHV SD-WAN", + "action-type": action_type, + "change-type": change_type, + "e2e-vpn-key": e2evpnkey, + "vnf-list": { + "vnf": vnf_input_list + } + } + } + } + } + + try: + response = self._request('post', path=path, data=data) + if response is None or response.status_code != 200: + return + body = response.json() + response_code = body.get("output"). \ + get("service-capacity-check-response"). \ + get("response-code") + + if response_code == "200": + return candidate_list + + except Exception as exc: + LOG.error("SD-WAN reservation, SDNC unknown error: {}". + format(exc)) + return + def filter_candidates(self, request, candidate_list, constraint_name, constraint_type, request_type): """Reduce candidate list based on SDN-C intelligence""" @@ -135,15 +210,6 @@ class SDNC(base.ServiceControllerBase): LOG.error(_LE("Constraint {} has an unknown type {}"). format(constraint_name, constraint_type)) - change_type = "" - if request_type == "speed-changed": - change_type = "Change-Speed" - elif request_type == "initial" or request_type == "": - change_type = "New-Start" - else: - LOG.error(_LE("Constraint {} has an unknown request type {}"). - format(constraint_name, request_type)) - # VNF input params common to all services service_type = request.get('service_type') e2evpnkey = request.get('e2evpnkey') @@ -197,6 +263,25 @@ class SDNC(base.ServiceControllerBase): vnf_input["infra-service-instance-id"] = dhv_service_instance for candidate in candidate_list: + + # generate the value of change_type based on the request type (inital or speed changed) + # and existing placement + # For New Start (or initial): New-Start + # For Change Speed No Rehome : Change-Speed + # For Change Speed Rehome: Rehome + change_type = "" + if request_type == "initial" or request_type == "": + change_type = "New-Start" + elif request_type == "speed changed": + existing_placement = str(candidate.get('existing_placement')) + if existing_placement == 'false': + change_type = "Rehome" + elif existing_placement == 'true': + change_type = "Change-Speed" + else: + LOG.error(_LE("Constraint {} has an unknown request type {}"). + format(constraint_name, request_type)) + # VNF input parameters common to all service_type vnf_input["device-type"] = service_type # If the candidate region id is AAIAIC25 and region_fit constraint @@ -305,4 +390,4 @@ class SDNC(base.ServiceControllerBase): format(constraint_name, constraint_type, exc)) return - return selected_candidates
\ No newline at end of file + return selected_candidates diff --git a/conductor/conductor/data/plugins/triage_translator/__init__.py b/conductor/conductor/data/plugins/triage_translator/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/conductor/conductor/data/plugins/triage_translator/__init__.py diff --git a/conductor/conductor/data/plugins/triage_translator/triage_translator.py b/conductor/conductor/data/plugins/triage_translator/triage_translator.py new file mode 100644 index 0000000..f660ad5 --- /dev/null +++ b/conductor/conductor/data/plugins/triage_translator/triage_translator.py @@ -0,0 +1,85 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import copy +import json + +from conductor.common.models.triage_tool import TriageTool +from conductor.common.music.model import base +from oslo_config import cfg +from StringIO import StringIO + +CONF = cfg.CONF +io = StringIO() + + +class TraigeTranslator(object): + + + def getPlanIdNAme(self, plan_name, plan_id, triage_translator_data): + triage_translator_data['plan_name'] = plan_name + triage_translator_data['plan_id'] = plan_id + def addDemandsTriageTranslator(self, name, triage_translator_data): + if not 'dropped_candidates' in triage_translator_data.keys(): + triage_translator_data['dropped_candidates'] = [] + dropped_candidate_details = {} + dropped_candidate_details['name'] = name + dropped_candidate_details['translation_dropped'] = [] + dropped_candidate_details['latency_dropped'] = [] + triage_translator_data['dropped_candidates'].append(dropped_candidate_details) + else: + for dc in triage_translator_data['dropped_candidates']: + print name + if not dc['name'] == name: + dropped_candidate_details = {} + dropped_candidate_details['name'] = name + dropped_candidate_details['translation_dropped'] = [] + triage_translator_data['dropped_candidates'].append(dropped_candidate_details) + + def collectDroppedCandiate(self, candidate_id, location_id, name, triage_translator_data, reason): + drop_can = {} + drop_can['candidate_id'] = candidate_id + if drop_can['candidate_id'] == "null": + drop_can['candidate_id']= None + drop_can['location_id'] = location_id + drop_can['reason'] = reason + for dropped_c in triage_translator_data['dropped_candidates']: + if dropped_c['name'] == name: + dropped_c['translation_dropped'].append(drop_can) + + def thefinalCallTrans(self, triage_translator_data): + triage_translator = {} + triage_translator['plan_id'] = triage_translator_data['plan_id'] + triage_translator['plan_name'] = triage_translator_data['plan_name'] + triage_translator['translator_triage']= {} + triage_translator['translator_triage']['dropped_candidates'] = [] + + for td in triage_translator_data['translator_triage']: + for a in td: + triage_translator['translator_triage']['dropped_candidates'].append(a) + tria_final = triage_translator['translator_triage'] + triage_translator_dataTool = base.create_dynamic_model( + keyspace=CONF.keyspace, baseclass=TriageTool, classname="TriageTool") + + triage_translator = json.dumps(tria_final ) + triageTransDatarow = triage_translator_dataTool(id=triage_translator_data['plan_id'], name=triage_translator_data['plan_name'], + triage_translator=triage_translator) + response = triageTransDatarow.insert() + + diff --git a/conductor/conductor/data/plugins/triage_translator/triage_translator_data.py b/conductor/conductor/data/plugins/triage_translator/triage_translator_data.py new file mode 100644 index 0000000..4d19095 --- /dev/null +++ b/conductor/conductor/data/plugins/triage_translator/triage_translator_data.py @@ -0,0 +1,23 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +class TraigeTranslatorData(object): + def __init__(self): + self.plan_name = None, + self.plan_id = None diff --git a/conductor/conductor/data/service.py b/conductor/conductor/data/service.py index d6ea20f..c0007fc 100644 --- a/conductor/conductor/data/service.py +++ b/conductor/conductor/data/service.py @@ -52,13 +52,13 @@ DATA_OPTS = [ 'mode. When set to False, data will flush any abandoned ' 'messages at startup.'), cfg.FloatOpt('existing_placement_cost', - default=-8000.0, - help='Default value is -8000, which is the diameter of the earth. ' - 'The distance cannot larger than this value'), + default=-8000.0, + help='Default value is -8000, which is the diameter of the earth. ' + 'The distance cannot larger than this value'), cfg.FloatOpt('cloud_candidate_cost', - default=2.0), + default=2.0), cfg.FloatOpt('service_candidate_cost', - default=1.0), + default=1.0), ] CONF.register_opts(DATA_OPTS, group='data') @@ -73,6 +73,7 @@ class DataServiceLauncher(object): self.conf = conf self.init_extension_managers(conf) + def init_extension_managers(self, conf): """Initialize extension managers.""" self.ip_ext_manager = ( @@ -112,6 +113,11 @@ class DataEndpoint(object): self.vc_ext_manager = vc_ext_manager self.sc_ext_manager = sc_ext_manager self.plugin_cache = {} + self.triage_data_trans = { + 'plan_id': None, + 'plan_name': None, + 'translator_triage': [] + } def get_candidate_location(self, ctx, arg): # candidates should have lat long info already @@ -223,7 +229,7 @@ class DataEndpoint(object): discard_set.add(candidate.get("candidate_id")) return discard_set - # (TODO:Larry) merge this function with the "get_candidate_discard_set" + #(TODO:Larry) merge this function with the "get_candidate_discard_set" def get_candidate_discard_set_by_cloud_region(self, value, candidate_list, value_attrib): discard_set = set() @@ -239,8 +245,10 @@ class DataEndpoint(object): (candidate.get(value_attrib) not in service_requests): discard_set.add(candidate.get("candidate_id")) + return discard_set + def get_inventory_group_candidates(self, ctx, arg): candidate_list = arg["candidate_list"] resolved_candidate = arg["resolved_candidate"] @@ -590,18 +598,36 @@ class DataEndpoint(object): error = False demands = arg.get('demands') + plan_info = arg.get('plan_info') + triage_translator_data = arg.get('triage_translator_data') resolved_demands = None results = self.ip_ext_manager.map_method( 'resolve_demands', - demands + demands, plan_info, triage_translator_data ) if results and len(results) > 0: resolved_demands = results[0] + if self.triage_data_trans['plan_id']== None : + self.triage_data_trans['plan_name'] = triage_translator_data['plan_name'] + self.triage_data_trans['plan_id'] = triage_translator_data['plan_id'] + self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates']) + elif (not self.triage_data_trans['plan_id'] == triage_translator_data['plan_id']) : + self.triage_data_trans = { + 'plan_id': None, + 'plan_name': None, + 'translator_triage': [] + } + self.triage_data_trans['plan_name'] = triage_translator_data['plan_name'] + self.triage_data_trans['plan_id'] = triage_translator_data['plan_id'] + self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates']) + else: + self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates']) else: error = True - return {'response': {'resolved_demands': resolved_demands}, - 'error': error} + return {'response': {'resolved_demands': resolved_demands, + 'trans': self.triage_data_trans}, + 'error': error } def resolve_location(self, ctx, arg): diff --git a/conductor/conductor/reservation/service.py b/conductor/conductor/reservation/service.py index ad26b98..01a7453 100644 --- a/conductor/conductor/reservation/service.py +++ b/conductor/conductor/reservation/service.py @@ -18,6 +18,7 @@ # import cotyledon +import json import time import socket from oslo_config import cfg @@ -31,7 +32,7 @@ from conductor.i18n import _LE, _LI from conductor import messaging from conductor import service from conductor.common.utils import conductor_logging_util as log_util - +from conductor.common.models import order_lock LOG = log.getLogger(__name__) @@ -44,7 +45,7 @@ reservation_OPTS = [ help='Number of workers for reservation service. ' 'Default value is 1.'), cfg.IntOpt('reserve_retries', - default=3, + default=1, help='Number of times reservation/release ' 'should be attempted.'), cfg.IntOpt('timeout', @@ -82,13 +83,17 @@ class ReservationServiceLauncher(object): # Dynamically create a plan class for the specified keyspace self.Plan = base.create_dynamic_model( keyspace=conf.keyspace, baseclass=plan.Plan, classname="Plan") - + self.OrderLock = base.create_dynamic_model( + keyspace=conf.keyspace, baseclass=order_lock.OrderLock, classname="OrderLock") if not self.Plan: raise + if not self.OrderLock: + raise def run(self): - kwargs = {'plan_class': self.Plan} + kwargs = {'plan_class': self.Plan, + 'order_locks': self.OrderLock} svcmgr = cotyledon.ServiceManager() svcmgr.add(ReservationService, workers=self.conf.reservation.workers, @@ -126,6 +131,7 @@ class ReservationService(cotyledon.Service): self.kwargs = kwargs self.Plan = kwargs.get('plan_class') + self.OrderLock = kwargs.get('order_locks') # Set up the RPC service(s) we want to talk to. self.data_service = self.setup_rpc(conf, "data") @@ -156,12 +162,11 @@ class ReservationService(cotyledon.Service): Use this only when the reservation service is not running concurrently. """ - plans = self.Plan.query.all() + plans = self.Plan.query.get_plan_by_col("status", self.Plan.RESERVING) for the_plan in plans: - if the_plan.status == self.Plan.RESERVING: - the_plan.status = self.Plan.SOLVED - # Use only in active-passive mode, so don't have to be atomic - the_plan.update() + the_plan.status = self.Plan.SOLVED + # Use only in active-passive mode, so don't have to be atomic + the_plan.update() def _restart(self): """Prepare to restart the service""" @@ -239,6 +244,7 @@ class ReservationService(cotyledon.Service): LOG.debug("%s" % self.__class__.__name__) # TODO(snarayanan): This is really meant to be a control loop # As long as self.running is true, we process another request. + while self.running: # Delay time (Seconds) for MUSIC requests. @@ -252,12 +258,22 @@ class ReservationService(cotyledon.Service): translation = None p = None # requests_to_reserve = dict() - plans = self.Plan.query.all() + + # Instead of using the query.all() method, now creating an index for 'status' + # field in conductor.plans table, and query plans by status columns + solved_plans = self.Plan.query.get_plan_by_col("status", self.Plan.SOLVED) + reserving_plans = self.Plan.query.get_plan_by_col("status", self.Plan.RESERVING) + + # combine the plans with status = 'solved' and 'reserving' together + plans = solved_plans + reserving_plans + found_solved_template = False for p in plans: + # when a plan is in RESERVING status more than timeout value if p.status == self.Plan.RESERVING and \ (self.current_time_seconds() - self.millisec_to_sec(p.updated)) > self.conf.reservation.timeout: + # change the plan status to SOLVED for another VM to reserve p.status = self.Plan.SOLVED p.update(condition=self.reservating_status_condition) break @@ -267,16 +283,17 @@ class ReservationService(cotyledon.Service): found_solved_template = True break + if not solution: + if found_solved_template: + message = _LE("Plan {} status is solved, yet " + "the solution wasn't found").format(p.id) + LOG.error(message) + p.status = self.Plan.ERROR + p.message = message + p.update(condition=self.solved_status_condition) + continue - if found_solved_template and not solution: - message = _LE("Plan {} status is solved, yet " - "the solution wasn't found").format(p.id) - LOG.error(message) - p.status = self.Plan.ERROR - p.message = message - p.update(condition=self.solved_status_condition) - continue # continue looping - elif found_solved_template and p and p.reservation_counter >= self.conf.reservation.max_reservation_counter: + if found_solved_template and p and p.reservation_counter >= self.conf.reservation.max_reservation_counter: message = _LE("Tried {} times. Plan {} is unable to reserve") \ .format(self.conf.reservation.max_reservation_counter, p.id) LOG.error(message) @@ -284,8 +301,6 @@ class ReservationService(cotyledon.Service): p.message = message p.update(condition=self.solved_status_condition) continue - elif not solution: - continue # continue looping log_util.setLoggerFilter(LOG, self.conf.keyspace, p.id) @@ -296,9 +311,14 @@ class ReservationService(cotyledon.Service): p.reservation_owner = socket.gethostname() _is_updated = p.update(condition=self.solved_status_condition) + if not _is_updated: + continue + if 'FAILURE' in _is_updated: continue + LOG.info(_LI("Reservation starts, changing the template status from solved to reserving, " + "atomic update response from MUSIC {}").format(_is_updated)) LOG.info(_LI("Plan {} with request id {} is reserving by machine {}. Tried to reserve it for {} times."). format(p.id, p.name, p.reservation_owner, p.reservation_counter)) @@ -306,7 +326,7 @@ class ReservationService(cotyledon.Service): # if plan needs reservation proceed with reservation # else set status to done. reservations = None - _is_success = 'FAILURE | Could not acquire lock' + _is_success = "FAILURE" if translation: conductor_solver = translation.get("conductor_solver") @@ -320,85 +340,139 @@ class ReservationService(cotyledon.Service): recommendations = solution.get("recommendations") reservation_list = list() + # TODO(larry) combine the two reservation logic as one, make the code service independent + sdwan_candidate_list = list() + service_model = reservations.get("service_model") - for reservation, resource in reservations.get("demands", - {}).items(): + for reservation, resource in reservations.get("demands", {}).items(): candidates = list() - reservation_demand = resource.get("demand") + reservation_demand = resource.get("demands") reservation_name = resource.get("name") reservation_type = resource.get("type") reservation_properties = resource.get("properties") if reservation_properties: - controller = reservation_properties.get( - "controller") + controller = reservation_properties.get("controller") request = reservation_properties.get("request") for recommendation in recommendations: for demand, r_resource in recommendation.items(): if demand == reservation_demand: # get selected candidate from translation - selected_candidate_id = \ - r_resource.get("candidate")\ - .get("candidate_id") - demands = \ - translation.get("conductor_solver")\ - .get("demands") - for demand_name, d_resource in \ - demands.items(): + selected_candidate_id = r_resource.get("candidate").get("candidate_id") + demands = translation.get("conductor_solver").get("demands") + for demand_name, d_resource in demands.items(): if demand_name == demand: - for candidate in d_resource\ - .get("candidates"): - if candidate\ - .get("candidate_id") ==\ - selected_candidate_id: - candidates\ - .append(candidate) + for candidate in d_resource.get("candidates"): + if candidate.get("candidate_id") == selected_candidate_id: + candidate['request'] = request + candidates.append(candidate) + sdwan_candidate_list.append(candidate) + + #TODO(larry) combine the two reservation logic as one, make the code service independent + if service_model == "ADIOD": + is_success = self.try_reservation_call( + method="reserve", + candidate_list=candidates, + reservation_type=service_model, + controller=controller, + request=request, + reservation_name=None + ) + + # if reservation succeed continue with next candidate + if is_success: + curr_reservation = dict() + curr_reservation['candidate_list'] = candidates + curr_reservation['reservation_name'] = reservation_name + curr_reservation['reservation_type'] = reservation_type + curr_reservation['controller'] = controller + curr_reservation['request'] = request + reservation_list.append(curr_reservation) + + else: + # begin roll back of all reserved resources on + # the first failed reservation + rollback_status = \ + self.rollback_reservation(reservation_list) + + # order_lock spin-up rollback + for decision in solution.get('recommendations'): + + candidate = decision.values()[0].get('candidate') + if candidate.get('inventory_type') == 'cloud': + # TODO(larry) change the code to get('conflict_id') instead of 'location_id' + conflict_id = candidate.get('conflict_id') + order_record = self.OrderLock.query.get_plan_by_col("id", conflict_id)[0] + if order_record: + order_record.delete() + # statuses + if rollback_status: + # released all reservations, + # move plan to translated + if p.reservation_counter >= self.conf.reservation.max_reservation_counter: + p.status = self.Plan.ERROR + p.message = _LE("Tried {} times. Plan {} is unable to reserve").format(self.conf.reservation.max_reservation_counter, p.id) + LOG.error(p.message) + else: + p.status = self.Plan.TRANSLATED + # TODO(larry): Should be replaced by the new api from MUSIC + while 'FAILURE' in _is_success: + _is_success = p.update(condition=self.reservation_owner_condition) + LOG.info(_LI("Rolling back the template from reserving to {} status, " + "atomic update response from MUSIC {}").format(p.status, _is_success)) + del reservation_list[:] + else: + LOG.error("Reservation rollback failed") + p.status = self.Plan.ERROR + p.message = "Reservation release failed" + # TODO(larry): Should be replaced by the new api from MUSIC + while 'FAILURE' in _is_success: + _is_success = p.update(condition=self.reservation_owner_condition) + LOG.info(_LI("Rollback Failed, Changing the template status from reserving to error, " + "atomic update response from MUSIC {}").format(_is_success)) + break # reservation failed + + continue + # continue with reserving the next candidate + + # TODO(larry) combine the two reservation logic as one, make the code service independent + if service_model == "DHV": is_success = self.try_reservation_call( method="reserve", - candidate_list=candidates, - reservation_name=reservation_name, - reservation_type=reservation_type, + candidate_list=sdwan_candidate_list, + reservation_type=service_model, controller=controller, - request=request) - - # if reservation succeeds continue with next candidate - if is_success: - curr_reservation = dict() - curr_reservation['candidate_list'] = candidates - curr_reservation['reservation_name'] = \ - reservation_name - curr_reservation['reservation_type'] = \ - reservation_type - curr_reservation['controller'] = controller - curr_reservation['request'] = request - reservation_list.append(curr_reservation) - else: - # begin roll back of all reserved resources on - # the first failed reservation - rollback_status = \ - self.rollback_reservation(reservation_list) - # statuses - if rollback_status: - # released all reservations, - # move plan to translated - p.status = self.Plan.TRANSLATED - # TODO(larry): Should be replaced by the new api from MUSIC - while 'FAILURE | Could not acquire lock' in _is_success: - _is_success = p.update(condition=self.reservation_owner_condition) - del reservation_list[:] - else: - LOG.error("Reservation rollback failed") + request=request, + reservation_name=None + ) + + if not is_success: + # order_lock spin-up rollback + for decision in solution.get('recommendations'): + + candidate = decision.values()[0].get('candidate') + if candidate.get('inventory_type') == 'cloud': + conflict_id = candidate.get('conflict_id') + order_record = self.OrderLock.query.get_plan_by_col("id", conflict_id)[0] + if order_record: + order_record.delete() + + if p.reservation_counter >= self.conf.reservation.max_reservation_counter: p.status = self.Plan.ERROR - p.message = "Reservation release failed" - # TODO(larry): Should be replaced by the new api from MUSIC - while 'FAILURE | Could not acquire lock' in _is_success: - _is_success = p.update(condition=self.reservation_owner_condition) - break # reservation failed + p.message = _LE("Tried {} times. Plan {} is unable to reserve").format( + self.conf.reservation.max_reservation_counter, p.id) + LOG.error(p.message) + else: + p.status = self.Plan.TRANSLATED - continue - # continue with reserving the next candidate + # TODO(larry): Should be replaced by the new api from MUSIC + while 'FAILURE' in _is_success: + _is_success = p.update(condition=self.reservation_owner_condition) + LOG.info(_LI("Rolling back the template from reserving to {} status, " + "atomic update response from MUSIC {}").format(p.status, _is_success)) + del reservation_list[:] # verify if all candidates have been reserved if p.status == self.Plan.RESERVING: @@ -408,9 +482,10 @@ class ReservationService(cotyledon.Service): LOG.debug("Plan {} Reservation complete".format(p.id)) p.status = self.Plan.DONE - while 'FAILURE | Could not acquire lock' in _is_success: + while 'FAILURE' in _is_success and (self.current_time_seconds() - self.millisec_to_sec(p.updated)) <= self.conf.reservation.timeout: _is_success = p.update(condition=self.reservation_owner_condition) - + LOG.info(_LI("Reservation is complete, changing the template status from reserving to done, " + "atomic update response from MUSIC {}").format(_is_success)) continue # done reserving continue to loop @@ -425,4 +500,3 @@ class ReservationService(cotyledon.Service): """Reload""" LOG.debug("%s" % self.__class__.__name__) self._restart() - diff --git a/conductor/conductor/service.py b/conductor/conductor/service.py index d5bf348..df5bffc 100644 --- a/conductor/conductor/service.py +++ b/conductor/conductor/service.py @@ -17,7 +17,6 @@ # ------------------------------------------------------------------------- # -# import socket import sys # from keystoneauth1 import loading as ka_loading @@ -47,9 +46,12 @@ OPTS = [ default='conductor', help='Music keyspace for content'), cfg.IntOpt('delay_time', - default=2, - help='Delay time (Seconds) for MUSIC requests. Set it to 2 seconds ' - 'by default.'), + default=2, + help='Delay time (Seconds) for MUSIC requests. Set it to 2 seconds ' + 'by default.'), + #TODO(larry): move to a new section [feature_supported] in config file + cfg.BoolOpt('HPA_enabled', + default=True) ] cfg.CONF.register_opts(OPTS) diff --git a/conductor/conductor/solver/optimizer/best_first.py b/conductor/conductor/solver/optimizer/best_first.py index 65e435d..3b579c6 100755 --- a/conductor/conductor/solver/optimizer/best_first.py +++ b/conductor/conductor/solver/optimizer/best_first.py @@ -1,4 +1,3 @@ -#!/bin/python # # ------------------------------------------------------------------------- # Copyright (c) 2015-2017 AT&T Intellectual Property @@ -18,7 +17,6 @@ # ------------------------------------------------------------------------- # - import copy import operator from oslo_log import log @@ -143,7 +141,7 @@ class BestFirst(search.Search): best_resource = None for candidate in candidate_list: _decision_path.decisions[demand.name] = candidate - _objective.compute(_decision_path) + _objective.compute(_decision_path) #TODO call the compute of latencyBetween if _objective.goal == "min": if _decision_path.total_value < bound_value: bound_value = _decision_path.total_value diff --git a/conductor/conductor/solver/optimizer/fit_first.py b/conductor/conductor/solver/optimizer/fit_first.py index 1316658..62e011d 100755 --- a/conductor/conductor/solver/optimizer/fit_first.py +++ b/conductor/conductor/solver/optimizer/fit_first.py @@ -1,4 +1,3 @@ -#!/bin/python # # ------------------------------------------------------------------------- # Copyright (c) 2015-2017 AT&T Intellectual Property @@ -18,13 +17,13 @@ # ------------------------------------------------------------------------- # - from oslo_log import log import sys import time from conductor.solver.optimizer import decision_path as dpath from conductor.solver.optimizer import search +from conductor.solver.triage_tool.triage_data import TriageData LOG = log.getLogger(__name__) @@ -34,10 +33,12 @@ class FitFirst(search.Search): def __init__(self, conf): search.Search.__init__(self, conf) - def search(self, _demand_list, _objective, _request, _begin_time): + def search(self, _demand_list, _objective, _request): decision_path = dpath.DecisionPath() decision_path.set_decisions({}) + _begin_time = int(round(time.time())) + # Begin the recursive serarch return self._find_current_best( _demand_list, _objective, decision_path, _request, _begin_time) @@ -45,8 +46,12 @@ class FitFirst(search.Search): def _find_current_best(self, _demand_list, _objective, _decision_path, _request, _begin_time): - _current_time = int(round(time.time())) - if (_current_time - _begin_time) > self.conf.solver.solver_timeout: + self.triageSolver.getSortedDemand(_demand_list) + # Termination condition: + # when order takes a long time to solve (more than 'timeout' value) + # then jump out of the recursion + if (int(round(time.time())) - _begin_time) > \ + self.conf.solver.solver_timeout: return None # _demand_list is common across all recursions @@ -62,7 +67,6 @@ class FitFirst(search.Search): # call constraints to whittle initial candidates # candidate_list meets all constraints for the demand candidate_list = self._solve_constraints(_decision_path, _request) - # find the best candidate among the list # bound_value keeps track of the max value discovered @@ -89,17 +93,14 @@ class FitFirst(search.Search): if _objective.goal is None: best_resource = candidate - # @Shankar, the following string value was renamed to 'min_cloud_version' in ONAP version (probably to - # ignore the word 'aic' like in other places). Looks like this will break things up in ECOMP. - # Renamed to older value 'min_aic'. elif _objective.goal == "min_aic": # convert the unicode to string candidate_version = candidate \ .get("cloud_region_version").encode('utf-8') if _decision_path.total_value < bound_value or \ (_decision_path.total_value == bound_value and - self._compare_version(candidate_version, - version_value) > 0): + self._compare_version(candidate_version, + version_value) > 0): bound_value = _decision_path.total_value version_value = candidate_version best_resource = candidate @@ -123,6 +124,7 @@ class FitFirst(search.Search): # candidate) back in the list so that it can be picked # up in the next iteration of the recursion _demand_list.insert(0, demand) + self.triageSolver.rollBackStatus(_decision_path.current_demand,_decision_path) return None # return None back to the recursion else: # best resource is found, add to the decision path diff --git a/conductor/conductor/solver/optimizer/optimizer.py b/conductor/conductor/solver/optimizer/optimizer.py index 39d2bcb..a36124c 100755 --- a/conductor/conductor/solver/optimizer/optimizer.py +++ b/conductor/conductor/solver/optimizer/optimizer.py @@ -1,4 +1,3 @@ -#!/bin/python # # ------------------------------------------------------------------------- # Copyright (c) 2015-2017 AT&T Intellectual Property @@ -18,11 +17,12 @@ # ------------------------------------------------------------------------- # - from oslo_config import cfg from oslo_log import log +import copy import time + from conductor import service # from conductor.solver.optimizer import decision_path as dpath # from conductor.solver.optimizer import best_first @@ -30,6 +30,7 @@ from conductor import service from conductor.solver.optimizer import fit_first from conductor.solver.optimizer import random_pick from conductor.solver.request import demand +from conductor.solver.triage_tool.triage_data import TriageData LOG = log.getLogger(__name__) @@ -65,49 +66,84 @@ class Optimizer(object): # that cleanup. Also, Shankar has confirmed solver/simulators folder needs # to go away. Commenting out for now - may be should be removed permanently. # Shankar (TODO). - # else: - # ''' for simulation ''' - # req_sim = request_simulator.RequestSimulator(self.conf) - # req_sim.generate_requests() - # self.requests = req_sim.requests + # ''' for simulation ''' + # req_sim = request_simulator.RequestSimulator(self.conf) + # req_sim.generate_requests() + # self.requests = req_sim.requests - def get_solution(self): - LOG.debug("search start") + def get_solution(self, num_solutions): + LOG.debug("search start") for rk in self.requests: request = self.requests[rk] LOG.debug("--- request = {}".format(rk)) + decision_list = list() + LOG.debug("1. sort demands") demand_list = self._sort_demands(request) - for d in demand_list: LOG.debug(" demand = {}".format(d.name)) LOG.debug("2. search") - st = time.time() - - if not request.objective.goal: - LOG.debug("No objective function is provided. " - "Random pick algorithm is used") - self.search = random_pick.RandomPick(self.conf) - best_path = self.search.search(demand_list, request) - else: - LOG.debug("Fit first algorithm is used") - self.search = fit_first.FitFirst(self.conf) - best_path = self.search.search(demand_list, - request.objective, request, - self._begin_time) - - if best_path is not None: - self.search.print_decisions(best_path) - else: - LOG.debug("no solution found") - LOG.debug("search delay = {} sec".format(time.time() - st)) - return best_path + + while (num_solutions == 'all' or num_solutions > 0): + + LOG.debug("searching for the solution {}".format(len(decision_list) + 1)) + + st = time.time() + _copy_demand_list = copy.deepcopy(demand_list) + + if not request.objective.goal: + LOG.debug("No objective function is provided. " + "Random pick algorithm is used") + self.search = random_pick.RandomPick(self.conf) + best_path = self.search.search(demand_list, request) + else: + LOG.debug("Fit first algorithm is used") + self.search = fit_first.FitFirst(self.conf) + best_path = self.search.search(demand_list, + request.objective, request) + + if best_path is not None: + self.search.print_decisions(best_path) + else: + LOG.debug("no solution found") + break + + LOG.debug("search delay = {} sec".format(time.time() - st)) + + # add the current solution to decision_list + decision_list.append(best_path.decisions) + + #remove the candidate with "uniqueness = true" + demand_list = copy.deepcopy(_copy_demand_list) + self._remove_unique_candidate(request, best_path, demand_list) + + if num_solutions != 'all': + num_solutions -= 1 + self.search.triageSolver.getSolution(decision_list) + return decision_list + + def _remove_unique_candidate(self, _request, current_decision, demand_list): + + # This method is to remove previous solved/used candidate from consideration + # when Conductor needs to provide multiple solutions to the user/client + + for demand_name, candidate_attr in current_decision.decisions.items(): + candidate_uniqueness = candidate_attr.get('uniqueness') + if candidate_uniqueness and candidate_uniqueness == 'true': + # if the candidate uniqueness is 'false', then remove + # that solved candidate from the translated candidates list + _request.demands[demand_name].resources.pop(candidate_attr.get('candidate_id')) + # update the demand_list + for demand in demand_list: + if(getattr(demand, 'name') == demand_name): + demand.resources = _request.demands[demand_name].resources def _sort_demands(self, _request): + LOG.debug(" _sort_demands") demand_list = [] # first, find loc-demand dependencies @@ -115,13 +151,22 @@ class Optimizer(object): open_demand_list = [] for key in _request.constraints: c = _request.constraints[key] - if c.constraint_type == "distance_to_location": + if c.constraint_type == "access_distance": for dk in c.demand_list: if _request.demands[dk].sort_base != 1: _request.demands[dk].sort_base = 1 open_demand_list.append(_request.demands[dk]) for op in _request.objective.operand_list: - if op.function.func_type == "distance_between": + if op.function.func_type == "latency_between": #TODO do i need to include the region_group here? + if isinstance(op.function.loc_a, demand.Location): + if _request.demands[op.function.loc_z.name].sort_base != 1: + _request.demands[op.function.loc_z.name].sort_base = 1 + open_demand_list.append(op.function.loc_z) + elif isinstance(op.function.loc_z, demand.Location): + if _request.demands[op.function.loc_a.name].sort_base != 1: + _request.demands[op.function.loc_a.name].sort_base = 1 + open_demand_list.append(op.function.loc_a) + elif op.function.func_type == "distance_between": if isinstance(op.function.loc_a, demand.Location): if _request.demands[op.function.loc_z.name].sort_base != 1: _request.demands[op.function.loc_z.name].sort_base = 1 @@ -162,7 +207,8 @@ class Optimizer(object): for key in _request.constraints: c = _request.constraints[key] - if c.constraint_type == "distance_between_demands": + # FIXME(snarayanan): "aic" only to be known by conductor-data + if c.constraint_type == "aic_distance": if d.name in c.demand_list: for dk in c.demand_list: if dk != d.name and \ @@ -172,7 +218,25 @@ class Optimizer(object): _request.demands[dk]) for op in _request.objective.operand_list: - if op.function.func_type == "distance_between": + if op.function.func_type == "latency_between": #TODO + if op.function.loc_a.name == d.name: + if op.function.loc_z.name in \ + _request.demands.keys(): + if _request.demands[ + op.function.loc_z.name].sort_base != 1: + _request.demands[ + op.function.loc_z.name].sort_base = 1 + _open_demand_list.append(op.function.loc_z) + elif op.function.loc_z.name == d.name: + if op.function.loc_a.name in \ + _request.demands.keys(): + if _request.demands[ + op.function.loc_a.name].sort_base != 1: + _request.demands[ + op.function.loc_a.name].sort_base = 1 + _open_demand_list.append(op.function.loc_a) + + elif op.function.func_type == "distance_between": if op.function.loc_a.name == d.name: if op.function.loc_z.name in \ _request.demands.keys(): @@ -201,14 +265,3 @@ class Optimizer(object): break return not_sorted_demand - -# Used for testing. This file is in .gitignore and will NOT be checked in. -CONFIG_FILE = '' - -''' for unit test ''' -if __name__ == "__main__": - # Prepare service-wide components (e.g., config) - conf = service.prepare_service([], config_files=[CONFIG_FILE]) - - opt = Optimizer(conf) - opt.get_solution() diff --git a/conductor/conductor/solver/optimizer/random_pick.py b/conductor/conductor/solver/optimizer/random_pick.py index 2896757..3130d8f 100644 --- a/conductor/conductor/solver/optimizer/random_pick.py +++ b/conductor/conductor/solver/optimizer/random_pick.py @@ -31,13 +31,22 @@ class RandomPick(search.Search): search.Search.__init__(self, conf) def search(self, _demand_list, _request): + decision_path = dpath.DecisionPath() decision_path.set_decisions({}) + return self._find_current_best(_demand_list, decision_path, _request) def _find_current_best(self, _demand_list, _decision_path, _request): + for demand in _demand_list: - r_index = randint(0, len(demand.resources) - 1) - best_resource = demand.resources[demand.resources.keys()[r_index]] + # apply the constraints on all candidates first + _decision_path.current_demand = demand + candidate_list = self._solve_constraints(_decision_path, _request) + + # random pick one candidate + r_index = randint(0, len(candidate_list) - 1) + best_resource = candidate_list[r_index] _decision_path.decisions[demand.name] = best_resource + return _decision_path diff --git a/conductor/conductor/solver/optimizer/search.py b/conductor/conductor/solver/optimizer/search.py index 9d138e4..9c4fe46 100755 --- a/conductor/conductor/solver/optimizer/search.py +++ b/conductor/conductor/solver/optimizer/search.py @@ -1,4 +1,3 @@ -#!/bin/python # # ------------------------------------------------------------------------- # Copyright (c) 2015-2017 AT&T Intellectual Property @@ -18,11 +17,11 @@ # ------------------------------------------------------------------------- # - from operator import itemgetter from oslo_log import log from conductor.solver.optimizer import decision_path as dpath +from conductor.solver.triage_tool.triage_data import TriageData LOG = log.getLogger(__name__) @@ -31,6 +30,7 @@ class Search(object): def __init__(self, conf): self.conf = conf + self.triageSolver = TriageData() def search(self, _demand_list, _objective): decision_path = dpath.DecisionPath() @@ -42,40 +42,63 @@ class Search(object): def _solve_constraints(self, _decision_path, _request): candidate_list = [] + solver={} for key in _decision_path.current_demand.resources: resource = _decision_path.current_demand.resources[key] candidate_list.append(resource) + self.assignNodeId(candidate_list, _decision_path.current_demand.name) + self.triageSolver.aasignNodeIdToCandidate(candidate_list, _decision_path.current_demand, _request.request_id, + _request.plan_id) for constraint in _decision_path.current_demand.constraint_list: LOG.debug("Evaluating constraint = {}".format(constraint.name)) LOG.debug("Available candidates before solving " "constraint {}".format(candidate_list)) + candidates_before = candidate_list + solver['candidate_before_list'] = candidate_list candidate_list =\ constraint.solve(_decision_path, candidate_list, _request) LOG.debug("Available candidates after solving " "constraint {}".format(candidate_list)) + solver['constraint_name_for_can'] = constraint.name + solver['solver_demand_name'] = _decision_path.current_demand.name + solver['candidate_after_list']=candidate_list + candidate_after = candidate_list if len(candidate_list) == 0: LOG.error("No candidates found for demand {} " "when constraint {} was evaluated " "".format(_decision_path.current_demand, constraint.name) ) + self.dropped_candidate( solver['candidate_before_list'], candidate_after, constraint.name, _decision_path.current_demand.name) break + self.dropped_candidate(solver['candidate_before_list'], candidate_after, constraint.name, + _decision_path.current_demand.name) + self.triageSolver.checkCandidateAfter(solver) if len(candidate_list) > 0: self._set_candidate_cost(candidate_list) - return candidate_list def _set_candidate_cost(self, _candidate_list): - for c in _candidate_list: - if c["inventory_type"] == "service": - c["cost"] = "1" - else: - c["cost"] = "2" _candidate_list[:] = sorted(_candidate_list, key=itemgetter("cost")) - + def dropped_candidate(self,candidates_before, candidate_after, constraint_name, demand_name): + dropped_candidate = [] + for dc in candidates_before: + if dc not in candidate_after: + dropped_details={} + dropped_details['constraint_name_dropped'] = constraint_name + dropped_details['name'] = demand_name + dc['constraints'].append(dropped_details) + dropped_candidate.append(dc) + self.triageSolver.droppedCadidatesStatus(dropped_candidate) + def assignNodeId(self, candidate_list, demand_name): + for cr in candidate_list: + if not 'node_id' in cr: + cr['name'] = demand_name + cr['node_id'] = (demand_name + '|' + cr['candidate_id']) + cr['constraints'] = [] def print_decisions(self, _best_path): if _best_path: msg = "--- demand = {}, chosen resource = {} at {}" @@ -88,3 +111,5 @@ class Search(object): LOG.debug(msg.format(_best_path.total_value)) msg = "--- total cost of decision = {}" LOG.debug(msg.format(_best_path.total_cost)) + + diff --git a/conductor/conductor/solver/orders_lock/__init__.py b/conductor/conductor/solver/orders_lock/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/conductor/conductor/solver/orders_lock/__init__.py diff --git a/conductor/conductor/solver/orders_lock/orders_lock_service.py b/conductor/conductor/solver/orders_lock/orders_lock_service.py new file mode 100644 index 0000000..7cb4de9 --- /dev/null +++ b/conductor/conductor/solver/orders_lock/orders_lock_service.py @@ -0,0 +1,192 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json +import time +from oslo_log import log +from oslo_config import cfg +from conductor.common.models.plan import Plan +from conductor.common.models.order_lock import OrderLock +from conductor.common.music.model import base + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + +class OrdersLockingService(object): + + def __init__(self): + self.Plan = base.create_dynamic_model(keyspace=CONF.keyspace, baseclass=Plan, classname="Plan") + self.OrderLock = base.create_dynamic_model( + keyspace=CONF.keyspace, baseclass=OrderLock, classname="OrderLock") + + def get_order_by_resource_id(self, service_resource_id): + return self.OrderLock.query.get(service_resource_id) + + def _get_plans_by_id(self, order_id): + + order_locks = self.OrderLock.query.get_plan_by_col("id", order_id) + order_record = order_locks[0] + if order_record: + LOG.debug("Getting Order lock record {} based on conflict id {}".format(order_record, order_id)) + return getattr(order_record, 'plans') + + def _update_order_status(self, rehome_status, plans, order_lock): + + updated_plan_statuses = dict() + for plan_id, plan_attributes in plans.items(): + # Convert the string into dictionary for plans field + plan_dict = json.loads(plan_attributes) + # Change the status to 'completed' if previous status is 'under_spin_up' + # else change to 'rehome' + + if rehome_status == OrderLock.FAILED: + new_status = OrderLock.FAILED + else: + if plan_dict.get('status') == OrderLock.UNDER_SPIN_UP: + new_status = OrderLock.COMPLETED + else: + new_status = OrderLock.REHOME + + updated_fields = { + "status": new_status, + "updated": str(self.current_time_millis()) + } + values = { + "id": order_lock.id, + "is_spinup_completed": True, + "spinup_completed_timestamp": self.current_time_millis() + } + order_lock.update(plan_id, updated_fields, values) + updated_plan_statuses[plan_id] = new_status + + return updated_plan_statuses + + # TODO(Saisree) + def update_order_status_and_get_effected_plans(self, rehome_status, service_resource_id): + # A music call to orders-lock table to update the status of the plans AND get this + # list of effected plans for this service_sersource_id - hardcoded + + effected_plans = dict() + order_locks = self.OrderLock.query.all() + + for order_lock_record in order_locks: + + plans = getattr(order_lock_record, 'plans') + for plan_id, plan_attributes in plans.items(): + # Convert the string into dictionary for plans field + plan_dict = json.loads(plan_attributes) + + # check if the service_resource_id is matched and the status is 'under spin-up' + if plan_dict.get('service_resource_id', None) == service_resource_id and \ + plan_dict.get('status', None) == OrderLock.UNDER_SPIN_UP: + + # update the status of the plans in order_locks table + self._update_order_status(rehome_status, plans, order_lock_record) + + # get the latest plans from order_locks table + effected_plans = self._get_plans_by_id(getattr(order_lock_record, 'id')) + break + + return effected_plans + + def _update_failed_plan(self, plan_id, service_resource_id): + + # update the waiting/pending plan status to 'error' with + # proper error message if MSO spin-up fails + p = self.Plan.query.get_plan_by_col("id", plan_id)[0] + if p and p.status == p.WAITING_SPINUP: + p.status = p.ERROR + p.message = "Error due to failed cloud candidate " \ + "spin-up in MSO (service_resource_id: {}).".format(service_resource_id) + p.update() + + # TODO(Ikram) + def rehomes_for_service_resource(self, rehome_status, service_resource_id, rehome_decisions): + # Sample of expected output from this method is as follows + # rehomes = { + # {"plan_id": "p1", "should_rehome": True}, + # {"plan_id": "p2", "should_rehome": True}, + # {"plan_id": "p3", "should_rehome": True} + # } + + effected_plans = self.update_order_status_and_get_effected_plans(rehome_status, service_resource_id) + LOG.debug("The effected plan list {} for service_resource_id" + " {} MSO release call".format(effected_plans, service_resource_id)) + + if effected_plans: + + for plan_id, plan_attribute in effected_plans.items(): + rehome_this_plan = True + rehome_decision_record = dict() + # Should we take a decision just on the basis of this? What if the status of the plan was not + # successfully set to 'rehome' but it actually DOES need a rehome based on the data from the order Q? + + #convert the string to JSON format + plan = json.loads(plan_attribute) + + if plan['status'] == OrderLock.FAILED: + self._update_failed_plan(plan_id, service_resource_id) + continue + + elif plan['status'] != OrderLock.REHOME: + LOG.info("Though effected, but will not retrigger " + "the plan {}, since plan.status {} is not 'rehome'.".format(plan_id, plan['status'])) + # What if the plan should be rehomed based on the Order Q data - this means, this infromation is stale + # Q must be clearned up - i.e. update plan status in the order Q to + # completed (or whatever plan.status is) + continue + + order_locks = self.OrderLock.query.all() + + # Go through the order_lock table + for order_lock_record in order_locks: + plans = getattr(order_lock_record, 'plans') + if plan_id in plans: + # Convert the string into dictionary for plans field + plan_dict = json.loads(plans[plan_id]) + # check if there is a record that is not able to 'rehome' (which means the status in order_locks.plans is not 'rehome' or 'under-spin-up' + if plan_dict.get('status', None) not in OrderLock.REHOMABLE: + rehome_this_plan = False + break + + rehome_decision_record['plan_id'] = plan_id + rehome_decision_record['should_rehome'] = rehome_this_plan + rehome_decisions.append(rehome_decision_record) + + return rehome_decisions + + def do_rehome(self, rehome_decisions): + + for decision in rehome_decisions: + if decision.get('should_rehome'): + LOG.info("Retriggering plan {}.".format(decision['plan_id'])) + self.retrigger_plan(decision['plan_id']) + else: + LOG.info("Will not retrigger plan {}.".format(decision['plan_id'])) + + def retrigger_plan(self, plan_id): + # update plan table set status to 'template' for plan_id + plan = self.Plan.query.get_plan_by_col("id", plan_id)[0] + plan.rehome_plan() + plan.update() + return + + def current_time_millis(self): + """Current time in milliseconds.""" + return int(round(time.time() * 1000))
\ No newline at end of file diff --git a/conductor/conductor/solver/request/functions/aic_version.py b/conductor/conductor/solver/request/functions/aic_version.py index feed1f5..b86cf08 100755..100644 --- a/conductor/conductor/solver/request/functions/aic_version.py +++ b/conductor/conductor/solver/request/functions/aic_version.py @@ -1,5 +1,21 @@ -#!/usr/bin/env python - +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# class AICVersion(object): diff --git a/conductor/conductor/solver/request/functions/cost.py b/conductor/conductor/solver/request/functions/cost.py index 2e1a29d..5d0affa 100755..100644 --- a/conductor/conductor/solver/request/functions/cost.py +++ b/conductor/conductor/solver/request/functions/cost.py @@ -1,5 +1,21 @@ -#!/usr/bin/env python - +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# class Cost(object): diff --git a/conductor/conductor/solver/request/functions/latency_between.py b/conductor/conductor/solver/request/functions/latency_between.py new file mode 100644 index 0000000..15f5489 --- /dev/null +++ b/conductor/conductor/solver/request/functions/latency_between.py @@ -0,0 +1,35 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from conductor.solver.utils import utils + +class LatencyBetween(object): + def __init__(self, _type): + self.func_type = _type + + self.loc_a = None + self.loc_z = None + self.region_group = None + + def compute(self, _loc_a, _loc_z): + latency = utils.compute_latency_score(_loc_a, _loc_z, self.region_group) + + return latency + + diff --git a/conductor/conductor/solver/request/objective.py b/conductor/conductor/solver/request/objective.py index d957581..526a889 100755 --- a/conductor/conductor/solver/request/objective.py +++ b/conductor/conductor/solver/request/objective.py @@ -1,7 +1,6 @@ -#!/usr/bin/env python # # ------------------------------------------------------------------------- -# Copyright (c) 2015-2017 AT&T Intellectual Property +# Copyright (c) 2015-2018 AT&T Intellectual Property # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,7 +17,6 @@ # ------------------------------------------------------------------------- # - from conductor.solver.request import demand # from conductor.solver.resource import region # from conductor.solver.resource import service @@ -54,12 +52,13 @@ class Operand(object): def compute(self, _decision_path, _request): value = 0.0 cei = _request.cei - if self.function.func_type == "distance_between": + if self.function.func_type == "latency_between": if isinstance(self.function.loc_a, demand.Location): if self.function.loc_z.name in \ _decision_path.decisions.keys(): resource = \ _decision_path.decisions[self.function.loc_z.name] + candidate_cost = resource.get('cost') loc = None # if isinstance(resource, region.Region): # loc = resource.location @@ -67,12 +66,14 @@ class Operand(object): # loc = resource.region.location loc = cei.get_candidate_location(resource) value = \ - self.function.compute(self.function.loc_a.value, loc) + self.function.compute(self.function.loc_a.value, loc) \ + + candidate_cost elif isinstance(self.function.loc_z, demand.Location): if self.function.loc_a.name in \ _decision_path.decisions.keys(): resource = \ _decision_path.decisions[self.function.loc_a.name] + candidate_cost = resource.get('cost') loc = None # if isinstance(resource, region.Region): # loc = resource.location @@ -80,7 +81,8 @@ class Operand(object): # loc = resource.region.location loc = cei.get_candidate_location(resource) value = \ - self.function.compute(self.function.loc_z.value, loc) + self.function.compute(self.function.loc_z.value, loc) \ + + candidate_cost else: if self.function.loc_a.name in \ _decision_path.decisions.keys() and \ @@ -105,24 +107,73 @@ class Operand(object): value = self.function.compute(loc_a, loc_z) - elif self.function.func_type == "cost": - for demand_name, candidate_info in _decision_path.decisions.items(): - value += float(candidate_info['cost']) - elif self.function.func_type == "hpa_score": # Currently only minimize objective goal is supported # Higher the HPA score the better. # Invert HPA Score if goal is minimize invert = -1 - # # if self.function.goal == "max": # invert = 1 - for demand_name, candidate_info in _decision_path.decisions.items(): hpa_score = invert * float(candidate_info.get('hpa_score', 0)) value += hpa_score + elif self.function.func_type == "distance_between": + if isinstance(self.function.loc_a, demand.Location): + if self.function.loc_z.name in \ + _decision_path.decisions.keys(): + resource = \ + _decision_path.decisions[self.function.loc_z.name] + candidate_cost = resource.get('cost') + loc = None + # if isinstance(resource, region.Region): + # loc = resource.location + # elif isinstance(resource, service.Service): + # loc = resource.region.location + loc = cei.get_candidate_location(resource) + value = \ + self.function.compute(self.function.loc_a.value, loc) \ + + candidate_cost + elif isinstance(self.function.loc_z, demand.Location): + if self.function.loc_a.name in \ + _decision_path.decisions.keys(): + resource = \ + _decision_path.decisions[self.function.loc_a.name] + candidate_cost = resource.get('cost') + loc = None + # if isinstance(resource, region.Region): + # loc = resource.location + # elif isinstance(resource, service.Service): + # loc = resource.region.location + loc = cei.get_candidate_location(resource) + value = \ + self.function.compute(self.function.loc_z.value, loc) \ + + candidate_cost + else: + if self.function.loc_a.name in \ + _decision_path.decisions.keys() and \ + self.function.loc_z.name in \ + _decision_path.decisions.keys(): + resource_a = \ + _decision_path.decisions[self.function.loc_a.name] + loc_a = None + # if isinstance(resource_a, region.Region): + # loc_a = resource_a.location + # elif isinstance(resource_a, service.Service): + # loc_a = resource_a.region.location + loc_a = cei.get_candidate_location(resource_a) + resource_z = \ + _decision_path.decisions[self.function.loc_z.name] + loc_z = None + # if isinstance(resource_z, region.Region): + # loc_z = resource_z.location + # elif isinstance(resource_z, service.Service): + # loc_z = resource_z.region.location + loc_z = cei.get_candidate_location(resource_z) + + value = self.function.compute(loc_a, loc_z) + if self.operation == "product": value *= self.weight diff --git a/conductor/conductor/solver/request/parser.py b/conductor/conductor/solver/request/parser.py index da031cc..fcdb2d5 100755 --- a/conductor/conductor/solver/request/parser.py +++ b/conductor/conductor/solver/request/parser.py @@ -20,6 +20,7 @@ # import json +import collections import operator import random @@ -42,36 +43,38 @@ from conductor.solver.request.functions import aic_version from conductor.solver.request.functions import cost from conductor.solver.request.functions import distance_between from conductor.solver.request.functions import hpa_score +from conductor.solver.request.functions import latency_between +from conductor.solver.request import objective +from conductor.solver.triage_tool.traige_latency import TriageLatency from oslo_log import log -# from conductor.solver.request.functions import distance_between -# from conductor.solver.request import objective -# from conductor.solver.resource import region -# from conductor.solver.resource import service -# from conductor.solver.utils import constraint_engine_interface as cei -# from conductor.solver.utils import utils - LOG = log.getLogger(__name__) # FIXME(snarayanan): This is really a SolverRequest (or Request) object class Parser(object): + demands = None # type: Dict[Any, Any] + locations = None # type: Dict[Any, Any] + obj_func_param = None + def __init__(self, _region_gen=None): self.demands = {} self.locations = {} self.region_gen = _region_gen self.constraints = {} self.objective = None + self.obj_func_param = list() self.cei = None self.request_id = None self.request_type = None + self.region_group = None # def get_data_engine_interface(self): # self.cei = cei.ConstraintEngineInterface() # FIXME(snarayanan): This should just be parse_template - def parse_template(self, json_template=None): + def parse_template(self, json_template=None, country_groups=None, regions_maps=None): if json_template is None: LOG.error("No template specified") return "Error" @@ -238,11 +241,36 @@ class Parser(object): self.objective = objective.Objective() self.objective.goal = input_objective["goal"] self.objective.operation = input_objective["operation"] + self.latencyTriage = TriageLatency() + + LOG.info("objective function params") + for operand_data in input_objective["operands"]: + if operand_data["function"] == "latency_between": + self.obj_func_param.append(operand_data["function_param"][1]) + LOG.info("done - objective function params") for operand_data in input_objective["operands"]: operand = objective.Operand() operand.operation = operand_data["operation"] operand.weight = float(operand_data["weight"]) - if operand_data["function"] == "distance_between": + if operand_data["function"] == "latency_between": + LOG.debug("Processing objective function latency_between") + self.latencyTriage.takeOpimaztionType(operand_data["function"]) + func = latency_between.LatencyBetween("latency_between") + func.region_group = self.assign_region_group_weight(country_groups, regions_maps) + param = operand_data["function_param"][0] + if param in self.locations: + func.loc_a = self.locations[param] + elif param in self.demands: + func.loc_a = self.demands[param] + param = operand_data["function_param"][1] + if param in self.locations: + func.loc_z = self.locations[param] + elif param in self.demands: + func.loc_z = self.demands[param] + operand.function = func + elif operand_data["function"] == "distance_between": + LOG.debug("Processing objective function distance_between") + self.latencyTriage.takeOpimaztionType(operand_data["function"]) func = distance_between.DistanceBetween("distance_between") param = operand_data["function_param"][0] if param in self.locations: @@ -269,6 +297,217 @@ class Parser(object): operand.function = func self.objective.operand_list.append(operand) + self.latencyTriage.updateTriageLatencyDB(self.plan_id, self.request_id) + + def assign_region_group_weight(self, countries, regions): + """ assign the latency group value to the country and returns a map""" + LOG.info("Processing Assigning Latency Weight to Countries ") + + countries = self.resolve_countries(countries, regions, + self.get_candidate_country_list()) # resolve the countries based on region type + region_latency_weight = collections.OrderedDict() + weight = 0 + + if countries is None: + LOG.info("No countries available to assign latency weight ") + return region_latency_weight + + try: + l_weight = '' + for i, e in enumerate(countries): + if e is None: continue + for k, x in enumerate(e.split(',')): + region_latency_weight[x] = weight + l_weight += x + " : " + str(weight) + l_weight += ',' + weight = weight + 1 + LOG.info("Latency Weights Assigned ") + LOG.info(l_weight) + except Exception as err: + LOG.info("Exception while assigning the latency weights " + err) + print(err) + return region_latency_weight + + def get_candidate_country_list(self): + LOG.info("Processing Get Candidate Countries from demands ") + candidate_country_list = list() + try: + + candidate_countries = '' + for demand_id, demands in self.demands.items(): + candidate_countries += demand_id + for candidte in demands.resources.values(): + candidate_country_list.append(candidte["country"]) + candidate_countries += candidte["country"] + candidate_countries += ',' + + LOG.info("Available Candidate Countries " + candidate_countries) + except Exception as err: + print(err) + return candidate_country_list + + def resolve_countries(self, countries_list, regions_map, candidates_country_list): + # check the map with given location and retrieve the values + LOG.info("Resolving Countries ") + if countries_list is None: + LOG.info("No Countries are available ") + return countries_list + + countries_list = self.filter_invalid_rules(countries_list, regions_map) + + if countries_list is not None and countries_list.__len__() > 0 and countries_list.__getitem__( + countries_list.__len__() - 1) == "*": + self.process_wildcard_rules(candidates_country_list, countries_list) + else: + self.process_without_wildcard_rules(candidates_country_list, countries_list) + + return countries_list + + def process_without_wildcard_rules(self, candidates_country_list, countries_list): + try: + temp_country_list = list() + for country_group in countries_list: + for country in country_group.split(','): + temp_country_list.append(country) + + tmpcl = '' + for cl in temp_country_list: + tmpcl += cl + tmpcl += ',' + + LOG.info("Countries List before diff " + tmpcl) + + ccl = '' + for cl in candidates_country_list: + ccl += cl + ccl += ',' + + LOG.info("Candidates Countries List before diff " + ccl) + + # filterout the candidates that does not match the countries list + # filter(lambda x: x not in countries_list, self.get_candidate_countries_list()) + LOG.info("Processing Difference between Candidate Countries and Latency Countries without *. ") + diff_bw_candidates_and_countries = list(set(candidates_country_list).difference(temp_country_list)) + candcl = '' + for cl in diff_bw_candidates_and_countries: + candcl += cl + candcl += ',' + + LOG.info("Available countries after processing diff between " + candcl) + + self.drop_no_latency_rule_candidates(diff_bw_candidates_and_countries) + except Exception as error: + print(error) + + def drop_no_latency_rule_candidates(self, diff_bw_candidates_and_countries): + + cadidate_list_ = list() + temp_candidates = dict() + + for demand_id, demands in self.demands.items(): + LOG.info("demand id " + demand_id) + for candidte in demands.resources.values(): + LOG.info("candidate id " + candidte['candidate_id']) + dem_candidate = {demand_id: demands} + temp_candidates.update(dem_candidate) + + droped_candidates = '' + for demand_id, demands in temp_candidates.items(): + droped_candidates += demand_id + for candidate in demands.resources.values(): + if demand_id in self.obj_func_param and candidate["country"] in diff_bw_candidates_and_countries: + droped_candidates += candidate['candidate_id'] + droped_candidates += ',' + self.latencyTriage.latencyDroppedCandiate(candidate['candidate_id'], demand_id, reason="diff_bw_candidates_and_countries,Latecy weight ") + self.demands[demand_id].resources.pop(candidate['candidate_id']) + LOG.info("dropped " + droped_candidates) + + # for demand_id, candidate_list in self.demands: + # LOG.info("Candidates for demand " + demand_id) + # cadidate_list_ = self.demands[demand_id]['candidates'] + # droped_candidates = '' + # xlen = cadidate_list_.__len__() - 1 + # len = xlen + # # LOG.info("Candidate List Length "+str(len)) + # for i in range(len + 1): + # # LOG.info("iteration " + i) + # LOG.info("Candidate Country " + cadidate_list_[xlen]["country"]) + # if cadidate_list_[xlen]["country"] in diff_bw_candidates_and_countries: + # droped_candidates += cadidate_list_[xlen]["country"] + # droped_candidates += ',' + # self.demands[demand_id]['candidates'].remove(cadidate_list_[xlen]) + # # filter(lambda candidate: candidate in candidate_list["candidates"]) + # # LOG.info("Droping Cadidate not eligible for latency weight. Candidate ID " + cadidate_list_[xlen]["candidate_id"] + " Candidate Country: "+cadidate_list_[xlen]["country"]) + # xlen = xlen - 1 + # if xlen < 0: break + # LOG.info("Dropped Candidate Countries " + droped_candidates + " from demand " + demand_id) + + def process_wildcard_rules(self, candidates_country_list, countries_list, ): + LOG.info("Processing the rules for " + countries_list.__getitem__(countries_list.__len__() - 1)) + candidate_countries = '' + countries_list.remove(countries_list.__getitem__( + countries_list.__len__() - 1)) # remove the wildcard and replace it with available candidates countries + temp_country_list = list() + for country_group in countries_list: + for country in country_group.split(','): + temp_country_list.append(country) + temp_countries = '' + for cl in temp_country_list: + temp_countries += cl + temp_countries += ',' + LOG.info("Countries before diff " + temp_countries) + ccl = '' + for cl in candidates_country_list: + ccl += cl + ccl += ',' + LOG.info("Candidates Countries List before diff " + ccl) + diff_bw_candidates_and_countries = list(set(candidates_country_list).difference(temp_country_list)) + LOG.info("Processing Difference between Candidate Countries and Latency Countries for * . ") + for c_group in diff_bw_candidates_and_countries: + candidate_countries += c_group + candidate_countries += ',' + LOG.info("Difference: " + candidate_countries[:-1]) + if candidate_countries.__len__() > 0: + LOG.info(candidate_countries[:-1]) + countries_list.append(candidate_countries[:-1]) # append the list of difference to existing countries + ac = '' + for cl in countries_list: + ac += cl + ac += ',' + LOG.info("Available countries after processing diff between " + ac) + + def filter_invalid_rules(self, countries_list, regions_map): + invalid_rules = list(); + for i, e in enumerate(countries_list): + if e is None: continue + + for k, region in enumerate(e.split(',')): + LOG.info("Processing the Rule for " + region) + if region.__len__() != 3: + if region == "*": + continue + region_list = regions_map.get(region) + + if region_list is None: + LOG.info("Invalid region " + region) + invalid_rules.append(region) + continue + countries_list.remove(countries_list[i]) + countries_list.insert(i, region_list) + for ir in invalid_rules: + LOG.info("Filtering out invalid rules from countries list ") + LOG.info("invalid rule " + ir) + + countries_list = list(filter(lambda country: (country not in invalid_rules), countries_list)) + + available_countries = '' + for cl in countries_list: + available_countries += cl + available_countries += ',' + + LOG.info("Available countries after the filteration " + available_countries[:-1]) + + return countries_list def get_data_from_aai_simulator(self): loc = demand.Location("uCPE") diff --git a/conductor/conductor/solver/rest/LR.csv b/conductor/conductor/solver/rest/LR.csv new file mode 100644 index 0000000..17d9106 --- /dev/null +++ b/conductor/conductor/solver/rest/LR.csv @@ -0,0 +1,4 @@ +EMEA-CORE," FRA, DEU, NLD, GBR"," France, Germany, Netherlans, United Kingdm" +Nordics," DNK, FIN, NOR, SWE"," Denmark, Finland, Norway, Sweden" +Pakistan, PAK, Pakistan +SP-POR-MOR," MAR, PRT, ESP"," Morocco, Portogal, Espain"
\ No newline at end of file diff --git a/conductor/conductor/solver/rest/LR.json b/conductor/conductor/solver/rest/LR.json new file mode 100644 index 0000000..e5c07e3 --- /dev/null +++ b/conductor/conductor/solver/rest/LR.json @@ -0,0 +1,8 @@ +[ +{"group": "EMEA-CORE1", "countries" : "FRA|DEU|NLD|GBR1"}, +{"group": "EMEA-CORE2", "countries" : "FRA|DEU|NLD|GBR2"}, +{"group": "EMEA-CORE3", "countries" : "FRA|DEU|NLD|GBR3"}, +{"group": "EMEA-CORE4", "countries" : "FRA|DEU|NLD|GBR4"}, +{"group": "EMEA-CORE5", "countries" : "FRA|DEU|NLD|GBR5"}, +{"group": "EMEA-CORE6", "countries" : "FRA|DEU|NLD|GBR6"} +]
\ No newline at end of file diff --git a/conductor/conductor/solver/rest/latency_data_loader.py b/conductor/conductor/solver/rest/latency_data_loader.py new file mode 100644 index 0000000..d0b7e9d --- /dev/null +++ b/conductor/conductor/solver/rest/latency_data_loader.py @@ -0,0 +1,119 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import csv +import collections +import json +from conductor.common.models import region_placeholders +from conductor.common.music import api + + +class LatencyDataLoader(object): + + def __init__(self): + rph = region_placeholders.RegionPlaceholders() + music = api.API() + print("Music version %s" % music.version()) + + + # load data into region place holder + def load_into_rph(self, json_data): + datamap = collections.OrderedDict() + group_map = collections.OrderedDict() + datamap = json.loads(json_data) + + + #for i, j in enumerate(datamap): + # group_map[j['group']] = j['countries'] + + music = api.API() + + #for row in group_map: + # music.row_create() + + kwargs = {'keyspace': 'conductor_inam', 'table': 'region_placeholders', 'pk_name': 'id'} + for row in enumerate(datamap): + kwargs['pk_value'] = id() + kwargs['values'] = {'region_name': row['group'], 'countries': row['countries']} + music.row_create(**kwargs) + + + + print(group_map) + + + def load_into_country_letancy(self, json_data): + datamap = collections.OrderedDict() + group_map = collections.OrderedDict() + datamap = json.loads(json_data) + + + #for i, j in enumerate(datamap): + # group_map[j['group']] = j['countries'] + + music = api.API() + + #for row in group_map: + # music.row_create() + + kwargs = {'keyspace': 'conductor_inam', 'table': 'country_latency', 'pk_name': 'id'} + for row in enumerate(datamap): + kwargs['pk_value'] = id() + kwargs['values'] = {'country_name': row['country_name'], 'groups': row['groups']} + music.row_create(**kwargs) + + + + print(group_map) + + + + + + + + + + + + + + + + + + + + + + + + + + +#json_string = '[{"group": "EMEA-CORE1", "countries" : "FRA|DEU|NLD|GBR1"},' \ +# '{"group": "EMEA-CORE2", "countries" : "FRA|DEU|NLD|GBR2"},' \ +# '{"group": "EMEA-CORE3", "countries" : "FRA|DEU|NLD|GBR3"},' \ +# '{"group": "EMEA-CORE4", "countries" : "FRA|DEU|NLD|GBR4"}]' + +#test = LatencyDataLoader() +#test.parseJSON(json_string) + + + diff --git a/conductor/conductor/solver/service.py b/conductor/conductor/solver/service.py index 56ec683..5fc9d29 100644 --- a/conductor/conductor/solver/service.py +++ b/conductor/conductor/solver/service.py @@ -17,22 +17,34 @@ # ------------------------------------------------------------------------- # -import socket -import time +import collections import cotyledon -from conductor import messaging -from conductor import service -from conductor.common.models import plan +import json +import time +import traceback +import json +import socket +import json +from oslo_config import cfg +from oslo_log import log + +from conductor.common.models import plan, region_placeholders, country_latency, group_rules, groups +from conductor.common.models import order_lock +from conductor.common.models import order_lock_history from conductor.common.music import api from conductor.common.music import messaging as music_messaging from conductor.common.music.model import base from conductor.i18n import _LE, _LI +from conductor import messaging +from conductor import service from conductor.solver.optimizer import optimizer from conductor.solver.request import parser from conductor.solver.utils import constraint_engine_interface as cei -from oslo_config import cfg -from oslo_log import log +from conductor.common.utils import conductor_logging_util as log_util +from conductor.common.models.order_lock import OrderLock +from conductor.common.models import triage_tool +from conductor.common.models.triage_tool import TriageTool # To use oslo.log in services: # @@ -113,8 +125,8 @@ CONF.register_opts(OPTS) class SolverServiceLauncher(object): """Launcher for the solver service.""" - def __init__(self, conf): + self.conf = conf # Set up Music access. @@ -124,12 +136,49 @@ class SolverServiceLauncher(object): # Dynamically create a plan class for the specified keyspace self.Plan = base.create_dynamic_model( keyspace=conf.keyspace, baseclass=plan.Plan, classname="Plan") + self.OrderLock =base.create_dynamic_model( + keyspace=conf.keyspace, baseclass=order_lock.OrderLock, classname="OrderLock") + self.OrderLockHistory = base.create_dynamic_model( + keyspace=conf.keyspace, baseclass=order_lock_history.OrderLockHistory, classname="OrderLockHistory") + self.RegionPlaceholders = base.create_dynamic_model( + keyspace=conf.keyspace, baseclass=region_placeholders.RegionPlaceholders, classname="RegionPlaceholders") + self.CountryLatency = base.create_dynamic_model( + keyspace=conf.keyspace, baseclass=country_latency.CountryLatency, classname="CountryLatency") + self.TriageTool = base.create_dynamic_model( + keyspace=conf.keyspace, baseclass=triage_tool.TriageTool ,classname = "TriageTool") + #self.Groups = base.create_dynamic_model( + # keyspace=conf.keyspace, baseclass=groups.Groups, classname="Groups") + #self.GroupRules = base.create_dynamic_model( + # keyspace=conf.keyspace, baseclass=group_rules.GroupRules, classname="GroupRules") if not self.Plan: raise + if not self.OrderLock: + raise + if not self.OrderLockHistory: + raise + if not self.RegionPlaceholders: + raise + if not self.CountryLatency: + raise + if not self.TriageTool: + raise + #if not self.Groups: + # raise + #if not self.GroupRules: + # raise def run(self): - kwargs = {'plan_class': self.Plan} + kwargs = {'plan_class': self.Plan, + 'order_locks': self.OrderLock, + 'order_locks_history': self.OrderLockHistory, + 'region_placeholders': self.RegionPlaceholders, + 'country_latency': self.CountryLatency, + 'triage_tool': self.TriageTool + #'groups': self.Groups, + #'group_rules': self.GroupRules + } + # kwargs = {'plan_class': self.Plan} svcmgr = cotyledon.ServiceManager() svcmgr.add(SolverService, workers=self.conf.solver.workers, @@ -143,8 +192,12 @@ class SolverService(cotyledon.Service): # This will appear in 'ps xaf' name = "Conductor Solver" + regions = collections.OrderedDict() + countries = list() + def __init__(self, worker_id, conf, **kwargs): """Initializer""" + LOG.debug("%s" % self.__class__.__name__) super(SolverService, self).__init__(worker_id) self._init(conf, **kwargs) @@ -156,7 +209,15 @@ class SolverService(cotyledon.Service): self.kwargs = kwargs self.Plan = kwargs.get('plan_class') - + self.OrderLock = kwargs.get('order_locks') + self.OrderLockHistory = kwargs.get('order_locks_history') + #self.OrderLock =kwargs.get('order_locks') + self.RegionPlaceholders = kwargs.get('region_placeholders') + self.CountryLatency = kwargs.get('country_latency') + self.TriageTool = kwargs.get('triage_tool') + + # self.Groups = kwargs.get('groups') + #self.GroupRules = kwargs.get('group_rules') # Set up the RPC service(s) we want to talk to. self.data_service = self.setup_rpc(conf, "data") @@ -166,7 +227,6 @@ class SolverService(cotyledon.Service): # Set up Music access. self.music = api.API() - self.solver_owner_condition = { "solver_owner": socket.gethostname() } @@ -193,12 +253,12 @@ class SolverService(cotyledon.Service): Use this only when the solver service is not running concurrently. """ - plans = self.Plan.query.all() + + plans = self.Plan.query.get_plan_by_col("status", self.Plan.SOLVING) for the_plan in plans: - if the_plan.status == self.Plan.SOLVING: - the_plan.status = self.Plan.TRANSLATED - # Use only in active-passive mode, so don't have to be atomic - the_plan.update() + the_plan.status = self.Plan.TRANSLATED + # Use only in active-passive mode, so don't have to be atomic + the_plan.update() def _restart(self): """Prepare to restart the service""" @@ -219,92 +279,167 @@ class SolverService(cotyledon.Service): return client def run(self): + """Run""" LOG.debug("%s" % self.__class__.__name__) # TODO(snarayanan): This is really meant to be a control loop # As long as self.running is true, we process another request. + while self.running: + # Delay time (Seconds) for MUSIC requests. time.sleep(self.conf.delay_time) + # plans = Plan.query().all() # Find the first plan with a status of TRANSLATED. # Change its status to SOLVING. # Then, read the "translated" field as "template". json_template = None p = None + requests_to_solve = dict() - plans = self.Plan.query.all() + regions_maps = dict() + country_groups = list() + + # Instead of using the query.all() method, now creating an index for 'status' + # field in conductor.plans table, and query plans by status columns + translated_plans = self.Plan.query.get_plan_by_col("status", self.Plan.TRANSLATED) + solving_plans = self.Plan.query.get_plan_by_col("status", self.Plan.SOLVING) + + + # combine the plans with status = 'translated' and 'solving' together + plans = translated_plans + solving_plans + found_translated_template = False + for p in plans: if p.status == self.Plan.TRANSLATED: json_template = p.translation found_translated_template = True break elif p.status == self.Plan.SOLVING and \ - (self.current_time_seconds() - self.millisec_to_sec( - p.updated)) > self.conf.solver.timeout: + (self.current_time_seconds() - self.millisec_to_sec(p.updated)) > self.conf.solver.timeout: p.status = self.Plan.TRANSLATED p.update(condition=self.solving_status_condition) break - if found_translated_template and not json_template: - message = _LE("Plan {} status is translated, yet " - "the translation wasn't found").format(p.id) - LOG.error(message) - p.status = self.Plan.ERROR - p.message = message - p.update(condition=self.translated_status_condition) + + if not json_template: + if found_translated_template: + message = _LE("Plan {} status is translated, yet " + "the translation wasn't found").format(p.id) + LOG.error(message) + p.status = self.Plan.ERROR + p.message = message + p.update(condition=self.translated_status_condition) continue - elif found_translated_template and p and p.solver_counter >= self.conf.solver.max_solver_counter: - message = _LE("Tried {} times. Plan {} is unable to solve") \ - .format(self.conf.solver.max_solver_counter, p.id) + + if found_translated_template and p and p.solver_counter >= self.conf.solver.max_solver_counter: + message = _LE("Tried {} times. Plan {} is unable to solve")\ + .format(self.conf.solver.max_solver_counter, p.id) LOG.error(message) p.status = self.Plan.ERROR p.message = message p.update(condition=self.translated_status_condition) continue - elif not json_template: - continue - p.status = self.Plan.SOLVING + log_util.setLoggerFilter(LOG, self.conf.keyspace, p.id) + p.status = self.Plan.SOLVING p.solver_counter += 1 p.solver_owner = socket.gethostname() _is_updated = p.update(condition=self.translated_status_condition) + if not _is_updated: + continue + # other VMs have updated the status and start solving the plan if 'FAILURE' in _is_updated: continue + LOG.info(_LI("Sovling starts, changing the template status from translated to solving, " + "atomic update response from MUSIC {}").format(_is_updated)) + LOG.info(_LI("Plan {} with request id {} is solving by machine {}. Tried to solve it for {} times."). format(p.id, p.name, p.solver_owner, p.solver_counter)) - _is_success = 'FAILURE | Could not acquire lock' - + _is_success = "FAILURE" request = parser.Parser() request.cei = self.cei + request.request_id = p.name + request.plan_id = p.id + # getting the number of solutions need to provide + num_solution = getattr(p, 'recommend_max', '1') + if num_solution.isdigit(): + num_solution = int(num_solution) + + #TODO(inam/larry): move this part of logic inside of parser and don't apply it to distance_between + try: + # getting region placeholders from database and insert/put into regions_maps dictionary + region_placeholders = self.RegionPlaceholders.query.all() + for region in region_placeholders: + regions_maps.update(region.countries) + + # getting country groups from database and insert into the country_groups list + customer_loc = '' + location_list = json_template["conductor_solver"]["locations"] + for location_id, location_info in location_list.items(): + customer_loc = location_info['country'] + + countries = self.CountryLatency.query.get_plan_by_col("country_name", customer_loc) + LOG.info("Customer Location for Latency Reduction " + customer_loc) + + if len(countries) == 0: + LOG.info("country is not present is country latency table, looking for * wildcard entry") + countries = self.CountryLatency.query.get_plan_by_col("country_name","*") + if len(countries) != 0: + LOG.info("Found '*' wild card entry in country latency table") + else: + msg = "No '*' wild card entry found in country latency table. No solution will be provided" + LOG.info(msg) + p.message = msg + + for country in countries: + country_groups = country.groups + + LOG.info("Done getting Latency Country DB Groups ") + except Exception as error_msg: + LOG.error("Exception thrown while reading region_placeholders and country groups information " + "from database. Exception message: {}".format(error_msg)) + try: - request.parse_template(json_template) + request.parse_template(json_template, country_groups, regions_maps) request.assgin_constraints_to_demands() requests_to_solve[p.id] = request - opt = optimizer.Optimizer(self.conf, _requests=requests_to_solve, _begin_time=self.millisec_to_sec(p.updated)) - solution = opt.get_solution() + opt = optimizer.Optimizer(self.conf, _requests=requests_to_solve) + solution_list = opt.get_solution(num_solution) except Exception as err: message = _LE("Plan {} status encountered a " "parsing error: {}").format(p.id, err.message) - LOG.error(message) + LOG.error(traceback.print_exc()) p.status = self.Plan.ERROR p.message = message - while 'FAILURE | Could not acquire lock' in _is_success: + while 'FAILURE' in _is_success: _is_success = p.update(condition=self.solver_owner_condition) + LOG.info(_LI("Encountered a parsing error, changing the template status from solving to error, " + "atomic update response from MUSIC {}").format(_is_success)) + continue + LOG.info("Preparing the recommendations ") + # checking if the order is 'initial' or 'speed changed' one + is_speed_change = False + if request and request.request_type == 'speed changed': + is_speed_change = True + recommendations = [] - if not solution or not solution.decisions: + if not solution_list or len(solution_list) < 1: + # when order takes too much time to solve if (int(round(time.time())) - self.millisec_to_sec(p.updated)) > self.conf.solver.solver_timeout: message = _LI("Plan {} is timed out, exceed the expected " "time {} seconds").format(p.id, self.conf.solver.timeout) + # when no solution found else: message = _LI("Plan {} search failed, no " "recommendations found by machine {}").format(p.id, p.solver_owner) @@ -312,69 +447,247 @@ class SolverService(cotyledon.Service): # Update the plan status p.status = self.Plan.NOT_FOUND p.message = message - while 'FAILURE | Could not acquire lock' in _is_success: + while 'FAILURE' in _is_success: _is_success = p.update(condition=self.solver_owner_condition) + LOG.info(_LI("Plan serach failed, changing the template status from solving to not found, " + "atomic update response from MUSIC {}").format(_is_success)) else: # Assemble recommendation result JSON - for demand_name in solution.decisions: - resource = solution.decisions[demand_name] - is_rehome = "false" if resource.get("existing_placement") == 'true' else "true" - location_id = "" if resource.get("cloud_region_version") == '2.5' else resource.get("location_id") - - rec = { - # FIXME(shankar) A&AI must not be hardcoded here. - # Also, account for more than one Inventory Provider. - "inventory_provider": "aai", - "service_resource_id": - resource.get("service_resource_id"), - "candidate": { - "candidate_id": resource.get("candidate_id"), - "inventory_type": resource.get("inventory_type"), - "cloud_owner": resource.get("cloud_owner"), - "location_type": resource.get("location_type"), - "location_id": location_id, - "is_rehome": is_rehome, - "vim-id": resource.get("vim-id"), - }, - "attributes": { - "physical-location-id": - resource.get("physical_location_id"), - "cloud_owner": resource.get("cloud_owner"), - 'aic_version': resource.get("cloud_region_version")}, - } - if rec["candidate"]["inventory_type"] == "service": - rec["attributes"]["host_id"] = resource.get("host_id") - rec["candidate"]["host_id"] = resource.get("host_id") - - if rec["candidate"]["inventory_type"] == "cloud": - if resource.get("all_directives") and resource.get("flavor_map"): - rec["attributes"]["directives"] = \ - self.set_flavor_in_flavor_directives( - resource.get("flavor_map"), resource.get("all_directives")) - # TODO(snarayanan): Add total value to recommendations? - # msg = "--- total value of decision = {}" - # LOG.debug(msg.format(_best_path.total_value)) - # msg = "--- total cost of decision = {}" - # LOG.debug(msg.format(_best_path.total_cost)) - - recommendations.append({demand_name: rec}) + for solution in solution_list: + current_rec = dict() + for demand_name in solution: + resource = solution[demand_name] + + if not is_speed_change: + is_rehome = "false" + else: + is_rehome = "false" if resource.get("existing_placement") == 'true' else "true" + + location_id = "" if resource.get("cloud_region_version") == '2.5' else resource.get("location_id") + + rec = { + # FIXME(shankar) A&AI must not be hardcoded here. + # Also, account for more than one Inventory Provider. + "inventory_provider": "aai", + "service_resource_id": + resource.get("service_resource_id"), + "candidate": { + "candidate_id": resource.get("candidate_id"), + "inventory_type": resource.get("inventory_type"), + "cloud_owner": resource.get("cloud_owner"), + "location_type": resource.get("location_type"), + "location_id": location_id, + "is_rehome": is_rehome, + }, + "attributes": { + "physical-location-id": + resource.get("physical_location_id"), + "cloud_owner": resource.get("cloud_owner"), + 'aic_version': resource.get("cloud_region_version")}, + } + + if resource.get('vim-id'): + rec["candidate"]['vim-id'] = resource.get('vim-id') + + if rec["candidate"]["inventory_type"] == "service": + rec["attributes"]["host_id"] = resource.get("host_id") + rec["attributes"]["service_instance_id"] = resource.get("candidate_id") + rec["candidate"]["host_id"] = resource.get("host_id") + + if resource.get('vlan_key'): + rec["attributes"]['vlan_key'] = resource.get('vlan_key') + if resource.get('port_key'): + rec["attributes"]['port_key'] = resource.get('port_key') + + elif rec["candidate"]["inventory_type"] == "cloud": + if resource.get("all_directives") and resource.get("flavor_map"): + rec["attributes"]["directives"] = \ + self.set_flavor_in_flavor_directives( + resource.get("flavor_map"), resource.get("all_directives")) + if resource.get('conflict_id'): + rec["candidate"]["conflict_id"] = resource.get("conflict_id") + + + # TODO(snarayanan): Add total value to recommendations? + # msg = "--- total value of decision = {}" + # LOG.debug(msg.format(_best_path.total_value)) + # msg = "--- total cost of decision = {}" + # LOG.debug(msg.format(_best_path.total_cost)) + current_rec[demand_name] = rec + + recommendations.append(current_rec) # Update the plan with the solution p.solution = { "recommendations": recommendations } - p.status = self.Plan.SOLVED - while 'FAILURE | Could not acquire lock' in _is_success: - _is_success = p.update(condition=self.solver_owner_condition) - LOG.info(_LI("Plan {} search complete, solution with {} " - "recommendations found by machine {}"). - format(p.id, len(recommendations), p.solver_owner)) + + # multiple spin-ups logic + ''' + go through list of recommendations in the solution + for cloud candidates, check if (cloud-region-id + e2evnfkey) is in the order_locks table + if so, insert the row with status 'parked' in order_locks, changes plan status to 'pending' in plans table (or other status value) + otherwise, insert the row with status 'locked' in order_locks, and change status to 'solved' in plans table - continue reservation + ''' + + # clean up the data/record in order_locks table, deleting all records that failed from MSO + order_locks = self.OrderLock.query.all() + for order_lock_record in order_locks: + + plans = getattr(order_lock_record, 'plans') + for plan_id, plan_attributes in plans.items(): + plan_dict = json.loads(plan_attributes) + + if plan_dict.get('status', None) == OrderLock.FAILED: + order_lock_record.delete() + LOG.info(_LI("The order lock record {} with status {} is deleted (due to failure spinup from MSO) from order_locks table"). + format(order_lock_record, plan_dict.get('status'))) + break + + inserted_order_records_dict = dict() + available_dependenies_set = set() + + is_inserted_to_order_locks = True + is_conflict_id_missing = False + is_order_translated_before_spinup = False + + for solution in solution_list: + + for demand_name, candidate in solution.items(): + if candidate.get('inventory_type') == 'cloud': + conflict_id = candidate.get('conflict_id') + service_resource_id = candidate.get('service_resource_id') + # TODO(larry): add more logic for missing conflict_id in template + if not conflict_id: + is_conflict_id_missing = True + break + + available_dependenies_set.add(conflict_id) + # check if conflict_id exists in order_locks table + order_lock_record = self.OrderLock.query.get_plan_by_col("id", conflict_id) + if order_lock_record: + is_spinup_completed = getattr(order_lock_record[0], 'is_spinup_completed') + spinup_completed_timestamp = getattr(order_lock_record[0], 'spinup_completed_timestamp') + if is_spinup_completed and spinup_completed_timestamp > p.translation_begin_timestamp: + is_order_translated_before_spinup = True + break + elif not is_spinup_completed: + inserted_order_records_dict[conflict_id] = service_resource_id + + if is_conflict_id_missing: + message = _LE("Missing conflict identifier field for cloud candidates in the template, " + "could not insert into order_locks table") + LOG.debug(message) + p.status = self.Plan.SOLVED + + elif is_order_translated_before_spinup: + message = _LE("Retriggering Plan {} due to the new order arrives before the " + "spinup completion of the old order ").format(p.id) + LOG.debug(message) + p.rehome_plan() + + elif len(inserted_order_records_dict) > 0: + + new_dependenies_set = available_dependenies_set - set(inserted_order_records_dict.keys()) + dependencies = ','.join(str(s) for s in new_dependenies_set) + + for conflict_id, service_resource_id in inserted_order_records_dict.items(): + plan = { + p.id: { + "status": OrderLock.UNDER_SPIN_UP, + "created": self.current_time_millis(), + "updated": self.current_time_millis(), + "service_resource_id": service_resource_id + } + } + + if dependencies: + plan[p.id]['dependencies'] = dependencies + + order_lock_row = self.OrderLock(id=conflict_id, plans=plan) + response = order_lock_row.insert() + + # TODO(larry): add more logs for inserting order lock record (insert/update) + LOG.info(_LI("Inserting the order lock record to order_locks table in MUSIC, " + "conditional insert operation response from MUSIC {}").format(response)) + if response and response.status_code == 200: + body = response.json() + LOG.info("Succcessfully inserted the record in order_locks table with " + "the following response message {}".format(body)) + else: + is_inserted_to_order_locks = False + else: + for solution in solution_list: + for demand_name, candidate in solution.items(): + if candidate.get('inventory_type') == 'cloud': + conflict_id = candidate.get('conflict_id') + service_resource_id = candidate.get('service_resource_id') + + order_lock_record = self.OrderLock.query.get_plan_by_col("id", conflict_id) + if order_lock_record: + deleting_record = order_lock_record[0] + plans = getattr(deleting_record, 'plans') + is_spinup_completed = getattr(deleting_record, 'is_spinup_completed') + spinup_completed_timestamp = getattr(deleting_record, 'spinup_completed_timestamp') + + if is_spinup_completed: + # persist the record in order_locks_history table + order_lock_history_record = self.OrderLockHistory(conflict_id=conflict_id, plans=plans, + is_spinup_completed=is_spinup_completed, + spinup_completed_timestamp=spinup_completed_timestamp) + LOG.debug("Inserting the history record with conflict id {} to order_locks_history table".format(conflict_id)) + order_lock_history_record.insert() + # remove the older record + LOG.debug("Deleting the order lock record {} from order_locks table".format(deleting_record)) + deleting_record.delete() + + plan = { + p.id: { + "status": OrderLock.UNDER_SPIN_UP, + "created": self.current_time_millis(), + "updated": self.current_time_millis(), + "service_resource_id": service_resource_id + } + } + order_lock_row = self.OrderLock(id=conflict_id, plans=plan) + response = order_lock_row.insert() + # TODO(larry): add more logs for inserting order lock record (insert/update) + LOG.info(_LI("Inserting the order lock record to order_locks table in MUSIC, " + "conditional insert operation response from MUSIC {}").format(response)) + if response and response.status_code == 200: + body = response.json() + LOG.info("Succcessfully inserted the record in order_locks table " + "with the following response message {}".format(body)) + else: + is_inserted_to_order_locks = False + + if not is_inserted_to_order_locks: + message = _LE("Plan {} status encountered an " + "error while inserting order lock message to MUSIC.").format(p.id) + LOG.error(message) + p.status = self.Plan.ERROR + p.message = message + + elif p.status == self.Plan.SOLVING: + if len(inserted_order_records_dict) > 0: + LOG.info(_LI("The plan with id {} is parked in order_locks table, waiting for MSO release calls"). + format(p.id)) + p.status = self.Plan.WAITING_SPINUP + else: + LOG.info(_LI("The plan with id {} is inserted in order_locks table."). + format(p.id)) + p.status = self.Plan.SOLVED + + while 'FAILURE' in _is_success and (self.current_time_seconds() - self.millisec_to_sec(p.updated)) <= self.conf.solver.timeout: + _is_success = p.update(condition=self.solver_owner_condition) + LOG.info(_LI("Plan search complete, changing the template status from solving to {}, " + "atomic update response from MUSIC {}").format(p.status, _is_success)) + + LOG.info(_LI("Plan {} search complete, {} solution(s) found by machine {}"). + format(p.id, len(solution_list), p.solver_owner)) LOG.debug("Plan {} detailed solution: {}". format(p.id, p.solution)) - LOG.info("Plan name: {}". - format(p.name)) - - # Check status, update plan with response, SOLVED or ERROR + LOG.info("Plan name: {}".format(p.name)) def terminate(self): """Terminate""" @@ -388,6 +701,10 @@ class SolverService(cotyledon.Service): LOG.debug("%s" % self.__class__.__name__) self._restart() + def current_time_millis(self): + """Current time in milliseconds.""" + return int(round(time.time() * 1000)) + def set_flavor_in_flavor_directives(self, flavor_map, directives): ''' Insert the flavor name inside the flavor_map into flavor_directives diff --git a/conductor/conductor/solver/triage_tool/__init__.py b/conductor/conductor/solver/triage_tool/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/conductor/conductor/solver/triage_tool/__init__.py diff --git a/conductor/conductor/solver/triage_tool/traige_latency.py b/conductor/conductor/solver/triage_tool/traige_latency.py new file mode 100644 index 0000000..3a7fb0c --- /dev/null +++ b/conductor/conductor/solver/triage_tool/traige_latency.py @@ -0,0 +1,77 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import copy +import json +import unicodedata + +from conductor.common.models.triage_tool import TriageTool +from conductor.common.music.model import base +from oslo_config import cfg +from StringIO import StringIO + +CONF = cfg.CONF +io = StringIO() + + +class TriageLatency(object): + + def __init__(self): + self.TriageTool = base.create_dynamic_model( + keyspace=CONF.keyspace, baseclass=TriageTool, classname="TriageTool") + self.optimzation={} + self.latency_dropped = [] + + + def takeOpimaztionType(self, optimation_type): + self.optimzation['opimization_type'] = optimation_type + + def latencyDroppedCandiate(self, candidate_id, demand_id, reason): + candiate_dropped = {} + candiate_dropped['demand_id'] = demand_id + candiate_dropped['candidate_id'] = candidate_id + candiate_dropped['reason'] = reason + self.latency_dropped.append(candiate_dropped) + + def updateTriageLatencyDB(self, plan_id, request_id): + if self.optimzation['opimization_type'] == "distance_between": + optimization_type = self.optimzation['opimization_type'] + op = json.dumps(optimization_type) + triage_dropped_list = self.TriageTool.query.get_plan_by_col("id", plan_id) + triageRowUpdate = triage_dropped_list[0] + triageRowUpdate.optimization_type = op + triageRowUpdate.update() + elif self.optimzation['opimization_type'] == "latency_between": + latency_dropped = {} + optimization_type = self.optimzation['opimization_type'] + latency_dropped['dropped_cadidtes'] = self.latency_dropped + op= json.dumps(optimization_type) + triageRowUpdate = self.TriageTool.query.get_plan_by_col("id", plan_id)[0] + triageRowUpdate.optimization_type = op + copy_translator = copy.copy(triageRowUpdate.triage_translator) + copy_tra = unicodedata.normalize('NFKD', copy_translator).encode('ascii', 'ignore') + cop_ta = json.loads(copy_tra) + for tt in cop_ta['translator_triage']['dropped_candidates']: + for tl in latency_dropped['dropped_cadidtes']: + if tt['name'] == tl['demand_id']: + tt['translator_triage']['lantency_dropped'].append(tl) + + triaL = json.dumps(latency_dropped) + triageRowUpdate.triage_translator = triaL + triageRowUpdate.update() diff --git a/conductor/conductor/solver/triage_tool/triage_data.py b/conductor/conductor/solver/triage_tool/triage_data.py new file mode 100644 index 0000000..007efcb --- /dev/null +++ b/conductor/conductor/solver/triage_tool/triage_data.py @@ -0,0 +1,224 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import copy +import json + +from conductor.common.models.triage_tool import TriageTool +from conductor.common.music.model import base +from oslo_config import cfg + + +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + + +CONF = cfg.CONF +io = StringIO() +class TriageData(object): + def __init__(self): + self.TriageTool = base.create_dynamic_model( + keyspace=CONF.keyspace, baseclass=TriageTool, classname="TriageTool") + + self.triage = {} + + self.triage['candidates'] = [] + self.triage['final_candidate']= {} + self.children = {'children' :[]} + self.triage['plan_id'] = None + self.triage['request_id'] = None + self.sorted_demand = [] + + def getSortedDemand(self, sorted_demand): + for d in sorted_demand: + if not d.name in self.sorted_demand: + # print d.name + self.sorted_demand.append(d.name) + self.sorted_demand + + def aasignNodeIdToCandidate(self, candiate, current_demand, request, plan_id): + demand_name = current_demand.name + self.triage['plan_id'] = plan_id + self.triage['request_id'] = request + candidateCopyStructure = [] + candidateCopy = copy.copy(candiate) + for cs in candidateCopy: + candidate_stru = {} + # candidate_stru['complex_name'] = cs['complex_name'] + # candidate_stru['inventory_type'] = cs['inventory_type'] + # candidate_stru['candidate_id'] = cs['candidate_id'] + # candidate_stru['physical_location_id'] = cs['physical_location_id'] + # candidate_stru['location_id'] = cs['location_id'] + candidate_stru['node_id'] = cs['node_id'] + candidate_stru['constraints'] =cs['constraints'] + candidate_stru['name'] = cs['name'] + candidateCopyStructure.append(candidate_stru) + for cr in candidateCopyStructure: + if not cr in self.triage['candidates']: + for c in current_demand.constraint_list: + constraint = {} + constraint['name'] = c.name + constraint['status'] = None + constraint['constraint_type'] = c.constraint_type + cr['constraints'].append(constraint) + self.triage['candidates'].append(cr) + + + def checkCandidateAfter(self,solver): + for ca in solver['candidate_after_list']: + for resource_candidate in self.triage['candidates']: + if ca['node_id'] == resource_candidate['node_id']: + for rcl in resource_candidate['constraints']: + if (rcl['name'] == solver['constraint_name_for_can']): + rcl['status'] = "passed" + return self.triage + + def droppedCadidatesStatus(self, dropped_candidate): + for dc in dropped_candidate: + for ca in self.triage['candidates']: + if dc['node_id'] == ca['node_id']: + ca['type'] ='dropped' + for cca in ca['constraints']: + for dl in dc['constraints']: + if 'constraint_name_dropped' in dl.keys(): + if(cca['name'] == dl['constraint_name_dropped']): + dc['status'] = "dropped" + return self.triage + + def rollBackStatus(self, demanHadNoCandidate, decisionWeneedtoRollback): + if len(decisionWeneedtoRollback.decisions) >0: + count = self.sorted_demand.index(demanHadNoCandidate.name) + count = count-1 + if count == 0: + decision_rolba = decisionWeneedtoRollback.decisions.values() + for x in decision_rolba: + for canrb in self.triage['candidates']: + if x['node_id'] == canrb['node_id'] : + canrb['type'] = "rollback" + # The folloing print statement was missing a test case - run tox to see + #print x['node_id'], ' +++ ', canrb['node_id'] + children = [] + for childCand in self.triage['candidates']: + if demanHadNoCandidate.name == childCand['name']: + children.append(childCand) + canrb['children'] = children + + elif len(decisionWeneedtoRollback.decisions) == 0: + self.triage['name'] = demanHadNoCandidate.name + self.triage['message'] = "this is parent demand and has no resource to rollback " + else: + decision_rolba = decisionWeneedtoRollback.decisions + #print decision_rolba[count] + candRollBack = decision_rolba[count] + for resource_rollback in self.triage['candidates']: + if candRollBack['node_id'] == resource_rollback['node_id']: + resource_rollback['type'] = "rollback" + + + def getSolution(self, decision_list): + + if len(decision_list) == 0: + self.children['children']=(self.triage['candidates']) + self.triage['final_candidate']= self.children + triaP = json.dumps(self.triage['final_candidate']) + else: + self.triage['candidates'] = [i for n, i in enumerate(self.triage['candidates']) if i not in self.triage['candidates'][n+1:]] + + counter = 0 + d1 = []; d2 = []; d3 = []; d4 = []; d5 = []; d6 = [] + for fc in decision_list: + for final_cand in fc.values(): + for final_resou in self.triage['candidates']: + if final_cand['node_id'] == final_resou['node_id']: + if 'type' in final_resou.keys() : + if not final_resou['type'] == "dropped": + final_resou['type'] = 'solution' + final_resou['children'] = [] + else: + final_resou['type'] = 'solution' + final_resou['children'] = [] + + elif not 'type' in final_resou.keys(): + final_resou['type'] = 'not tried' + # + for cand in self.triage['candidates']: + if cand['name'] == self.sorted_demand[0]: + d1.append(cand) + elif cand['name'] == self.sorted_demand[1]: + d2.append(cand) + elif self.sorted_demand[2] == cand['name']: + d3.append(cand) + elif self.sorted_demand[3] == cand['name']: + d4.append(cand) + elif self.sorted_demand[4] == cand['name']: + d5.append(cand) + elif self.sorted_demand[5] == cand['name']: + d6.append(cand) + else: + break + if len(d1) > 0: + for d1c in d1: + if d1c['type'] == 'solution': + d1c['children'] = (d2) + if len(d1c['children']) == 0: + break + else: + for d2c in d1c['children']: + if d2c['type'] == 'solution': + d2c['children'] = (d3) + if len(d2c['children']) == 0: + break + else: + for d3c in d2c['children']: + if d3c['type'] == 'solution': + d3c['children'] = (d4) + if len(d3c['children']) == 0: + break + else: + for d4c in d3c['children']: + if d4c['type'] == 'solution': + d4c['children'] = (d5) + if len(d4c['children']) == 0: + break + else: + for d5c in d4c['children']: + if d5c['type'] == 'solution': + d5c['children'] = (d6) + + + self.children['children']=(d1) + self.triage['final_candidate'] = self.children + triaP = json.dumps(self.triage['final_candidate']) + triageRowUpdate = self.TriageTool.query.get_plan_by_col("id", self.triage['plan_id'])[0] + triageRowUpdate.triage_solver = triaP + triageRowUpdate.update() + # triageDatarow = self.TriageTool(id=self.triage['plan_id'], name=self.triage['request_id'], + # triage_solver=triaP) + # response = triageDatarow.insert() + + + + + + + + + diff --git a/conductor/conductor/solver/triage_tool/triage_tool_service.py b/conductor/conductor/solver/triage_tool/triage_tool_service.py new file mode 100644 index 0000000..839d105 --- /dev/null +++ b/conductor/conductor/solver/triage_tool/triage_tool_service.py @@ -0,0 +1,48 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json +import time +from oslo_log import log +from oslo_config import cfg +from conductor.common.models.plan import Plan +from conductor.common.models.triage_tool import TriageTool +from conductor.common.music.model import base + +CONF = cfg.CONF +LOG = log.getLogger(__name__) +class TriageToolService(object): + + def __init__(self): + self.Plan = base.create_dynamic_model(keyspace=CONF.keyspace, baseclass=Plan, classname="Plan") + self.TriageTool = base.create_dynamic_model( + keyspace=CONF.keyspace, baseclass=TriageTool, classname="TriageTool") + + + # def get_order_by_req_id(self, name): + # return self.TriageTool.query.get(name) + + def _get_plans_by_id(self, id): + + triage_info = self.TriageTool.query.get_plan_by_col("id", id) + triage_data = triage_info[0] + return triage_data + + + diff --git a/conductor/conductor/solver/utils/utils.py b/conductor/conductor/solver/utils/utils.py index 5cec51f..06de301 100755 --- a/conductor/conductor/solver/utils/utils.py +++ b/conductor/conductor/solver/utils/utils.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # # ------------------------------------------------------------------------- # Copyright (c) 2015-2017 AT&T Intellectual Property @@ -18,8 +17,11 @@ # ------------------------------------------------------------------------- # - import math +from oslo_log import log + + +LOG = log.getLogger(__name__) def compute_air_distance(_src, _dst): @@ -30,12 +32,14 @@ def compute_air_distance(_src, _dst): output: air distance as km """ distance = 0.0 + latency_score = 0.0 if _src == _dst: return distance radius = 6371.0 # km + dlat = math.radians(_dst[0] - _src[0]) dlon = math.radians(_dst[1] - _src[1]) a = math.sin(dlat / 2.0) * math.sin(dlat / 2.0) + \ @@ -48,6 +52,21 @@ def compute_air_distance(_src, _dst): return distance +def compute_latency_score(_src,_dst, _region_group): + """Compute the Network latency score between src and dst""" + earth_half_circumference = 20000 + region_group_weight = _region_group.get(_dst[2]) + + if region_group_weight == 0 or region_group_weight is None : + LOG.debug("Computing the latency score based on distance between : ") + latency_score = compute_air_distance(_src,_dst) + elif _region_group > 0 : + LOG.debug("Computing the latency score ") + latency_score = compute_air_distance(_src, _dst) + region_group_weight * earth_half_circumference + LOG.debug("Finished Computing the latency score: "+str(latency_score)) + return latency_score + + def convert_km_to_miles(_km): return _km * 0.621371 diff --git a/conductor/conductor/tests/unit/api/controller/test_root.py b/conductor/conductor/tests/unit/api/controller/test_root.py index 7a2d233..f39f0ae 100644 --- a/conductor/conductor/tests/unit/api/controller/test_root.py +++ b/conductor/conductor/tests/unit/api/controller/test_root.py @@ -19,7 +19,7 @@ """Test case for RootController /""" import json - +from conductor import version from conductor.tests.unit.api import base_api @@ -29,7 +29,11 @@ class TestRoot(base_api.BaseApiTest): actual_response = self.app.get('/') req_json_file = './conductor/tests/unit/api/controller/versions.json' expected_response = json.loads(open(req_json_file).read()) - # print('GOT:%s' % actual_response) + + versions = expected_response.get('versions') + for version_obj in versions: + version_obj['version'] = "of-has:{}".format(version.version_info.version_string()) + self.assertEqual(200, actual_response.status_int) self.assertJsonEqual(expected_response, json.loads(actual_response.body)) diff --git a/conductor/conductor/tests/unit/api/controller/versions.json b/conductor/conductor/tests/unit/api/controller/versions.json index 885ce01..0ccd626 100644 --- a/conductor/conductor/tests/unit/api/controller/versions.json +++ b/conductor/conductor/tests/unit/api/controller/versions.json @@ -1,26 +1,35 @@ { - "versions": [ - { - "id": "v1", - "media-types": [ - { - "type": "application/vnd.onap.has-v1+json", - "base": "application/json" - } - ], - "updated": "2016-11-01T00:00:00Z", - "links": [ - { - "rel": "self", - "href": "http://localhost/v1" - }, - { - "type": "text/html", - "rel": "describedby", - "href": "https://wiki.onap.org/pages/viewpage.action?pageId=16005528" - } - ], - "status": "EXPERIMENTAL" - } - ] + "versions": [{ + "id": "v1", + "links": [{ + "href": "http://localhost/v1", + "rel": "self" + }, { + "href": "https://wiki.onap.org/pages/viewpage.action?pageId=16005528", + "rel": "describedby", + "type": "text/html" + }], + "media-types": [{ + "base": "application/json", + "type": "application/vnd.onap.has-v1+json" + }], + "status": "EXPERIMENTAL", + "updated": "2016-11-01T00:00:00Z" + }, { + "id": "v1", + "links": [{ + "href": "http://localhost/v1", + "rel": "self" + }, { + "href": "https://wiki.onap.org/pages/viewpage.action?pageId=16005528", + "rel": "describedby", + "type": "text/html" + }], + "media-types": [{ + "base": "application/json", + "type": "application/vnd.onap.has-v1+json" + }], + "status": "EXPERIMENTAL", + "updated": "2018-02-01T00:00:00Z" + }] }
\ No newline at end of file diff --git a/conductor/conductor/tests/unit/controller/test_translator.py b/conductor/conductor/tests/unit/controller/test_translator.py index c61e57e..9682c7d 100644 --- a/conductor/conductor/tests/unit/controller/test_translator.py +++ b/conductor/conductor/tests/unit/controller/test_translator.py @@ -18,6 +18,7 @@ # """Test classes for translator""" +import mock import os import unittest import uuid @@ -26,6 +27,7 @@ import yaml from conductor import __file__ as conductor_root from conductor.controller.translator import Translator from conductor.controller.translator import TranslatorException +from conductor.data.plugins.triage_translator.triage_translator import TraigeTranslator from mock import patch from oslo_config import cfg @@ -124,6 +126,7 @@ class TestNoExceptionTranslator(unittest.TestCase): @patch('conductor.common.music.messaging.component.RPCClient.call') def test_parse_demands_with_candidate(self, mock_call): + TraigeTranslator.thefinalCallTrans = mock.MagicMock(return_value=None) demands = { "vGMuxInfra": [{ "inventory_provider": "aai", @@ -164,6 +167,7 @@ class TestNoExceptionTranslator(unittest.TestCase): @patch('conductor.common.music.messaging.component.RPCClient.call') def test_parse_demands_without_candidate(self, mock_call): + TraigeTranslator.thefinalCallTrans = mock.MagicMock(return_value=None) demands = { "vGMuxInfra": [{ "inventory_provider": "aai", @@ -688,14 +692,27 @@ class TestNoExceptionTranslator(unittest.TestCase): @patch('conductor.controller.translator.Translator.create_components') def test_parse_reservation(self, mock_create): - expected_resv = {'counter': 0, 'demands': { - 'instance_vG': {'demands': {'vG': 'null'}, - 'properties': {}, - 'name': 'instance', - 'demand': 'vG'}}} + expected_resv = { + 'counter': 0, + 'demands': { + 'instance_vG': { + 'demands': 'vG', + 'name': 'instance', + 'properties': {} + } + }, + 'service_model': 'null' + } self.Translator._demands = {'vG': 'null'} resv = { - 'instance': {'demands': {'vG': 'null'}} + 'service_model': 'null', + 'service_candidates': { + 'instance': { + 'demands': { + 'vG': 'null' + } + } + } } self.assertEquals( self.Translator.parse_reservations(resv), expected_resv) diff --git a/conductor/conductor/tests/unit/controller/test_translator_svc.py b/conductor/conductor/tests/unit/controller/test_translator_svc.py index 4b24001..c94ad15 100644 --- a/conductor/conductor/tests/unit/controller/test_translator_svc.py +++ b/conductor/conductor/tests/unit/controller/test_translator_svc.py @@ -96,7 +96,7 @@ class TestTranslatorServiceNoException(unittest.TestCase): int(round(time.time()))) @patch('conductor.common.music.model.base.Base.insert') - @patch('conductor.common.music.model.search.Query.all') + @patch('conductor.common.music.model.search.Query.get_plan_by_col') @patch('conductor.common.music.model.base.Base.update') def test_reset_template_status(self, mock_call, mock_update, mock_insert): mock_plan = self.Plan(str(uuid.uuid4()), diff --git a/conductor/conductor/tests/unit/data/demands.json b/conductor/conductor/tests/unit/data/demands.json index 459a013..da97df6 100644 --- a/conductor/conductor/tests/unit/data/demands.json +++ b/conductor/conductor/tests/unit/data/demands.json @@ -26,5 +26,12 @@ "inventory_type": "cloud" } ] - } + }, + "triage_translator_data": { + "plan_id": "plan_abc", + "plan_name": "plan_name", + "translator_triage": [], + "dropped_candidates": [] + } + }
\ No newline at end of file diff --git a/conductor/conductor/tests/unit/data/plugins/inventory_provider/hpa_req_features.json b/conductor/conductor/tests/unit/data/plugins/inventory_provider/hpa_req_features.json index 9e2c507..ade034e 100644 --- a/conductor/conductor/tests/unit/data/plugins/inventory_provider/hpa_req_features.json +++ b/conductor/conductor/tests/unit/data/plugins/inventory_provider/hpa_req_features.json @@ -718,4 +718,4 @@ ] } ] -]
\ No newline at end of file +] diff --git a/conductor/conductor/tests/unit/data/plugins/inventory_provider/test_aai.py b/conductor/conductor/tests/unit/data/plugins/inventory_provider/test_aai.py index 0c04b07..a586bc3 100644 --- a/conductor/conductor/tests/unit/data/plugins/inventory_provider/test_aai.py +++ b/conductor/conductor/tests/unit/data/plugins/inventory_provider/test_aai.py @@ -22,12 +22,14 @@ import unittest import conductor.data.plugins.inventory_provider.aai as aai import mock from conductor.data.plugins.inventory_provider.aai import AAI +from conductor.data.plugins.triage_translator.triage_translator import TraigeTranslator from oslo_config import cfg class TestAAI(unittest.TestCase): def setUp(self): + CONF = cfg.CONF CONF.register_opts(aai.AAI_OPTS, group='aai') self.conf = CONF @@ -37,16 +39,20 @@ class TestAAI(unittest.TestCase): mock.patch.stopall() def test_get_version_from_string(self): + self.assertEqual("2.5", self.aai_ep._get_version_from_string("AAI2.5")) self.assertEqual("3.0", self.aai_ep._get_version_from_string("AAI3.0")) def test_aai_versioned_path(self): + self.assertEqual('/{}/cloud-infrastructure/cloud-regions/?depth=0'.format(self.conf.aai.server_url_version), self.aai_ep._aai_versioned_path("/cloud-infrastructure/cloud-regions/?depth=0")) self.assertEqual('/{}/query?format=id'.format(self.conf.aai.server_url_version), self.aai_ep._aai_versioned_path("/query?format=id")) + def test_resolve_clli_location(self): + req_json_file = './conductor/tests/unit/data/plugins/inventory_provider/_request_clli_location.json' req_json = json.loads(open(req_json_file).read()) @@ -57,10 +63,11 @@ class TestAAI(unittest.TestCase): self.mock_get_request = mock.patch.object(AAI, '_request', return_value=response) self.mock_get_request.start() - self.assertEqual({'country': u'USA', 'latitude': u'40.39596', 'longitude': u'-74.135342'}, - self.aai_ep.resolve_clli_location("clli_code")) + self.assertEqual({'country': u'USA', 'latitude': u'40.39596', 'longitude': u'-74.135342'} , + self.aai_ep.resolve_clli_location("clli_code")) def test_get_inventory_group_pair(self): + req_json_file = './conductor/tests/unit/data/plugins/inventory_provider/_request_inventory_group_pair.json' req_json = json.loads(open(req_json_file).read()) @@ -71,10 +78,11 @@ class TestAAI(unittest.TestCase): self.mock_get_request = mock.patch.object(AAI, '_request', return_value=response) self.mock_get_request.start() - self.assertEqual([[u'instance-1', u'instance-2']], - self.aai_ep.get_inventory_group_pairs("service_description")) + self.assertEqual([[u'instance-1', u'instance-2']] , + self.aai_ep.get_inventory_group_pairs("service_description")) def test_resolve_host_location(self): + req_json_file = './conductor/tests/unit/data/plugins/inventory_provider/_request_host_name.json' req_json = json.loads(open(req_json_file).read()) @@ -92,11 +100,20 @@ class TestAAI(unittest.TestCase): self.mock_get_complex = mock.patch.object(AAI, '_get_complex', return_value=complex_json) self.mock_get_complex.start() - self.assertEqual({'country': u'USA', 'latitude': u'28.543251', 'longitude': u'-81.377112'}, + self.assertEqual({'country': u'USA', 'latitude': u'28.543251', 'longitude': u'-81.377112'} , self.aai_ep.resolve_host_location("host_name")) def test_resolve_demands(self): - self.assertEqual({}, self.aai_ep.resolve_demands(dict())) + + self.aai_ep.conf.HPA_enabled = True + TraigeTranslator.getPlanIdNAme = mock.MagicMock(return_value=None) + TraigeTranslator.addDemandsTriageTranslator = mock.MagicMock(return_value=None) + + plan_info = { + 'plan_name': 'name', + 'plan_id': 'id' + } + triage_translator_data = None demands_list_file = './conductor/tests/unit/data/plugins/inventory_provider/demand_list.json' demands_list = json.loads(open(demands_list_file).read()) @@ -118,8 +135,7 @@ class TestAAI(unittest.TestCase): req_response.ok = True req_response.json.return_value = demand_service_response - self.mock_first_level_service_call = mock.patch.object(AAI, 'first_level_service_call', - return_value=generic_vnf_list) + self.mock_first_level_service_call = mock.patch.object(AAI, 'first_level_service_call', return_value=generic_vnf_list) self.mock_first_level_service_call.start() self.mock_get_regions = mock.patch.object(AAI, '_get_regions', return_value=regions_response) @@ -136,30 +152,35 @@ class TestAAI(unittest.TestCase): self.assertEqual({u'demand_name': [ {'candidate_id': u'service-instance-id', 'city': None, 'cloud_owner': u'cloud-owner', - 'vim-id': 'cloud-owner_cloud-region-id', - 'cloud_region_version': '', 'complex_name': None, 'cost': 1.0, + 'uniqueness': 'true', + 'vim-id': u'cloud-owner_cloud-region-id', + 'vlan_key': None, 'cloud_region_version': '', 'complex_name': None, 'cost': 1.0, 'country': u'USA', 'existing_placement': 'false', 'host_id': u'vnf-name', 'inventory_provider': 'aai', 'inventory_type': 'service', 'latitude': u'28.543251', 'location_id': u'cloud-region-id', 'location_type': 'att_aic', - 'longitude': u'-81.377112', 'physical_location_id': 'test-id', + 'longitude': u'-81.377112', 'physical_location_id': u'test-id', + 'port_key': None, 'region': u'SE', 'service_resource_id': '', 'sriov_automation': 'false', 'state': None}, {'candidate_id': u'region-name', 'city': u'Middletown', 'cloud_owner': u'cloud-owner', - 'vim-id': 'cloud-owner_region-name', + 'uniqueness': 'true', + 'vim-id': u'cloud-owner_region-name', 'cloud_region_version': u'1.0', 'complex_name': u'complex-name', 'cost': 2.0, 'country': u'USA', 'existing_placement': 'false', 'inventory_provider': 'aai', 'inventory_type': 'cloud', 'latitude': u'50.34', 'location_id': u'region-name', 'location_type': 'att_aic', 'longitude': u'30.12', - 'physical_location_id': u'complex-id', 'region': u'USA', - 'service_resource_id': u'service-resource-id-123', + 'physical_location_id': u'complex-id', + 'region': u'USA', 'service_resource_id': u'service-resource-id-123', 'sriov_automation': 'false', 'state': u'NJ', 'flavors': flavor_info}]}, - self.aai_ep.resolve_demands(demands_list)) + self.aai_ep.resolve_demands(demands_list, plan_info=plan_info, + triage_translator_data=triage_translator_data)) def test_get_complex(self): + complex_json_file = './conductor/tests/unit/data/plugins/inventory_provider/_request_get_complex.json' complex_json = json.loads(open(complex_json_file).read()) @@ -171,12 +192,12 @@ class TestAAI(unittest.TestCase): self.mock_get_request = mock.patch.object(AAI, '_request', return_value=response) self.mock_get_request.start() - self.assertEqual( - {u'city': u'Middletown', u'latitude': u'28.543251', u'longitude': u'-81.377112', u'country': u'USA', - u'region': u'SE'}, - self.aai_ep._get_complex("/v10/complex/complex_id", "complex_id")) + self.assertEqual({u'city': u'Middletown', u'latitude': u'28.543251', u'longitude': u'-81.377112', u'country': u'USA', u'region': u'SE'} , + self.aai_ep._get_complex("/v10/complex/complex_id", "complex_id")) + def test_check_network_roles(self): + network_role_json_file = './conductor/tests/unit/data/plugins/inventory_provider/_request_network_role.json' network_role_json = json.loads(open(network_role_json_file).read()) @@ -187,10 +208,12 @@ class TestAAI(unittest.TestCase): self.mock_get_request = mock.patch.object(AAI, '_request', return_value=response) self.mock_get_request.start() - self.assertEqual(set(['test-cloud-value']), - self.aai_ep.check_network_roles("network_role_id")) + self.assertEqual(set(['test-cloud-value']) , + self.aai_ep.check_network_roles("network_role_id")) + def test_check_candidate_role(self): + candidate_role_json_file = './conductor/tests/unit/data/plugins/inventory_provider/_request_candidate_role.json' candidate_role_json = json.loads(open(candidate_role_json_file).read()) @@ -213,8 +236,7 @@ class TestAAI(unittest.TestCase): inventory_attributes['attr-1'] = 'attr-1-value1' self.assertEqual(True, - self.aai_ep.match_inventory_attributes(template_attributes, inventory_attributes, - "candidate-id")) + self.aai_ep.match_inventory_attributes(template_attributes, inventory_attributes, "candidate-id")) template_attributes['attr-1'] = { 'not': ['attr-1-value2'] @@ -259,6 +281,7 @@ class TestAAI(unittest.TestCase): self.aai_ep._refresh_cache()) def test_get_aai_rel_link(self): + relatonship_response_file = './conductor/tests/unit/data/plugins/inventory_provider/relationship_list.json' relatonship_response = json.loads(open(relatonship_response_file).read()) related_to = "service-instance" @@ -338,13 +361,13 @@ class TestAAI(unittest.TestCase): "score": 6}, "directives": [ { - "type": "sriovNICNetwork_directives", - "attributes": [ - { - "attribute_name": "A", - "attribute_value": "a" - } - ] + "type": "sriovNICNetwork_directives", + "attributes": [ + { + "attribute_name": "A", + "attribute_value": "a" + } + ] }, { "type": "sriovNICNetwork_directives", @@ -360,3 +383,5 @@ class TestAAI(unittest.TestCase): feature_json[4])) self.assertEqual(None, self.aai_ep.match_hpa(candidate_json['candidate_list'][1], feature_json[5])) + + diff --git a/conductor/conductor/tests/unit/data/test_service.py b/conductor/conductor/tests/unit/data/test_service.py index 7953f3f..c9104c4 100644 --- a/conductor/conductor/tests/unit/data/test_service.py +++ b/conductor/conductor/tests/unit/data/test_service.py @@ -42,6 +42,7 @@ class TestDataEndpoint(unittest.TestCase): vc_ext.Manager(cfg.CONF, 'conductor.vim_controller.plugin')) sc_ext_manager = ( sc_ext.Manager(cfg.CONF, 'conductor.service_controller.plugin')) + self.data_ep = DataEndpoint(ip_ext_manager, vc_ext_manager, sc_ext_manager) @@ -204,6 +205,7 @@ class TestDataEndpoint(unittest.TestCase): def test_reslove_demands(self, ext_mock, logutil_mock, info_mock, debug_mock, error_mock): + self.maxDiff = None req_json_file = './conductor/tests/unit/data/demands.json' req_json = yaml.safe_load(open(req_json_file).read()) ctxt = { @@ -212,14 +214,21 @@ class TestDataEndpoint(unittest.TestCase): } logutil_mock.return_value = uuid.uuid4() ext_mock.return_value = [] - expected_response = {'response': {'resolved_demands': None}, + expected_response = {'response': {'resolved_demands': None, 'trans': {'plan_id': None, + 'plan_name': None, + 'translator_triage': []}}, 'error': True} self.assertEqual(expected_response, self.data_ep.resolve_demands(ctxt, req_json)) return_value = req_json['demands']['vG'] ext_mock.return_value = [return_value] - expected_response = {'response': {'resolved_demands': return_value}, - 'error': False} + expected_response = { 'error': False, 'response': + { 'resolved_demands': + [{ 'attributes': + { 'customer-id': 'some_company', 'provisioning-status': 'provisioned' }, + 'inventory_provider': 'aai', 'inventory_type': 'service', 'service_type': 'vG' }, + { 'inventory_provider': 'aai', 'inventory_type': 'cloud' } ], + 'trans': { 'plan_id': 'plan_abc', 'plan_name': 'plan_name', 'translator_triage': [ [] ] } } } self.assertEqual(expected_response, self.data_ep.resolve_demands(ctxt, req_json)) diff --git a/conductor/conductor/tests/unit/solver/test_order_lock_service.py b/conductor/conductor/tests/unit/solver/test_order_lock_service.py new file mode 100644 index 0000000..0050a1d --- /dev/null +++ b/conductor/conductor/tests/unit/solver/test_order_lock_service.py @@ -0,0 +1,108 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +"""Test classes for order_locks_services""" + +import mock +import unittest +import uuid + +from conductor.common.models.order_lock import OrderLock +from conductor.solver.orders_lock.orders_lock_service import OrdersLockingService + + +class TestOrdersLockingService(unittest.TestCase): + def setUp(self): + self.order_lock_svc = OrdersLockingService() + + def test_update_order_status(self): + + rehome_status = OrderLock.COMPLETED + plan_id = uuid.uuid4() + plans = { + plan_id: '{"status": "under-spin-up"}' + } + order_lock_inst = OrderLock(id='conflict_id') + order_lock_inst.update = mock.MagicMock(return_value=None) + actual_response = self.order_lock_svc._update_order_status(rehome_status=rehome_status, + plans=plans, + order_lock=order_lock_inst) + + self.assertEquals(actual_response, {plan_id: OrderLock.COMPLETED}) + + def test_update_order_status_and_get_effected_plans(self): + + plan_id = uuid.uuid4() + plans = { + plan_id: '{' + '"status": "under-spin-up",' + '"service_resource_id": "resource-id"' + '}' + } + order_locks = [OrderLock(id='conflict_id', plans=plans)] + rehome_status = OrderLock.COMPLETED + + self.order_lock_svc.OrderLock.query = mock.MagicMock(return_value=None) + self.order_lock_svc.OrderLock.query.all = mock.MagicMock(return_value=order_locks) + self.order_lock_svc._update_order_status = mock.MagicMock(return_value=None) + self.order_lock_svc._get_plans_by_id = mock.MagicMock(return_value=plans) + + actual_response = self.order_lock_svc.update_order_status_and_get_effected_plans(rehome_status=rehome_status, + service_resource_id='resource-id') + self.assertEquals(actual_response, plans) + + def test_rehomes_for_service_resource(self): + + plan_id = uuid.uuid4() + plans = { + plan_id: '{' + '"status": "rehome",' + '"service_resource_id": "resource-id"' + '}' + } + order_locks = [OrderLock(id='conflict_id', plans=plans)] + + rehome_status = OrderLock.COMPLETED + self.order_lock_svc.update_order_status_and_get_effected_plans = mock.MagicMock(return_value=plans) + self.order_lock_svc.OrderLock.query = mock.MagicMock(return_value=None) + self.order_lock_svc.OrderLock.query.all = mock.MagicMock(return_value=order_locks) + + actual_response = self.order_lock_svc.rehomes_for_service_resource(rehome_status, 'resource-id', list()) + expect_response = [{'plan_id': plan_id, 'should_rehome': True}] + + self.assertEquals(actual_response, expect_response) + + def test_get_plans_by_id(self): + + plan_id = uuid.uuid4() + plans = { + plan_id: '{' + '"status": "under-spin-up",' + '"service_resource_id": "resource-id"' + '}' + } + order_locks = [OrderLock(id='conflict_id', plans=plans)] + self.order_lock_svc.OrderLock.query = mock.MagicMock(return_value=None) + self.order_lock_svc.OrderLock.query.get_plan_by_col = mock.MagicMock(return_value=order_locks) + actual_response = self.order_lock_svc._get_plans_by_id('order_id') + + self.assertEquals(actual_response, plans) + + +if __name__ == '__main__': + unittest.main() diff --git a/conductor/conductor/tests/unit/solver/test_solver_parser.py b/conductor/conductor/tests/unit/solver/test_solver_parser.py new file mode 100644 index 0000000..94b544f --- /dev/null +++ b/conductor/conductor/tests/unit/solver/test_solver_parser.py @@ -0,0 +1,149 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import mock +import unittest + +from conductor.solver.request import demand +from conductor.solver.request.parser import Parser as SolverRequestParser +from conductor.solver.optimizer.constraints import access_distance as access_dist +from conductor.solver.triage_tool.traige_latency import TriageLatency +from collections import OrderedDict + + +class TestSolverParser(unittest.TestCase): + + + def setUp(self): + self.sp = SolverRequestParser() + + c1 = access_dist.AccessDistance(_name = 'c1', _type = 't1', _demand_list = ['d1', 'd2']) + c2 = access_dist.AccessDistance(_name = 'c2', _type = 't1', _demand_list = ['d1']) + self.sp.constraints = {'c1': c1, 'c2': c2} + self.sp.demands = {'d1': demand.Demand('d1'), 'd2': demand.Demand('d2')} + + def test_assign_region_group_weight(self): + # input: + # two group of countries: + # USA and MEX belong to the first region group + # CHN and IND belong to the second region group + # output: + # a dictionary which assign corresspoding weight values to each country + # ('USA', 0), ('MEX', 0), ('CHN', 1), ('IND', 1) + countries = ['USA,MEX', 'CHN,IND'] + + self.sp.resolve_countries = mock.MagicMock(return_value=countries) + actual_response = self.sp.assign_region_group_weight(None, None) + self.assertEqual(actual_response, OrderedDict([('USA', 0), ('MEX', 0), ('CHN', 1), ('IND', 1)])) + + def test_filter_invalid_rules(self): + + # test case 1: + # No region placeholder + # input should be the same as output + countries = ['USA,MEX', 'CHN,IND'] + regions_map = dict() + actual_response = self.sp.filter_invalid_rules(countries, regions_map) + self.assertEqual(actual_response, ['USA,MEX', 'CHN,IND']) + + # test case 2 + # input: + # region placeholder => EUROPE: 'DEU,ITA' + # replacing all 'EUROPE' in countries parameter to 'DEU,ITA' + countries = ['EUROPE', 'CHN,IND'] + regions_map = dict() + regions_map['EUROPE'] = 'DEU,ITA' + actual_response = self.sp.filter_invalid_rules(countries, regions_map) + self.assertEqual(actual_response, ['DEU,ITA', 'CHN,IND']) + + def test_drop_no_latency_rule_candidates(self): + + # test case: + # one demand 'demand_1' contains two candidates (d1_candidate1 and d1_candidate2) + # candidate1 locates in USA and candidate2 locates in ITA + # the parameter 'diff_bw_candidates_and_countries' contains a list with one element 'ITA' + # this function should get rid of candidate2 (locates in ITA) from the demand1 candidate list + d1_candidate1 = dict() + d1_candidate1['candidate_id'] = 'd1_candidate1' + d1_candidate1['country'] = 'USA' + + d1_candidate2 = dict() + d1_candidate2['candidate_id'] = 'd1_candidate2' + d1_candidate2['country'] = 'ITA' + + test_demand_1 = demand.Demand('demand_1') + test_demand_1.resources['d1_candidate1'] = d1_candidate1 + test_demand_1.resources['d1_candidate2'] = d1_candidate2 + self.sp.demands = {'demand_1': test_demand_1} + + self.sp.obj_func_param = ['demand_1'] + diff_bw_candidates_and_countries = ['ITA'] + + self.sp.latencyTriage = TriageLatency() + self.sp.latencyTriage.latencyDroppedCandiate = mock.MagicMock(return_value=None) + + self.sp.drop_no_latency_rule_candidates(diff_bw_candidates_and_countries) + self.assertEqual(self.sp.demands['demand_1'].resources, {'d1_candidate1': {'candidate_id': 'd1_candidate1', 'country': 'USA'}}) + + def test_resolve_countries(self): + + countries_with_wildcard = ['USA,MEX', 'CHN,IND', '*'] + countries_without_wildcard = ['USA,MEX', 'CHN, IND', 'ITA'] + candidates_country_list = ['USA', 'CHN', 'USA', 'IND'] + + # test case 1 + # pass country list with wildcard in it + self.sp.filter_invalid_rules = mock.MagicMock(return_value=countries_with_wildcard) + actual_response = self.sp.resolve_countries(countries_with_wildcard, None, candidates_country_list) + self.assertEqual(actual_response, countries_with_wildcard) + + # test case 2 + # country list without wildcard rule + self.sp.filter_invalid_rules = mock.MagicMock(return_value=countries_without_wildcard) + actual_response = self.sp.resolve_countries(countries_without_wildcard, None, candidates_country_list) + self.assertEqual(actual_response, countries_without_wildcard) + + def test_assgin_constraints_to_demands(self): + # The 'side effect' or impact of sp.assgin_constraints_to_demands() method is to + # correctly change the instance variable (a dict) of self.sp.constraints to hold/contain + # against each constraint, the correct list of demaands as set in another instance varaible (dictornary) + # of self.sp.demands. That's what we are testing below. See how the two dictornaries are set in the setUp() + # method + + self.sp.assgin_constraints_to_demands() + returned_constraints = [c.name for c in self.sp.demands['d1'].constraint_list] + self.assertEqual(sorted(returned_constraints), ['c1', 'c2']) + + def test_sort_constraint_by_rank(self): + # The 'side effect' or impact of sp.assgin_constraints_to_demands() method is to + # correctly change the instance variable (a dict) of self.sp.constraints to hold/contain + # against each constraint, the correct list of demaands as set in another instance varaible (dictornary) + # of self.sp.demands. That's what we are testing below. See how the two dictornaries are set in the setUp() + # method + + self.sp.sort_constraint_by_rank() + returned_constraints = [c.name for c in self.sp.demands['d1'].constraint_list] + self.assertNotEqual(sorted(returned_constraints), ['c1', 'c3']) + + def tearDown(self): + self.sp.constraints = {} + self.sp.demands = {} + +if __name__ == "__main__": + unittest.main()
\ No newline at end of file diff --git a/conductor/conductor/tests/unit/test_aai.py b/conductor/conductor/tests/unit/test_aai.py index e255396..39ddf09 100644 --- a/conductor/conductor/tests/unit/test_aai.py +++ b/conductor/conductor/tests/unit/test_aai.py @@ -15,8 +15,10 @@ class TestConstaintAccessDistance(unittest.TestCase, AccessDistance): self.parserExpected = { "demands": {}, "locations": {}, + "obj_func_param": {}, "cei": "null", "region_gen": "null", + "region_group": {}, "request_id": "null", "request_type": "null", "objective": "null", diff --git a/conductor/setup.cfg b/conductor/setup.cfg index af691c7..5c87a2c 100644 --- a/conductor/setup.cfg +++ b/conductor/setup.cfg @@ -20,6 +20,7 @@ [metadata] name = of-has summary = ONAP Homing Service +description-file = README.rst author = AT&T author-email = ikram@research.att.com home-page = https://wiki.onap.org/pages/viewpage.action?pageId=16005528 |