diff options
Diffstat (limited to 'engine')
-rw-r--r-- | engine/src/valet/engine/__init__.py | 18 | ||||
-rw-r--r-- | engine/src/valet/engine/db_connect/__init__.py | 18 | ||||
-rw-r--r-- | engine/src/valet/engine/db_connect/db_apis/__init__.py | 18 | ||||
-rw-r--r-- | engine/src/valet/engine/db_connect/db_apis/mem_db.py | 117 | ||||
-rw-r--r-- | engine/src/valet/engine/db_connect/db_apis/music.py | 406 | ||||
-rw-r--r-- | engine/src/valet/engine/db_connect/db_handler.py | 533 | ||||
-rw-r--r-- | engine/src/valet/engine/db_connect/locks.py | 152 |
7 files changed, 1262 insertions, 0 deletions
diff --git a/engine/src/valet/engine/__init__.py b/engine/src/valet/engine/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/engine/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/engine/db_connect/__init__.py b/engine/src/valet/engine/db_connect/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/engine/db_connect/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/engine/db_connect/db_apis/__init__.py b/engine/src/valet/engine/db_connect/db_apis/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/engine/db_connect/db_apis/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/engine/db_connect/db_apis/mem_db.py b/engine/src/valet/engine/db_connect/db_apis/mem_db.py new file mode 100644 index 0000000..b706c63 --- /dev/null +++ b/engine/src/valet/engine/db_connect/db_apis/mem_db.py @@ -0,0 +1,117 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import copy + + +class MemDB(object): + + def __init__(self, _config, _logger): + self.logger = _logger + + self.keyspace = _config.get("keyspace") + self.requests_table = _config.get("requests_table") + self.results_table = _config.get("results_table") + self.group_rules_table = _config.get("group_rules_table") + self.groups_table = _config.get("groups_table") + self.stacks_table = _config.get("stacks_table") + self.resources_table = _config.get("resources_table") + self.stack_id_map_table = _config.get("stack_id_map_table") + + self.requests = {} + self.results = {} + self.group_rules = {} + self.groups = {} + self.stacks = {} + self.resources = {} + self.stack_id_map = {} + + def read_all_rows(self, keyspace, table): + rows = {"result": {}} + + if table == self.requests_table: + for k, v in self.requests.iteritems(): + rows["result"][k] = copy.deepcopy(v) + elif table == self.results_table: + for k, v in self.results.iteritems(): + rows["result"][k] = copy.deepcopy(v) + elif table == self.group_rules_table: + for k, v in self.group_rules.iteritems(): + rows["result"][k] = copy.deepcopy(v) + elif table == self.groups_table: + for k, v in self.groups.iteritems(): + rows["result"][k] = copy.deepcopy(v) + + return rows + + def insert_atom(self, keyspace, table, data, name=None, value=None): + if table == self.requests_table: + self.requests[data['request_id']] = data + elif table == self.results_table: + self.results[data['request_id']] = data + elif table == self.group_rules_table: + self.group_rules[data['id']] = data + elif table == self.groups_table: + self.groups[data['id']] = data + elif table == self.resources_table: + self.resources[data['id']] = data + elif table == self.stacks_table: + self.stacks[data['id']] = data + elif table == self.stack_id_map_table: + self.stack_id_map[data['request_id']] = data + + def delete_atom(self, keyspace, table, pk_name, pk_value): + if table == self.requests_table: + if pk_value in self.requests.keys(): + del self.requests[pk_value] + elif table == self.groups_table: + if pk_value in self.groups.keys(): + del self.groups[pk_value] + elif table == self.results_table: + if pk_value in self.results.keys(): + del self.results[pk_value] + + def read_row(self, keyspace, table, pk_name, pk_value): + row = {"result": {}} + + if table == self.requests_table: + if pk_value in self.requests.keys(): + row["result"]["row 0"] = copy.deepcopy(self.requests[pk_value]) + elif table == self.results_table: + if pk_value in self.results.keys(): + row["result"]["row 0"] = copy.deepcopy(self.results[pk_value]) + elif table == self.resources_table: + if pk_value in self.resources.keys(): + row["result"]["row 0"] = copy.deepcopy(self.resources[pk_value]) + elif table == self.group_rules_table: + if pk_value in self.group_rules.keys(): + row["result"]["row 0"] = copy.deepcopy(self.group_rules[pk_value]) + elif table == self.stack_id_map_table: + if pk_value in self.stack_id_map.keys(): + row["result"]["row 0"] = copy.deepcopy(self.stack_id_map[pk_value]) + elif table == self.stacks_table: + if pk_value in self.stacks.keys(): + row["result"]["row 0"] = copy.deepcopy(self.stacks[pk_value]) + + return row + + def create_lock(self, _key): + return "$x--0000000000" + + def delete_lock(self, _key): + pass diff --git a/engine/src/valet/engine/db_connect/db_apis/music.py b/engine/src/valet/engine/db_connect/db_apis/music.py new file mode 100644 index 0000000..6135718 --- /dev/null +++ b/engine/src/valet/engine/db_connect/db_apis/music.py @@ -0,0 +1,406 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2016 AT&T +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + + +import base64 +import json +import requests + +from valet.utils.decryption import decrypt + + +class REST(object): + """Helper class for REST operations.""" + + def __init__(self, hosts, port, path, timeout, retries, + userid, password, ns, logger): + """Initializer. Accepts target host list, port, and path.""" + + self.hosts = hosts # List of IP or FQDNs + self.port = port # Port Number + self.path = path # Path starting with / + self.timeout = float(timeout) # REST request timeout in seconds + self.retries = retries # Retires before failing over to next Music server. + self.userid = userid + self.password = password + self.ns = ns + self.logger = logger # For logging + + self.urls = [] + for host in self.hosts: + # Must end without a slash + self.urls.append('http://%(host)s:%(port)s%(path)s' % { + 'host': host, + 'port': self.port, + 'path': self.path, + }) + + def __headers(self, content_type='application/json'): + """Returns HTTP request headers.""" + + headers = { + 'ns': self.ns, + 'accept': content_type, + 'content-type': content_type, + 'authorization': 'Basic %s' % base64.b64encode(self.userid + ':' + self.password) + } + + return headers + + def request(self, method='get', content_type='application/json', path='/', + data=None, raise400=True): + """ Performs HTTP request """ + + if method not in ('post', 'get', 'put', 'delete'): + raise KeyError("Method must be: post, get, put, or delete.") + + method_fn = getattr(requests, method) + + if data: + data_json = json.dumps(data) + else: + data_json = None + + response = None + timeout = False + err_message = "" + full_url = "" + for url in self.urls: + # Try each url in turn. First one to succeed wins. + full_url = url + path + + for attempt in range(self.retries): + # Ignore the previous exception. + try: + my_headers = self.__headers(content_type) + for header_key in my_headers: + if (type(my_headers[header_key]).__name__ == 'unicode'): + my_headers[header_key] = my_headers[header_key].encode('ascii', 'ignore') + response = method_fn(full_url, data=data_json, + headers=my_headers, + timeout=self.timeout) + if raise400 or not response.status_code == 400: + response.raise_for_status() + return response + + except requests.exceptions.Timeout as err: + err_message = err.message + response = requests.Response() + response.url = full_url + if not timeout: + self.logger.warning("Music: %s Timeout" % url, errorCode='availability') + timeout = True + + except requests.exceptions.RequestException as err: + err_message = err.message + self.logger.debug("Music: %s Request Exception" % url) + self.logger.debug(" method = %s" % method) + self.logger.debug(" timeout = %s" % self.timeout) + self.logger.debug(" err = %s" % err) + self.logger.debug(" full url = %s" % full_url) + self.logger.debug(" request data = %s" % data_json) + self.logger.debug(" request headers = %s" % my_headers) + self.logger.debug(" status code = %s" % response.status_code) + self.logger.debug(" response = %s" % response.text) + self.logger.debug(" response headers = %s" % response.headers) + + # If we get here, an exception was raised for every url, + # but we passed so we could try each endpoint. Raise status + # for the last attempt (for now) so that we report something. + if response is not None: + self.logger.debug("Music: Full Url: %s", full_url) + self.logger.debug("Music: %s ", err_message) + response.raise_for_status() + + +class Music(object): + """Wrapper for Music API""" + + def __init__(self, _config, _logger): + """Initializer. Accepts a lock_timeout for atomic operations.""" + + self.logger = _logger + + pw = decrypt(_config["engine"]["ek"], + _config["logging"]["lk"], + _config["db"]["dk"], + _config["music"]["password"]) + + kwargs = { + 'hosts': _config["music"]["hosts"], + 'port': _config["music"]["port"], + 'path': _config["music"]["path"], + 'timeout': _config["music"]["timeout"], + 'retries': _config["music"]["retries"], + 'userid': _config["music"]["userid"], + 'password': pw, + 'ns': _config["music"]["namespace"], + 'logger': _logger, + } + self.rest = REST(**kwargs) + + self.lock_names = [] + self.lock_timeout = _config["music"]["lock_timeout"] + + self.replication_factor = _config["music"]["replication_factor"] + + @staticmethod + def __row_url_path(keyspace, table, pk_name=None, pk_value=None): + """Returns a Music-compliant row URL path.""" + + path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows' % { + 'keyspace': keyspace, + 'table': table, + } + + if pk_name and pk_value: + path += '?%s=%s' % (pk_name, pk_value) + + return path + + def create_keyspace(self, keyspace): + """Creates a keyspace.""" + + data = { + 'replicationInfo': { + # 'class': 'NetworkTopologyStrategy', + # 'dc1': self.replication_factor, + 'class': 'SimpleStrategy', + 'replication_factor': self.replication_factor, + }, + 'durabilityOfWrites': True, + 'consistencyInfo': { + 'type': 'eventual', + }, + } + + path = '/keyspaces/%s' % keyspace + response = self.rest.request(method='post', path=path, data=data) + + return response.ok + + def drop_keyspace(self, keyspace): + """Drops a keyspace.""" + + data = { + 'consistencyInfo': { + 'type': 'eventual', + }, + } + + path = '/keyspaces/%s' % keyspace + response = self.rest.request(method='delete', path=path, data=data) + + return response.ok + + def create_table(self, keyspace, table, schema): + """Creates a table.""" + + data = { + 'fields': schema, + 'consistencyInfo': { + 'type': 'eventual', + }, + } + self.logger.debug(data) + + path = '/keyspaces/%(keyspace)s/tables/%(table)s' % { + 'keyspace': keyspace, + 'table': table, + } + + response = self.rest.request(method='post', path=path, data=data) + + return response.ok + + def create_index(self, keyspace, table, index_field, index_name=None): + """Creates an index for the referenced table.""" + + data = None + if index_name: + data = { + 'index_name': index_name, + } + + pstr = '/keyspaces/%(keyspace)s/tables/%(table)s/index/%(index_field)s' + path = pstr % { + 'keyspace': keyspace, + 'table': table, + 'index_field': index_field, + } + + response = self.rest.request(method='post', path=path, data=data) + + return response.ok + + def version(self): + """Returns version string.""" + + path = '/version' + response = self.rest.request(method='get', content_type='text/plain', path=path) + + return response.text + + def create_lock(self, lock_name): + """Returns the lock id. Use for acquiring and releasing.""" + + path = '/locks/create/%s' % lock_name + response = self.rest.request(method='post', path=path) + + return json.loads(response.text)["lock"]["lock"] + + def acquire_lock(self, lock_id): + """Acquire a lock.""" + + path = '/locks/acquire/%s' % lock_id + response = self.rest.request(method='get', path=path, raise400=False) + + return json.loads(response.text)["status"] == "SUCCESS" + + def release_lock(self, lock_id): + """Release a lock.""" + + path = '/locks/release/%s' % lock_id + response = self.rest.request(method='delete', path=path) + + return response.ok + + def delete_lock(self, lock_name): + """Deletes a lock by name.""" + + path = '/locks/delete/%s' % lock_name + response = self.rest.request(method='delete', path=path, raise400=False) + + return response.ok + + def delete_all_locks(self): + """Delete all locks created during the lifetime of this object.""" + + # TODO(JD): Shouldn't this really be part of internal cleanup? + # FIXME: It can be several API calls. Any way to do in one fell swoop? + for lock_name in self.lock_names: + self.delete_lock(lock_name) + + def create_row(self, keyspace, table, values): + """Create a row.""" + + # self.logger.debug("MUSIC: create_row "+ table) + + data = { + 'values': values, + 'consistencyInfo': { + 'type': 'eventual', + }, + } + + path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows' % { + 'keyspace': keyspace, + 'table': table, + } + response = self.rest.request(method='post', path=path, data=data) + + return response.ok + + def insert_atom(self, keyspace, table, values, name=None, value=None): + """Atomic create/update row.""" + + data = { + 'values': values, + 'consistencyInfo': { + 'type': 'atomic', + } + } + + path = self.__row_url_path(keyspace, table, name, value) + method = 'post' + + # self.logger.debug("MUSIC: Method: %s ", (method.upper())) + # self.logger.debug("MUSIC: Path: %s", (path)) + # self.logger.debug("MUSIC: Data: %s", (data)) + + self.rest.request(method=method, path=path, data=data) + + def update_row_eventually(self, keyspace, table, values): + """Update a row. Not atomic.""" + + data = { + 'values': values, + 'consistencyInfo': { + 'type': 'eventual', + }, + } + + path = self.__row_url_path(keyspace, table) + response = self.rest.request(method='post', path=path, data=data) + + return response.ok + + def delete_row_eventually(self, keyspace, table, pk_name, pk_value): + """Delete a row. Not atomic.""" + + data = { + 'consistencyInfo': { + 'type': 'eventual', + }, + } + + path = self.__row_url_path(keyspace, table, pk_name, pk_value) + response = self.rest.request(method='delete', path=path, data=data) + + return response.ok + + def delete_atom(self, keyspace, table, pk_name, pk_value): + """Atomic delete row.""" + + data = { + 'consistencyInfo': { + 'type': 'atomic', + } + } + path = self.__row_url_path(keyspace, table, pk_name, pk_value) + self.rest.request(method='delete', path=path, data=data) + + def read_row(self, keyspace, table, pk_name, pk_value): + """Read one row based on a primary key name/value.""" + + path = self.__row_url_path(keyspace, table, pk_name, pk_value) + response = self.rest.request(path=path) + return response.json() + + def read_all_rows(self, keyspace, table): + """Read all rows.""" + + return self.read_row(keyspace, table, pk_name=None, pk_value=None) diff --git a/engine/src/valet/engine/db_connect/db_handler.py b/engine/src/valet/engine/db_connect/db_handler.py new file mode 100644 index 0000000..2165b8a --- /dev/null +++ b/engine/src/valet/engine/db_connect/db_handler.py @@ -0,0 +1,533 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import json +import operator + +from valet.engine.db_connect.locks import Locks, now + + +class DBHandler(object): + + def __init__(self, _db, _config, _logger): + self.keyspace = _config.get("keyspace") + self.requests_table = _config.get("requests_table") + self.results_table = _config.get("results_table") + self.group_rules_table = _config.get("group_rules_table") + self.groups_table = _config.get("groups_table") + self.stacks_table = _config.get("stacks_table") + self.resources_table = _config.get("resources_table") + self.stack_id_map_table = _config.get("stack_id_map_table") + self.regions_table = _config.get("regions_table") + + self.db = _db + + self.logger = _logger + + def get_requests(self): + """Get requests from valet-api.""" + + request_list = [] + + try: + rows = self.db.read_all_rows(self.keyspace, self.requests_table) + except Exception as e: + self.logger.error("DB: while reading requests: " + str(e)) + return [] + + if rows is not None and len(rows) > 0: + for key, row in rows.iteritems(): + if key == "status": + if row == "FAILURE": + self.logger.error("DB: Failure in " + self.requests_table) + return [] + continue + elif key == "error": + continue + elif key == "result": + for _, dbrow in row.iteritems(): + request_list.append(dbrow) + + if len(request_list) > 0: + # NOTE(Gueyoung): Sort by timestamp if timestamp is based UDT. + # Currently, ping's timestamp is always -1, while all others 0. + # This is to provide the priority to ping request. + request_list.sort(key=operator.itemgetter("timestamp")) + + return request_list + + def return_request(self, _req_id, _status, _result): + """Finalize the request by + + Create result in results table and delete handled request from requests table. + """ + + # TODO(Gueyoung): avoid duplicated results. + + # Wait randomly with unique seed (Valet instance ID). + # random.seed(_seed) + # r = random.randint(1, 100) + # delay = float(r) / 100.0 + # time.sleep(delay) + + if not self._create_result(_req_id, _status, _result): + return False + + if not self._delete_request(_req_id): + return False + + return True + + def get_results(self): + """Get results.""" + + result_list = [] + + try: + rows = self.db.read_all_rows(self.keyspace, self.results_table) + except Exception as e: + self.logger.error("DB: while reading results: " + str(e)) + return None + + if rows is not None and len(rows) > 0: + for key, row in rows.iteritems(): + if key == "status": + continue + elif key == "error": + continue + elif key == "result": + for _, dbrow in row.iteritems(): + result_list.append(dbrow) + + return result_list + + def _create_result(self, _req_id, _status, _result): + """Return result of request by putting it into results table.""" + + data = { + 'request_id': _req_id, + 'status': json.dumps(_status), + 'result': json.dumps(_result), + 'timestamp': now() + } + try: + self.db.insert_atom(self.keyspace, self.results_table, data) + except Exception as e: + self.logger.error("DB: while putting placement result: " + str(e)) + return False + + return True + + def _delete_request(self, _req_id): + """Delete finished request.""" + + try: + self.db.delete_atom(self.keyspace, self.requests_table, + 'request_id', _req_id) + except Exception as e: + self.logger.error("DB: while deleting handled request: " + str(e)) + return False + + return True + + def clean_expired_regions(self): + """Delete regions from the regions table that have expired. + + Return the list of locked regions.""" + + locked_regions = [] + + try: + result = self.db.read_row(self.keyspace, self.regions_table, None, None)["result"] + except Exception as e: + self.logger.error("DB: while reading locked regions: " + str(e)) + return None + + for _, data in sorted(result.iteritems()): + if int(data["expire_time"]) < now(): + + self.logger.warning("lock on %s has timed out and is revoked" % data["region_id"]) + + Locks.unlock(self, data["region_id"]) + + if not self.delete_region(data["region_id"]): + return None + else: + locked_regions.append(data["region_id"]) + + return locked_regions + + def delete_region(self, region_id): + """Delete from regions table.""" + + try: + self.db.delete_atom(self.keyspace, self.regions_table, + 'region_id', region_id) + except Exception as e: + self.logger.error("DB: while deleting expired lock: " + str(e)) + return False + + return True + + def add_region(self, region_id, expire_time, update=False): + """Add/update locking info into region table.""" + + data = { + "region_id": region_id, + "locked_by": "hostname", + "expire_time": expire_time + } + + name = value = None + if update: + name = "region_id" + value = region_id + + try: + self.db.insert_atom(self.keyspace, self.regions_table, data, name, value) + except Exception as e: + self.logger.error("DB: while adding locked region: " + str(e)) + return False + + return True + + def create_stack_id_map(self, _req_id, _stack_id): + """Create request map entry.""" + + data = { + 'request_id': _req_id, + 'stack_id': _stack_id + } + try: + self.db.insert_atom(self.keyspace, self.stack_id_map_table, data) + except Exception as e: + self.logger.error("DB: while creating request map: " + str(e)) + return False + + return True + + def get_stack_id_map(self, _req_id): + """Get stack id.""" + + try: + row = self.db.read_row(self.keyspace, self.stack_id_map_table, + "request_id", _req_id) + except Exception as e: + self.logger.error("DB: while reading stack_id: " + str(e)) + return None + + if len(row) > 0: + if "result" in row.keys(): + if len(row["result"]) > 0: + return row["result"][row["result"].keys()[0]] + else: + return {} + else: + return {} + else: + return {} + + def delete_stack_id_map(self, _req_id): + """Delete map of confirmed or rollbacked request.""" + + try: + self.db.delete_atom(self.keyspace, self.stack_id_map_table, + 'request_id', _req_id) + except Exception as e: + self.logger.error("DB: while deleting request id map: " + str(e)) + return False + + return True + + def get_group_rules(self): + """Get all valet group rules.""" + + rule_list = [] + + try: + rows = self.db.read_all_rows(self.keyspace, self.group_rules_table) + except Exception as e: + self.logger.error("DB: while reading group rules: " + str(e)) + return None + + if len(rows) > 0: + for key, row in rows.iteritems(): + if key == "result": + for _, dbrow in row.iteritems(): + rule_list.append(dbrow) + + return rule_list + + def get_group_rule(self, _id): + """Get valet group rule.""" + + try: + row = self.db.read_row(self.keyspace, self.group_rules_table, "id", _id) + except Exception as e: + self.logger.error("DB: while reading group rule: " + str(e)) + return None + + if len(row) > 0: + if "result" in row.keys(): + if len(row["result"]) > 0: + return row["result"][row["result"].keys()[0]] + else: + return {} + else: + return {} + else: + return {} + + def create_group_rule(self, _name, _scope, _type, _level, _members, _desc): + """Create a group rule.""" + + data = { + 'id': _name, + 'app_scope': _scope, + 'type': _type, + 'level': _level, + 'members': json.dumps(_members), + 'description': _desc, + 'groups': json.dumps([]), + 'status': "enabled" + } + try: + self.db.insert_atom(self.keyspace, self.group_rules_table, data) + except Exception as e: + self.logger.error("DB: while creating a group rule: " + str(e)) + return False + + return True + + def get_valet_groups(self): + """Get all valet groups.""" + + group_list = [] + + try: + rows = self.db.read_all_rows(self.keyspace, self.groups_table) + except Exception as e: + self.logger.error("DB: while reading groups: " + str(e)) + return None + + if len(rows) > 0: + for key, row in rows.iteritems(): + if key == "result": + for _, dbrow in row.iteritems(): + group_list.append(dbrow) + + return group_list + + def create_valet_group(self, _id, _g_info): + """Create a group.""" + + data = { + 'id': _id, + 'uuid': _g_info.get("uuid"), + 'type': _g_info.get("group_type"), + 'level': _g_info.get("level"), + 'factory': _g_info.get("factory"), + 'rule_id': _g_info.get("rule_id"), + 'metadata': json.dumps(_g_info.get("metadata")), + 'server_list': json.dumps(_g_info.get("server_list")), + 'member_hosts': json.dumps(_g_info.get("member_hosts")), + 'status': _g_info.get("status") + } + try: + self.db.insert_atom(self.keyspace, self.groups_table, data) + except Exception as e: + self.logger.error("DB: while creating a group: " + str(e)) + return False + + return True + + def update_valet_group(self, _id, _g_info): + """Update group.""" + + data = { + 'id': _id, + 'uuid': _g_info.get("uuid"), + 'type': _g_info.get("group_type"), + 'level': _g_info.get("level"), + 'factory': _g_info.get("factory"), + 'rule_id': _g_info.get("rule_id"), + 'metadata': json.dumps(_g_info.get("metadata")), + 'server_list': json.dumps(_g_info.get("server_list")), + 'member_hosts': json.dumps(_g_info.get("member_hosts")), + 'status': _g_info.get("status") + } + try: + self.db.insert_atom(self.keyspace, self.groups_table, data, + name='id', value=_id) + except Exception as e: + self.logger.error("DB: while updating group: " + str(e)) + return False + + return True + + def delete_valet_group(self, _id): + """Delete finished request.""" + + try: + self.db.delete_atom(self.keyspace, self.groups_table, 'id', _id) + except Exception as e: + self.logger.error("DB: while deleting valet group: " + str(e)) + return False + + return True + + def get_resource(self, _dc_id): + """Get datacenter's resource.""" + + try: + row = self.db.read_row(self.keyspace, self.resources_table, "id", _dc_id) + except Exception as e: + self.logger.error("DB: while reading datacenter resource: " + str(e)) + return None + + if len(row) > 0: + if "result" in row.keys(): + if len(row["result"]) > 0: + return row["result"][row["result"].keys()[0]] + else: + return {} + else: + return {} + else: + return {} + + def create_resource(self, _k, _url, _requests, _resource): + """Create a new resource status.""" + + data = { + 'id': _k, + 'url': _url, + 'requests': json.dumps(_requests), + 'resource': json.dumps(_resource) + } + try: + self.db.insert_atom(self.keyspace, self.resources_table, data) + except Exception as e: + self.logger.error("DB: while inserting resource status: " + str(e)) + return False + + return True + + def update_resource(self, _k, _url, _requests, _resource): + """Update resource status.""" + + data = { + 'id': _k, + 'url': _url, + 'requests': json.dumps(_requests), + 'resource': json.dumps(_resource) + } + try: + self.db.insert_atom(self.keyspace, self.resources_table, data, + name='id', value=_k) + except Exception as e: + self.logger.error("DB: while updating resource status: " + str(e)) + return False + + return True + + def get_stack(self, _id): + """Get stack info.""" + + try: + row = self.db.read_row(self.keyspace, self.stacks_table, 'id', _id) + except Exception as e: + self.logger.error("DB: while getting stack info: " + str(e)) + return None + + if len(row) > 0: + if "result" in row.keys(): + if len(row["result"]) > 0: + return row["result"][row["result"].keys()[0]] + else: + return {} + else: + return {} + else: + return {} + + def create_stack(self, _id, _status, _dc, _name, _uuid, + _tenant_id, _metadata, + _servers, _old_servers, + _state, _old_state): + """Store new stack info.""" + + data = { + 'id': _id, + 'last_status': _status, + 'datacenter': _dc, + 'stack_name': _name, + 'uuid': _uuid, + 'tenant_id': _tenant_id, + 'metadata': json.dumps(_metadata), + 'servers': json.dumps(_servers), + 'prior_servers': json.dumps(_old_servers), + 'state': _state, + 'prior_state': _old_state + } + try: + self.db.insert_atom(self.keyspace, self.stacks_table, data) + except Exception as e: + self.logger.error("DB: while storing app: " + str(e)) + return False + + return True + + def delete_stack(self, _id): + """Delete stack.""" + + try: + self.db.delete_atom(self.keyspace, self.stacks_table, 'id', _id) + except Exception as e: + self.logger.error("DB: while deleting app: " + str(e)) + return False + + return True + + def update_stack(self, _id, _status, _dc, _name, _uuid, + _tenant_id, _metadata, + _servers, _old_servers, + _state, _old_state): + """Store updated stack info.""" + + data = { + 'id': _id, + 'last_status': _status, + 'datacenter': _dc, + 'stack_name': _name, + 'uuid': _uuid, + 'tenant_id': _tenant_id, + 'metadata': json.dumps(_metadata), + 'servers': json.dumps(_servers), + 'prior_servers': json.dumps(_old_servers), + 'state': _state, + 'prior_state': _old_state + } + try: + self.db.insert_atom(self.keyspace, self.stacks_table, data, + name='id', value=_id) + except Exception as e: + self.logger.error("DB: while updating stack: " + str(e)) + return False + + return True diff --git a/engine/src/valet/engine/db_connect/locks.py b/engine/src/valet/engine/db_connect/locks.py new file mode 100644 index 0000000..b367e8b --- /dev/null +++ b/engine/src/valet/engine/db_connect/locks.py @@ -0,0 +1,152 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import re +import time + +from valet.utils.logger import Logger + + +def now(): + return int(round(time.time() * 1000)) + + +def later(minutes=0, seconds=0): + # Consider 20 sec as a lead time. + seconds -= 20 + return int(round(time.time() * 1000)) + (minutes * 60 + seconds) * 1000 + + +class Locks(object): + """Manage locking as a semaphore. + + A region lock that manages locks and the region table to + lock the entire time while working on a region. + """ + + Lockspace = "engine" + + def __init__(self, dbh, timeout=None): + self.dbh = dbh + self.db = dbh.db + self.timeout = timeout + + self.locked_regions = [] + self.expired = 0 + self.locked = False + self.region = None + self.key = None + + def set_regions(self): + """Set locked regions.""" + + lr = self.dbh.clean_expired_regions() + if lr is None: + return False + + self.locked_regions = lr + + return True + + def _add_region(self, region): + """Set when to expire and update/add to region table.""" + + self.expired = later(seconds=self.timeout) + self.region = region + + if not self.dbh.add_region(self.region, self.expired): + return None + + return "yes" + + def is_my_turn(self, region): + """Try for a lock, unless you know its already locked. + + If you already have the lock, just update the expire time.""" + + if self.expired < now(): + self.locked = False + + if self.locked: + if not self.region == region: + return "no" + + return self._add_region(region) + + if region in self.locked_regions: + return "no" + + self.db.logger.debug("try lock region: " + region) + + if self._add_region(region) is None: + return None + + status = self.got_lock(region) + if status is None: + return None + + if status == "fail": + self.locked = False + return "no" + + self.locked = True + + return "yes" + + def got_lock(self, key): + """I got lock if I get the first (0) lock""" + + self.key = '%s.%s.%s' % (self.dbh.keyspace, Locks.Lockspace, key) + + try: + lock_id = self.db.create_lock(self.key) + except Exception as e: + Logger.get_logger('debug').error("DB: while creating lock: " + str(e)) + return None + + if 0 == int(re.search('-(\d+)$', lock_id).group(1)): + return "ok" + else: + return "fail" + + def done_with_my_turn(self): + """Release lock and clear from region table.""" + + if not self.locked: + return "ok" + + try: + self.db.delete_lock(self.key) + except Exception as e: + Logger.get_logger('debug').error("DB: while deleting lock: " + str(e)) + return None + + if not self.dbh.delete_region(self.region): + return None + + self.locked = False + self.region = None + + return "ok" + + @staticmethod + def unlock(dbh, key): + """Removes the lock for a key.""" + + key = '%s.%s.%s' % (dbh.keyspace, Locks.Lockspace, key) + dbh.db.delete_lock(key) |