summaryrefslogtreecommitdiffstats
path: root/engine/src
diff options
context:
space:
mode:
Diffstat (limited to 'engine/src')
-rw-r--r--engine/src/valet/engine/__init__.py18
-rw-r--r--engine/src/valet/engine/db_connect/__init__.py18
-rw-r--r--engine/src/valet/engine/db_connect/db_apis/__init__.py18
-rw-r--r--engine/src/valet/engine/db_connect/db_apis/mem_db.py117
-rw-r--r--engine/src/valet/engine/db_connect/db_apis/music.py406
-rw-r--r--engine/src/valet/engine/db_connect/db_handler.py533
-rw-r--r--engine/src/valet/engine/db_connect/locks.py152
7 files changed, 1262 insertions, 0 deletions
diff --git a/engine/src/valet/engine/__init__.py b/engine/src/valet/engine/__init__.py
new file mode 100644
index 0000000..bd50995
--- /dev/null
+++ b/engine/src/valet/engine/__init__.py
@@ -0,0 +1,18 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
diff --git a/engine/src/valet/engine/db_connect/__init__.py b/engine/src/valet/engine/db_connect/__init__.py
new file mode 100644
index 0000000..bd50995
--- /dev/null
+++ b/engine/src/valet/engine/db_connect/__init__.py
@@ -0,0 +1,18 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
diff --git a/engine/src/valet/engine/db_connect/db_apis/__init__.py b/engine/src/valet/engine/db_connect/db_apis/__init__.py
new file mode 100644
index 0000000..bd50995
--- /dev/null
+++ b/engine/src/valet/engine/db_connect/db_apis/__init__.py
@@ -0,0 +1,18 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
diff --git a/engine/src/valet/engine/db_connect/db_apis/mem_db.py b/engine/src/valet/engine/db_connect/db_apis/mem_db.py
new file mode 100644
index 0000000..b706c63
--- /dev/null
+++ b/engine/src/valet/engine/db_connect/db_apis/mem_db.py
@@ -0,0 +1,117 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+import copy
+
+
+class MemDB(object):
+
+ def __init__(self, _config, _logger):
+ self.logger = _logger
+
+ self.keyspace = _config.get("keyspace")
+ self.requests_table = _config.get("requests_table")
+ self.results_table = _config.get("results_table")
+ self.group_rules_table = _config.get("group_rules_table")
+ self.groups_table = _config.get("groups_table")
+ self.stacks_table = _config.get("stacks_table")
+ self.resources_table = _config.get("resources_table")
+ self.stack_id_map_table = _config.get("stack_id_map_table")
+
+ self.requests = {}
+ self.results = {}
+ self.group_rules = {}
+ self.groups = {}
+ self.stacks = {}
+ self.resources = {}
+ self.stack_id_map = {}
+
+ def read_all_rows(self, keyspace, table):
+ rows = {"result": {}}
+
+ if table == self.requests_table:
+ for k, v in self.requests.iteritems():
+ rows["result"][k] = copy.deepcopy(v)
+ elif table == self.results_table:
+ for k, v in self.results.iteritems():
+ rows["result"][k] = copy.deepcopy(v)
+ elif table == self.group_rules_table:
+ for k, v in self.group_rules.iteritems():
+ rows["result"][k] = copy.deepcopy(v)
+ elif table == self.groups_table:
+ for k, v in self.groups.iteritems():
+ rows["result"][k] = copy.deepcopy(v)
+
+ return rows
+
+ def insert_atom(self, keyspace, table, data, name=None, value=None):
+ if table == self.requests_table:
+ self.requests[data['request_id']] = data
+ elif table == self.results_table:
+ self.results[data['request_id']] = data
+ elif table == self.group_rules_table:
+ self.group_rules[data['id']] = data
+ elif table == self.groups_table:
+ self.groups[data['id']] = data
+ elif table == self.resources_table:
+ self.resources[data['id']] = data
+ elif table == self.stacks_table:
+ self.stacks[data['id']] = data
+ elif table == self.stack_id_map_table:
+ self.stack_id_map[data['request_id']] = data
+
+ def delete_atom(self, keyspace, table, pk_name, pk_value):
+ if table == self.requests_table:
+ if pk_value in self.requests.keys():
+ del self.requests[pk_value]
+ elif table == self.groups_table:
+ if pk_value in self.groups.keys():
+ del self.groups[pk_value]
+ elif table == self.results_table:
+ if pk_value in self.results.keys():
+ del self.results[pk_value]
+
+ def read_row(self, keyspace, table, pk_name, pk_value):
+ row = {"result": {}}
+
+ if table == self.requests_table:
+ if pk_value in self.requests.keys():
+ row["result"]["row 0"] = copy.deepcopy(self.requests[pk_value])
+ elif table == self.results_table:
+ if pk_value in self.results.keys():
+ row["result"]["row 0"] = copy.deepcopy(self.results[pk_value])
+ elif table == self.resources_table:
+ if pk_value in self.resources.keys():
+ row["result"]["row 0"] = copy.deepcopy(self.resources[pk_value])
+ elif table == self.group_rules_table:
+ if pk_value in self.group_rules.keys():
+ row["result"]["row 0"] = copy.deepcopy(self.group_rules[pk_value])
+ elif table == self.stack_id_map_table:
+ if pk_value in self.stack_id_map.keys():
+ row["result"]["row 0"] = copy.deepcopy(self.stack_id_map[pk_value])
+ elif table == self.stacks_table:
+ if pk_value in self.stacks.keys():
+ row["result"]["row 0"] = copy.deepcopy(self.stacks[pk_value])
+
+ return row
+
+ def create_lock(self, _key):
+ return "$x--0000000000"
+
+ def delete_lock(self, _key):
+ pass
diff --git a/engine/src/valet/engine/db_connect/db_apis/music.py b/engine/src/valet/engine/db_connect/db_apis/music.py
new file mode 100644
index 0000000..6135718
--- /dev/null
+++ b/engine/src/valet/engine/db_connect/db_apis/music.py
@@ -0,0 +1,406 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+# -*- encoding: utf-8 -*-
+#
+# Copyright (c) 2016 AT&T
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+#
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import base64
+import json
+import requests
+
+from valet.utils.decryption import decrypt
+
+
+class REST(object):
+ """Helper class for REST operations."""
+
+ def __init__(self, hosts, port, path, timeout, retries,
+ userid, password, ns, logger):
+ """Initializer. Accepts target host list, port, and path."""
+
+ self.hosts = hosts # List of IP or FQDNs
+ self.port = port # Port Number
+ self.path = path # Path starting with /
+ self.timeout = float(timeout) # REST request timeout in seconds
+ self.retries = retries # Retires before failing over to next Music server.
+ self.userid = userid
+ self.password = password
+ self.ns = ns
+ self.logger = logger # For logging
+
+ self.urls = []
+ for host in self.hosts:
+ # Must end without a slash
+ self.urls.append('http://%(host)s:%(port)s%(path)s' % {
+ 'host': host,
+ 'port': self.port,
+ 'path': self.path,
+ })
+
+ def __headers(self, content_type='application/json'):
+ """Returns HTTP request headers."""
+
+ headers = {
+ 'ns': self.ns,
+ 'accept': content_type,
+ 'content-type': content_type,
+ 'authorization': 'Basic %s' % base64.b64encode(self.userid + ':' + self.password)
+ }
+
+ return headers
+
+ def request(self, method='get', content_type='application/json', path='/',
+ data=None, raise400=True):
+ """ Performs HTTP request """
+
+ if method not in ('post', 'get', 'put', 'delete'):
+ raise KeyError("Method must be: post, get, put, or delete.")
+
+ method_fn = getattr(requests, method)
+
+ if data:
+ data_json = json.dumps(data)
+ else:
+ data_json = None
+
+ response = None
+ timeout = False
+ err_message = ""
+ full_url = ""
+ for url in self.urls:
+ # Try each url in turn. First one to succeed wins.
+ full_url = url + path
+
+ for attempt in range(self.retries):
+ # Ignore the previous exception.
+ try:
+ my_headers = self.__headers(content_type)
+ for header_key in my_headers:
+ if (type(my_headers[header_key]).__name__ == 'unicode'):
+ my_headers[header_key] = my_headers[header_key].encode('ascii', 'ignore')
+ response = method_fn(full_url, data=data_json,
+ headers=my_headers,
+ timeout=self.timeout)
+ if raise400 or not response.status_code == 400:
+ response.raise_for_status()
+ return response
+
+ except requests.exceptions.Timeout as err:
+ err_message = err.message
+ response = requests.Response()
+ response.url = full_url
+ if not timeout:
+ self.logger.warning("Music: %s Timeout" % url, errorCode='availability')
+ timeout = True
+
+ except requests.exceptions.RequestException as err:
+ err_message = err.message
+ self.logger.debug("Music: %s Request Exception" % url)
+ self.logger.debug(" method = %s" % method)
+ self.logger.debug(" timeout = %s" % self.timeout)
+ self.logger.debug(" err = %s" % err)
+ self.logger.debug(" full url = %s" % full_url)
+ self.logger.debug(" request data = %s" % data_json)
+ self.logger.debug(" request headers = %s" % my_headers)
+ self.logger.debug(" status code = %s" % response.status_code)
+ self.logger.debug(" response = %s" % response.text)
+ self.logger.debug(" response headers = %s" % response.headers)
+
+ # If we get here, an exception was raised for every url,
+ # but we passed so we could try each endpoint. Raise status
+ # for the last attempt (for now) so that we report something.
+ if response is not None:
+ self.logger.debug("Music: Full Url: %s", full_url)
+ self.logger.debug("Music: %s ", err_message)
+ response.raise_for_status()
+
+
+class Music(object):
+ """Wrapper for Music API"""
+
+ def __init__(self, _config, _logger):
+ """Initializer. Accepts a lock_timeout for atomic operations."""
+
+ self.logger = _logger
+
+ pw = decrypt(_config["engine"]["ek"],
+ _config["logging"]["lk"],
+ _config["db"]["dk"],
+ _config["music"]["password"])
+
+ kwargs = {
+ 'hosts': _config["music"]["hosts"],
+ 'port': _config["music"]["port"],
+ 'path': _config["music"]["path"],
+ 'timeout': _config["music"]["timeout"],
+ 'retries': _config["music"]["retries"],
+ 'userid': _config["music"]["userid"],
+ 'password': pw,
+ 'ns': _config["music"]["namespace"],
+ 'logger': _logger,
+ }
+ self.rest = REST(**kwargs)
+
+ self.lock_names = []
+ self.lock_timeout = _config["music"]["lock_timeout"]
+
+ self.replication_factor = _config["music"]["replication_factor"]
+
+ @staticmethod
+ def __row_url_path(keyspace, table, pk_name=None, pk_value=None):
+ """Returns a Music-compliant row URL path."""
+
+ path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows' % {
+ 'keyspace': keyspace,
+ 'table': table,
+ }
+
+ if pk_name and pk_value:
+ path += '?%s=%s' % (pk_name, pk_value)
+
+ return path
+
+ def create_keyspace(self, keyspace):
+ """Creates a keyspace."""
+
+ data = {
+ 'replicationInfo': {
+ # 'class': 'NetworkTopologyStrategy',
+ # 'dc1': self.replication_factor,
+ 'class': 'SimpleStrategy',
+ 'replication_factor': self.replication_factor,
+ },
+ 'durabilityOfWrites': True,
+ 'consistencyInfo': {
+ 'type': 'eventual',
+ },
+ }
+
+ path = '/keyspaces/%s' % keyspace
+ response = self.rest.request(method='post', path=path, data=data)
+
+ return response.ok
+
+ def drop_keyspace(self, keyspace):
+ """Drops a keyspace."""
+
+ data = {
+ 'consistencyInfo': {
+ 'type': 'eventual',
+ },
+ }
+
+ path = '/keyspaces/%s' % keyspace
+ response = self.rest.request(method='delete', path=path, data=data)
+
+ return response.ok
+
+ def create_table(self, keyspace, table, schema):
+ """Creates a table."""
+
+ data = {
+ 'fields': schema,
+ 'consistencyInfo': {
+ 'type': 'eventual',
+ },
+ }
+ self.logger.debug(data)
+
+ path = '/keyspaces/%(keyspace)s/tables/%(table)s' % {
+ 'keyspace': keyspace,
+ 'table': table,
+ }
+
+ response = self.rest.request(method='post', path=path, data=data)
+
+ return response.ok
+
+ def create_index(self, keyspace, table, index_field, index_name=None):
+ """Creates an index for the referenced table."""
+
+ data = None
+ if index_name:
+ data = {
+ 'index_name': index_name,
+ }
+
+ pstr = '/keyspaces/%(keyspace)s/tables/%(table)s/index/%(index_field)s'
+ path = pstr % {
+ 'keyspace': keyspace,
+ 'table': table,
+ 'index_field': index_field,
+ }
+
+ response = self.rest.request(method='post', path=path, data=data)
+
+ return response.ok
+
+ def version(self):
+ """Returns version string."""
+
+ path = '/version'
+ response = self.rest.request(method='get', content_type='text/plain', path=path)
+
+ return response.text
+
+ def create_lock(self, lock_name):
+ """Returns the lock id. Use for acquiring and releasing."""
+
+ path = '/locks/create/%s' % lock_name
+ response = self.rest.request(method='post', path=path)
+
+ return json.loads(response.text)["lock"]["lock"]
+
+ def acquire_lock(self, lock_id):
+ """Acquire a lock."""
+
+ path = '/locks/acquire/%s' % lock_id
+ response = self.rest.request(method='get', path=path, raise400=False)
+
+ return json.loads(response.text)["status"] == "SUCCESS"
+
+ def release_lock(self, lock_id):
+ """Release a lock."""
+
+ path = '/locks/release/%s' % lock_id
+ response = self.rest.request(method='delete', path=path)
+
+ return response.ok
+
+ def delete_lock(self, lock_name):
+ """Deletes a lock by name."""
+
+ path = '/locks/delete/%s' % lock_name
+ response = self.rest.request(method='delete', path=path, raise400=False)
+
+ return response.ok
+
+ def delete_all_locks(self):
+ """Delete all locks created during the lifetime of this object."""
+
+ # TODO(JD): Shouldn't this really be part of internal cleanup?
+ # FIXME: It can be several API calls. Any way to do in one fell swoop?
+ for lock_name in self.lock_names:
+ self.delete_lock(lock_name)
+
+ def create_row(self, keyspace, table, values):
+ """Create a row."""
+
+ # self.logger.debug("MUSIC: create_row "+ table)
+
+ data = {
+ 'values': values,
+ 'consistencyInfo': {
+ 'type': 'eventual',
+ },
+ }
+
+ path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows' % {
+ 'keyspace': keyspace,
+ 'table': table,
+ }
+ response = self.rest.request(method='post', path=path, data=data)
+
+ return response.ok
+
+ def insert_atom(self, keyspace, table, values, name=None, value=None):
+ """Atomic create/update row."""
+
+ data = {
+ 'values': values,
+ 'consistencyInfo': {
+ 'type': 'atomic',
+ }
+ }
+
+ path = self.__row_url_path(keyspace, table, name, value)
+ method = 'post'
+
+ # self.logger.debug("MUSIC: Method: %s ", (method.upper()))
+ # self.logger.debug("MUSIC: Path: %s", (path))
+ # self.logger.debug("MUSIC: Data: %s", (data))
+
+ self.rest.request(method=method, path=path, data=data)
+
+ def update_row_eventually(self, keyspace, table, values):
+ """Update a row. Not atomic."""
+
+ data = {
+ 'values': values,
+ 'consistencyInfo': {
+ 'type': 'eventual',
+ },
+ }
+
+ path = self.__row_url_path(keyspace, table)
+ response = self.rest.request(method='post', path=path, data=data)
+
+ return response.ok
+
+ def delete_row_eventually(self, keyspace, table, pk_name, pk_value):
+ """Delete a row. Not atomic."""
+
+ data = {
+ 'consistencyInfo': {
+ 'type': 'eventual',
+ },
+ }
+
+ path = self.__row_url_path(keyspace, table, pk_name, pk_value)
+ response = self.rest.request(method='delete', path=path, data=data)
+
+ return response.ok
+
+ def delete_atom(self, keyspace, table, pk_name, pk_value):
+ """Atomic delete row."""
+
+ data = {
+ 'consistencyInfo': {
+ 'type': 'atomic',
+ }
+ }
+ path = self.__row_url_path(keyspace, table, pk_name, pk_value)
+ self.rest.request(method='delete', path=path, data=data)
+
+ def read_row(self, keyspace, table, pk_name, pk_value):
+ """Read one row based on a primary key name/value."""
+
+ path = self.__row_url_path(keyspace, table, pk_name, pk_value)
+ response = self.rest.request(path=path)
+ return response.json()
+
+ def read_all_rows(self, keyspace, table):
+ """Read all rows."""
+
+ return self.read_row(keyspace, table, pk_name=None, pk_value=None)
diff --git a/engine/src/valet/engine/db_connect/db_handler.py b/engine/src/valet/engine/db_connect/db_handler.py
new file mode 100644
index 0000000..2165b8a
--- /dev/null
+++ b/engine/src/valet/engine/db_connect/db_handler.py
@@ -0,0 +1,533 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+import json
+import operator
+
+from valet.engine.db_connect.locks import Locks, now
+
+
+class DBHandler(object):
+
+ def __init__(self, _db, _config, _logger):
+ self.keyspace = _config.get("keyspace")
+ self.requests_table = _config.get("requests_table")
+ self.results_table = _config.get("results_table")
+ self.group_rules_table = _config.get("group_rules_table")
+ self.groups_table = _config.get("groups_table")
+ self.stacks_table = _config.get("stacks_table")
+ self.resources_table = _config.get("resources_table")
+ self.stack_id_map_table = _config.get("stack_id_map_table")
+ self.regions_table = _config.get("regions_table")
+
+ self.db = _db
+
+ self.logger = _logger
+
+ def get_requests(self):
+ """Get requests from valet-api."""
+
+ request_list = []
+
+ try:
+ rows = self.db.read_all_rows(self.keyspace, self.requests_table)
+ except Exception as e:
+ self.logger.error("DB: while reading requests: " + str(e))
+ return []
+
+ if rows is not None and len(rows) > 0:
+ for key, row in rows.iteritems():
+ if key == "status":
+ if row == "FAILURE":
+ self.logger.error("DB: Failure in " + self.requests_table)
+ return []
+ continue
+ elif key == "error":
+ continue
+ elif key == "result":
+ for _, dbrow in row.iteritems():
+ request_list.append(dbrow)
+
+ if len(request_list) > 0:
+ # NOTE(Gueyoung): Sort by timestamp if timestamp is based UDT.
+ # Currently, ping's timestamp is always -1, while all others 0.
+ # This is to provide the priority to ping request.
+ request_list.sort(key=operator.itemgetter("timestamp"))
+
+ return request_list
+
+ def return_request(self, _req_id, _status, _result):
+ """Finalize the request by
+
+ Create result in results table and delete handled request from requests table.
+ """
+
+ # TODO(Gueyoung): avoid duplicated results.
+
+ # Wait randomly with unique seed (Valet instance ID).
+ # random.seed(_seed)
+ # r = random.randint(1, 100)
+ # delay = float(r) / 100.0
+ # time.sleep(delay)
+
+ if not self._create_result(_req_id, _status, _result):
+ return False
+
+ if not self._delete_request(_req_id):
+ return False
+
+ return True
+
+ def get_results(self):
+ """Get results."""
+
+ result_list = []
+
+ try:
+ rows = self.db.read_all_rows(self.keyspace, self.results_table)
+ except Exception as e:
+ self.logger.error("DB: while reading results: " + str(e))
+ return None
+
+ if rows is not None and len(rows) > 0:
+ for key, row in rows.iteritems():
+ if key == "status":
+ continue
+ elif key == "error":
+ continue
+ elif key == "result":
+ for _, dbrow in row.iteritems():
+ result_list.append(dbrow)
+
+ return result_list
+
+ def _create_result(self, _req_id, _status, _result):
+ """Return result of request by putting it into results table."""
+
+ data = {
+ 'request_id': _req_id,
+ 'status': json.dumps(_status),
+ 'result': json.dumps(_result),
+ 'timestamp': now()
+ }
+ try:
+ self.db.insert_atom(self.keyspace, self.results_table, data)
+ except Exception as e:
+ self.logger.error("DB: while putting placement result: " + str(e))
+ return False
+
+ return True
+
+ def _delete_request(self, _req_id):
+ """Delete finished request."""
+
+ try:
+ self.db.delete_atom(self.keyspace, self.requests_table,
+ 'request_id', _req_id)
+ except Exception as e:
+ self.logger.error("DB: while deleting handled request: " + str(e))
+ return False
+
+ return True
+
+ def clean_expired_regions(self):
+ """Delete regions from the regions table that have expired.
+
+ Return the list of locked regions."""
+
+ locked_regions = []
+
+ try:
+ result = self.db.read_row(self.keyspace, self.regions_table, None, None)["result"]
+ except Exception as e:
+ self.logger.error("DB: while reading locked regions: " + str(e))
+ return None
+
+ for _, data in sorted(result.iteritems()):
+ if int(data["expire_time"]) < now():
+
+ self.logger.warning("lock on %s has timed out and is revoked" % data["region_id"])
+
+ Locks.unlock(self, data["region_id"])
+
+ if not self.delete_region(data["region_id"]):
+ return None
+ else:
+ locked_regions.append(data["region_id"])
+
+ return locked_regions
+
+ def delete_region(self, region_id):
+ """Delete from regions table."""
+
+ try:
+ self.db.delete_atom(self.keyspace, self.regions_table,
+ 'region_id', region_id)
+ except Exception as e:
+ self.logger.error("DB: while deleting expired lock: " + str(e))
+ return False
+
+ return True
+
+ def add_region(self, region_id, expire_time, update=False):
+ """Add/update locking info into region table."""
+
+ data = {
+ "region_id": region_id,
+ "locked_by": "hostname",
+ "expire_time": expire_time
+ }
+
+ name = value = None
+ if update:
+ name = "region_id"
+ value = region_id
+
+ try:
+ self.db.insert_atom(self.keyspace, self.regions_table, data, name, value)
+ except Exception as e:
+ self.logger.error("DB: while adding locked region: " + str(e))
+ return False
+
+ return True
+
+ def create_stack_id_map(self, _req_id, _stack_id):
+ """Create request map entry."""
+
+ data = {
+ 'request_id': _req_id,
+ 'stack_id': _stack_id
+ }
+ try:
+ self.db.insert_atom(self.keyspace, self.stack_id_map_table, data)
+ except Exception as e:
+ self.logger.error("DB: while creating request map: " + str(e))
+ return False
+
+ return True
+
+ def get_stack_id_map(self, _req_id):
+ """Get stack id."""
+
+ try:
+ row = self.db.read_row(self.keyspace, self.stack_id_map_table,
+ "request_id", _req_id)
+ except Exception as e:
+ self.logger.error("DB: while reading stack_id: " + str(e))
+ return None
+
+ if len(row) > 0:
+ if "result" in row.keys():
+ if len(row["result"]) > 0:
+ return row["result"][row["result"].keys()[0]]
+ else:
+ return {}
+ else:
+ return {}
+ else:
+ return {}
+
+ def delete_stack_id_map(self, _req_id):
+ """Delete map of confirmed or rollbacked request."""
+
+ try:
+ self.db.delete_atom(self.keyspace, self.stack_id_map_table,
+ 'request_id', _req_id)
+ except Exception as e:
+ self.logger.error("DB: while deleting request id map: " + str(e))
+ return False
+
+ return True
+
+ def get_group_rules(self):
+ """Get all valet group rules."""
+
+ rule_list = []
+
+ try:
+ rows = self.db.read_all_rows(self.keyspace, self.group_rules_table)
+ except Exception as e:
+ self.logger.error("DB: while reading group rules: " + str(e))
+ return None
+
+ if len(rows) > 0:
+ for key, row in rows.iteritems():
+ if key == "result":
+ for _, dbrow in row.iteritems():
+ rule_list.append(dbrow)
+
+ return rule_list
+
+ def get_group_rule(self, _id):
+ """Get valet group rule."""
+
+ try:
+ row = self.db.read_row(self.keyspace, self.group_rules_table, "id", _id)
+ except Exception as e:
+ self.logger.error("DB: while reading group rule: " + str(e))
+ return None
+
+ if len(row) > 0:
+ if "result" in row.keys():
+ if len(row["result"]) > 0:
+ return row["result"][row["result"].keys()[0]]
+ else:
+ return {}
+ else:
+ return {}
+ else:
+ return {}
+
+ def create_group_rule(self, _name, _scope, _type, _level, _members, _desc):
+ """Create a group rule."""
+
+ data = {
+ 'id': _name,
+ 'app_scope': _scope,
+ 'type': _type,
+ 'level': _level,
+ 'members': json.dumps(_members),
+ 'description': _desc,
+ 'groups': json.dumps([]),
+ 'status': "enabled"
+ }
+ try:
+ self.db.insert_atom(self.keyspace, self.group_rules_table, data)
+ except Exception as e:
+ self.logger.error("DB: while creating a group rule: " + str(e))
+ return False
+
+ return True
+
+ def get_valet_groups(self):
+ """Get all valet groups."""
+
+ group_list = []
+
+ try:
+ rows = self.db.read_all_rows(self.keyspace, self.groups_table)
+ except Exception as e:
+ self.logger.error("DB: while reading groups: " + str(e))
+ return None
+
+ if len(rows) > 0:
+ for key, row in rows.iteritems():
+ if key == "result":
+ for _, dbrow in row.iteritems():
+ group_list.append(dbrow)
+
+ return group_list
+
+ def create_valet_group(self, _id, _g_info):
+ """Create a group."""
+
+ data = {
+ 'id': _id,
+ 'uuid': _g_info.get("uuid"),
+ 'type': _g_info.get("group_type"),
+ 'level': _g_info.get("level"),
+ 'factory': _g_info.get("factory"),
+ 'rule_id': _g_info.get("rule_id"),
+ 'metadata': json.dumps(_g_info.get("metadata")),
+ 'server_list': json.dumps(_g_info.get("server_list")),
+ 'member_hosts': json.dumps(_g_info.get("member_hosts")),
+ 'status': _g_info.get("status")
+ }
+ try:
+ self.db.insert_atom(self.keyspace, self.groups_table, data)
+ except Exception as e:
+ self.logger.error("DB: while creating a group: " + str(e))
+ return False
+
+ return True
+
+ def update_valet_group(self, _id, _g_info):
+ """Update group."""
+
+ data = {
+ 'id': _id,
+ 'uuid': _g_info.get("uuid"),
+ 'type': _g_info.get("group_type"),
+ 'level': _g_info.get("level"),
+ 'factory': _g_info.get("factory"),
+ 'rule_id': _g_info.get("rule_id"),
+ 'metadata': json.dumps(_g_info.get("metadata")),
+ 'server_list': json.dumps(_g_info.get("server_list")),
+ 'member_hosts': json.dumps(_g_info.get("member_hosts")),
+ 'status': _g_info.get("status")
+ }
+ try:
+ self.db.insert_atom(self.keyspace, self.groups_table, data,
+ name='id', value=_id)
+ except Exception as e:
+ self.logger.error("DB: while updating group: " + str(e))
+ return False
+
+ return True
+
+ def delete_valet_group(self, _id):
+ """Delete finished request."""
+
+ try:
+ self.db.delete_atom(self.keyspace, self.groups_table, 'id', _id)
+ except Exception as e:
+ self.logger.error("DB: while deleting valet group: " + str(e))
+ return False
+
+ return True
+
+ def get_resource(self, _dc_id):
+ """Get datacenter's resource."""
+
+ try:
+ row = self.db.read_row(self.keyspace, self.resources_table, "id", _dc_id)
+ except Exception as e:
+ self.logger.error("DB: while reading datacenter resource: " + str(e))
+ return None
+
+ if len(row) > 0:
+ if "result" in row.keys():
+ if len(row["result"]) > 0:
+ return row["result"][row["result"].keys()[0]]
+ else:
+ return {}
+ else:
+ return {}
+ else:
+ return {}
+
+ def create_resource(self, _k, _url, _requests, _resource):
+ """Create a new resource status."""
+
+ data = {
+ 'id': _k,
+ 'url': _url,
+ 'requests': json.dumps(_requests),
+ 'resource': json.dumps(_resource)
+ }
+ try:
+ self.db.insert_atom(self.keyspace, self.resources_table, data)
+ except Exception as e:
+ self.logger.error("DB: while inserting resource status: " + str(e))
+ return False
+
+ return True
+
+ def update_resource(self, _k, _url, _requests, _resource):
+ """Update resource status."""
+
+ data = {
+ 'id': _k,
+ 'url': _url,
+ 'requests': json.dumps(_requests),
+ 'resource': json.dumps(_resource)
+ }
+ try:
+ self.db.insert_atom(self.keyspace, self.resources_table, data,
+ name='id', value=_k)
+ except Exception as e:
+ self.logger.error("DB: while updating resource status: " + str(e))
+ return False
+
+ return True
+
+ def get_stack(self, _id):
+ """Get stack info."""
+
+ try:
+ row = self.db.read_row(self.keyspace, self.stacks_table, 'id', _id)
+ except Exception as e:
+ self.logger.error("DB: while getting stack info: " + str(e))
+ return None
+
+ if len(row) > 0:
+ if "result" in row.keys():
+ if len(row["result"]) > 0:
+ return row["result"][row["result"].keys()[0]]
+ else:
+ return {}
+ else:
+ return {}
+ else:
+ return {}
+
+ def create_stack(self, _id, _status, _dc, _name, _uuid,
+ _tenant_id, _metadata,
+ _servers, _old_servers,
+ _state, _old_state):
+ """Store new stack info."""
+
+ data = {
+ 'id': _id,
+ 'last_status': _status,
+ 'datacenter': _dc,
+ 'stack_name': _name,
+ 'uuid': _uuid,
+ 'tenant_id': _tenant_id,
+ 'metadata': json.dumps(_metadata),
+ 'servers': json.dumps(_servers),
+ 'prior_servers': json.dumps(_old_servers),
+ 'state': _state,
+ 'prior_state': _old_state
+ }
+ try:
+ self.db.insert_atom(self.keyspace, self.stacks_table, data)
+ except Exception as e:
+ self.logger.error("DB: while storing app: " + str(e))
+ return False
+
+ return True
+
+ def delete_stack(self, _id):
+ """Delete stack."""
+
+ try:
+ self.db.delete_atom(self.keyspace, self.stacks_table, 'id', _id)
+ except Exception as e:
+ self.logger.error("DB: while deleting app: " + str(e))
+ return False
+
+ return True
+
+ def update_stack(self, _id, _status, _dc, _name, _uuid,
+ _tenant_id, _metadata,
+ _servers, _old_servers,
+ _state, _old_state):
+ """Store updated stack info."""
+
+ data = {
+ 'id': _id,
+ 'last_status': _status,
+ 'datacenter': _dc,
+ 'stack_name': _name,
+ 'uuid': _uuid,
+ 'tenant_id': _tenant_id,
+ 'metadata': json.dumps(_metadata),
+ 'servers': json.dumps(_servers),
+ 'prior_servers': json.dumps(_old_servers),
+ 'state': _state,
+ 'prior_state': _old_state
+ }
+ try:
+ self.db.insert_atom(self.keyspace, self.stacks_table, data,
+ name='id', value=_id)
+ except Exception as e:
+ self.logger.error("DB: while updating stack: " + str(e))
+ return False
+
+ return True
diff --git a/engine/src/valet/engine/db_connect/locks.py b/engine/src/valet/engine/db_connect/locks.py
new file mode 100644
index 0000000..b367e8b
--- /dev/null
+++ b/engine/src/valet/engine/db_connect/locks.py
@@ -0,0 +1,152 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+import re
+import time
+
+from valet.utils.logger import Logger
+
+
+def now():
+ return int(round(time.time() * 1000))
+
+
+def later(minutes=0, seconds=0):
+ # Consider 20 sec as a lead time.
+ seconds -= 20
+ return int(round(time.time() * 1000)) + (minutes * 60 + seconds) * 1000
+
+
+class Locks(object):
+ """Manage locking as a semaphore.
+
+ A region lock that manages locks and the region table to
+ lock the entire time while working on a region.
+ """
+
+ Lockspace = "engine"
+
+ def __init__(self, dbh, timeout=None):
+ self.dbh = dbh
+ self.db = dbh.db
+ self.timeout = timeout
+
+ self.locked_regions = []
+ self.expired = 0
+ self.locked = False
+ self.region = None
+ self.key = None
+
+ def set_regions(self):
+ """Set locked regions."""
+
+ lr = self.dbh.clean_expired_regions()
+ if lr is None:
+ return False
+
+ self.locked_regions = lr
+
+ return True
+
+ def _add_region(self, region):
+ """Set when to expire and update/add to region table."""
+
+ self.expired = later(seconds=self.timeout)
+ self.region = region
+
+ if not self.dbh.add_region(self.region, self.expired):
+ return None
+
+ return "yes"
+
+ def is_my_turn(self, region):
+ """Try for a lock, unless you know its already locked.
+
+ If you already have the lock, just update the expire time."""
+
+ if self.expired < now():
+ self.locked = False
+
+ if self.locked:
+ if not self.region == region:
+ return "no"
+
+ return self._add_region(region)
+
+ if region in self.locked_regions:
+ return "no"
+
+ self.db.logger.debug("try lock region: " + region)
+
+ if self._add_region(region) is None:
+ return None
+
+ status = self.got_lock(region)
+ if status is None:
+ return None
+
+ if status == "fail":
+ self.locked = False
+ return "no"
+
+ self.locked = True
+
+ return "yes"
+
+ def got_lock(self, key):
+ """I got lock if I get the first (0) lock"""
+
+ self.key = '%s.%s.%s' % (self.dbh.keyspace, Locks.Lockspace, key)
+
+ try:
+ lock_id = self.db.create_lock(self.key)
+ except Exception as e:
+ Logger.get_logger('debug').error("DB: while creating lock: " + str(e))
+ return None
+
+ if 0 == int(re.search('-(\d+)$', lock_id).group(1)):
+ return "ok"
+ else:
+ return "fail"
+
+ def done_with_my_turn(self):
+ """Release lock and clear from region table."""
+
+ if not self.locked:
+ return "ok"
+
+ try:
+ self.db.delete_lock(self.key)
+ except Exception as e:
+ Logger.get_logger('debug').error("DB: while deleting lock: " + str(e))
+ return None
+
+ if not self.dbh.delete_region(self.region):
+ return None
+
+ self.locked = False
+ self.region = None
+
+ return "ok"
+
+ @staticmethod
+ def unlock(dbh, key):
+ """Removes the lock for a key."""
+
+ key = '%s.%s.%s' % (dbh.keyspace, Locks.Lockspace, key)
+ dbh.db.delete_lock(key)