From d8b010d217720a10114f5db0efb1d95f2a65f8df Mon Sep 17 00:00:00 2001 From: krishnaa96 Date: Wed, 5 May 2021 15:54:01 +0530 Subject: Add db driver to switch between music and etcd Issue-ID: OPTFRA-947 Signed-off-by: krishnaa96 Change-Id: Ie91756469a52e2262e85e0baec98855ee666d98a --- conductor/conductor/common/__init__.py | 3 +- conductor/conductor/common/db_backend.py | 53 +++++++++++ .../conductor/common/models/country_latency.py | 20 ++-- conductor/conductor/common/models/group_rules.py | 17 ++-- conductor/conductor/common/models/groups.py | 9 +- conductor/conductor/common/models/order_lock.py | 20 ++-- conductor/conductor/common/models/plan.py | 5 +- .../conductor/common/models/region_placeholders.py | 12 +-- conductor/conductor/common/music/api.py | 90 +++++++---------- conductor/conductor/common/music/model/base.py | 12 +-- conductor/conductor/common/music/model/search.py | 10 +- conductor/conductor/common/music/voting.py | 4 +- .../common/utils/conductor_logging_util.py | 34 ++++--- conductor/conductor/controller/service.py | 6 +- conductor/conductor/controller/translator_svc.py | 4 +- conductor/conductor/messaging.py | 4 +- conductor/conductor/reservation/service.py | 45 +++++---- .../conductor/solver/rest/latency_data_loader.py | 106 +++++++-------------- conductor/conductor/solver/service.py | 6 +- .../tests/unit/common/models/test_order_lock.py | 2 + .../conductor/tests/unit/controller/test_rpc.py | 4 +- .../tests/unit/controller/test_translator_svc.py | 5 +- .../tests/unit/reservation/test_service.py | 5 +- .../tests/unit/solver/optimizer/test_greedy.py | 5 +- .../unit/solver/optimizer/test_random_pick.py | 5 +- .../tests/unit/solver/test_order_lock_service.py | 4 +- .../tests/unit/solver/test_solver_parser.py | 4 +- 27 files changed, 251 insertions(+), 243 deletions(-) create mode 100644 conductor/conductor/common/db_backend.py diff --git a/conductor/conductor/common/__init__.py b/conductor/conductor/common/__init__.py index 9bcf381..030b4a6 100644 --- a/conductor/conductor/common/__init__.py +++ b/conductor/conductor/common/__init__.py @@ -21,6 +21,7 @@ from oslo_log import log as logging +from conductor.common import db_backend from conductor.common.music import api LOG = logging.getLogger(__name__) @@ -36,7 +37,7 @@ def music_api(configuration): 'version': configuration.get('version'), 'replication_factor': configuration.get('replication_factor'), } - api_instance = api.API(**kwargs) + api_instance = db_backend.get_client(**kwargs) # Create the keyspace if necessary # TODO(jdandrea): Use oslo.config with a [music] section diff --git a/conductor/conductor/common/db_backend.py b/conductor/conductor/common/db_backend.py new file mode 100644 index 0000000..e763f8d --- /dev/null +++ b/conductor/conductor/common/db_backend.py @@ -0,0 +1,53 @@ +# +# ------------------------------------------------------------------------- +# Copyright (C) 2021 Wipro Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from oslo_config import cfg + +from conductor.common.etcd.api import EtcdAPI +from conductor.common.music.api import MockAPI +from conductor.common.music.api import MusicAPI + +CONF = cfg.CONF + +DB_BACKEND_OPTS = [ + cfg.StrOpt('db_backend', + default='music', + help='DB backend to use for conductor.'), + cfg.BoolOpt('music_mock', + default=False, + help='use mock api.'), +] + +CONF.register_opts(DB_BACKEND_OPTS, group='db_options') + +global DB_API + + +def get_client(): + """Wrapper for Music and Music Mock API""" + + global DB_API + + if CONF.db_options.db_backend == "etcd": + DB_API = EtcdAPI() + elif CONF.db_options.db_backend == "music": + if CONF.db_options.music_mock: + DB_API = MockAPI() + DB_API = MusicAPI() + return DB_API diff --git a/conductor/conductor/common/models/country_latency.py b/conductor/conductor/common/models/country_latency.py index 6bbe735..a780e80 100644 --- a/conductor/conductor/common/models/country_latency.py +++ b/conductor/conductor/common/models/country_latency.py @@ -17,8 +17,8 @@ # ------------------------------------------------------------------------- # +from conductor.common import db_backend from conductor.common.music.model import base -from conductor.common.music import api class CountryLatency(base.Base): @@ -28,7 +28,7 @@ class CountryLatency(base.Base): id = None country_name = None - groups = None # type: List[Any] # + groups = None # Status PARKED = "parked" @@ -42,7 +42,7 @@ class CountryLatency(base.Base): """Return schema.""" schema = { 'id': 'text', - 'country_name':'text', + 'country_name': 'text', 'groups': 'list', 'PRIMARY KEY': '(id)' } @@ -65,31 +65,31 @@ class CountryLatency(base.Base): def values(self): """Valu-es""" value_dict = { - #'id': self.id, + # 'id': self.id, 'country_name': self.country_name, - 'groups':self.groups + 'groups': self.groups } return value_dict def delete(self, country_id): """Update country latency""" - return api.MUSIC_API.row_delete( + return db_backend.DB_API.row_delete( self.__keyspace__, self.__tablename__, self.pk_name(), country_id, True) def update(self, country_name, updated_fields): """Update country latency""" - api.MUSIC_API.row_complex_field_update( + db_backend.DB_API.row_complex_field_update( self.__keyspace__, self.__tablename__, self.pk_name(), self.pk_value(), country_name, updated_fields) - #def insert(self): + # def insert(self): # return \ - # api.MUSIC_API.row_insert_by_condition( + # DB_API.row_insert_by_condition( # self.__keyspace__, self.__tablename__, self.pk_name(), # self.pk_value(), self.values(), self.PARKED) - def __init__(self, country_name=None,groups=None,_insert=False): + def __init__(self, country_name=None, groups=None, _insert=False): """Initializer""" super(CountryLatency, self).__init__() diff --git a/conductor/conductor/common/models/group_rules.py b/conductor/conductor/common/models/group_rules.py index 453842c..10028ab 100644 --- a/conductor/conductor/common/models/group_rules.py +++ b/conductor/conductor/common/models/group_rules.py @@ -17,8 +17,8 @@ # ------------------------------------------------------------------------- # +from conductor.common import db_backend from conductor.common.music.model import base -from conductor.common.music import api class GroupRules(base.Base): @@ -28,7 +28,7 @@ class GroupRules(base.Base): id = None group = None - rule = None # type: List[Any] # + rule = None # Status PARKED = "parked" @@ -42,7 +42,7 @@ class GroupRules(base.Base): """Return schema.""" schema = { 'id': 'text', - 'group':'text', + 'group': 'text', 'rule': 'map', 'PRIMARY KEY': '(id)' } @@ -72,17 +72,18 @@ class GroupRules(base.Base): def update(self, group, updated_fields): """Update country latency""" - api.MUSIC_API.row_complex_field_update( + db_backend.DB_API.row_complex_field_update( self.__keyspace__, self.__tablename__, self.pk_name(), self.pk_value(), group, updated_fields) def insert(self): return \ - api.MUSIC_API.row_insert_by_condition( - self.__keyspace__, self.__tablename__, self.pk_name(), - self.pk_value(),self.values(), self.PARKED) + db_backend.DB_API.row_insert_by_condition( + self.__keyspace__, self.__tablename__, self.pk_name(), + self.pk_value(), self.values(), self.PARKED + ) - def __init__(self, id=None, group=None,rule=None,_insert=False): + def __init__(self, id=None, group=None, rule=None, _insert=False): """Initializer""" super(GroupRules, self).__init__() diff --git a/conductor/conductor/common/models/groups.py b/conductor/conductor/common/models/groups.py index b70a747..56081d1 100644 --- a/conductor/conductor/common/models/groups.py +++ b/conductor/conductor/common/models/groups.py @@ -17,13 +17,12 @@ # ------------------------------------------------------------------------- # +from conductor.common import db_backend from conductor.common.music.model import base -from conductor.common.music import api class Groups(base.Base): - __tablename__ = "groups" __keyspace__ = None @@ -43,7 +42,7 @@ class Groups(base.Base): """Return schema.""" schema = { 'id': 'text', - 'group':'text', + 'group': 'text', 'countries': 'map', 'PRIMARY KEY': '(id)' } @@ -73,13 +72,13 @@ class Groups(base.Base): def update(self, group, updated_fields): """Update country latency""" - api.MUSIC_API.row_complex_field_update( + db_backend.DB_API.row_complex_field_update( self.__keyspace__, self.__tablename__, self.pk_name(), self.pk_value(), group, updated_fields) def insert(self): return \ - api.MUSIC_API.row_insert_by_condition( + db_backend.DB_API.row_insert_by_condition( self.__keyspace__, self.__tablename__, self.pk_name(), self.pk_value(), self.values(), self.PARKED) diff --git a/conductor/conductor/common/models/order_lock.py b/conductor/conductor/common/models/order_lock.py index ccbdd51..f9ad465 100644 --- a/conductor/conductor/common/models/order_lock.py +++ b/conductor/conductor/common/models/order_lock.py @@ -17,11 +17,9 @@ # ------------------------------------------------------------------------- # -import json -import time -from conductor.common.models import validate_uuid4 +from conductor.common import db_backend from conductor.common.music.model import base -from conductor.common.music import api + class OrderLock(base.Base): @@ -70,9 +68,9 @@ class OrderLock(base.Base): return self.id def values(self): - """Valu-es""" + """Values""" value_dict = { - 'id' : self.id, + 'id': self.id, 'plans': self.plans, 'is_spinup_completed': self.is_spinup_completed, 'spinup_completed_timestamp': self.spinup_completed_timestamp @@ -81,20 +79,20 @@ class OrderLock(base.Base): def update(self, plan_id, updated_fields, values=None): """Update order lock""" - api.MUSIC_API.row_complex_field_update( + db_backend.DB_API.row_complex_field_update( self.__keyspace__, self.__tablename__, self.pk_name(), self.pk_value(), plan_id, updated_fields, values) def insert(self): return \ - api.MUSIC_API.row_insert_by_condition( - self.__keyspace__, self.__tablename__, self.pk_name(), - self.pk_value(), self.values(), self.PARKED) + db_backend.DB_API.row_insert_by_condition( + self.__keyspace__, self.__tablename__, self.pk_name(), + self.pk_value(), self.values(), self.PARKED) def __init__(self, id=None, plans=None, is_spinup_completed=False, spinup_completed_timestamp=None, _insert=False): """Initializer""" super(OrderLock, self).__init__() - # Breaking here with errot: Can't set attribute (TODO: Ikram/Rupali) + # Breaking here with error: Can't set attribute self.id = id self.plans = plans self.is_spinup_completed = is_spinup_completed diff --git a/conductor/conductor/common/models/plan.py b/conductor/conductor/common/models/plan.py index de5af5b..ca44ded 100644 --- a/conductor/conductor/common/models/plan.py +++ b/conductor/conductor/common/models/plan.py @@ -21,7 +21,6 @@ import json import time -import os from conductor.common.models import validate_uuid4 from conductor.common.music.model import base @@ -67,7 +66,7 @@ class Plan(base.Base): # Status TEMPLATE = "template" # Template ready for translation - TRANSLATING = "translating" # Translating the template + TRANSLATING = "translating" # Translating the template TRANSLATED = "translated" # Translation ready for solving SOLVING = "solving" # Search for solutions in progress # Search complete, solution with n>0 recommendations found @@ -220,7 +219,7 @@ class Plan(base.Base): self.updated = updated or current_time_millis() self.name = name self.timeout = timeout - self.recommend_max = recommend_max + self.recommend_max = str(recommend_max) self.message = message or "" # owners should be empty when the plan is created self.translation_owner = translation_owner or {} diff --git a/conductor/conductor/common/models/region_placeholders.py b/conductor/conductor/common/models/region_placeholders.py index 850ae42..e522f8a 100644 --- a/conductor/conductor/common/models/region_placeholders.py +++ b/conductor/conductor/common/models/region_placeholders.py @@ -17,13 +17,12 @@ # ------------------------------------------------------------------------- # +from conductor.common import db_backend from conductor.common.music.model import base -from conductor.common.music import api class RegionPlaceholders(base.Base): - __tablename__ = "region_placeholders" __keyspace__ = None @@ -43,7 +42,7 @@ class RegionPlaceholders(base.Base): """Return schema.""" schema = { 'id': 'text', - 'region_name':'text', + 'region_name': 'text', 'countries': 'map', 'PRIMARY KEY': '(id)' } @@ -75,13 +74,12 @@ class RegionPlaceholders(base.Base): def delete(self, region_id): """Update country latency""" - return api.MUSIC_API.row_delete(self.__keyspace__, self.__tablename__, self.pk_name(), - region_id, True) - + return db_backend.DB_API.row_delete(self.__keyspace__, self.__tablename__, self.pk_name(), + region_id, True) def update(self, region_name, updated_fields): """Update country latency""" - api.MUSIC_API.row_complex_field_update( + db_backend.DB_API.row_complex_field_update( self.__keyspace__, self.__tablename__, self.pk_name(), self.pk_value(), region_name, updated_fields) diff --git a/conductor/conductor/common/music/api.py b/conductor/conductor/common/music/api.py index dba852a..c8fa231 100644 --- a/conductor/conductor/common/music/api.py +++ b/conductor/conductor/common/music/api.py @@ -20,8 +20,8 @@ """Music Data Store API""" import copy -import logging import json +import logging import time from oslo_config import cfg @@ -29,15 +29,13 @@ from oslo_log import log from conductor.common import rest from conductor.common.utils import basic_auth_util -from conductor.i18n import _LE, _LI # pylint: disable=W0212 -from conductor.common.utils import cipherUtils +from conductor.i18n import _LE # pylint: disable=W0212 +from conductor.i18n import _LI LOG = log.getLogger(__name__) CONF = cfg.CONF -global MUSIC_API - MUSIC_API_OPTS = [ cfg.StrOpt('server_url', default='http://controller:8080/MUSIC/rest/v2', @@ -71,7 +69,7 @@ MUSIC_API_OPTS = [ help='Use mock API'), cfg.StrOpt('music_topology', default='SimpleStrategy'), - #TODO(larry); make the config inputs more generic + # TODO(larry); make the config inputs more generic cfg.StrOpt('first_datacenter_name', help='Name of the first data center'), cfg.IntOpt('first_datacenter_replicas', @@ -111,7 +109,6 @@ class MusicAPI(object): def __init__(self): """Initializer.""" - global MUSIC_API # set the urllib log level to ERROR logging.getLogger('urllib3').setLevel(logging.ERROR) @@ -129,8 +126,8 @@ class MusicAPI(object): port = CONF.music_api.port or 8080 path = CONF.music_api.path or '/MUSIC/rest' version = CONF.version - server_url = 'http://{}:{}/{}'.format( - host, port, version, path.rstrip('/').lstrip('/')) + server_url = 'http://{}:{}/{}/{}'.format( + host, port, path.rstrip('/').lstrip('/'), version) kwargs = { 'server_url': server_url, @@ -173,8 +170,6 @@ class MusicAPI(object): self.third_datacenter_name = CONF.music_api.third_datacenter_name self.third_datacenter_replicas = CONF.music_api.third_datacenter_replicas - MUSIC_API = self - def __del__(self): """Deletion.""" if type(self.lock_ids) is dict: @@ -242,23 +237,22 @@ class MusicAPI(object): Supports atomic operations, Returns a payload of data and lock_name (if any). """ - #if atomic: + # if atomic: # lock_name = self.lock_create(keyspace, table, pk_value) - #else: + # else: # lock_name = None - #lock_id = self.lock_ids.get(lock_name) + # lock_id = self.lock_ids.get(lock_name) data = { 'consistencyInfo': { 'type': 'atomic', } } - if condition: data['conditions'] = condition - #, 'lock_name': lock_name + # , 'lock_name': lock_name return {'data': data} def payload_delete(self, payload): @@ -385,7 +379,7 @@ class MusicAPI(object): LOG.debug("Updating row with pk_value {} in table " "{}, keyspace {}".format(pk_value, table, keyspace)) response = self.rest.request(method='put', path=path, data=data) - #self.payload_delete(payload) + # self.payload_delete(payload) if response is not None and CONF.music_api.music_new_version: response_json = json.loads(response.content) response_status = response_json.get("status") @@ -437,29 +431,29 @@ class MusicAPI(object): common_values = copy.deepcopy(values_when_id_non_exist) common_values.pop('status', None) - if (CONF.music_api.music_new_version): + if CONF.music_api.music_new_version: # Conditional Insert request body sends to new version of MUSIC (2.5.5 and lator) data = { - "primaryKey": pk_name, - "primaryKeyValue": pk_value, - - "casscadeColumnName": "plans", - "tableValues": { - "id": pk_value, - "is_spinup_completed": values.get('is_spinup_completed') - }, - "casscadeColumnData": { - "key": plan_id, - "value": common_values - }, - "conditions": { - "exists": { - "status": values_when_id_exist.get('status') - }, - "nonexists": { - "status": values_when_id_non_exist.get('status') - } - } + "primaryKey": pk_name, + "primaryKeyValue": pk_value, + + "casscadeColumnName": "plans", + "tableValues": { + "id": pk_value, + "is_spinup_completed": values.get('is_spinup_completed') + }, + "casscadeColumnData": { + "key": plan_id, + "value": common_values + }, + "conditions": { + "exists": { + "status": values_when_id_exist.get('status') + }, + "nonexists": { + "status": values_when_id_non_exist.get('status') + } + } } else: @@ -468,7 +462,7 @@ class MusicAPI(object): "primaryKeyValue": pk_value, "cascadeColumnKey": plan_id, "cascadeColumnName": "plans", - "values":{ + "values": { "id": pk_value, "is_spinup_completed": values.get('is_spinup_completed') }, @@ -480,7 +474,7 @@ class MusicAPI(object): } } - #conditional/update/keyspaces/conductor_order_locks/tables/order_locks + # conditional/update/keyspaces/conductor_order_locks/tables/order_locks path = '/conditional/insert/keyspaces/%(keyspace)s/tables/%(table)s' % { 'keyspace': keyspace, 'table': table, @@ -502,7 +496,7 @@ class MusicAPI(object): def row_complex_field_update(self, keyspace, table, pk_name, pk_value, plan_id, updated_fields, values): - if (CONF.music_api.music_new_version): + if CONF.music_api.music_new_version: # new version of MUSIC data = { "primaryKey": pk_name, @@ -533,7 +527,6 @@ class MusicAPI(object): "order_locks table, response from MUSIC {}".format(plan_id, updated_fields, pk_value, response)) return response and response.ok - @staticmethod def _table_path_generate(keyspace, table): path = '/keyspaces/%(keyspace)s/tables/%(table)s/' % { @@ -587,12 +580,8 @@ class MockAPI(object): """Initializer.""" LOG.info(_LI("Initializing Music Mock API")) - global MUSIC_API - self.music['keyspaces'] = {} - MUSIC_API = self - @property def _keyspaces(self): return self.music.get('keyspaces') @@ -701,12 +690,3 @@ class MockAPI(object): if CONF.music_api.debug: LOG.debug("Requesting version info") return "v1-mock" - - -def API(): - """Wrapper for Music and Music Mock API""" - - # FIXME(jdandrea): Follow more formal practices for defining/using mocks - if CONF.music_api.mock: - return MockAPI() - return MusicAPI() diff --git a/conductor/conductor/common/music/model/base.py b/conductor/conductor/common/music/model/base.py index 9e8205f..d393cbe 100644 --- a/conductor/conductor/common/music/model/base.py +++ b/conductor/conductor/common/music/model/base.py @@ -29,7 +29,7 @@ import six from conductor.common.classes import abstractclassmethod from conductor.common.classes import classproperty -from conductor.common.music import api +from conductor.common import db_backend from conductor.common.music.model import search LOG = logging.getLogger(__name__) @@ -67,14 +67,14 @@ class Base(object): """Create table""" kwargs = cls.__kwargs() kwargs['schema'] = cls.schema() - api.MUSIC_API.table_create(**kwargs) + db_backend.DB_API.table_create(**kwargs) # Create indexes for the table del kwargs['schema'] if cls.indexes(): for index in cls.indexes(): kwargs['index'] = index - api.MUSIC_API.index_create(**kwargs) + db_backend.DB_API.index_create(**kwargs) @abstractclassmethod def atomic(cls): @@ -123,7 +123,7 @@ class Base(object): setattr(self, pk_name, the_id) else: kwargs['pk_value'] = kwargs['values'][pk_name] - response = api.MUSIC_API.row_create(**kwargs) + response = db_backend.DB_API.row_create(**kwargs) return response def update(self, condition=None): @@ -141,7 +141,7 @@ class Base(object): if kwargs['table'] != ('order_locks'): if pk_name in kwargs['values']: kwargs['values'].pop(pk_name) - return api.MUSIC_API.row_update(**kwargs) + return db_backend.DB_API.row_update(**kwargs) def delete(self): """Delete row""" @@ -149,7 +149,7 @@ class Base(object): kwargs['pk_name'] = self.pk_name() kwargs['pk_value'] = self.pk_value() kwargs['atomic'] = self.atomic() - api.MUSIC_API.row_delete(**kwargs) + db_backend.DB_API.row_delete(**kwargs) @classmethod def filter_by(cls, **kwargs): diff --git a/conductor/conductor/common/music/model/search.py b/conductor/conductor/common/music/model/search.py index 3cb665e..f530d70 100644 --- a/conductor/conductor/common/music/model/search.py +++ b/conductor/conductor/common/music/model/search.py @@ -24,7 +24,7 @@ import inspect from oslo_config import cfg from oslo_log import log as logging -from conductor.common.music import api +from conductor.common import db_backend # FIXME(jdandrea): Keep for the __init__ # from conductor.common.classes import get_class @@ -59,7 +59,7 @@ class Query(object): """Convert query response rows to objects""" results = [] pk_name = self.model.pk_name() # pylint: disable=E1101 - for row_id, row in rows.items():# pylint: disable=W0612 + for row_id, row in rows.items(): # pylint: disable=W0612 the_id = row.pop(pk_name) result = self.model(_insert=False, **row) setattr(result, pk_name, the_id) @@ -70,21 +70,21 @@ class Query(object): """Return object with pk_name matching pk_value""" pk_name = self.model.pk_name() kwargs = self.__kwargs() - rows = api.MUSIC_API.row_read( + rows = db_backend.DB_API.row_read( pk_name=pk_name, pk_value=pk_value, **kwargs) return (self.__rows_to_objects(rows).first()) def all(self): """Return all objects""" kwargs = self.__kwargs() - rows = api.MUSIC_API.row_read(**kwargs) + rows = db_backend.DB_API.row_read(**kwargs) return self.__rows_to_objects(rows) def get_plan_by_col(self, pk_name, pk_value): # Before using this method, create an index the column (except the primary key) # you want to filter by. kwargs = self.__kwargs() - rows = api.MUSIC_API.row_read( + rows = db_backend.DB_API.row_read( pk_name=pk_name, pk_value=pk_value, **kwargs) return self.__rows_to_objects(rows) diff --git a/conductor/conductor/common/music/voting.py b/conductor/conductor/common/music/voting.py index c9c02ed..f65b425 100644 --- a/conductor/conductor/common/music/voting.py +++ b/conductor/conductor/common/music/voting.py @@ -21,7 +21,7 @@ import time from oslo_config import cfg -from conductor.common.music import api +from conductor.common import db_backend from conductor import service CONF = cfg.CONF @@ -38,7 +38,7 @@ def main(): CONF.set_override('debug', True, 'music_api') CONF.set_override('mock', True, 'music_api') CONF.set_override('hostnames', ['music2'], 'music_api') - music = api.API() + music = db_backend.get_client() print("Music version %s" % music.version()) # Randomize the name so that we don't step on each other. diff --git a/conductor/conductor/common/utils/conductor_logging_util.py b/conductor/conductor/common/utils/conductor_logging_util.py index b6ba105..52b5bf0 100644 --- a/conductor/conductor/common/utils/conductor_logging_util.py +++ b/conductor/conductor/common/utils/conductor_logging_util.py @@ -19,39 +19,47 @@ import json import logging -from conductor.common.music import api + +from conductor.common import db_backend + class LoggerFilter(logging.Filter): transaction_id = None plan_id = None + def filter(self, record): record.transaction_id = self.transaction_id record.plan_id = self.plan_id return True + def getTransactionId(keyspace, plan_id): - """ get transaction id from a pariticular plan in MUSIC """ + """get transaction id from a pariticular plan in MUSIC """ - rows = api.API().row_read(keyspace, "plans", "id", plan_id) + rows = db_backend.get_client().row_read(keyspace, "plans", "id", plan_id) if 'result' in rows: rows = rows['result'] for row_id, row_value in rows.items(): - template = row_value['template'] - if template: - data = json.loads(template) - if "transaction-id" in data: - return data["transaction-id"] + template = row_value['template'] + if template: + data = json.loads(template) + if "transaction-id" in data: + return data["transaction-id"] + def setLoggerFilter(logger, keyspace, plan_id): - #formatter = logging.Formatter('%(asctime)s %(transaction_id)s %(levelname)s %(name)s: [-] %(plan_id)s %(message)s') - generic_formatter = logging.Formatter('%(asctime)s|%(transaction_id)s|%(thread)d|%(levelname)s|%(module)s|%(name)s:' + generic_formatter = logging.Formatter('%(asctime)s|%(transaction_id)s|%(thread)d|%(levelname)s|%(module)s|%(' + 'name)s: ' ' [-] plan id: %(plan_id)s [-] %(message)s') - audit_formatter = logging.Formatter('%(asctime)s|%(asctime)s|%(transaction_id)s||%(thread)d||Conductor|N/A|COMPLETE' + audit_formatter = logging.Formatter('%(asctime)s|%(asctime)s|%(transaction_id)s||%(' + 'thread)d||Conductor|N/A|COMPLETE ' '|200|sucessful||%(levelname)s|||0|%(module)s|||||||||%(name)s : [-] ' 'plan id: %(plan_id)s [-] %(message)s') - metric_formatter = logging.Formatter('%(asctime)s|%(asctime)s|%(transaction_id)s||%(thread)d||Conductor|N/A|N/A|N/A|' - 'COMPLETE|200|sucessful||%(levelname)s|||0|%(module)s||||||||||%(name)s : [-] ' + metric_formatter = logging.Formatter('%(asctime)s|%(asctime)s|%(transaction_id)s||%(' + 'thread)d||Conductor|N/A|N/A|N/A| ' + 'COMPLETE|200|sucessful||%(levelname)s|||0|%(module)s||||||||||%(name)s : [' + '-] ' 'plan id: %(plan_id)s [-] %(message)s') error_formatter = logging.Formatter('%(asctime)s|%(transaction_id)s|%(thread)d|Conductor|N/A|N/A|N/A|ERROR|500' '|N/A|%(name)s : [-] plan id: %(plan_id)s [-] %(message)s') diff --git a/conductor/conductor/controller/service.py b/conductor/conductor/controller/service.py index d28c37f..01ffe40 100644 --- a/conductor/conductor/controller/service.py +++ b/conductor/conductor/controller/service.py @@ -19,9 +19,9 @@ import cotyledon -from conductor.common.models import plan +from conductor.common import db_backend from conductor.common.models import order_lock -from conductor.common.music import api +from conductor.common.models import plan from conductor.common.music import messaging as music_messaging from conductor.common.music.model import base from conductor.controller import rpc @@ -72,7 +72,7 @@ class ControllerServiceLauncher(object): self.conf = conf # Set up Music access. - self.music = api.API() + self.music = db_backend.get_client() self.music.keyspace_create(keyspace=conf.keyspace) # Dynamically create a plan class for the specified keyspace diff --git a/conductor/conductor/controller/translator_svc.py b/conductor/conductor/controller/translator_svc.py index 651e4da..7c155ca 100644 --- a/conductor/conductor/controller/translator_svc.py +++ b/conductor/conductor/controller/translator_svc.py @@ -27,7 +27,7 @@ from oslo_config import cfg from oslo_log import log from conductor.common.config_loader import load_config_file -from conductor.common.music import api +from conductor.common import db_backend from conductor.common.music import messaging as music_messaging from conductor.common.utils import conductor_logging_util as log_util from conductor.controller.generic_objective_translator import GenericObjectiveTranslator @@ -88,7 +88,7 @@ class TranslatorService(cotyledon.Service): self.data_service = self.setup_rpc(conf, "data") # Set up Music access. - self.music = api.API() + self.music = db_backend.get_client() self.translation_owner_condition = { "translation_owner": socket.gethostname() diff --git a/conductor/conductor/messaging.py b/conductor/conductor/messaging.py index 84a34a9..60545ae 100644 --- a/conductor/conductor/messaging.py +++ b/conductor/conductor/messaging.py @@ -19,7 +19,7 @@ from oslo_config import cfg -from conductor.common import music +from conductor.common import db_backend from conductor.common.music.messaging import component DEFAULT_URL = "__default__" @@ -52,7 +52,7 @@ def get_transport(conf, url=None, optional=False, cache=True): # Yes, we know an API is not a transport. Cognitive dissonance FTW! # TODO(jdandrea): try/except to catch problems keyspace = conf.messaging_server.keyspace - transport = music.api.API() + transport = db_backend.get_client() transport.keyspace_create(keyspace=keyspace) except Exception: if not optional or url: diff --git a/conductor/conductor/reservation/service.py b/conductor/conductor/reservation/service.py index 6a990f5..8c5a390 100644 --- a/conductor/conductor/reservation/service.py +++ b/conductor/conductor/reservation/service.py @@ -17,22 +17,23 @@ # ------------------------------------------------------------------------- # -import cotyledon -import json -import time import socket +import time + +import cotyledon from oslo_config import cfg from oslo_log import log +from conductor.common import db_backend +from conductor.common.models import order_lock from conductor.common.models import plan -from conductor.common.music import api from conductor.common.music import messaging as music_messaging from conductor.common.music.model import base -from conductor.i18n import _LE, _LI +from conductor.common.utils import conductor_logging_util as log_util +from conductor.i18n import _LE +from conductor.i18n import _LI from conductor import messaging from conductor import service -from conductor.common.utils import conductor_logging_util as log_util -from conductor.common.models import order_lock LOG = log.getLogger(__name__) @@ -77,7 +78,7 @@ class ReservationServiceLauncher(object): self.conf = conf # Set up Music access. - self.music = api.API() + self.music = db_backend.get_client() self.music.keyspace_create(keyspace=conf.keyspace) # Dynamically create a plan class for the specified keyspace @@ -93,7 +94,7 @@ class ReservationServiceLauncher(object): def run(self): kwargs = {'plan_class': self.Plan, - 'order_locks': self.OrderLock} + 'order_locks': self.OrderLock} svcmgr = cotyledon.ServiceManager() svcmgr.add(ReservationService, workers=self.conf.reservation.workers, @@ -124,7 +125,6 @@ class ReservationService(cotyledon.Service): "status": self.Plan.RESERVING } - def _init(self, conf, **kwargs): """Set up the necessary ingredients.""" self.conf = conf @@ -137,7 +137,7 @@ class ReservationService(cotyledon.Service): self.data_service = self.setup_rpc(conf, "data") # Set up Music access. - self.music = api.API() + self.music = db_backend.get_client() # Number of retries for reservation/release self.reservation_retries = self.conf.reservation.reserve_retries @@ -155,7 +155,7 @@ class ReservationService(cotyledon.Service): def millisec_to_sec(self, millisec): """Convert milliseconds to seconds""" - return millisec/1000 + return millisec / 1000 def _reset_reserving_status(self): """Reset plans being reserved so they can be reserved again. @@ -272,7 +272,8 @@ class ReservationService(cotyledon.Service): for p in plans: # when a plan is in RESERVING status more than timeout value if p.status == self.Plan.RESERVING and \ - (self.current_time_seconds() - self.millisec_to_sec(p.updated)) > self.conf.reservation.timeout: + (self.current_time_seconds() - self.millisec_to_sec( + p.updated)) > self.conf.reservation.timeout: # change the plan status to SOLVED for another VM to reserve p.status = self.Plan.SOLVED p.update(condition=self.reservating_status_condition) @@ -370,7 +371,7 @@ class ReservationService(cotyledon.Service): candidates.append(candidate) sdwan_candidate_list.append(candidate) - #TODO(larry) combine the two reservation logic as one, make the code service independent + # TODO(larry) combine the two reservation logic as one, make the code service independent if service_model == "ADIOD": is_success = self.try_reservation_call( method="reserve", @@ -400,7 +401,7 @@ class ReservationService(cotyledon.Service): # order_lock spin-up rollback for decision in solution.get('recommendations'): - candidate = list(decision.values())[0].get('candidate') # Python 3 Conversion -- dict object to list object + candidate = list(decision.values())[0].get('candidate') if candidate.get('inventory_type') == 'cloud': # TODO(larry) change the code to get('conflict_id') instead of 'location_id' conflict_id = candidate.get('conflict_id') @@ -413,7 +414,8 @@ class ReservationService(cotyledon.Service): # move plan to translated if p.reservation_counter >= self.conf.reservation.max_reservation_counter: p.status = self.Plan.ERROR - p.message = _LE("Tried {} times. Plan {} is unable to reserve").format(self.conf.reservation.max_reservation_counter, p.id) + p.message = _LE("Tried {} times. Plan {} is unable to reserve") \ + .format(self.conf.reservation.max_reservation_counter, p.id) LOG.error(p.message) else: p.status = self.Plan.TRANSLATED @@ -430,8 +432,8 @@ class ReservationService(cotyledon.Service): # TODO(larry): Should be replaced by the new api from MUSIC while 'FAILURE' in _is_success: _is_success = p.update(condition=self.reservation_owner_condition) - LOG.info(_LI("Rollback Failed, Changing the template status from reserving to error, " - "atomic update response from MUSIC {}").format(_is_success)) + LOG.info(_LI("Rollback Failed, Changing the template status from reserving to " + "error. atomic update response from MUSIC {}").format(_is_success)) break # reservation failed continue @@ -446,13 +448,13 @@ class ReservationService(cotyledon.Service): controller=controller, request=request, reservation_name=None - ) + ) if not is_success: # order_lock spin-up rollback for decision in solution.get('recommendations'): - candidate = list(decision.values())[0].get('candidate') # Python 3 Conversion -- dict object to list object + candidate = list(decision.values())[0].get('candidate') if candidate.get('inventory_type') == 'cloud': conflict_id = candidate.get('conflict_id') order_record = self.OrderLock.query.get_plan_by_col("id", conflict_id)[0] @@ -482,7 +484,8 @@ class ReservationService(cotyledon.Service): LOG.debug("Plan {} Reservation complete".format(p.id)) p.status = self.Plan.DONE - while 'FAILURE' in _is_success and (self.current_time_seconds() - self.millisec_to_sec(p.updated)) <= self.conf.reservation.timeout: + while 'FAILURE' in _is_success and (self.current_time_seconds() - self.millisec_to_sec(p.updated)) \ + <= self.conf.reservation.timeout: _is_success = p.update(condition=self.reservation_owner_condition) LOG.info(_LI("Reservation is complete, changing the template status from reserving to done, " "atomic update response from MUSIC {}").format(_is_success)) diff --git a/conductor/conductor/solver/rest/latency_data_loader.py b/conductor/conductor/solver/rest/latency_data_loader.py index d0b7e9d..c148b6b 100644 --- a/conductor/conductor/solver/rest/latency_data_loader.py +++ b/conductor/conductor/solver/rest/latency_data_loader.py @@ -17,103 +17,65 @@ # ------------------------------------------------------------------------- # -import csv import collections import json -from conductor.common.models import region_placeholders -from conductor.common.music import api + +from conductor.common import db_backend class LatencyDataLoader(object): def __init__(self): - rph = region_placeholders.RegionPlaceholders() - music = api.API() + music = db_backend.get_client() print("Music version %s" % music.version()) - # load data into region place holder def load_into_rph(self, json_data): - datamap = collections.OrderedDict() - group_map = collections.OrderedDict() - datamap = json.loads(json_data) - - - #for i, j in enumerate(datamap): - # group_map[j['group']] = j['countries'] - - music = api.API() + datamap = collections.OrderedDict() + group_map = collections.OrderedDict() + datamap = json.loads(json_data) - #for row in group_map: - # music.row_create() + # for i, j in enumerate(datamap): + # group_map[j['group']] = j['countries'] - kwargs = {'keyspace': 'conductor_inam', 'table': 'region_placeholders', 'pk_name': 'id'} - for row in enumerate(datamap): - kwargs['pk_value'] = id() - kwargs['values'] = {'region_name': row['group'], 'countries': row['countries']} - music.row_create(**kwargs) + music = db_backend.get_client() + # for row in group_map: + # music.row_create() + kwargs = {'keyspace': 'conductor_inam', 'table': 'region_placeholders', 'pk_name': 'id'} + for row in enumerate(datamap): + kwargs['pk_value'] = id() + kwargs['values'] = {'region_name': row['group'], 'countries': row['countries']} + music.row_create(**kwargs) - print(group_map) - + print(group_map) def load_into_country_letancy(self, json_data): - datamap = collections.OrderedDict() - group_map = collections.OrderedDict() - datamap = json.loads(json_data) - - - #for i, j in enumerate(datamap): - # group_map[j['group']] = j['countries'] - - music = api.API() - - #for row in group_map: - # music.row_create() - - kwargs = {'keyspace': 'conductor_inam', 'table': 'country_latency', 'pk_name': 'id'} - for row in enumerate(datamap): - kwargs['pk_value'] = id() - kwargs['values'] = {'country_name': row['country_name'], 'groups': row['groups']} - music.row_create(**kwargs) - - - - print(group_map) - - - - - + datamap = collections.OrderedDict() + group_map = collections.OrderedDict() + datamap = json.loads(json_data) + # for i, j in enumerate(datamap): + # group_map[j['group']] = j['countries'] + music = db_backend.get_client() + # for row in group_map: + # music.row_create() + kwargs = {'keyspace': 'conductor_inam', 'table': 'country_latency', 'pk_name': 'id'} + for row in enumerate(datamap): + kwargs['pk_value'] = id() + kwargs['values'] = {'country_name': row['country_name'], 'groups': row['groups']} + music.row_create(**kwargs) + print(group_map) - - - - - - - - - - - - - - - -#json_string = '[{"group": "EMEA-CORE1", "countries" : "FRA|DEU|NLD|GBR1"},' \ +# json_string = '[{"group": "EMEA-CORE1", "countries" : "FRA|DEU|NLD|GBR1"},' \ # '{"group": "EMEA-CORE2", "countries" : "FRA|DEU|NLD|GBR2"},' \ # '{"group": "EMEA-CORE3", "countries" : "FRA|DEU|NLD|GBR3"},' \ # '{"group": "EMEA-CORE4", "countries" : "FRA|DEU|NLD|GBR4"}]' -#test = LatencyDataLoader() -#test.parseJSON(json_string) - - - +# test = LatencyDataLoader() +# test.parseJSON(json_string) diff --git a/conductor/conductor/solver/service.py b/conductor/conductor/solver/service.py index b6ed5dc..f0bb14b 100644 --- a/conductor/conductor/solver/service.py +++ b/conductor/conductor/solver/service.py @@ -28,6 +28,7 @@ import traceback from oslo_config import cfg from oslo_log import log +from conductor.common import db_backend from conductor.common.models import country_latency from conductor.common.models import order_lock from conductor.common.models.order_lock import OrderLock @@ -35,7 +36,6 @@ from conductor.common.models import order_lock_history from conductor.common.models import plan from conductor.common.models import region_placeholders from conductor.common.models import triage_tool -from conductor.common.music import api from conductor.common.music import messaging as music_messaging from conductor.common.music.model import base import conductor.common.prometheus_metrics as PC @@ -132,7 +132,7 @@ class SolverServiceLauncher(object): self.conf = conf # Set up Music access. - self.music = api.API() + self.music = db_backend.get_client() self.music.keyspace_create(keyspace=conf.keyspace) # Dynamically create a plan class for the specified keyspace @@ -223,7 +223,7 @@ class SolverService(cotyledon.Service): # self.optimizer = optimizer.Optimizer(conf) # Set up Music access. - self.music = api.API() + self.music = db_backend.get_client() self.solver_owner_condition = { "solver_owner": socket.gethostname() } diff --git a/conductor/conductor/tests/unit/common/models/test_order_lock.py b/conductor/conductor/tests/unit/common/models/test_order_lock.py index f8a6d06..9884095 100644 --- a/conductor/conductor/tests/unit/common/models/test_order_lock.py +++ b/conductor/conductor/tests/unit/common/models/test_order_lock.py @@ -44,5 +44,7 @@ class TestOrder_Lock(unittest.TestCase): self.assertEqual(self.schema, self.orderLock.schema()) + + if __name__ == '__main__': unittest.main() diff --git a/conductor/conductor/tests/unit/controller/test_rpc.py b/conductor/conductor/tests/unit/controller/test_rpc.py index d2d6a89..8b0ea75 100644 --- a/conductor/conductor/tests/unit/controller/test_rpc.py +++ b/conductor/conductor/tests/unit/controller/test_rpc.py @@ -21,17 +21,17 @@ import unittest import uuid +from conductor.common import db_backend from conductor.controller.rpc import ControllerRPCEndpoint as rpc from conductor import service from conductor.common.models import plan from conductor.common.music.model import base -from conductor.common.music import api from oslo_config import cfg from mock import patch def plan_prepare(conf): - music = api.API() + music = db_backend.get_client() music.keyspace_create(keyspace=conf.keyspace) plan_tmp = base.create_dynamic_model( keyspace=conf.keyspace, baseclass=plan.Plan, classname="Plan") diff --git a/conductor/conductor/tests/unit/controller/test_translator_svc.py b/conductor/conductor/tests/unit/controller/test_translator_svc.py index f256991..6ee7193 100644 --- a/conductor/conductor/tests/unit/controller/test_translator_svc.py +++ b/conductor/conductor/tests/unit/controller/test_translator_svc.py @@ -26,15 +26,16 @@ import futurist from mock import patch from mock import PropertyMock + +from conductor.common import db_backend from conductor.controller.translator_svc import TranslatorService from conductor.common.models import plan -from conductor.common.music import api from conductor.common.music.model import base from oslo_config import cfg def plan_prepare(conf): - music = api.API() + music = db_backend.get_client() music.keyspace_create(keyspace=conf.keyspace) plan_tmp = base.create_dynamic_model( keyspace=conf.keyspace, baseclass=plan.Plan, classname="Plan") diff --git a/conductor/conductor/tests/unit/reservation/test_service.py b/conductor/conductor/tests/unit/reservation/test_service.py index a8e7687..6d3dbbe 100644 --- a/conductor/conductor/tests/unit/reservation/test_service.py +++ b/conductor/conductor/tests/unit/reservation/test_service.py @@ -20,19 +20,20 @@ """Test class for reservation service""" import unittest + +from conductor.common import db_backend from conductor.reservation.service import ReservationServiceLauncher as ReservationServiceLauncher from conductor.reservation.service import ReservationService from conductor.common.models import plan from conductor.common.music.model import base from oslo_config import cfg -from conductor.common.music import api import uuid from mock import patch import json def plan_prepare(conf): cfg.CONF.set_override('certificate_authority_bundle_file', '../AAF_RootCA.cer', 'music_api') - music = api.API() + music = db_backend.get_client() music.keyspace_create(keyspace=conf.keyspace) plan_tmp = base.create_dynamic_model( keyspace=conf.keyspace, baseclass=plan.Plan, classname="Plan") diff --git a/conductor/conductor/tests/unit/solver/optimizer/test_greedy.py b/conductor/conductor/tests/unit/solver/optimizer/test_greedy.py index 1e087fa..7177c35 100644 --- a/conductor/conductor/tests/unit/solver/optimizer/test_greedy.py +++ b/conductor/conductor/tests/unit/solver/optimizer/test_greedy.py @@ -20,7 +20,8 @@ """Test class for optimizer greedy.py""" import unittest -from conductor.common.music import api + +from conductor.common import db_backend from conductor.solver.optimizer.greedy import Greedy from oslo_config import cfg from mock import patch @@ -32,7 +33,7 @@ class TestGreedy(unittest.TestCase): @patch('conductor.common.music.model.base.Base.table_create') @patch('conductor.common.music.model.base.Base.insert') def setUp(self, conf, _requests=None, _begin_time=None): - self.music = api.API() + self.music = db_backend.get_client() self.conf = cfg.CONF self.greedy = Greedy(self.conf) self._objective = None diff --git a/conductor/conductor/tests/unit/solver/optimizer/test_random_pick.py b/conductor/conductor/tests/unit/solver/optimizer/test_random_pick.py index 4a0dd8b..dd30204 100644 --- a/conductor/conductor/tests/unit/solver/optimizer/test_random_pick.py +++ b/conductor/conductor/tests/unit/solver/optimizer/test_random_pick.py @@ -20,7 +20,8 @@ """Test class for optimizer random_pick.py""" import unittest -from conductor.common.music import api + +from conductor.common import db_backend from conductor.solver.optimizer.random_pick import RandomPick from oslo_config import cfg from mock import patch @@ -31,7 +32,7 @@ class TestRandomPick(unittest.TestCase): @patch('conductor.common.music.model.base.Base.table_create') @patch('conductor.common.music.model.base.Base.insert') def setUp(self, conf, _requests=None, _begin_time=None): - self.music = api.API() + self.music = db_backend.get_client() self.conf = cfg.CONF self.randomPick = RandomPick(self.conf) diff --git a/conductor/conductor/tests/unit/solver/test_order_lock_service.py b/conductor/conductor/tests/unit/solver/test_order_lock_service.py index 0d7c0db..a68959f 100644 --- a/conductor/conductor/tests/unit/solver/test_order_lock_service.py +++ b/conductor/conductor/tests/unit/solver/test_order_lock_service.py @@ -22,8 +22,8 @@ import mock import unittest import uuid +from conductor.common import db_backend from conductor.common.models.order_lock import OrderLock -from conductor.common.music import api from conductor.solver.orders_lock.orders_lock_service import OrdersLockingService from oslo_config import cfg @@ -32,7 +32,7 @@ class TestOrdersLockingService(unittest.TestCase): def setUp(self): # Initialize music API cfg.CONF.set_override('certificate_authority_bundle_file', '../AAF_RootCA.cer', 'music_api') - music = api.API() + music = db_backend.get_client() cfg.CONF.set_override('keyspace', 'conductor') music.keyspace_create(keyspace=cfg.CONF.keyspace) self.order_lock_svc = OrdersLockingService() diff --git a/conductor/conductor/tests/unit/solver/test_solver_parser.py b/conductor/conductor/tests/unit/solver/test_solver_parser.py index e5ba50b..5e4f981 100644 --- a/conductor/conductor/tests/unit/solver/test_solver_parser.py +++ b/conductor/conductor/tests/unit/solver/test_solver_parser.py @@ -20,7 +20,7 @@ import mock import unittest -from conductor.common.music import api +from conductor.common import db_backend from conductor.solver.request import demand from conductor.solver.request.parser import Parser as SolverRequestParser from conductor.solver.optimizer.constraints import access_distance as access_dist @@ -34,7 +34,7 @@ class TestSolverParser(unittest.TestCase): def setUp(self): # Initialize music API - music = api.API() + music = db_backend.get_client() cfg.CONF.set_override('keyspace', 'conductor') music.keyspace_create(keyspace=cfg.CONF.keyspace) self.sp = SolverRequestParser() -- cgit 1.2.3-korg