diff options
author | Arthur Martella <arthur.martella.1@att.com> | 2019-03-15 12:14:57 -0400 |
---|---|---|
committer | Arthur Martella <arthur.martella.1@att.com> | 2019-03-15 12:14:57 -0400 |
commit | 0c4d7726ea2169fb47765bdec576f05c6d70aeb4 (patch) | |
tree | 68c0d4b917de4866b55e73e0acae10fa46901752 /engine/src | |
parent | 9b71f559689602bf3ff0d424e54afe52986cb958 (diff) |
Initial upload of F-GPS seed code 3/21
Includes:
Engine source: main, rules, solver, utils
Change-Id: I04cfa4802cd7a8588c9208319663efa2e3418ffa
Issue-ID: OPTFRA-440
Signed-off-by: arthur.martella.1@att.com
Diffstat (limited to 'engine/src')
-rw-r--r-- | engine/src/valet/__init__.py | 18 | ||||
-rw-r--r-- | engine/src/valet/bootstrapper.py | 121 | ||||
-rwxr-xr-x | engine/src/valet/rules/VNF_Rack_Diversity_RDN.json | 8 | ||||
-rwxr-xr-x | engine/src/valet/rules/VNF_Rack_Quorum_RDN.json | 8 | ||||
-rwxr-xr-x | engine/src/valet/rules/VNF_host_diversity_RDN.json | 8 | ||||
-rwxr-xr-x | engine/src/valet/rules/test_host_affinity_rule.json | 8 | ||||
-rwxr-xr-x | engine/src/valet/rules/test_host_diveristy_rule0.json | 8 | ||||
-rwxr-xr-x | engine/src/valet/rules/test_host_diveristy_rule1.json | 8 | ||||
-rwxr-xr-x | engine/src/valet/rules/test_host_diveristy_rule2.json | 8 | ||||
-rwxr-xr-x | engine/src/valet/rules/test_host_exclusivity.json | 8 | ||||
-rwxr-xr-x | engine/src/valet/rules/test_host_exclusivity2.json | 8 | ||||
-rw-r--r-- | engine/src/valet/solver/__init__.py | 18 | ||||
-rw-r--r-- | engine/src/valet/solver/ostro.py | 529 | ||||
-rw-r--r-- | engine/src/valet/utils/__init__.py | 18 | ||||
-rw-r--r-- | engine/src/valet/utils/decryption.py | 44 | ||||
-rw-r--r-- | engine/src/valet/utils/logger.py | 349 | ||||
-rw-r--r-- | engine/src/valet/valet_main.py | 88 |
17 files changed, 1257 insertions, 0 deletions
diff --git a/engine/src/valet/__init__.py b/engine/src/valet/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/bootstrapper.py b/engine/src/valet/bootstrapper.py new file mode 100644 index 0000000..08bc41b --- /dev/null +++ b/engine/src/valet/bootstrapper.py @@ -0,0 +1,121 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import glob +import json +import os +import sys + +from valet.engine.app_manager.app_handler import AppHandler +from valet.engine.db_connect.db_apis.music import Music +from valet.engine.db_connect.db_handler import DBHandler +from valet.engine.db_connect.locks import Locks +from valet.engine.resource_manager.compute_manager import ComputeManager +from valet.engine.resource_manager.metadata_manager import MetadataManager +from valet.engine.resource_manager.naming import Naming +from valet.engine.resource_manager.nova_compute import NovaCompute +from valet.engine.resource_manager.resource_handler import ResourceHandler +from valet.engine.resource_manager.topology_manager import TopologyManager +from valet.engine.search.optimizer import Optimizer + + +class Bootstrapper(object): + """Bootstrap valet-engine. + + Instantiate and configure all valet-engine sub-systems. + """ + + def __init__(self, _config, _logger): + self.config = _config + self.logger = _logger + + self.valet_id = None + + self.dbh = None + self.ah = None + self.rh = None + self.compute = None + self.topology = None + self.metadata = None + self.optimizer = None + + self.lock = None + + def config_valet(self): + """Set all required modules and configure them.""" + + self.valet_id = self.config["engine"]["id"] + + # Set DB connection. + db_config = self.config.get("db") + self.logger.info("launch engine -- keyspace: %s" % db_config.get("keyspace")) + db = Music(self.config, self.logger) + + self.dbh = DBHandler(db, db_config, self.logger) + + # Set lock to deal with datacenters in parallel. + self.lock = Locks(self.dbh, self.config["engine"]["timeout"]) + + # Set backend platform connection. + compute_config = self.config.get("compute") + compute_source = NovaCompute(self.config, + self.logger) + + topology_config = self.config.get("topology") + topology_source = Naming(self.config.get("naming"), self.logger) + + self.compute = ComputeManager(compute_source, self.logger) + self.topology = TopologyManager(topology_source, self.logger) + self.metadata = MetadataManager(compute_source, self.logger) + + # Set resource handler. + self.rh = ResourceHandler("ResourceHandler", + self.dbh, + self.compute, + self.metadata, + self.topology, + compute_config, + self.logger) + + dha = self.config["engine"]["dha"] + use_dha = True + if dha == "false" or not dha: + use_dha = False + + # Set application handler. + self.ah = AppHandler(self.dbh, use_dha, self.logger) + + # Set optimizer for placement decisions. + self.optimizer = Optimizer(self.logger) + + # Read initial Valet Group rules and create in DB. + root = os.path.dirname(os.path.dirname(os.path.realpath(sys.argv[0]))) + for rule_file in glob.glob(root + "/valet/rules/" + "*.json"): + rule = json.loads(open(rule_file).read()) + self.dbh.create_group_rule( + rule["name"], + rule["app_scope"], + rule["type"], + rule["level"], + rule["members"], + rule["description"] + ) + + self.logger.debug("rule (" + rule["name"] + ") created") + + return True diff --git a/engine/src/valet/rules/VNF_Rack_Diversity_RDN.json b/engine/src/valet/rules/VNF_Rack_Diversity_RDN.json new file mode 100755 index 0000000..02c6afc --- /dev/null +++ b/engine/src/valet/rules/VNF_Rack_Diversity_RDN.json @@ -0,0 +1,8 @@ +{ + "name": "VNF_Rack_Diversity_RDN", + "type": "diversity", + "level": "rack", + "app_scope": "vnf", + "members": [], + "description": "Rack level diversity for vUSP RDN VMs" +} diff --git a/engine/src/valet/rules/VNF_Rack_Quorum_RDN.json b/engine/src/valet/rules/VNF_Rack_Quorum_RDN.json new file mode 100755 index 0000000..c977ed1 --- /dev/null +++ b/engine/src/valet/rules/VNF_Rack_Quorum_RDN.json @@ -0,0 +1,8 @@ +{ + "name": "VNF_Rack_Quorum_RDN", + "type": "quorum-diversity", + "level": "rack", + "app_scope": "vnf", + "members": [], + "description": "Rack level quorum for vUSP RDN VMs" +} diff --git a/engine/src/valet/rules/VNF_host_diversity_RDN.json b/engine/src/valet/rules/VNF_host_diversity_RDN.json new file mode 100755 index 0000000..5da2b0a --- /dev/null +++ b/engine/src/valet/rules/VNF_host_diversity_RDN.json @@ -0,0 +1,8 @@ +{ + "name": "VNF_host_diversity_RDN", + "type": "diversity", + "level": "host", + "app_scope": "vnf", + "members": [], + "description": "Host level diversity for vUSP RDN VMs" +} diff --git a/engine/src/valet/rules/test_host_affinity_rule.json b/engine/src/valet/rules/test_host_affinity_rule.json new file mode 100755 index 0000000..bd01da0 --- /dev/null +++ b/engine/src/valet/rules/test_host_affinity_rule.json @@ -0,0 +1,8 @@ +{ + "name": "VALET_HOST_AFFINITY_RULE", + "type": "affinity", + "level": "host", + "app_scope": "vnf", + "members": [], + "description": "for test" +} diff --git a/engine/src/valet/rules/test_host_diveristy_rule0.json b/engine/src/valet/rules/test_host_diveristy_rule0.json new file mode 100755 index 0000000..fbbc41b --- /dev/null +++ b/engine/src/valet/rules/test_host_diveristy_rule0.json @@ -0,0 +1,8 @@ +{ + "name": "VALET_HOST_DIVERSITY_RULE_0", + "type": "diversity", + "level": "host", + "app_scope": "lcp", + "members": [], + "description": "for test" +} diff --git a/engine/src/valet/rules/test_host_diveristy_rule1.json b/engine/src/valet/rules/test_host_diveristy_rule1.json new file mode 100755 index 0000000..4a55515 --- /dev/null +++ b/engine/src/valet/rules/test_host_diveristy_rule1.json @@ -0,0 +1,8 @@ +{ + "name": "VALET_HOST_DIVERSITY_RULE_1", + "type": "diversity", + "level": "host", + "app_scope": "lcp", + "members": [], + "description": "for test" +} diff --git a/engine/src/valet/rules/test_host_diveristy_rule2.json b/engine/src/valet/rules/test_host_diveristy_rule2.json new file mode 100755 index 0000000..7ad81af --- /dev/null +++ b/engine/src/valet/rules/test_host_diveristy_rule2.json @@ -0,0 +1,8 @@ +{ + "name": "VALET_HOST_DIVERSITY_RULE_2", + "type": "diversity", + "level": "host", + "app_scope": "lcp", + "members": [], + "description": "for test" +} diff --git a/engine/src/valet/rules/test_host_exclusivity.json b/engine/src/valet/rules/test_host_exclusivity.json new file mode 100755 index 0000000..d777b82 --- /dev/null +++ b/engine/src/valet/rules/test_host_exclusivity.json @@ -0,0 +1,8 @@ +{ + "name": "VALET_HOST_EXCLUSIVITY", + "type": "exclusivity", + "level": "host", + "app_scope": "lcp", + "members": [], + "description": "for test" +} diff --git a/engine/src/valet/rules/test_host_exclusivity2.json b/engine/src/valet/rules/test_host_exclusivity2.json new file mode 100755 index 0000000..2eff2a3 --- /dev/null +++ b/engine/src/valet/rules/test_host_exclusivity2.json @@ -0,0 +1,8 @@ +{ + "name": "VALET_HOST_EXCLUSIVITY2", + "type": "exclusivity", + "level": "host", + "app_scope": "lcp", + "members": [], + "description": "for test" +} diff --git a/engine/src/valet/solver/__init__.py b/engine/src/valet/solver/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/solver/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/solver/ostro.py b/engine/src/valet/solver/ostro.py new file mode 100644 index 0000000..67ba5df --- /dev/null +++ b/engine/src/valet/solver/ostro.py @@ -0,0 +1,529 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + + +import json +import traceback +from datetime import datetime + +from valet.engine.db_connect.locks import * + + +# noinspection PyBroadException +class Ostro(object): + """Main class for scheduling and query.""" + + def __init__(self, _bootstrapper): + self.valet_id = _bootstrapper.valet_id + + self.dbh = _bootstrapper.dbh + + self.rh = _bootstrapper.rh + self.ahandler = _bootstrapper.ah + + self.optimizer = _bootstrapper.optimizer + + self.logger = _bootstrapper.logger + + # To lock valet-engine per datacenter. + self.lock = _bootstrapper.lock + + self.end_of_process = False + + def run_ostro(self): + """Run main valet-engine loop.""" + + self.logger.info("*** start valet-engine main loop") + + # TODO(Gueyoung): Run resource handler thread. + + try: + # NOTE(Gueyoung): if DB causes any error, Valet-Engine exits. + + while self.end_of_process is False: + + if not self.lock.set_regions(): + break + + request_list = self.dbh.get_requests() + if len(request_list) > 0: + rc = self._handle_requests(request_list) + Logger.set_req_id(None) + if not rc: + break + + time.sleep(1) + except KeyboardInterrupt: + self.logger.error("keyboard interrupt") + except Exception: + self.logger.error(traceback.format_exc()) + + self.lock.done_with_my_turn() + + self.logger.info("*** exit valet-engine") + + def plan(self): + """Handle planning requests. + + This is only for capacity planning. + """ + + self.logger.info("*** start planning......") + + request_list = self.dbh.get_requests() + + if len(request_list) > 0: + if not self._handle_requests(request_list): + self.logger.error("while planning") + return False + else: + self.logger.error("while reading plan") + return False + + return True + + def _handle_requests(self, _req_list): + """Deal with all requests. + + Request types (operations) are + Group rule management: 'group_query', 'group_create'. + Placement management: 'create', 'delete', 'update', 'confirm', 'rollback'. + Engine management: 'ping'. + """ + + for req in _req_list: + req_id_elements = req["request_id"].split("-", 1) + opt = req_id_elements[0] + req_id = req_id_elements[1] + Logger.set_req_id(req_id) + begin_time = datetime.now() + + req_body = json.loads(req["request"]) + + self.logger.debug("input request_type = " + opt) + self.logger.debug("request = " + json.dumps(req_body, indent=4)) + + # Check if the same request with prior request. + (status, result) = self.ahandler.check_history(req["request_id"]) + + if result is None: + if opt in ("create", "delete", "update", "confirm", "rollback"): + app = self._handle_app(opt, req_id, req_body) + + if app is None: + errstr = "valet-engine exits due to " + opt + " error" + Logger.get_logger('audit').error(errstr, beginTimestamp=begin_time, elapsedTime=datetime.now() - begin_time, statusCode=False) + self.logger.error(errstr) + return False + + if app.status == "locked": + errstr = "datacenter is being serviced by another valet" + Logger.get_logger('audit').error(errstr, beginTimestamp=begin_time, elapsedTime=datetime.now() - begin_time, statusCode=False) + self.logger.info(errstr) + continue + + (status, result) = self._get_json_result(app) + + elif opt in ("group_query", "group_create"): + # TODO(Gueyoung): group_delete and group_update + + (status, result) = self._handle_rule(opt, req_body) + + if result is None: + errstr = "valet-engine exits due to " + opt + " error" + Logger.get_logger('audit').error(errstr, beginTimestamp=begin_time, elapsedTime=datetime.now() - begin_time, statusCode=False) + self.logger.info(errstr) + return False + + if status["status"] == "locked": + errstr = "datacenter is locked by the other valet" + Logger.get_logger('audit').error(errstr, beginTimestamp=begin_time, elapsedTime=datetime.now() - begin_time, statusCode=False) + self.logger.info(errstr) + continue + + elif opt == "ping": + # To check if the local valet-engine is alive. + + if req_body["id"] == self.valet_id: + self.logger.debug("got ping") + + status = {"status": "ok", "message": ""} + result = {} + else: + continue + + else: + status = {"status": "failed", "message": "unknown operation = " + opt} + result = {} + + self.logger.error(status["message"]) + + else: + self.logger.info("decision already made") + + # Store final result in memory cache. + if status["message"] != "timeout": + self.ahandler.record_history(req["request_id"], status, result) + + # Return result + if not self.dbh.return_request(req["request_id"], status, result): + return False + + self.logger.debug("output status = " + json.dumps(status, indent=4)) + self.logger.debug(" result = " + json.dumps(result, indent=4)) + + Logger.get_logger('audit').info("done request = " + req["request_id"], beginTimestamp=begin_time, elapsedTime=datetime.now() - begin_time) + self.logger.info("done request = " + req["request_id"] + ' ----') + + # this should be handled by exceptions so we can log the audit correctly + if self.lock.done_with_my_turn() is None: + return False + + return True + + def _handle_app(self, _opt, _req_id, _req_body): + """Deal with placement request. + + Placement management: 'create', 'delete', 'update', 'confirm', 'rollback'. + + Validate the request, extract info, search placements, and store results. + """ + + resource = None + app = None + + # Validate request. + if _opt == "create": + app = self.ahandler.validate_for_create(_req_id, _req_body) + elif _opt == "update": + app = self.ahandler.validate_for_update(_req_id, _req_body) + elif _opt == "delete": + app = self.ahandler.validate_for_delete(_req_id, _req_body) + elif _opt == "confirm": + app = self.ahandler.validate_for_confirm(_req_id, _req_body) + elif _opt == "rollback": + app = self.ahandler.validate_for_rollback(_req_id, _req_body) + + if app is None: + return None + elif app.status != "ok": + return app + + # Check if datacenter is locked. + # Set the expired time of current lock. + lock_status = self.lock.is_my_turn(app.datacenter_id) + if lock_status is None: + return None + elif lock_status == "no": + app.status = "locked" + return app + + # Load valet rules. + if self.rh.load_group_rules_from_db() is None: + return None + + if _opt == "create": + # Make placement decisions for newly created servers in stack. + + # Load resource (hosts, racks, metadata, groups) from DB. + if not self.rh.load_resource(_req_body.get("datacenter")): + return None + + resource = self.rh.resource_list[0] + + # Sync rsource status with platform (OpenStack Nova). + if not resource.sync_with_platform(): + self.logger.error("fail to sync resource status") + app.status = "fail to sync resource status" + return app + + app.set_resource(resource) + + self.ahandler.set_for_create(app) + if app is None: + return None + elif app.status != "ok": + return app + + self.optimizer.place(app) + if app.status != "ok": + return app + + elif _opt == "update": + # TODO(Gueyoung): assume only image update and + # Valet does not deal with this update. + + self.ahandler.set_for_update(app) + if app is None: + return None + elif app.status != "ok": + return app + + return app + + elif _opt == "delete": + # Mark delete state in stack and servers. + + # Load resource (hosts, racks, metadata, groups) from DB + if not self.rh.load_resource(_req_body.get("datacenter")): + return None + + resource = self.rh.resource_list[0] + + # Sync rsource status with platform + if not resource.sync_with_platform(): + self.logger.error("fail to sync resource status") + app.status = "fail to sync resource status" + return app + + app.set_resource(resource) + + self.optimizer.update(app) + if app.status != "ok": + return app + + elif _opt == "confirm": + # Confirm prior create, delete, or update request. + + datacenter_info = {"id": app.datacenter_id, "url": "none"} + + # Load resource (hosts, racks, metadata, groups) from DB + # No sync with platform. + if not self.rh.load_resource(datacenter_info): + return None + + resource = self.rh.resource_list[0] + + app.set_resource(resource) + + self.optimizer.confirm(app) + if app.status != "ok": + return app + + elif _opt == "rollback": + # Rollback prior create, delete, or update request. + + datacenter_info = {"id": app.datacenter_id, "url": "none"} + + # Load resource (hosts, racks, metadata, groups) from DB + # No sync with platform. + if not self.rh.load_resource(datacenter_info): + return None + + resource = self.rh.resource_list[0] + + app.set_resource(resource) + + self.optimizer.rollback(app) + if app.status != "ok": + return app + + # Check timeout before store data. + if self.lock.expired < now(): + app.status = "timeout" + return app + + # Store app info into DB. + if not self.ahandler.store_app(app): + return None + self.logger.info("requested app(" + app.app_name + ") is stored") + + # Store resource into DB. + if not resource.store_resource(opt=_opt, req_id=_req_id): + return None + self.logger.info("resource status(" + resource.datacenter_id + ") is stored") + + # TODO(Gueyoung): if timeout happened at this moment, + # Rollback data change. + + return app + + def _handle_rule(self, _opt, _req_body): + """Deal with valet rule and groups request. + + Group rule management: 'group_query', 'group_create'. + """ + + status = {} + + result = None + + if _opt == "group_query": + # Query valet group rules and server placements under rules. + + rule_name = _req_body.get("name", None) + datacenter_id = _req_body.get("datacenter_id", None) + + if rule_name is None or rule_name == "": + # Return basic info of all rules. + + # Load valet rules. + if self.rh.load_group_rules_from_db() is None: + status["status"] = "failed" + status["message"] = "DB error" + return status, [] + + result = self.rh.get_rules() + if result is None: + status["status"] = "failed" + status["message"] = "DB error" + return status, [] + + else: + # Return rule info with server placements under this rule. + + if datacenter_id is None: + status["status"] = "failed" + status["message"] = "no region id given" + return status, {} + + # Check if datacenter is locked. + lock_status = self.lock.is_my_turn(datacenter_id) + if lock_status is None: + status["status"] = "failed" + status["message"] = "DB error" + return status, [] + elif lock_status == "no": + status["status"] = "locked" + status["message"] = "" + return status, {} + + message = self.rh.load_group_rule_from_db(rule_name) + if message is None: + status["status"] = "failed" + status["message"] = "DB error while loading rule" + return status, {} + elif message != "ok": + status["status"] = "failed" + status["message"] = message + self.logger.error(status["message"]) + return status, {} + + datacenter_info = {"id": datacenter_id, "url": "none"} + + # Load resource data from DB. + message = self.rh.load_resource_with_rule(datacenter_info) + if message is None: + status["status"] = "failed" + status["message"] = "DB error while loading resource" + return status, {} + elif message != "ok": + status["status"] = "failed" + status["message"] = message + self.logger.error(status["message"]) + return status, {} + + resource = self.rh.resource_list[0] + + # Sync rsource status with platform + if not resource.sync_with_platform(): + status["status"] = "failed" + status["message"] = "Platform delay" + return status, {} + + result = self.rh.get_placements_under_rule(rule_name, resource) + + # Check timeout before store data. + if self.lock.expired < now(): + status["status"] = "failed" + status["message"] = "timeout" + return status, {} + + # Store resource into DB. + if not resource.store_resource(): + status["status"] = "failed" + status["message"] = "DB error while storing resource" + return status, {} + self.logger.info("resource status(" + datacenter_id + ") is stored") + + # TODO(Gueyoung): If timeout happened here, Rollback stored data. + + elif _opt == "group_create": + result = {} + + rule_name = _req_body.get("name") + app_scope = _req_body.get("app_scope") + rule_type = _req_body.get("type") + level = _req_body.get("level") + members = _req_body.get("members", []) + desc = _req_body.get("desc", "none") + + message = self.rh.create_group_rule(rule_name, app_scope, + rule_type, level, + members, desc) + if message is None: + status["status"] = "failed" + status["message"] = "DB error while creating rule" + return status, {} + elif message != "ok": + status["status"] = "failed" + status["message"] = message + return status, result + + elif _opt == "group_delete": + pass + elif _opt == "group_update": + pass + + status["status"] = "ok" + status["message"] = "" + + return status, result + + def _get_json_result(self, _app): + """Set request result format as JSON.""" + + status = {"status": "ok", "message": ""} + + result = {} + + if _app.status != "ok": + if _app.status.startswith("na:"): + status_elements = _app.status.split(':') + if status_elements[1].strip() != "update": + status["message"] = status_elements[1].strip() + + return status, {} + else: + status["status"] = "failed" + status["message"] = _app.status + return status, {} + + if _app.state == "create": + for sk, s in _app.servers.iteritems(): + if s.host_assignment_inx == -1: + result[s.host_assignment_variable] = '::' + s.host + else: + p = '::' + s.host + + if s.host_assignment_variable not in result.keys(): + result[s.host_assignment_variable] = [] + result[s.host_assignment_variable].insert(s.host_assignment_inx, p) + elif _app.state == "update": + for sk, s in _app.servers.iteritems(): + if s.host_assignment_inx == -1: + result[s.host_assignment_variable] = "" + else: + p = "" + + if s.host_assignment_variable not in result.keys(): + result[s.host_assignment_variable] = [] + result[s.host_assignment_variable].insert(s.host_assignment_inx, p) + + return status, result diff --git a/engine/src/valet/utils/__init__.py b/engine/src/valet/utils/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/utils/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/utils/decryption.py b/engine/src/valet/utils/decryption.py new file mode 100644 index 0000000..c523edc --- /dev/null +++ b/engine/src/valet/utils/decryption.py @@ -0,0 +1,44 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +from base64 import b64decode +from Crypto.Cipher import AES +from hashlib import md5 + + +UNPAD = lambda s: s[:-ord(s[len(s) - 1:])] + + +def decrypt(_k1, _k2, _k3, _pw): + code_list = ['g', 'E', 't', 'a', 'W', 'i', 'Y', 'H', '2', 'L'] + + code = int(_k1) + int(_k2) * int(_k3) + str_code = str(code) + + key = "" + for i in range(0, len(str_code)): + c_code = code_list[int(str_code[i])] + key += c_code + + enc_key = md5(key.encode('utf8')).hexdigest() + + enc = b64decode(_pw) + iv = enc[:16] + cipher = AES.new(enc_key, AES.MODE_CBC, iv) + + return UNPAD(cipher.decrypt(enc[16:])).decode('utf8') diff --git a/engine/src/valet/utils/logger.py b/engine/src/valet/utils/logger.py new file mode 100644 index 0000000..9a5fca0 --- /dev/null +++ b/engine/src/valet/utils/logger.py @@ -0,0 +1,349 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + + +"""Setup logging. +from valet.utils.logger import Logger + Logger.get_logger('metric').info('bootstrap STUFF') +""" + +import json +import logging +import socket +from datetime import datetime +from logging.handlers import RotatingFileHandler + + +class Logger(object): + logs = None + + def __init__(self, _config=None, console=False): + if _config is None: + Logger.logs = {"console": Console()} + + if Logger.logs is None: + Logger.logs = {"audit": Audit(_config), "metric": Metric(_config), "debug": Debug(_config)} + Logger.logs["error"] = Error(_config, Logger.logs["debug"]) + if console: + Logger.logs["console"] = Console(Logger.logs["debug"]) + + @classmethod + def get_logger(cls, name): + return cls.logs[name].adapter + + @classmethod + def set_req_id(cls, uuid): + EcompLogger._set_request_id(uuid) + + +class EcompLogger(object): + """Parent class for all logs.""" + logging.getLogger().setLevel(logging.DEBUG) # set root + _lvl = logging.INFO + _size = 10000000 + datefmt = '%d/%m/%Y %H:%M:%S' + + _requestID = None + + def __init__(self): + self.fh = None + self.logger = None + + def set_fh(self, name, fmt, _config, lvl=_lvl, size=_size): + logfile = _config.get("path") + name + ".log" + self.fh = RotatingFileHandler(logfile, mode='a', maxBytes=size, backupCount=2, encoding=None, delay=0) + self.fh.setLevel(lvl) + self.fh.setFormatter(fmt) + + self.logger = logging.getLogger(name) + self.logger.addHandler(self.fh) + self.fh.addFilter(LoggerFilter()) + + def add_filter(self, fltr): + self.fh.addFilter(fltr()) + + @classmethod + def get_request_id(cls): return EcompLogger._requestID + + @classmethod + def _set_request_id(cls, uuid): EcompLogger._requestID = uuid + + @staticmethod + def format_str(fmt, sep="|"): + fmt = sep + sep.join(map(lambda x: '' if x.startswith('X') else '%(' + str(x) + ')s', fmt)) + sep + return fmt.replace('%(asctime)s', '%(asctime)s.%(msecs)03d') + + +class LoggerFilter(logging.Filter): + def filter(self, record): + record.requestId = EcompLogger.get_request_id() or '' + return True + + +class Audit(EcompLogger): + """A summary view of the processing of a requests. + It captures activity requests and includes time initiated, finished, and the API who invoked it + """ + fmt = ['beginTimestamp', 'asctime', 'requestId', 'XserviceInstanceID', 'XthreadId', 'vmName', 'XserviceName', 'XpartnerName', 'statusCode', 'responseCode', 'responseDescription', 'XinstanceUUID', 'levelname', 'Xseverity', 'XserverIP', 'elapsedTime', 'server', 'XclientIP', 'XclassName', 'Xunused', 'XprocessKey', 'message', 'XcustomField2', 'XcustomField3', 'XcustomField4', 'XdetailMessage'] + + def __init__(self, _config): + EcompLogger.__init__(self) + fmt = logging.Formatter(self.format_str(Audit.fmt), EcompLogger.datefmt) + self.set_fh("audit", fmt, _config) + self.add_filter(AuditFilter) + + # use value from kwargs in adapter process or the default given here + instantiation = { + 'beginTimestamp' : '', + 'statusCode' : True, + 'responseCode' : '900', + 'responseDescription' : '', + 'elapsedTime' : '', + } + self.adapter = AuditAdapter(self.logger, instantiation) + + +# noinspection PyProtectedMember +class AuditFilter(logging.Filter): + vmName = socket.gethostname() + vmFqdn = socket.getfqdn() + responseDecode = { + 'permission' : 100, + 'availabilty' : 200, # Availability/Timeouts + 'data' : 300, + 'schema' : 400, + 'process' : 500 # Business process errors + } # 900 # unknown + + def filter(self, record): + record.beginTimestamp = AuditAdapter._beginTimestamp.strftime(EcompLogger.datefmt + ".%f")[:-3] if AuditAdapter._beginTimestamp else "" + record.vmName = AuditFilter.vmName + record.statusCode = "ERROR" if AuditAdapter._statusCode is False else "COMPLETE" + record.responseCode = AuditFilter.responseDecode.get(AuditAdapter._responseCode, AuditAdapter._responseCode) + record.responseDescription = AuditAdapter._responseDescription + record.elapsedTime = AuditAdapter._elapsedTime + record.server = AuditFilter.vmFqdn + return True + + +class AuditAdapter(logging.LoggerAdapter): + _beginTimestamp = None + _elapsedTime = None + _responseDescription = None + _statusCode = None + _responseCode = '' + + def process(self, msg, kwargs): + AuditAdapter._beginTimestamp = kwargs.pop('beginTimestamp', self.extra['beginTimestamp']) + AuditAdapter._elapsedTime = kwargs.pop('elapsedTime', self.extra['elapsedTime']) + AuditAdapter._responseCode = kwargs.pop('responseCode', self.extra['responseCode']) + AuditAdapter._responseDescription = kwargs.pop('responseDescription', self.extra['responseDescription']) + AuditAdapter._statusCode = kwargs.pop('statusCode', self.extra['statusCode']) + return msg, kwargs + + +class Metric(EcompLogger): + """A detailed view into the processing of a transaction. + It captures the start and end of calls/interactions with other entities + """ + fmt = ['beginTimestamp', 'targetEntity', 'asctime', 'requestId', 'XserviceInstanceID', 'XthreadId', 'vmName', 'XserviceName', 'XpartnerName', 'statusCode', 'XresponseCode', 'XresponseDescription', 'XinstanceUUID', 'levelname', 'Xseverity', 'XserverIP', 'elapsedTime', 'server', 'XclientIP', 'XclassName', 'Xunused', 'XprocessKey', 'message', 'XcustomField2', 'XcustomField3', 'XcustomField4', 'XdetailMessage'] + + def __init__(self, _config): + EcompLogger.__init__(self) + fmt = logging.Formatter(self.format_str(Metric.fmt), EcompLogger.datefmt) + self.set_fh("metric", fmt, _config) + self.add_filter(MetricFilter) + + # use value from kwargs in adapter process or the default given here + instantiation = { + 'beginTimestamp' : '', + 'targetEntity' : '', + 'statusCode' : True, + 'elapsedTime' : '', + } + self.adapter = MetricAdapter(self.logger, instantiation) + + +# noinspection PyProtectedMember +class MetricFilter(logging.Filter): + vmName = socket.gethostname() + vmFqdn = socket.getfqdn() + + def filter(self, record): + record.beginTimestamp = MetricAdapter._beginTimestamp.strftime(EcompLogger.datefmt + ".%f")[:-3] if MetricAdapter._beginTimestamp else "" + record.targetEntity = MetricAdapter._targetEntity + record.vmName = MetricFilter.vmName + record.statusCode = "ERROR" if MetricAdapter._statusCode is False else "COMPLETE" + record.elapsedTime = MetricAdapter._elapsedTime + record.server = MetricFilter.vmFqdn + return True + + +class MetricAdapter(logging.LoggerAdapter): + _beginTimestamp = None + _elapsedTime = None + _targetEntity = None + _statusCode = None + + def process(self, msg, kwargs): + MetricAdapter._beginTimestamp = kwargs.pop('beginTimestamp', self.extra['beginTimestamp']) + MetricAdapter._targetEntity = kwargs.pop('targetEntity', self.extra['targetEntity']) + MetricAdapter._elapsedTime = kwargs.pop('elapsedTime', self.extra['elapsedTime']) + MetricAdapter._statusCode = kwargs.pop('statusCode', self.extra['statusCode']) + return msg, kwargs + + +class Error(EcompLogger): + """capture info, warn, error and fatal conditions""" + fmt = ['asctime', 'requestId', 'XthreadId', 'XserviceName', 'XpartnerName', 'targetEntity', 'targetServiceName', 'levelname', 'errorCode', 'errorDescription', 'filename)s:%(lineno)s - %(message'] + + def __init__(self, _config, logdebug): + EcompLogger.__init__(self) + fmt = logging.Formatter(self.format_str(Error.fmt) + '^', EcompLogger.datefmt) + self.set_fh("error", fmt, _config, lvl=logging.WARN) + # add my handler to the debug logger + logdebug.logger.addHandler(self.fh) + self.add_filter(ErrorFilter) + + +# noinspection PyProtectedMember +class ErrorFilter(logging.Filter): + errorDecode = { + 'permission' : 100, + 'availabilty' : 200, # Availability/Timeouts + 'data' : 300, + 'schema' : 400, + 'process' : 500 # Business process errors + } # 900 # unknown + + def filter(self, record): + record.targetEntity = DebugAdapter._targetEntity + record.targetServiceName = DebugAdapter._targetServiceName + record.errorCode = ErrorFilter.errorDecode.get(DebugAdapter._errorCode, DebugAdapter._errorCode) + record.errorDescription = DebugAdapter._errorDescription + return True + + +class Debug(EcompLogger): + """capture whatever data may be needed to debug and correct abnormal conditions""" + fmt = ['asctime', 'requestId', 'levelname', 'filename)s:%(lineno)s - %(message'] + + # use value from kwargs in adapter process or the default given here + instantiation = { + 'targetEntity' : '', + 'targetServiceName' : '', + 'errorCode' : '900', + 'errorDescription' : '' + } + + def __init__(self, _config): + EcompLogger.__init__(self) + fmt = logging.Formatter(self.format_str(Debug.fmt) + '^', EcompLogger.datefmt) + self.set_fh("debug", fmt, _config, lvl=logging.DEBUG) + + self.adapter = DebugAdapter(self.logger, Debug.instantiation) + + +class DebugAdapter(logging.LoggerAdapter): + _targetEntity = '' + _targetServiceName = '' + _errorCode = '' + _errorDescription = '' + + def process(self, msg, kwargs): + DebugAdapter._targetEntity = kwargs.pop('targetEntity', self.extra['targetEntity']) + DebugAdapter._targetServiceName = kwargs.pop('targetServiceName', self.extra['targetServiceName']) + DebugAdapter._errorCode = kwargs.pop('errorCode', self.extra['errorCode']) + DebugAdapter._errorDescription = kwargs.pop('errorDescription', self.extra['errorDescription']) + return msg, kwargs + + +class Console(EcompLogger): + """ set logger to point to stderr.""" + fmt = ['asctime', 'levelname', 'filename)s:%(lineno)s - %(message'] + + def __init__(self, logdebug=None): + EcompLogger.__init__(self) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + fmt = logging.Formatter(self.format_str(Console.fmt, sep=" "), EcompLogger.datefmt) + ch.setFormatter(fmt) + + # console can be written to when Debug is, or... + if logdebug is not None: + logdebug.logger.addHandler(ch) + return + + # ...console is written to as a stand alone (ex. for tools using valet libs) + self.logger = logging.getLogger('console') + self.adapter = DebugAdapter(self.logger, Debug.instantiation) + self.logger.addHandler(ch) + ch.addFilter(LoggerFilter()) + + +def every_log(name): + log = Logger.get_logger(name) + log.info("so this happened") + log.debug("check out what happened") + log.warning("something bad happened") + log.error("something bad happened to me") + + +""" MAIN """ +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description='Test Logging', add_help=False) # <<<2 + parser.add_argument('-pc', action='store_true', help='where to write log files') + parser.add_argument("-?", "--help", action="help", help="show this help message and exit") + opts = parser.parse_args() + + path = "/opt/pc/" if opts.pc else "/tmp/" + config = json.loads('{ "path": "' + path + '" }') + + now = datetime.now() + then = now.replace(hour=now.hour + 1) + differ = now - then + + # write to console + logger = Logger().get_logger('console') + logger.info('Log files are written to ' + path) + Logger.logs = None # this reset is only needed cuz of console test, never for prod + + # create all loggers and save an instance of debug logger + logger = Logger(config, console=True).get_logger('debug') + + metric = Logger.get_logger('metric') + metric.info('METRIC STUFF') + metric = Logger.get_logger('metric') + Logger.set_req_id('1235-123-1234-1234') + metric.info(' -- METRIC NOW THEN -- ', beginTimestamp=then, elapsedTime=differ, statusCode=False) + every_log('metric') + every_log('debug') + Logger.get_logger('audit').info('AUDIT STUFF', responseCode=100, responseDescription="you shoulda seen it", elapsedTime=differ, statusCode=False, beginTimestamp=now) + every_log('audit') + logger.error("--------------------------------") + logger.error('EC:100 TE:OS', errorCode='100', targetEntity='target entity') + logger.error('EC:schema TSN:IDK', errorCode='schema', targetServiceName='target service name') + logger.error('EC:393 ED:bt', errorCode='393', errorDescription='this is an error') + logger.error("--------------------------------") + try: + assert False # ("Now test logging an exception") + except AssertionError: + logger.exception('This is a log of an exception', errorDescription='EXMAN') diff --git a/engine/src/valet/valet_main.py b/engine/src/valet/valet_main.py new file mode 100644 index 0000000..ba91f97 --- /dev/null +++ b/engine/src/valet/valet_main.py @@ -0,0 +1,88 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/usr/bin/env python2.7 + + +import argparse +import json +import os.path +import sys +import traceback + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from valet.bootstrapper import Bootstrapper +from valet.solver.ostro import Ostro +from valet.utils.logger import Logger + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Test Logging', add_help=False) # <<<2 + parser.add_argument('config', help='config file path (required)') + parser.add_argument('-db', metavar='keyspace_string', help='override keyspace with typed in value') + parser.add_argument('-stdout', action='store_true', help='also print debugging log to stdout') + parser.add_argument("-?", "--help", action="help", help="show this help message and exit") + opts = parser.parse_args() + + # Prepare configuration and logging. + # noinspection PyBroadException + try: + config_file_d = open(opts.config, 'r') + config_file = config_file_d.read() + config = json.loads(config_file) + config_file_d.close() + + logger_config = config.get("logging") + + if not os.path.exists(logger_config.get("path")): + os.makedirs(logger_config.get("path")) + + # create all loggers and save an instance of the debug logger + logger = Logger(logger_config, console=opts.stdout).get_logger('debug') + except Exception: + print("error while configuring: " + traceback.format_exc()) + sys.exit(2) + + try: + config_file_dir = os.path.dirname(opts.config) + version_file_name = config_file_dir + "/version.json" + version_file_d = open(version_file_name, 'r') + version_json = json.dumps(json.loads(version_file_d.read())) + logger.info("Starting Valet with version: " + version_json) + version_file_d.close() + except Exception: + logger.warning("Warning! Error while printing version: " + traceback.format_exc()) + + + # Boostrap all components and configure them. + # noinspection PyBroadException + try: + if opts.db: + config['db']['keyspace'] = opts.db + + bootstrapper = Bootstrapper(config, logger) + if not bootstrapper.config_valet(): + print("error while configurating") + except Exception: + print("error while bootstrapping: " + traceback.format_exc()) + sys.exit(2) + + # Start valet-engine (aka. Ostro). + ostro = Ostro(bootstrapper) + ostro.run_ostro() |