diff options
author | vv770d <vv770d@att.com> | 2021-03-17 22:44:49 +0000 |
---|---|---|
committer | vv770d <vv770d@att.com> | 2021-03-17 22:51:01 +0000 |
commit | 15dd0a3541db1e7ac9f38680d9dbe83cf3d303ae (patch) | |
tree | bf908cb1e47da9eb8c34913f00b2f773b3d9b39e /oti/event-handler/otihandler | |
parent | 18512176d62ae211d97952027ca6a6dc59b50992 (diff) |
[DCAE] Remove OTI component
OTI has been deprecated
Change-Id: Ic2051f9262744081880d8471c31a2491d2e4451c
Signed-off-by: vv770d <vv770d@att.com>
Issue-ID: DCAEGEN2-2676
Signed-off-by: vv770d <vv770d@att.com>
Diffstat (limited to 'oti/event-handler/otihandler')
22 files changed, 0 insertions, 5238 deletions
diff --git a/oti/event-handler/otihandler/__init__.py b/oti/event-handler/otihandler/__init__.py deleted file mode 100644 index 87cf002..0000000 --- a/oti/event-handler/otihandler/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= diff --git a/oti/event-handler/otihandler/__main__.py b/oti/event-handler/otihandler/__main__.py deleted file mode 100644 index 59a7087..0000000 --- a/oti/event-handler/otihandler/__main__.py +++ /dev/null @@ -1,74 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -"""run as server: python -m otihandler""" - -import logging -import os -import sys - -from otihandler.config import Config -from otihandler.onap.audit import Audit -from otihandler.web_server import DTIWeb -from otihandler.dbclient import DaoBase - - -class LogWriter(object): - """redirect the standard out + err to the logger""" - - def __init__(self, logger_func): - self.logger_func = logger_func - - def write(self, log_line): - """actual writer to be used in place of stdout or stderr""" - - log_line = log_line.rstrip() - if log_line: - self.logger_func(log_line) - - def flush(self): - """no real flushing of the buffer""" - - pass - - -def run_event_handler(): - """main run function for event_handler""" - - Config.load_from_file() - # Config.discover() - - logger = logging.getLogger("event_handler") - sys.stdout = LogWriter(logger.info) - sys.stderr = LogWriter(logger.error) - - logger.info("========== run_event_handler ==========") - app_version = os.getenv("APP_VER") - logger.info("app_version %s", app_version) - Audit.init(Config.get_system_name(), app_version, Config.LOGGER_CONFIG_FILE_PATH) - - logger.info("starting event_handler with config:") - logger.info(Audit.log_json_dumps(Config.config)) - - audit = Audit(req_message="start event_handler") - - audit = Audit(req_message="DB init start") - DaoBase.init_db(os.environ.get("DB_CONN_URL")) - - DTIWeb.run_forever(audit) - -if __name__ == "__main__": - run_event_handler() diff --git a/oti/event-handler/otihandler/cbs_rest.py b/oti/event-handler/otihandler/cbs_rest.py deleted file mode 100644 index 2e3e7de..0000000 --- a/oti/event-handler/otihandler/cbs_rest.py +++ /dev/null @@ -1,202 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -"""REST for high-level information retrievals from Consul KVs""" - -import copy -import logging - -from otihandler.consul_client import ConsulClient - - -class CBSRest(object): - _logger = logging.getLogger("oti_handler.cbs_rest") - - @staticmethod - def get_value(key, default=None): - """Wrap ConsulClient.get_value() to ignore exceptions.""" - - data = default - try: - data = ConsulClient.get_value(key) - except Exception as e: - pass - - return data - - @staticmethod - def get_kvs(key): - """Wrap ConsulClient.get_kvs() to ignore exceptions.""" - - data = {} - try: - data = ConsulClient.get_kvs(key, trim_prefix=True) - except Exception as e: - data = {} - - if not data: - data = {} - return data - - @staticmethod - def get_service_component(service_name): - """Get the fully-bound config for a service_name.""" - - return ConsulClient.get_service_component(service_name) - - @staticmethod - def get_service_component_all(service_name, service_location=None, policies_as_list=False): - """Get all Consul objects for a service_name.""" - - r_dict = ConsulClient.get_service_component_all(service_name, policies_as_list=policies_as_list) - if r_dict and r_dict.get('oti'): - r_dict['oti'] = CBSRest.get_oti(service_name, service_location=service_location) - return r_dict - - @staticmethod - def get_oti(service_name=None, vnf_type=None, vnf_id=None, service_location=None): - """ - Get DTI events. - - Parameters - ---------- - service_name : string - optional. The service component name assigned by dockerplugin to the component that is unique to the cloudify node instance and used in its Consul key(s). - vnf_type : string - optional, allows multiple values separated by commas. Gets DTI events for these vnf_type(s). - vnf_id : string - optional. Requires vnf_type also. Gets DTI event for this vnf_id. - service_location : string - optional, allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location. - If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided, - otherwise results are not location filtered. - - Returns - ------- - dict - Dictionary of DTI event(s). - If one vnf_type and vnf_id are both specified, then object returned will be just the one DTI event. - If one vnf_type is specified but not vnf_id, then DTI events will be keyed by vnf_id. - Otherwise the DTI events will be keyed by vnf_type, sub-keyed by vnf_id. - - """ - - lc_vnf_type = vnf_type - if vnf_type: - lc_vnf_type = vnf_type.lower() - - r_dict = {} - - want_locs = [] - if service_location: - want_locs = service_location.split(',') - - give_types = [] - if service_name: - if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node - try: - node_name = ConsulClient.lookup_service(service_name)[0].get("Node") - if node_name: - services = ConsulClient.lookup_node(node_name).get("Services") - if services: - for node_svc in list(services.keys()): - if "-component-dockerhost-" in node_svc or "_component_kubernetes_master" in node_svc: - want_locs = services[node_svc].get("Tags", []) - break - except: - pass - supported_types = ConsulClient.get_value(service_name + ":oti") - if supported_types: - supported_types = [type.lower() for type in list(supported_types.keys())] - if supported_types: - if lc_vnf_type: # If user specifies vnf_type(s), constrain to supported ones - for type in lc_vnf_type.split(','): - if type in supported_types: - give_types.append(type) - else: - give_types = supported_types - if not give_types or (len(give_types) == 1 and give_types[0] == ''): - return r_dict - elif lc_vnf_type: - give_types = lc_vnf_type.split(',') - - - # If they specified only one vnf_type ... - if lc_vnf_type and ',' not in lc_vnf_type: - type = give_types[0] - - # ... and vnf_id - if vnf_id: - # get just one vnf_id - t_dict = CBSRest.get_value("oti_events/" + type + "/" + vnf_id, default=None) - if t_dict: - event_loc = t_dict.get('dcae_service_location') - if not event_loc or not want_locs or event_loc in want_locs: - r_dict = copy.deepcopy(t_dict) - - # ... and not vnf_id - else: - # get all DTI events of just one type, indexed by vnf_id - t_dict = CBSRest.get_kvs("oti_events/" + type + "/") - if t_dict: - if not want_locs: - r_dict = copy.deepcopy(t_dict) - else: - for id in t_dict: - event_loc = t_dict[id].get('dcae_service_location') - if not event_loc or event_loc in want_locs: - r_dict[id] = copy.deepcopy(t_dict[id]) - - # If they did not specify either service_name or vnf_type (the only way give_types=[]) - elif not give_types: - # get all DTI events, indexed by vnf_type then vnf_id - t_dict = CBSRest.get_kvs("oti_events/") - if t_dict: - for type in t_dict: - for id in t_dict[type]: - if not vnf_id or vnf_id == id: - if want_locs: - event_loc = t_dict[type][id].get('dcae_service_location') - if not want_locs or not event_loc or event_loc in want_locs: - if type not in r_dict: - r_dict[type] = {} - r_dict[type][id] = copy.deepcopy(t_dict[type][id]) - - # If they speclfied multiple vnf_types - else: - # get all DTI events of give_types, indexed by vnf_type then vnf_id - for type in give_types: - t_dict = CBSRest.get_kvs("oti_events/" + type + "/") - if t_dict: - for id in t_dict: - if not vnf_id or vnf_id == id: - if want_locs: - event_loc = t_dict[id].get('dcae_service_location') - if not want_locs or not event_loc or event_loc in want_locs: - if type not in r_dict: - r_dict[type] = {} - r_dict[type][id] = copy.deepcopy(t_dict[id]) - - return r_dict - - @staticmethod - def get_policies(service_name, policy_id=None): - """Get one or all policies for a service_name.""" - - if policy_id: - return ConsulClient.get_value(service_name + ":policies/items/" + policy_id) - else: - return ConsulClient.get_kvs(service_name + ":policies/items/", trim_prefix=True) diff --git a/oti/event-handler/otihandler/cfy_client.py b/oti/event-handler/otihandler/cfy_client.py deleted file mode 100644 index 4e3de87..0000000 --- a/oti/event-handler/otihandler/cfy_client.py +++ /dev/null @@ -1,638 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -"""Our client interface to Cloudify""" -import base64 -import copy -import json -import logging -import os - -import requests - -from otihandler.consul_client import ConsulClient - - -class CfyClientConsulError(RuntimeError): - pass - - -class CloudifyClient(object): - """quick replacement for cloudify_rest_client -- this supports pagination and scans all DCAE tenants""" - - def __init__(self, **kwargs): - self._protocol = kwargs.get('protocol', 'http') - self._host = kwargs.get('host') - self._port = kwargs.get('port') - self._headers = kwargs.get('headers') - - self.node_instances = self - - def list(self, **kwargs): - url_mask = "{}://{}:{}/api/v3.1/tenants".format(self._protocol, self._host, self._port) - # s = Session() - # req = Request('GET', url_mask, headers=self._headers) - # prepped = req.prepare() - # response = s.send(prepped,verify=False,timeout=30) - response = requests.get(url_mask, headers=self._headers, timeout=30) - obj = response.json() - tenants = [x["name"] for x in obj["items"]] - tenants_with_containers = [x for x in tenants if 'DCAE' in x] - - size = 1000 - url_mask = "{}://{}:{}/api/v3.1/node-instances?_size={}&_offset={}".format( - self._protocol, self._host, self._port, size, "{}") - if kwargs: - for (key,val) in kwargs.items(): - if isinstance(val, str): - url_mask = url_mask + '&{}={}'.format(key, val) - elif isinstance(val, list): - url_mask = url_mask + '&{}={}'.format(key, ','.join(val)) - - for tenant in tenants_with_containers: - self._headers_with_tenant = copy.deepcopy(self._headers) - self._headers_with_tenant['Tenant'] = tenant - - offset = 0 - total = 1 - while offset < total: - # s = Session() - # req = Request('GET', url_mask.format(offset), headers=self._headers_with_tenant) - # prepped = req.prepare() - # response = s.send(prepped, verify=False, timeout=30) - response = requests.get(url_mask.format(offset), headers=self._headers_with_tenant, timeout=30) - response.raise_for_status() - obj = response.json() - offset = offset + len(obj["items"]) - total = obj["metadata"]["pagination"]["total"] - for item in obj["items"]: - yield NodeInstance(item) - - def update_node_instance(self, node_instance_id, body, **kwargs): - headers = copy.deepcopy(self._headers_with_tenant) - headers['Content-Type'] = "application/json" - url_mask = "{}://{}:{}/api/v3.1/node-instances/{}".format( - self._protocol, self._host, self._port, node_instance_id) - response = requests.patch(url_mask, json=body, headers=headers, timeout=30) - obj = response.json() - return obj - - -class NodeInstance(object): - """quick replacement for cloudify_rest_client""" - - def __init__(self, instance): - self.id = instance.get("id") - self.deployment_id = instance.get("deployment_id") - self.host_id = instance.get("host_id") - self.runtime_properties = instance.get("runtime_properties") - self.relationships = instance.get("relationships") - self.state = instance.get("state") - self.version = instance.get("version") - self.node_id = instance.get("node_id") - self.scaling_groups = instance.get("scaling_groups") - - -class CfyClient(object): - _logger = logging.getLogger("oti_handler.cfy_client") - _client = None - - - @staticmethod - def __set_cloudify_manager_client(): - """Create connection to Cloudify_Manager.""" - - if CfyClient._client: - return - - host = None - port = None - obj = json.loads(os.environ.get("CLOUDIFY", "{}")).get("cloudify") - source = "CLOUDIFY environment variable" - if not obj: - CM_KEY = 'cloudify_manager' - source = "Consul key '{}'".format(CM_KEY) - - try: - results = ConsulClient.lookup_service(CM_KEY) - except Exception as e: - msg = "Unexpected exception {}: {!s} from ConsulClient.lookup_service({})".format(type(e).__name__, e, CM_KEY) - CfyClient._logger.error(msg) - raise CfyClientConsulError(msg) - result = results[0] - host = result['ServiceAddress'] - port = result['ServicePort'] - - try: - obj = ConsulClient.get_value(CM_KEY) - except Exception as e: - msg = "Unexpected exception {}: {!s} from ConsulClient.get_value({})".format(type(e).__name__, e, CM_KEY) - CfyClient._logger.error(msg) - raise CfyClientConsulError(msg) - if not obj: - raise CfyClientConsulError("{} value is empty or invalid".format(source)) - - obj = obj.get('cloudify') - - if not obj: - raise CfyClientConsulError("{} value is missing 'cloudify' key or value".format(source)) - - host = obj.get('address', host) - if not host: - raise CfyClientConsulError("{} value is missing 'cloudify.address'".format(source)) - - port = obj.get('port', port) - if not port: - raise CfyClientConsulError("{} value is missing 'cloudify.port'".format(source)) - - protocol = obj.get('protocol') - if not protocol: - raise CfyClientConsulError("{} value is missing 'cloudify.protocol'".format(source)) - username = obj.get('user') - if not username: - raise CfyClientConsulError("{} value is missing 'cloudify.user'".format(source)) - password = obj.get('password') - if not password: - raise CfyClientConsulError("{} value is missing 'cloudify.password'".format(source)) - - b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), "utf-8")).decode("utf-8") - headers = {'Authorization': 'Basic ' + b64_encoded_str.rstrip('\n')} - #headers = {'Authorization': 'Basic ' + '{}:{}'.format(username, password).encode("base64").rstrip('\n')} - - CfyClient._client = CloudifyClient(host=host, port=port, protocol=protocol, headers=headers) - - - @staticmethod - def query_k8_components(in_cluster_fqdn): - """ - Iterate components that belong to a cluster fqdn. - - Parameters - ---------- - in_cluster_fqdn : string - k8s cluster FQDN - - Returns - ------- - A generator of tuples of component information - [ (proxy_fqdn, namespace, scn, replicas, scn_port), ... ] - """ - - cnt_found = 0 - CfyClient.__set_cloudify_manager_client() - for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"): - rtp = node_instance.runtime_properties - scn_port = None - cluster_fqdn = None - proxy_fqdn = None - dti_info = rtp.get('dti_info') - if dti_info: - env_items = dti_info.get('env') - for env in env_items: - if env.get("name") == 'KUBE_CLUSTER_FQDN': - cluster_fqdn = env.get("value") - if env.get("name") == 'KUBE_PROXY_FQDN': - proxy_fqdn = env.get("value") - ports = dti_info.get('ports') - if ports: - scn_port = ports[0].split(':')[0] - else: - continue - - if in_cluster_fqdn != cluster_fqdn: - continue - - controller_type = rtp.get('k8s_controller_type') - if not controller_type: - CfyClient._logger.debug("controller type is missing") - continue - elif controller_type != "statefulset": - CfyClient._logger.debug("not a stateful set") - continue - - container_id = rtp.get('k8s_deployment') - if not container_id: - CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format( - node_instance.deployment_id, node_instance.id)) - continue - - try: - namespace = container_id.get('namespace') - except: - namespace = '' - pass - - replicas = 1 - try: - replicas = rtp.get('replicas') - except: - pass - - scn = rtp.get('service_component_name') - if not scn: - CfyClient._logger.debug( - "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, - node_instance.id)) - continue - - cnt_found += 1 - yield (proxy_fqdn, namespace, scn, replicas, scn_port) - continue - - msg = "Found {} components (collectors) for cluster={}" \ - .format(cnt_found, in_cluster_fqdn) - CfyClient._logger.debug(msg) - - - @staticmethod - def iter_components(dcae_target_type, dcae_service_location='', component_type=''): - """ - Iterate components that handle a given dcae_target_type. - - Parameters - ---------- - dcae_target_type : string - VNF Type - dcae_service_location : string - Location of the component (optional) - component_type : string - Type of the component (optional) - - Returns - ------- - A generator of tuples of component information - [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ] - or - [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ] - - """ - - cnt_found = 0 - - # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI) - dockerhosts = [] - k8s_svcs_tagged_with_clli = [] - if dcae_service_location: - try: - dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location]) - except Exception as e: - msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "-component-dockerhost-", [dcae_service_location]) - CfyClient._logger.error(msg) - raise CfyClientConsulError(msg) - try: - k8s_svcs_tagged_with_clli = ConsulClient.search_services("_component_kubernetes_master", [dcae_service_location]) - except Exception as e: - msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "_component_kubernetes_master", [dcae_service_location]) - CfyClient._logger.error(msg) - raise CfyClientConsulError(msg) - - CfyClient.__set_cloudify_manager_client() - for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"): - rtp = node_instance.runtime_properties - - # Skip this node_instance if it is not a collector - container_type = "docker" - container_id = rtp.get('container_id') - docker_host = '' - svc_with_my_clli_tags = '' - if not container_id: - container_type = "k8s" - container_id = rtp.get('k8s_deployment') - if not container_id: - CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id)) - continue - docker_config = rtp.get('docker_config') - if not docker_config: - CfyClient._logger.debug("{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, node_instance.id)) - continue - dti_reconfig_script = "" - if container_type == "docker": - dti_reconfig_script = rtp.get('dti_reconfig_script') - if not dti_reconfig_script: - CfyClient._logger.debug("{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, node_instance.id)) - continue - elif container_type == "k8s": - dti_reconfig_script = docker_config.get('reconfigs',{}).get('dti') - if not dti_reconfig_script: - CfyClient._logger.debug("{} {} runtime_properties docker_config has no reconfigs.dti".format(node_instance.deployment_id, node_instance.id)) - continue - - scn = rtp.get('service_component_name') - scn_address = None - scn_port = None - if not scn: - CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id)) - continue - if container_type == "docker": - docker_host = rtp.get('selected_container_destination') - if not docker_host: - CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id)) - continue - elif container_type == "k8s": - try: - srvcCatalogItem = ConsulClient.lookup_service(scn)[0] - scn_address = srvcCatalogItem.get("ServiceAddress") - except: - CfyClient._logger.debug( - "{} {} runtime_properties has no consul svc catalog registry".format(node_instance.deployment_id, - node_instance.id)) - continue - svc_with_my_clli_tags = rtp.get('svc_with_my_clli_tags') - # e.g., scn="s908d92e232ed43..." - if not svc_with_my_clli_tags: - # We should not incur this burden. k8splugin should store this into runtime properties. - try: - node_name = srvcCatalogItem.get("Node") - if node_name: - # e.g., node_name="zldcdyh1adce3kpma00" - services = ConsulClient.lookup_node(node_name).get("Services") - if services: - for node_svc in list(services.keys()): - if "_component_kubernetes_master" in node_svc: - # e.g., node_svc="zldcdyh1adce3_kp_component_kubernetes_master" - svc_with_my_clli_tags = node_svc - break - except: - pass - # ... cache results we find into runtime properties to avoid searching again - if svc_with_my_clli_tags: - CfyClient._logger.debug("{} {} storing runtime property svc_with_my_clli_tags={}".format( - node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags)) - rtp['svc_with_my_clli_tags'] = svc_with_my_clli_tags - body = { - "runtime_properties": rtp, - "state": node_instance.state, - "version": 1 + int(node_instance.version) - } - try: - CfyClient._client.update_node_instance(node_instance.id, body) - except: - pass - - if not svc_with_my_clli_tags: - CfyClient._logger.debug("{} {} runtime_properties has no svc_with_my_clli_tags".format(node_instance.deployment_id, node_instance.id)) - continue - - # get the nodeport for statefulset sidecar service - dti_info = rtp.get('dti_info') - if dti_info: - ports = dti_info.get('ports') - if ports: - scn_port = ports[0].split(':')[1] - docker_host = rtp.get('configuration',{}).get('file_content') - if not docker_host: - CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id)) - continue - - # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG - if dcae_service_location: - if container_type == "docker" and docker_host not in dockerhosts: - CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}" - .format(node_instance.deployment_id, node_instance.id, docker_host, dcae_service_location)) - continue - elif container_type == "k8s" and svc_with_my_clli_tags not in k8s_svcs_tagged_with_clli: - CfyClient._logger.debug("{} {} svc_with_my_clli_tags {} is not TAGged with DTI Event dcae_service_location {}" - .format(node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags, dcae_service_location)) - continue - - # If DTI Event specifies component_type, then collector's service_component_type must match - if component_type: - c_component_type = rtp.get('service_component_type') - if component_type != c_component_type: - CfyClient._logger.debug("{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id)) - continue - - # Check if the collector supports this VNF Type - # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config) - dti_key = scn + ':oti' - try: - obj = ConsulClient.get_value(dti_key) - except Exception as e: - CfyClient._logger.error( - "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}" - .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id) - ) - continue - if not obj: - CfyClient._logger.debug("{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, dti_key)) - continue - obj_types = set(k.lower() for k in obj) - if dcae_target_type.lower() in obj_types: - CfyClient._logger.debug("{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, dcae_target_type)) - cnt_found += 1 - yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, dti_reconfig_script, container_type, scn_address, scn_port ) - continue - else: - CfyClient._logger.debug("{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, dcae_target_type, dti_key)) - continue - - msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}"\ - .format(cnt_found, dcae_target_type, dcae_service_location, component_type) - CfyClient._logger.debug(msg) - - @staticmethod - def iter_components_for_docker(dcae_target_type, dcae_service_location='', component_type=''): - """ - Iterate components that handle a given dcae_target_type to find the components of docker type - - Parameters - ---------- - dcae_target_type : string - VNF Type - dcae_service_location : string - Location of the component (optional) - component_type : string - Type of the component (optional) - - Returns - ------- - A generator of tuples of component information - [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ] - - """ - - cnt_found = 0 - # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI) - dockerhosts = [] - - if dcae_service_location: - try: - dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location]) - except Exception as e: - msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format( - type(e).__name__, e, "-component-dockerhost-", [dcae_service_location]) - CfyClient._logger.error(msg) - raise CfyClientConsulError(msg) - - CfyClient.__set_cloudify_manager_client() - for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"): - rtp = node_instance.runtime_properties - - # Skip this node_instance if it is not a collector - container_type = "docker" - container_id = rtp.get('container_id') - if not container_id: - if not container_id: - CfyClient._logger.debug("{} {} runtime_properties has no container_id".format( - node_instance.deployment_id, node_instance.id)) - continue - docker_config = rtp.get('docker_config') - if not docker_config: - CfyClient._logger.debug( - "{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, - node_instance.id)) - continue - dti_reconfig_script = "" - dti_reconfig_script = rtp.get('dti_reconfig_script') - if not dti_reconfig_script: - CfyClient._logger.debug( - "{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, - node_instance.id)) - continue - scn = rtp.get('service_component_name') - if not scn: - CfyClient._logger.debug( - "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, - node_instance.id)) - continue - docker_host = rtp.get('selected_container_destination') - if not docker_host: - CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format( - node_instance.deployment_id, node_instance.id)) - continue - - # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG - if dcae_service_location: - if docker_host not in dockerhosts: - CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}" - .format(node_instance.deployment_id, node_instance.id, docker_host, - dcae_service_location)) - continue - - # If DTI Event specifies component_type, then collector's service_component_type must match - if component_type: - c_component_type = rtp.get('service_component_type') - if component_type != c_component_type: - CfyClient._logger.debug( - "{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id)) - continue - - # Check if the collector supports this VNF Type - # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config) - dti_key = scn + ':oti' - try: - obj = ConsulClient.get_value(dti_key) - except Exception as e: - CfyClient._logger.error( - "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}" - .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id) - ) - continue - if not obj: - CfyClient._logger.debug( - "{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, - dti_key)) - continue - obj_types = set(k.lower() for k in obj) - if dcae_target_type.lower() in obj_types: - CfyClient._logger.debug( - "{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, - dcae_target_type)) - cnt_found += 1 - yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, - node_instance.state, docker_host, dti_reconfig_script, container_type, '', '') - continue - else: - CfyClient._logger.debug( - "{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, - dcae_target_type, dti_key)) - continue - - msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}" \ - .format(cnt_found, dcae_target_type, dcae_service_location, component_type) - CfyClient._logger.debug(msg) - - - @staticmethod - def iter_components_of_deployment(deployment_id, node_id=None, reconfig_type="app"): - """ - Iterate components of a specific deployment_id. - - Parameters - ---------- - deployment_id : string - Cloudify deployment ID that created the component(s). - node_id : string - Cloudify node ID that created the component. - reconfig_type : string - "app" - - Returns - ------- - A generator of tuples of component information - [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ] - or - [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ] - - """ - - cnt_found = 0 - - CfyClient.__set_cloudify_manager_client() - for node_instance in CfyClient._client.node_instances.list( - deployment_id=deployment_id, - _include=['id','node_id','deployment_id','state','runtime_properties'] - ): - if node_id and node_instance.node_id != node_id: - continue - - rtp = node_instance.runtime_properties - - # Skip this node_instance if it is not a collector - container_type = "docker" - container_id = rtp.get('container_id') - if not container_id: - container_type = "k8s" - container_id = rtp.get('k8s_deployment') - if not container_id: - CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id)) - continue - reconfig_script = rtp.get('docker_config',{}).get('reconfigs',{}).get(reconfig_type) - if not reconfig_script: - CfyClient._logger.debug("{} {} runtime_properties has no docker_config.reconfigs.{}".format(node_instance.deployment_id, node_instance.id, reconfig_type)) - continue - scn = rtp.get('service_component_name') - if not scn: - CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id)) - continue - if container_type == "docker": - docker_host = rtp.get('selected_container_destination') - if not docker_host: - CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id)) - continue - elif container_type == "k8s": - docker_host = rtp.get('configuration',{}).get('file_content') - if not docker_host: - CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id)) - continue - - CfyClient._logger.debug("{} {} is a {}-reconfigurable component".format(node_instance.deployment_id, node_instance.id, reconfig_type)) - cnt_found += 1 - yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, reconfig_script, container_type) - continue - - msg = "Found {} {}-reconfigurable components".format(cnt_found, reconfig_type) - CfyClient._logger.debug(msg) diff --git a/oti/event-handler/otihandler/config.py b/oti/event-handler/otihandler/config.py deleted file mode 100644 index 5c87f43..0000000 --- a/oti/event-handler/otihandler/config.py +++ /dev/null @@ -1,180 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -"""read and use the config""" - -import copy -import json -import logging -import logging.config -import os - -from otihandler.consul_client import ConsulClient - -os.makedirs('logs', exist_ok=True) -logging.basicConfig( - filename='logs/oti_handler.log', \ - format='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \ - '%(threadName)s %(name)s.%(funcName)s: %(message)s', \ - datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG) - -class Config(object): - """main config of the application""" - - CONFIG_FILE_PATH = "etc/config.json" - LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config" - SERVICE_NAME = "oti_handler" - - FIELD_SYSTEM = "system" - FIELD_WSERVICE_PORT = "wservice_port" - FIELD_TLS = "tls" - - _logger = logging.getLogger("oti_handler.config") - config = None - - cloudify_proto = None - cloudify_addr = None - cloudify_port = None - cloudify_user = None - cloudify_pass = None - cloudify = None - consul_url = "http://consul:8500" - tls_cacert_file = None - tls_server_cert_file = None - tls_private_key_file = None - tls_server_ca_chain_file = None - wservice_port = 9443 - - @staticmethod - def _get_tls_file_path(tls_config, cert_directory, tls_name): - """calc file path and verify its existance""" - - file_name = tls_config.get(tls_name) - if not file_name: - return None - tls_file_path = os.path.join(cert_directory, file_name) - if not os.path.isfile(tls_file_path) or not os.access(tls_file_path, os.R_OK): - Config._logger.error("invalid %s: %s", tls_name, tls_file_path) - return None - return tls_file_path - - @staticmethod - def _set_tls_config(tls_config): - """verify and set tls certs in config""" - - try: - Config.tls_cacert_file = None - Config.tls_server_cert_file = None - Config.tls_private_key_file = None - Config.tls_server_ca_chain_file = None - - if not (tls_config and isinstance(tls_config, dict)): - Config._logger.info("no tls in config: %s", json.dumps(tls_config)) - return - - cert_directory = tls_config.get("cert_directory") - - if not (cert_directory and isinstance(cert_directory, str)): - Config._logger.warning("unexpected tls.cert_directory: %r", cert_directory) - return - - cert_directory = os.path.join( - os.path.dirname(os.path.dirname(os.path.realpath(__file__))), str(cert_directory)) - if not (cert_directory and os.path.isdir(cert_directory)): - Config._logger.warning("ignoring invalid cert_directory: %s", cert_directory) - return - - Config.tls_cacert_file = Config._get_tls_file_path(tls_config, cert_directory, "cacert") - Config.tls_server_cert_file = Config._get_tls_file_path(tls_config, cert_directory, - "server_cert") - Config.tls_private_key_file = Config._get_tls_file_path(tls_config, cert_directory, - "private_key") - Config.tls_server_ca_chain_file = Config._get_tls_file_path(tls_config, cert_directory, - "server_ca_chain") - - finally: - Config._logger.info("tls_cacert_file = %s", Config.tls_cacert_file) - Config._logger.info("tls_server_cert_file = %s", Config.tls_server_cert_file) - Config._logger.info("tls_private_key_file = %s", Config.tls_private_key_file) - Config._logger.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file) - - @staticmethod - def merge(new_config): - """merge the new_config into current config - override the values""" - - if not new_config: - return - - if not Config.config: - Config.config = new_config - return - - new_config = copy.deepcopy(new_config) - Config.config.update(new_config) - - @staticmethod - def get_system_name(): - """find the name of the oti_handler system - to be used as the key in consul-kv for config of oti_handler - """ - - return (Config.config or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME) - - @staticmethod - def discover(): - """bring and merge the config settings from Consul""" - - system_key = Config.get_system_name() - new_config = ConsulClient.get_value(system_key) - - if not new_config or not isinstance(new_config, dict): - Config._logger.warn("unexpected config from Consul: %s", new_config) - return - - Config._logger.debug("loaded config from Consul(%s): %s", - system_key, json.dumps(new_config)) - Config._logger.debug("config before merge from Consul: %s", json.dumps(Config.config)) - Config.merge(new_config.get(Config.SERVICE_NAME)) - Config._logger.debug("merged config from Consul: %s", json.dumps(Config.config)) - - @staticmethod - def load_from_file(file_path=None): - """read and store the config from config file""" - - if not file_path: - file_path = Config.CONFIG_FILE_PATH - - loaded_config = None - if os.access(file_path, os.R_OK): - with open(file_path, 'r') as config_json: - loaded_config = json.load(config_json) - - if not loaded_config: - Config._logger.info("config not loaded from file: %s", file_path) - return - - Config._logger.info("config loaded from file: %s", file_path) - logging_config = loaded_config.get("logging") - if logging_config: - logging.config.dictConfig(logging_config) - - Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port) - - local_config = loaded_config.get(Config.SERVICE_NAME, {}) - Config._set_tls_config(local_config.get(Config.FIELD_TLS)) - - Config.merge(loaded_config.get(Config.SERVICE_NAME)) - return True diff --git a/oti/event-handler/otihandler/consul_client.py b/oti/event-handler/otihandler/consul_client.py deleted file mode 100644 index 1b25f3e..0000000 --- a/oti/event-handler/otihandler/consul_client.py +++ /dev/null @@ -1,617 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -"""client to talk to consul at consul port 8500""" - -import base64 -import copy -import json -import logging -import os -import re -import socket - -import requests - - -class ConsulClientError(RuntimeError): - pass - -class ConsulClientConnectionError(RuntimeError): - pass - -class ConsulClientServiceNotFoundError(RuntimeError): - pass - -class ConsulClientNodeNotFoundError(RuntimeError): - pass - -class ConsulClientKVEntryNotFoundError(RuntimeError): - pass - - -class ConsulClient(object): - """talking to consul""" - - CONSUL_SERVICE_MASK = "{}/v1/catalog/service/{}" - CONSUL_KV_MASK = "{}/v1/kv/{}" - CONSUL_KVS_MASK = "{}/v1/kv/{}?recurse=true" - CONSUL_TRANSACTION_URL = "{}/v1/txn" - _logger = logging.getLogger("oti_handler.consul_client") - - MAX_OPS_PER_TXN = 64 - # MAX_VALUE_LEN = 512 * 1000 - - OPERATION_SET = "set" - OPERATION_DELETE = "delete" - OPERATION_DELETE_FOLDER = "delete-tree" - - - #----- Methods for Consul services - - @staticmethod - def lookup_service(service_name): - """find the service record in consul""" - - service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name) - - ConsulClient._logger.info("lookup_service(%s)", service_path) - - try: - response = requests.get(service_path, timeout=30) - response.raise_for_status() - # except requests.exceptions.HTTPError as e: - # except requests.exceptions.ConnectionError as e: - # except requests.exceptions.Timeout as e: - except requests.exceptions.RequestException as e: - msg = "lookup_service({}) requests.get({}) threw exception {}: {!s}".format( - service_name, service_path, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientConnectionError(msg) - - try: - return_list = response.json() - # except ValueError as e: - except Exception as e: - msg = "lookup_service({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( - service_name, service_path, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientServiceNotFoundError(msg) - - if not return_list: - msg = "lookup_service({}) got empty or no value from requests.get({})".format( - service_name, service_path) - ConsulClient._logger.error(msg) - raise ConsulClientServiceNotFoundError(msg) - - return return_list - - - @staticmethod - def get_all_services(): - """List all services from consul""" - - service_path = "{}/v1/catalog/services".format(os.environ.get("CONSUL_URL").rstrip("/")) - - ConsulClient._logger.info("get_all_services(%s)", service_path) - - try: - response = requests.get(service_path, timeout=30) - response.raise_for_status() - except requests.exceptions.RequestException as e: - msg = "get_all_services() requests.get({}) threw exception {}: {!s}".format( - service_path, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientConnectionError(msg) - - try: - return_dict = response.json() - except Exception as e: - msg = "get_all_services() parsing JSON from requests.get({}) threw exception {}: {!s}".format( - service_path, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientServiceNotFoundError(msg) - - if not return_dict: - msg = "get_all_services() got empty or no value from requests.get({})".format( - service_path) - ConsulClient._logger.info(msg) - # raise ConsulClientServiceNotFoundError(msg) - - return return_dict - - - @staticmethod - def _find_matching_services(services, name_search, tags): - """Find matching services given search criteria""" - sub_tags = tags[0][4:6] - tags.append(sub_tags) - - def is_match(service): - srv_name, srv_tags = service - return name_search in srv_name and \ - any([tag in srv_tags for tag in tags]) - - return [ srv[0] for srv in list(services.items()) if is_match(srv) ] - - - @staticmethod - def search_services(name_search, tags): - """ - Search for services that match criteria - - Args: - ----- - name_search: (string) Name to search for as a substring - tags: (list) List of strings that are tags. A service must match **ANY OF** the - tags in the list. - - Returns: - -------- - List of names of services that matched - """ - - matches = [] - - # srvs is dict where key is service name and value is list of tags - srvs = ConsulClient.get_all_services() - - if srvs: - matches = ConsulClient._find_matching_services(srvs, name_search, tags) - - return matches - - - @staticmethod - def get_service_fqdn_port(service_name, node_meta=False): - """find the service record in consul""" - - service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name) - - ConsulClient._logger.info("get_service_fqdn_port(%s)", service_path) - - try: - response = requests.get(service_path, timeout=30) - response.raise_for_status() - except requests.exceptions.RequestException as e: - msg = "get_service_fqdn_port({}) requests.get({}) threw exception {}: {!s}".format( - service_name, service_path, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientConnectionError(msg) - - try: - service = response.json() - except Exception as e: - msg = "get_service_fqdn_port({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( - service_name, service_path, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientServiceNotFoundError(msg) - - if not service: - msg = "get_service_fqdn_port({}) got empty or no value from requests.get({})".format( - service_name, service_path) - ConsulClient._logger.error(msg) - raise ConsulClientServiceNotFoundError(msg) - - try: - service = service[0] # arbitrarily choose the first one - port = service["ServicePort"] - - # HTTPS certificate validation requires FQDN not IP address - fqdn = "" - if node_meta: - meta = service.get("NodeMeta") - if meta: - fqdn = meta.get("fqdn") - if not fqdn: - fqdn = socket.getfqdn(str(service["ServiceAddress"])) - except Exception as e: - msg = "get_service_fqdn_port({}) parsing result from requests.get({}) threw exception {}: {!s}".format( - service_name, service_path, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientServiceNotFoundError(msg) - - return (fqdn, port) - - - #----- Methods for Consul nodes - - @staticmethod - def lookup_node(node_name): - """find the node record in consul""" - - node_path = "{}/v1/catalog/node/{}".format(os.environ.get("CONSUL_URL").rstrip("/"), node_name) - - ConsulClient._logger.info("lookup_node(%s)", node_path) - - try: - response = requests.get(node_path, timeout=30) - response.raise_for_status() - # except requests.exceptions.HTTPError as e: - # except requests.exceptions.ConnectionError as e: - # except requests.exceptions.Timeout as e: - except requests.exceptions.RequestException as e: - msg = "lookup_node({}) requests.get({}) threw exception {}: {!s}".format( - node_name, node_path, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientConnectionError(msg) - - try: - return_dict = response.json() - # except ValueError as e: - except Exception as e: - msg = "lookup_node({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( - node_name, node_path, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientNodeNotFoundError(msg) - - if not return_dict: - msg = "lookup_node({}) got empty or no value from requests.get({})".format( - node_name, node_path) - ConsulClient._logger.error(msg) - raise ConsulClientNodeNotFoundError(msg) - - return return_dict - - - #----- Methods for Consul key-values - - @staticmethod - def put_value(key, data, cas=None): - """put the value for key into consul-kv""" - - # ConsulClient._logger.info("put_value(%s)", str(key)) - - URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key) - if cas is not None: - URL = '{}?cas={}'.format(URL, cas) - - try: - response = requests.put(URL, data=json.dumps(data), timeout=30) - response.raise_for_status() - except requests.exceptions.RequestException as e: - msg = "put_value({}) requests.put({}) threw exception {}: {!s}".format( - key, URL, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientConnectionError(msg) - - try: - updated = response.json() - except Exception as e: - msg = "put_value({}) parsing JSON from requests.put({}) threw exception {}: {!s}".format( - key, URL, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientKVEntryNotFoundError(msg) - - return updated - - - @staticmethod - def get_value(key, get_index=False): - """get the value for key from consul-kv""" - - URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key) - - try: - response = requests.get(URL, timeout=30) - response.raise_for_status() - except requests.exceptions.RequestException as e: - msg = "get_value({}) requests.get({}) threw exception {}: {!s}".format( - key, URL, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientConnectionError(msg) - - try: - data = response.json() - except Exception as e: - msg = "get_value({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( - key, URL, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientKVEntryNotFoundError(msg) - - if not data: - msg = "get_value({}) got empty or no value from requests.get({})".format( - key, URL) - ConsulClient._logger.error(msg) - raise ConsulClientKVEntryNotFoundError(msg) - - try: - value = base64.b64decode(data[0]["Value"]).decode("utf-8") - value_dict = json.loads(value) - except Exception as e: - msg = "get_value({}) decoding value from requests.get({}) threw exception {}: {!s}".format( - key, URL, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientKVEntryNotFoundError(msg) - - ConsulClient._logger.info("consul-kv key=%s value(%s) data=%s", - key, value, json.dumps(data)) - - if get_index: - return data[0]["ModifyIndex"], value_dict - - return value_dict - - - @staticmethod - def get_kvs(prefix, nest=True, trim_prefix=False): - """get key-values for keys beginning with prefix from consul-kv""" - - URL = ConsulClient.CONSUL_KVS_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), prefix) - - try: - response = requests.get(URL, timeout=30) - response.raise_for_status() - except requests.exceptions.RequestException as e: - msg = "get_kvs({}) requests.get({}) threw exception {}: {!s}".format( - prefix, URL, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientConnectionError(msg) - - try: - data = response.json() - except Exception as e: - msg = "get_kvs({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( - prefix, URL, type(e).__name__, e) - ConsulClient._logger.error(msg) - raise ConsulClientKVEntryNotFoundError(msg) - - if not data: - msg = "get_kvs({}) got empty or no value from requests.get({})".format( - prefix, URL) - ConsulClient._logger.error(msg) - raise ConsulClientKVEntryNotFoundError(msg) - - def put_level_value(level_keys, value, level_dict={}): - if level_keys: - key = level_keys.pop(0) - level_dict[key] = put_level_value(level_keys, value, level_dict.get(key, {})) - return level_dict - else: - return value - - rdict = {} - for item in data: - v = base64.b64decode(item["Value"]).decode("utf-8") - try: - value = json.loads(v) - except Exception as e: - value = v - key = item['Key'] - if trim_prefix: - key = key[len(prefix):] - if nest: - level_keys = key.split('/') - rdict = put_level_value(level_keys, value, rdict) - else: - rdict[key] = value - - ConsulClient._logger.info("consul-kv prefix=%s value(%s) data=%s", - prefix, json.dumps(rdict), json.dumps(data)) - return rdict - - - @staticmethod - def _gen_txn_operation(verb, key, value=None): - """returns the properly formatted operation to be used inside transaction""" - - # key = urllib.quote(key) # can't use urllib.quote() because it kills ':' in the key - if value: - return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(bytes(value, "utf-8")).decode("utf-8")}} - return {"KV": {"Verb": verb, "Key": key}} - - - @staticmethod - def _run_transaction(operation_name, txn): - """run a single transaction of several operations at consul /txn""" - - if not txn: - return - - txn_url = ConsulClient.CONSUL_TRANSACTION_URL.format(os.environ.get("CONSUL_URL").rstrip("/")) - response = None - try: - response = requests.put(txn_url, json=txn, timeout=30) - except requests.exceptions.RequestException as e: - ConsulClient._logger.error("failed to {} at {}: exception {}: {!s} on txn={}" - .format(operation_name, txn_url, type(e).__name__, e, json.dumps(txn))) - return - - if response.status_code != requests.codes.ok: - ConsulClient._logger.error("failed {} {}: {} text={} txn={} headers={}" - .format(operation_name, txn_url, response.status_code, - response.text, json.dumps(txn), - json.dumps(dict(list(response.request.headers.items()))))) - return - - ConsulClient._logger.info("response for {} {}: {} text={} txn={} headers={}" - .format(operation_name, txn_url, response.status_code, - response.text, json.dumps(txn), - json.dumps(dict(list(response.request.headers.items()))))) - - return True - - - @staticmethod - def store_kvs(kvs): - """put kvs into consul-kv""" - - if not kvs: - ConsulClient._logger.warning("kvs not supplied to store_kvs()") - return - - store_kvs = [ - ConsulClient._gen_txn_operation(ConsulClient.OPERATION_SET, - key, json.dumps(value)) - for key, value in kvs.items() - ] - txn = [] - idx_step = ConsulClient.MAX_OPS_PER_TXN - len(txn) - for idx in range(0, len(store_kvs), idx_step): - txn += store_kvs[idx : idx + idx_step] - if not ConsulClient._run_transaction("store_kvs", txn): - return False - txn = [] - - return ConsulClient._run_transaction("store_kvs", txn) - - - @staticmethod - def delete_key(key): - """delete key from consul-kv""" - - if not key: - ConsulClient._logger.warning("key not supplied to delete_key()") - return - - delete_key = [ - ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE, key) - ] - return ConsulClient._run_transaction("delete_key", delete_key) - - - @staticmethod - def delete_kvs(key): - """delete key from consul-kv""" - - if not key: - ConsulClient._logger.warning("key not supplied to delete_kvs()") - return - - delete_kvs = [ - ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE_FOLDER, key) - ] - return ConsulClient._run_transaction("delete_kvs", delete_kvs) - - - #----- Methods for Config Binding Service - - @staticmethod - def get_service_component(scn): - config = json.dumps(ConsulClient.get_value(scn)) - - try: - dmaap = ConsulClient.get_value(scn + ":dmaap") - except Exception as e: - dmaap = None - if dmaap: - for key in list(dmaap.keys()): - config = re.sub('"<<' + key + '>>"', json.dumps(dmaap[key]), config) - - try: - rel = ConsulClient.get_value(scn + ":rel") - except Exception as e: - rel = None - if rel: - for key in list(rel.keys()): - config = re.sub('"{{' + key + '}}"', json.dumps(rel[key]), config) - - return json.loads(config) - - - @staticmethod - def get_service_component_all(scn, policies_as_list=True): - t_scn = scn + ":" - t_len = len(t_scn) - a_dict = ConsulClient.get_kvs(scn) - b_dict = {} - for key in a_dict: - b_key = None - if key == scn: - b_dict["config"] = ConsulClient.get_service_component(scn) - elif key == scn + ":dmaap": - continue - elif key[0:t_len] == t_scn: - b_key = key[t_len:] - # policies_as_list = True formats policies items in a list like ONAP's CBS; False keeps policy_ids keys - if policies_as_list and b_key == "policies": # convert items from KVs to a values list - b_dict[b_key] = {} - for sub_key in a_dict[key]: - if sub_key == "items": - b_dict[b_key][sub_key] = [] - d_dict = a_dict[key][sub_key] - for item in sorted(d_dict.keys()): # old CBS sorted them so we emulate - b_dict[b_key][sub_key].append(d_dict[item]) - else: - b_dict[b_key][sub_key] = copy.deepcopy(a_dict[key][sub_key]) - else: - b_dict[b_key] = copy.deepcopy(a_dict[key]) - return b_dict - - - @staticmethod - def add_vnf_id(scn, vnf_type, vnf_id, dti_dict): - """ - Add VNF instance to Consul scn:oti key. - - Treat its value as a JSON string representing a dict. - Extend the dict by adding a dti_dict for vnf_id under vnf_type. - Turn the resulting extended dict into a JSON string. - Store the string back into Consul under scn:oti key. - Watch out for conflicting concurrent updates. - """ - - key = scn + ':oti' - lc_vnf_type = vnf_type.lower() - while True: # do until update succeeds - (mod_index, v) = ConsulClient.get_value(key, get_index=True) - lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case - # but DCAE-C doesn't create such keys - - if lc_vnf_type not in lc_v: - return # That VNF type is not supported by this component - lc_v[lc_vnf_type][vnf_id] = dti_dict # add or replace the VNF instance - - updated = ConsulClient.put_value(key, lc_v, cas=mod_index) - if updated: - return lc_v - - - @staticmethod - def delete_vnf_id(scn, vnf_type, vnf_id): - """ - Delete VNF instance from Consul scn:oti key. - - Treat its value as a JSON string representing a dict. - Modify the dict by deleting the vnf_id key entry from under vnf_type. - Turn the resulting extended dict into a JSON string. - Store the string back into Consul under scn:oti key. - Watch out for conflicting concurrent updates. - """ - - key = scn + ':oti' - lc_vnf_type = vnf_type.lower() - while True: # do until update succeeds - (mod_index, v) = ConsulClient.get_value(key, get_index=True) - lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case - # but DCAE-C doesn't create such keys - - if lc_vnf_type not in lc_v: - return # That VNF type is not supported by this component - if vnf_id not in lc_v[lc_vnf_type]: - return lc_v - del lc_v[lc_vnf_type][vnf_id] # delete the VNF instance - - updated = ConsulClient.put_value(key, lc_v, cas=mod_index) - if updated: - return lc_v - - -if __name__ == "__main__": - value = None - - if value: - print(json.dumps(value, sort_keys=True, indent=4, separators=(',', ': '))) diff --git a/oti/event-handler/otihandler/dbclient/__init__.py b/oti/event-handler/otihandler/dbclient/__init__.py deleted file mode 100644 index ee3ec3e..0000000 --- a/oti/event-handler/otihandler/dbclient/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -from .models import Event -from .models import EventAck -from .db_dao import DaoBase diff --git a/oti/event-handler/otihandler/dbclient/apis/__init__.py b/oti/event-handler/otihandler/dbclient/apis/__init__.py deleted file mode 100644 index 05e3800..0000000 --- a/oti/event-handler/otihandler/dbclient/apis/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -from .db_access import DbAccess -from .event_db_access import EventDbAccess diff --git a/oti/event-handler/otihandler/dbclient/apis/db_access.py b/oti/event-handler/otihandler/dbclient/apis/db_access.py deleted file mode 100644 index f064b30..0000000 --- a/oti/event-handler/otihandler/dbclient/apis/db_access.py +++ /dev/null @@ -1,50 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -""" -Base class for APIs to interact with application database using sqlAlchemy ORM lib and postgresSql driver -""" - -from sqlalchemy.orm import sessionmaker -from ..db_dao import DaoBase -import psycopg2 -from psycopg2.extras import execute_values -import os -import logging - - -class DbAccess(object): - logger = logging.getLogger("dti_handler.DbAccess") - engine = None - session = None - - def __init__(self): - self.engine = DaoBase.getDbEngine() - # create a configured "Session" class - Session = sessionmaker(bind=self.engine) - - # create a Session - self.session = Session() - - def saveDomainObject(self, obj): - self.session.add(obj) - self.session.commit() - self.session.close() - - def deleteDomainObject(self,obj): - self.session.delete(obj) - self.session.commit() - self.session.close() diff --git a/oti/event-handler/otihandler/dbclient/apis/event_db_access.py b/oti/event-handler/otihandler/dbclient/apis/event_db_access.py deleted file mode 100644 index 898ee8e..0000000 --- a/oti/event-handler/otihandler/dbclient/apis/event_db_access.py +++ /dev/null @@ -1,154 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -""" DB APIs to interact with application database using sqlAlchemy ORM lib and postgresSql driver""" - -from sqlalchemy import and_ -from sqlalchemy.orm.exc import NoResultFound - -from .db_access import DbAccess -from ..models import Event, EventAck - - -class EventDbAccess(DbAccess): - - def __init__(self): - DbAccess.__init__(self) - - def query_event_item(self, target_type, target_name): - try: - query = self.session.query(Event).filter(Event.target_type == target_type).\ - filter(Event.target_name == target_name) - evt = query.one() - except NoResultFound: - return None - else: - return evt - - def query_event_data(self, target_type, target_name): - try: - query = self.session.query(Event).filter(Event.target_type == target_type).\ - filter(Event.target_name == target_name) - evt = query.one() - except NoResultFound: - return [] - else: - try: - ack_result = self.session.query(EventAck).filter(EventAck.event == evt).all() - except NoResultFound: - return [] - else: - return ack_result - - def query_event_data_k8s(self, target_type, target_name): - try: - query = self.session.query(Event).filter(Event.target_type == target_type).\ - filter(Event.target_name == target_name) - evt = query.one() - except NoResultFound: - return [] - else: - try: - ack_result = self.session.query(EventAck).filter(EventAck.event == evt).\ - filter(EventAck.container_type != 'docker').all() - except NoResultFound: - return [] - else: - return ack_result - - def query_event_info_docker(self, prim_evt, service_component, deployment_id, container_id): - try: - query = self.session.query(EventAck).filter(EventAck.event == prim_evt).filter( - and_(EventAck.service_component == service_component, - EventAck.deployment_id == deployment_id, - EventAck.container_id == container_id, - EventAck.container_type == 'docker')) - evt = query.one() - except NoResultFound as nrf: - raise nrf - else: - return evt - - def update_event_item(self, dti_event, target_type, target_name): - self.session.query(Event).filter(Event.target_type == target_type). \ - filter(Event.target_name == target_name).update({Event.event:dti_event}) - self.session.commit() - - def query_raw_k8_events(self, cluster, pod, namespace): - """ - run an inner JOIN query to dtih_event and dtih_event_ack tables using supplied query predicates - - :param cluster: - :param pod: - :param namespace: - :return: - Set of event objects related to k8s pods - """ - try: - return self.session.query(Event).filter(Event.dtih_event_id.in_( - self.session.query(EventAck.dtih_event_id).filter(and_(EventAck.k8s_cluster_fqdn == cluster, - EventAck.k8s_pod_id == pod, - EventAck.k8s_namespace == namespace)))).all() - except NoResultFound: - print("invalid query or no data") - return () - - def query_raw_docker_events(self, target_types, locations): - """ - run a query to dtih_event table using supplied query predicates - - :param target_types: required - :param locations: optional - :return: - set of event objects related to docker container - """ - try: - if not locations or (len(locations) == 1 and locations[0] == ''): - return self.session.query(Event).filter(Event.target_type.in_(target_types)).all() - else: - return self.session.query(Event).filter(Event.target_type.in_(target_types)).filter( - Event.location_clli.in_(locations)).all() - except NoResultFound: - print("invalid query or no data") - return () - - def query_pod_info2(self, cluster): - try: - return self.session.query(EventAck).filter(EventAck.k8s_cluster_fqdn == cluster).all() - except NoResultFound: - print("invalid query or no data") - return () - - def query_pod_info(self, cluster): - try: - return self.session.query(EventAck.k8s_pod_id, EventAck.k8s_namespace, - EventAck.k8s_proxy_fqdn, EventAck.k8s_service_name, - EventAck.k8s_service_port)\ - .filter(EventAck.k8s_cluster_fqdn == cluster) \ - .distinct().order_by(EventAck.k8s_cluster_fqdn).all() - except NoResultFound: - print("invalid query or no data") - return () - - def query_event_data_k8s_pod(self, prim_evt, scn): - try: - query = self.session.query(EventAck).filter(EventAck.event == prim_evt).filter( - and_(EventAck.service_component == scn)) - event_info = query.one() - except NoResultFound: - return None - else: - return event_info diff --git a/oti/event-handler/otihandler/dbclient/db_dao.py b/oti/event-handler/otihandler/dbclient/db_dao.py deleted file mode 100644 index 78fa058..0000000 --- a/oti/event-handler/otihandler/dbclient/db_dao.py +++ /dev/null @@ -1,33 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -""" SqlAlchemy ORM engine for postgresSql dti database """ - -from sqlalchemy import create_engine - -class DaoBase: - _engine = None - - @staticmethod - def init_db(dbConStr): - if DaoBase._engine: - return - DaoBase._engine = create_engine(dbConStr) - - @staticmethod - def getDbEngine(): - return DaoBase._engine - diff --git a/oti/event-handler/otihandler/dbclient/models/__init__.py b/oti/event-handler/otihandler/dbclient/models/__init__.py deleted file mode 100644 index bc802f5..0000000 --- a/oti/event-handler/otihandler/dbclient/models/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - - -from .event import Event -from .event_ack import EventAck diff --git a/oti/event-handler/otihandler/dbclient/models/event.py b/oti/event-handler/otihandler/dbclient/models/event.py deleted file mode 100644 index 553bec2..0000000 --- a/oti/event-handler/otihandler/dbclient/models/event.py +++ /dev/null @@ -1,40 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -""" ORM - mapping class for dtih_event table """ - -from sqlalchemy import Column, String, Integer, ForeignKey, func -from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP -from sqlalchemy.ext.declarative import declarative_base -import datetime - - -Base = declarative_base() - -class Event(Base): - __tablename__ = 'dtih_event' - __table_args__ = {'schema': 'dti'} - dtih_event_id = Column(Integer, primary_key=True) - event = Column(JSONB) - create_ts = Column(TIMESTAMP(timezone=True), default=func.now()) - last_modified_ts = Column(TIMESTAMP(timezone=True), default=func.now()) - target_name = Column(String) - target_type = Column(String) - location_clli = Column(String) - # def __repr__(self): - # return "<Event(event_id='%s', target_type='%s', target_name='%s')" % ( - # self.event_id, self.target_type, self.target_name - # ) diff --git a/oti/event-handler/otihandler/dbclient/models/event_ack.py b/oti/event-handler/otihandler/dbclient/models/event_ack.py deleted file mode 100644 index 2b19316..0000000 --- a/oti/event-handler/otihandler/dbclient/models/event_ack.py +++ /dev/null @@ -1,51 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -""" ORM - mapping class for dtih_event_ack table """ - -import datetime -from sqlalchemy import Column, String, Integer, ForeignKey, func -from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP -from sqlalchemy.orm import relationship -from sqlalchemy.ext.declarative import declarative_base -from ..models import Event - -Base = declarative_base() - -class EventAck(Base): - __tablename__ = 'dtih_event_ack' - __table_args__ = {'schema': 'dti'} - dtih_event_ack_id = Column(Integer, primary_key=True) - create_ts = Column(TIMESTAMP(timezone=True), default=func.now()) - last_modified_ts = Column(TIMESTAMP(timezone=True), default=func.now()) - action = Column(String) - k8s_namespace = Column(String) - k8s_service_name = Column(String) - k8s_service_port = Column(String) - k8s_cluster_fqdn = Column(String) - k8s_proxy_fqdn = Column(String) - k8s_pod_id = Column(String) - service_component = Column(String) - deployment_id = Column(String) - container_type = Column(String) - docker_host = Column(String) - container_id = Column(String) - reconfig_script = Column(String) - dtih_event_id = Column(Integer, ForeignKey(Event.dtih_event_id)) - event = relationship(Event) - - def update_action(self, action): - setattr(self, 'action', action) diff --git a/oti/event-handler/otihandler/docker_client.py b/oti/event-handler/otihandler/docker_client.py deleted file mode 100644 index 621a1ec..0000000 --- a/oti/event-handler/otihandler/docker_client.py +++ /dev/null @@ -1,175 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -"""client interface to docker""" - -import docker -import json -import logging -import time - -from otihandler.config import Config -from otihandler.consul_client import ConsulClient -from otihandler.utils import decrypt - - -# class DockerClientError(RuntimeError): -# pass - -class DockerClientConnectionError(RuntimeError): - pass - - -class DockerClient(object): - """ - All Docker logins are in Consul's key-value store under - "docker_plugin/docker_logins" as a list of json objects where - each object is a single login: - - [{ "username": "XXXX", "password": "yyyy", - "registry": "hostname.domain:18443" }] - """ - - _logger = logging.getLogger("oti_handler.docker_client") - - def __init__(self, docker_host, reauth=False): - """Create Docker client - - Args: - ----- - reauth: (boolean) Forces reauthentication, e.g., Docker login - """ - - (fqdn, port) = ConsulClient.get_service_fqdn_port(docker_host, node_meta=True) - base_url = "https://{}:{}".format(fqdn, port) - - try: - tls_config = docker.tls.TLSConfig( - client_cert=( - Config.tls_server_ca_chain_file, - Config.tls_private_key_file - ) - ) - self._client = docker.APIClient(base_url=base_url, tls=tls_config, version='auto', timeout=60) - - for dcl in ConsulClient.get_value("docker_plugin/docker_logins"): - dcl['password'] = decrypt(dcl['password']) - dcl["reauth"] = reauth - self._client.login(**dcl) - - # except requests.exceptions.RequestException as e: - except Exception as e: - msg = "DockerClient.__init__({}) attempt to {} with TLS got exception {}: {!s}".format( - docker_host, base_url, type(e).__name__, e) - - # Then try connecting to dockerhost without TLS - try: - base_url = "tcp://{}:{}".format(fqdn, port) - self._client = docker.APIClient(base_url=base_url, tls=False, version='auto', timeout=60) - - for dcl in ConsulClient.get_value("docker_plugin/docker_logins"): - dcl['password'] = decrypt(dcl['password']) - dcl["reauth"] = reauth - self._client.login(**dcl) - - # except requests.exceptions.RequestException as e: - except Exception as e: - msg = "{}\nDockerClient.__init__({}) attempt to {} without TLS got exception {}: {!s}".format( - msg, docker_host, base_url, type(e).__name__, e) - DockerClient._logger.error(msg) - raise DockerClientConnectionError(msg) - - @staticmethod - def build_cmd(script_path, use_sh=True, msg_type="dti", **kwargs): - """Build command to execute""" - - data = json.dumps(kwargs or {}) - - if use_sh: - return ['/bin/sh', script_path, msg_type, data] - else: - return [script_path, msg_type, data] - - def notify_for_reconfiguration(self, container_id, cmd): - """Notify Docker container that reconfiguration occurred - - Notify the Docker container by doing Docker exec of passed-in command - - Args: - ----- - container_id: (string) - cmd: (list) of strings each entry being part of the command - """ - - for attempts_remaining in range(11,-1,-1): - try: - result = self._client.exec_create(container=container_id, cmd=cmd) - except docker.errors.APIError as e: - # e # 500 Server Error: Internal Server Error ("{"message":"Container 624108d1ab96f24b568662ca0e5ffc39b59c1c57431aec0bef231fb62b04e166 is not running"}") - DockerClient._logger.debug("exec_create() returned APIError: {!s}".format(e)) - - # e.message # 500 Server Error: Internal Server Error - # DockerClient._logger.debug("e.message: {}".format(e.message)) - # e.response.status_code # 500 - # DockerClient._logger.debug("e.response.status_code: {}".format(e.response.status_code)) - # e.response.reason # Internal Server Error - # DockerClient._logger.debug("e.response.reason: {}".format(e.response.reason)) - # e.explanation # {"message":"Container 624108d1ab96f24b568662ca0e5ffc39b59c1c57431aec0bef231fb62b04e166 is not running"} - # DockerClient._logger.debug("e.explanation: {}".format(e.explanation)) - - # docker container restarting can wait - if e.explanation and 'is restarting' in e.explanation.lower(): - DockerClient._logger.debug("notification exec_create() experienced: {!s}".format(e)) - if attempts_remaining == 0: - result = None - break - time.sleep(10) - # elif e.explanation and 'no such container' in e.explanation.lower(): - # elif e.explanation and 'is not running' in e.explanation.lower(): - else: - DockerClient._logger.warn("aborting notification exec_create() because exception {}: {!s}".format(type(e).__name__, e)) - return str(e) # don't raise or CM will retry usually forever - # raise DockerClientError(e) - except Exception as e: - DockerClient._logger.warn("aborting notification exec_create() because exception {}: {!s}".format( - type(e).__name__, e)) - return str(e) # don't raise or CM will retry usually forever - # raise DockerClientError(e) - else: - break - if not result: - DockerClient._logger.warn("aborting notification exec_create() because docker exec failed") - return "notification unsuccessful" # failed to get an exec_id, perhaps trying multiple times, so don't raise or CM will retry usually forever - DockerClient._logger.debug("notification exec_create() succeeded") - - for attempts_remaining in range(11,-1,-1): - try: - result = self._client.exec_start(exec_id=result['Id']) - except Exception as e: - DockerClient._logger.debug("notification exec_start() got exception {}: {!s}".format(type(e).__name__, e)) - if attempts_remaining == 0: - DockerClient._logger.warn("aborting notification exec_start() because exception {}: {!s}".format(type(e).__name__, e)) - return str(e) # don't raise or CM will retry usually forever - # raise DockerClientError(e) - time.sleep(10) - else: - break - DockerClient._logger.debug("notification exec_start() succeeded") - - DockerClient._logger.info("Pass to docker exec {} {} {}".format( - container_id, cmd, result)) - - return result diff --git a/oti/event-handler/otihandler/dti_processor.py b/oti/event-handler/otihandler/dti_processor.py deleted file mode 100644 index 5802233..0000000 --- a/oti/event-handler/otihandler/dti_processor.py +++ /dev/null @@ -1,815 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -"""OTI Event processor for handling all the event types""" - -import copy -import json -import logging -from multiprocessing.dummy import Pool as ThreadPool -from threading import Lock - -import requests - -from otihandler import utils -from otihandler.cfy_client import CfyClient -from otihandler.consul_client import ConsulClient -from otihandler.dbclient.apis import EventDbAccess -from otihandler.dbclient.models import Event, EventAck -from otihandler.docker_client import DockerClient - -notify_response_arr = [] -lock = Lock() -K8S_CLUSTER_PROXY_NODE_PORT = '30132' - - -# def notify_docker(args_tuple): -# """ -# event notification executor inside a process pool to communicate with docker container -# interacts with docker client library -# """ -# (dti_event, db_access, ack_item) = args_tuple -# try: -# dcae_service_action = dti_event.get('dcae_service_action') -# component_scn = ack_item.service_component -# deployment_id = ack_item.deployment_id -# container_id = ack_item.container_id -# docker_host = ack_item.docker_host -# reconfig_script = ack_item.reconfig_script -# container_type = 'docker' -# except Exception as e: -# return ( -# "ERROR", "dti_processor.notify_docker() processing args got exception {}: {!s}".format(type(e).__name__, e)) -# what = "" -# try: -# what = "{} in {} container {} on {} that was deployed by {}".format( -# reconfig_script, container_type, container_id, docker_host, deployment_id) -# if dcae_service_action == 'add': -# add_action = {"dcae_service_action": "deploy"} -# dti_event.update(add_action) -# -# if dcae_service_action == 'delete': -# add_action = {"dcae_service_action": "undeploy"} -# dti_event.update(add_action) -# -# # dkr = DockerClient(docker_host, reauth=False) -# result = '' -# # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ]) -# if dti_event.get('dcae_service_action') == 'undeploy': -# # delete from dti_event_ack table -# try: -# db_access.deleteDomainObject(ack_item) -# except Exception as e: -# msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e) -# DTIProcessor.logger.warning(msg) -# return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) -# else: -# return (component_scn, "ran {}, got: {!s}".format(what, result)) -# -# except Exception as e: -# return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) - - -def notify_svc(args_tuple): - """ - add/update/delete event handler - event notification executor inside a process pool to communicate with docker container and k8s services - interacts with docker client library - interacts with k8s node port services using REST client - """ - (orig_dti_event, db_access, curr_evt, res_tuple) = args_tuple - dti_event = copy.deepcopy(orig_dti_event) - try: - dcae_service_action = dti_event.get('dcae_service_action').lower() - - component_scn = res_tuple[0] - deployment_id = res_tuple[1] - container_id = res_tuple[2] - node_id = res_tuple[3] - docker_host = res_tuple[6] - reconfig_script = res_tuple[7] - container_type = res_tuple[8] - except Exception as e: - return ("ERROR", "dti_processor.notify_svc() processing args got exception {}: {!s}".format(type(e).__name__, e)) - - what = "" - if container_type == "docker": - # exec reconfigure.sh in docker container - try: - what = "{} in {} container {} on {} that was deployed by {} node {}".format( - reconfig_script, container_type, container_id, docker_host, deployment_id, node_id) - if dcae_service_action == 'add': - add_action = {"dcae_service_action": "deploy"} - dti_event.update(add_action) - - if dcae_service_action == 'delete': - add_action = {"dcae_service_action": "undeploy"} - dti_event.update(add_action) - - dkr = DockerClient(docker_host, reauth=False) - result = '' - if dti_event.get('dcae_service_action') == 'update': - # undeploy + deploy - DTIProcessor.logger.debug("update 1 - running undeploy {}".format(what)) - dti_event.update({"dcae_service_action": "undeploy"}) - result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)]) - DTIProcessor.logger.debug("update 2 - running deploy {}".format(what)) - dti_event.update({"dcae_service_action": "deploy"}) - result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)]) - try: - upd_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id, - container_id) - upd_evt_ack.update_action('update') - db_access.saveDomainObject(upd_evt_ack) - except Exception as e: - msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warning(msg) - return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) - else: - DTIProcessor.logger.debug("running {}".format(what)) - result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)]) - if dti_event.get('dcae_service_action') == 'deploy': - # add into dti_event_ack table - try: - add_evt_ack = EventAck(service_component=component_scn, deployment_id=deployment_id, - container_type='docker', docker_host=docker_host, - container_id=container_id, reconfig_script=reconfig_script, - event=curr_evt, - action='add') - db_access.saveDomainObject(add_evt_ack) - except Exception as e: - msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warning(msg) - return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) - else: - # remove from dtih_event_ack if present - if curr_evt: - try: - del_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id, - container_id) - db_access.deleteDomainObject(del_evt_ack) - except Exception as e: - msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warning(msg) - return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) - except Exception as e: - return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) - - return (component_scn, "ran {}, got: {!s}".format(what, result)) - elif container_type == "k8s": - DTIProcessor.logger.debug("dti_processor.notify_svc() handling k8s component") - # if action is 'update', check if k8s pod info exists already for this event in app db - if dcae_service_action == 'add': - DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for add action") - return notify_k8s((dti_event, db_access, curr_evt, res_tuple)) - elif dcae_service_action == 'update': - # handle update for pods being tracked and handle add for new pods - k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn) - if k8s_scn_result: - # update - DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for update action") - return notify_k8s_pod((dti_event, db_access, k8s_scn_result)) - else: - # add - DTIProcessor.logger.debug("dti_processor.notify_svc(), convert update to add action in k8s ") - add_action = {"dcae_service_action": "add"} - dti_event.update(add_action) - return notify_k8s((dti_event, db_access, curr_evt, res_tuple)) - - -def notify_k8s(args_tuple): - """ - add event handler - event notification executor inside a process pool to communicate with k8s statefulset nodeport service - uses REST API client to call k8s services - """ - (dti_event, db_access, curr_evt, res_tuple) = args_tuple - component_scn = res_tuple[0] - deployment_id = res_tuple[1] - node_id = res_tuple[3] - container_type = res_tuple[8] - service_address = res_tuple[9] - service_port = res_tuple[10] - what = "{} in {} deployment {} that was deployed by {} node {}".format( - "add", container_type, "statefulset", deployment_id, node_id) - # call scn node port service REST API - svc_nodeport_url = "https://{}:{}".format(service_address, service_port) - try: - DTIProcessor.logger.debug("running {}".format(what)) - response = requests.put(svc_nodeport_url, json=dti_event, timeout=50) - response.raise_for_status() - except requests.exceptions.RequestException as e: - msg = "collector nodeport service({}) threw exception {}: {!s}".format( - svc_nodeport_url, type(e).__name__, e) - DTIProcessor.logger.error(msg) - return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) - try: - event_ack_info = response.json() - except Exception as e: - msg = "collector nodeport service({}) threw exception {}: {!s}".format( - svc_nodeport_url, type(e).__name__, e) - DTIProcessor.logger.error(msg) - return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) - - if not event_ack_info: - msg = "collector nodeport service returned bad data" - DTIProcessor.logger.error(msg) - return (component_scn, "collector nodeport service returned bad data") - - namespace = event_ack_info.get("KubeNamespace") - svc_name = event_ack_info.get("KubeServiceName") - svc_port = event_ack_info.get("KubeServicePort") - proxy_fqdn = event_ack_info.get("KubeProxyFqdn") - cluster_fqdn = event_ack_info.get("KubeClusterFqdn") - pod_name = event_ack_info.get("KubePod") - statefulset = pod_name[0:pod_name.rindex('-')] - - what = "{} in {} deployment {} in namespace {} that was deployed by {} node {}".format( - "add", container_type, statefulset, namespace, deployment_id, node_id) - try: - add_evt_ack = EventAck(k8s_namespace=namespace, k8s_service_name=svc_name, deployment_id=deployment_id, - k8s_service_port=svc_port, k8s_cluster_fqdn=cluster_fqdn, k8s_proxy_fqdn=proxy_fqdn, - k8s_pod_id=pod_name, event=curr_evt, action='add', container_type='k8s', - service_component=component_scn) - db_access.saveDomainObject(add_evt_ack) - return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info)) - except Exception as e: - msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warning(msg) - return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) - - -def notify_pods(args_tuple): - """ - notify event handler - event notification executor inside a process pool to communicate with k8s DTIH proxy nodeport service - uses REST API client to call k8s services - """ - event_ack_info = '' - (dti_event, res_tuple) = args_tuple - try: - cluster = res_tuple[0] - port = K8S_CLUSTER_PROXY_NODE_PORT - namespace = res_tuple[1] - svc_name = res_tuple[2] - svc_port = res_tuple[4] - replicas = res_tuple[3] - - for replica in range(replicas): - pod_id = "sts-{}-{}".format(svc_name, replica) - item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(cluster, port, namespace, - pod_id, svc_name, - svc_port) - what = "{} for pod id {} in cluster {} and namespace {}".format("notify", pod_id, cluster, namespace) - try: - DTIProcessor.logger.debug("running {}".format(what)) - response = requests.put(item_pod_url, json=dti_event, timeout=50) - response.raise_for_status() - except requests.exceptions.RequestException as e: - msg = "stateful set proxy service({}) threw exception {}: {!s}".format( - item_pod_url, type(e).__name__, e) - DTIProcessor.logger.error(msg) - with lock: - notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what))) - else: - try: - event_ack_info = response.json() - except Exception as e: - msg = "stateful set proxy service({}) threw exception {}: {!s}".format( - item_pod_url, type(e).__name__, e) - DTIProcessor.logger.error(msg) - with lock: - notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what))) - - if not event_ack_info: - msg = "stateful set proxy service returned bad data" - DTIProcessor.logger.error(msg) - with lock: - notify_response_arr.append ((pod_id, "no acknowledgement - running {}".format(what))) - - with lock: - notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info))) - except Exception as e: - with lock: - notify_response_arr.append (("ERROR", "dti_processor.notify_pods() processing args got exception {}: {!s}".format(type(e).__name__, e))) - -def notify_k8s_pod(args_tuple): - """ - update event handler - event notification executor inside a process pool to communicate with k8s DTIH proxy service - uses REST API client to call k8s services - """ - item_pod_url = '' - component_scn = '' - (dti_event, db_access, ack_item) = args_tuple - # call ingress proxy to dispatch delete event - - action = dti_event.get('dcae_service_action') - what = "{} to {} ID {} in namespace {} that was deployed in cluster {}".format( - action, 'k8s pod', ack_item.k8s_pod_id, ack_item.k8s_namespace, ack_item.k8s_cluster_fqdn) - try: - DTIProcessor.logger.debug("running {}".format(what)) - item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format( - ack_item.k8s_proxy_fqdn, K8S_CLUSTER_PROXY_NODE_PORT, ack_item.k8s_namespace, - ack_item.k8s_pod_id, ack_item.k8s_service_name, ack_item.k8s_service_port) - component_scn = ack_item.service_component - response = requests.put(item_pod_url, json=dti_event, timeout=50) - response.raise_for_status() - except requests.exceptions.RequestException as e: - msg = "exception occured, stateful set proxy service({}) threw {}: {!s}".format( - item_pod_url, type(e).__name__, e) - DTIProcessor.logger.error(msg) - return (component_scn, "ran {}, got: {!s}".format(what, msg)) - else: - if action == 'delete': - try: - db_access.deleteDomainObject(ack_item) - except Exception as e: - msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warning(msg) - return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) - else: - try: - ack_item.update_action('update') - db_access.saveDomainObject(ack_item) - except Exception as e: - msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warning(msg) - return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) - - return (component_scn, "ran {}, got: {!s}".format(what, response.json())) - - -class DTIProcessor(object): - """ - Main event processing class that encapsulates all the logic of this handler application! - An instance of this class is created per incoming client request. - - Generates input data by querying platform services - cloudify, consul, postgresSql - - It creates a pool of worker processes using a multiprocessing Pool class instance. - Tasks are offloaded to the worker processes that exist in the pool. - The input data is distributed across processes of the Pool object to enable parallel execution of - event notification function across multiple input values (data parallelism). - """ - - logger = logging.getLogger("oti_handler.dti_processor") - K8S_CLUSTER_PROXY_NODE_PORT = '30132' - db_access = None - docker_pool = None - k8s_pool = None - - def __init__(self, dti_event, send_notification=True): - self._result = {} - self.event = dti_event - self.is_notify = send_notification - self.action = dti_event.get('dcae_service_action').lower() - self.target_name = dti_event.get('dcae_target_name') - self.target_type = dti_event.get('dcae_target_type', '').lower() - self.event_clli = dti_event.get('dcae_service_location') - res_dict = None - try: - self.docker_pool = ThreadPool(8) - self.k8s_pool = ThreadPool(8) - except Exception as e: - msg = "DTIProcessor.__init__() creating ThreadPool got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.error(msg) - self._result['ERROR'] = msg - raise e - else: - self.db_access = EventDbAccess() - self.prim_db_event = None - try: - res_dict = self.dispatcher() - except: - raise - finally: - try: - self.docker_pool.close() - self.k8s_pool.close() - except Exception as e: - msg = "DTIProcessor.__init__() running pool.close() got exception {}: {!s}".format(type(e).__name__, - e) - DTIProcessor.logger.error(msg) - self._result['ERROR'] = msg - try: - self.docker_pool.join() - self.k8s_pool.join() - except Exception as e: - msg = "DTIProcessor.__init__() running pool.join() got exception {}: {!s}".format(type(e).__name__, - e) - DTIProcessor.logger.error(msg) - self._result['ERROR'] = msg - - # if not send_notification: - # DTIProcessor._logger.debug("DTIProcessor.__init__() not notifying DCAE-Controller components") - # return - - if res_dict: - try: - utils.update_dict(self._result, res_dict) - except Exception as e: - msg = "DTIProcessor.__init__() running utils.update_dict() got exception {}: {!s}".format( - type(e).__name__, e) - DTIProcessor.logger.error(msg) - self._result['ERROR'] = msg - - DTIProcessor.logger.debug("DTIProcessor.__init__() done notifying new DCAE-Controller components") - - def dispatcher(self): - """ dispatch method to execute specific method based on event type """ - - arg = str(self.action) - method = getattr(self, arg, lambda: "Invalid action") - return method() - - def undeploy(self): - """ - delete event from consul KV store, this functionality will be retired as events are stored - in postgresSql oti database - """ - global key - try: - # update Consul KV store with DTI Event - storing them in a folder for all components - key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name) - result = ConsulClient.delete_key(key) - except Exception as e: - msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e) - DTIProcessor.logger.warning(msg) - self._result['WARNING'] = msg - else: - if not result: - msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name) - DTIProcessor.logger.warning(msg) - self._result['WARNING'] = msg - - def deploy(self): - """ - add event to consul KV store, this functionality will be retired as events are stored - in postgresSql oti database - """ - dep_key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name) - try: - # update Consul KV store with DTI Event - storing them in a folder for all components - result = ConsulClient.store_kvs({dep_key: self.event}) - except Exception as e: - msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e) - DTIProcessor.logger.warning(msg) - self._result['WARNING'] = msg - - def add(self): - """ - process DTI event that contains a new VNF instance that has to be configured in the collector microservices - """ - res_dict = None - try: - msg = "processing add event for {}/{}".format(self.target_type, self.target_name) - DTIProcessor.logger.debug(msg) - # insert add event into dtih_event table - self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type, - location_clli=self.event_clli) - self.db_access.saveDomainObject(self.prim_db_event) - except Exception as e: - msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args) - DTIProcessor.logger.warning(msg) - self._result['ERROR'] = msg - raise Exception(msg) - else: - if self.is_notify: - try: - # force the action to add, to avoid bad things later - add_action = {"dcae_service_action": "add"} - self.event.update(add_action) - # mock up data - mock_tp11 = ( - "scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1", - "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s", - "dcae-d1.idns.cip.corp.com", "30996") - mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1", - "docker_node_instance_id1", - "node_instance_state", "docker_host", "dti_reconfig_script", "docker", - "dcae-d1.idns.cip.corp.com", "30996") - # tpl_arr = [] - # tpl_arr.append(mock_tp11) - # tpl_arr.append(mock_tp12) - # res_dict = dict(self.docker_pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr)))) - res_dict = dict(self.docker_pool.map(notify_svc, - ((self.event, self.db_access, self.prim_db_event, tp) for tp in - CfyClient().iter_components(self.target_type, - dcae_service_location=self.event_clli)) - )) - except Exception as e: - msg = "DTIProcessor.__init__() running pool.map() got exception {}: {!s}".format(type(e).__name__, - e) - DTIProcessor.logger.error(msg) - self._result['ERROR'] = msg - return res_dict - - def add_replay(self): - """ - convert an update event flow and replay as an add event type since the event acknowledgement is missing - from application database - """ - res_dict = None - try: - # force the action to add, to avoid bad things later - add_action = {"dcae_service_action": "add"} - self.event.update(add_action) - # mock up data - mock_tp11 = ("scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1", - "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s", - "dcae-d1.idns.cip.corp.com", "30996") - mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1", - "docker_node_instance_id1", - "node_instance_state", "docker_host", "dti_reconfig_script", "docker", - "dcae-d1.idns.cip.corp.com", "30996") - # tpl_arr = [] - # tpl_arr.append(mock_tp11) - # tpl_arr.append(mock_tp12) - # res_dict = dict(self.pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr)))) - res_dict = dict(self.docker_pool.map(notify_svc, - ((self.event, self.db_access, self.prim_db_event, tp) for tp in - CfyClient().iter_components(self.target_type, - dcae_service_location=self.event_clli)) - )) - except Exception as e: - msg = "DTIProcessor.add_replay() running pool.map() got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.error(msg) - self._result['ERROR'] = msg - return res_dict - - def delete(self): - """ - process DTI event that indicates a VNF instance has to be removed from the collector microservices - """ - res_dict = {} - res_dict_k8s = {} - res_dict_docker = {} - try: - self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name) - if self.is_notify: - try: - msg = "processing delete event for {}/{} to relate with any docker hosts".format( - self.target_type, self.target_name) - DTIProcessor.logger.warning(msg) - res_dict_docker = dict(self.docker_pool.map(notify_svc, - ((self.event, self.db_access, self.prim_db_event, tp) - for tp - in CfyClient().iter_components_for_docker( - self.target_type, - dcae_service_location=self.event_clli)) - )) - except Exception as e: - msg = "DTIProcessor.delete() running docker_pool.map() got exception {}: {!s}".format(type(e).__name__, - e) - DTIProcessor.logger.error(msg) - self._result['ERROR'] = msg - - try: - msg = "processing delete event for {}/{} to relate with any k8s hosts".format( - self.target_type, self.target_name) - DTIProcessor.logger.warning(msg) - if self.prim_db_event: - result = self.db_access.query_event_data_k8s(self.target_type, self.target_name) - res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, ( - ((self.event, self.db_access, ack_item) for ack_item in result)))) - except Exception as e: - msg = "DTIProcessor.delete() running k8s_pool.map() got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.error(msg) - self._result['ERROR'] = msg - - try: - if self.prim_db_event: - self.db_access.deleteDomainObject(self.prim_db_event) - except Exception as e: - msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args) - DTIProcessor.logger.warning(msg) - self._result['ERROR'] = msg - except Exception as e: - msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args) - DTIProcessor.logger.warning(msg) - self._result['ERROR'] = msg - - if res_dict_k8s: - utils.update_dict(res_dict, res_dict_k8s) - - if res_dict_docker: - utils.update_dict(res_dict, res_dict_docker) - - return res_dict - - def update(self): - """ - process DTI event that indicates VNF instance has to be updated in the collector microservices - """ - res_dict = {} - res_dict_k8s = {} - res_dict_docker = {} - - if self.is_notify: - try: - self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name) - if self.prim_db_event: - self.db_access.update_event_item(self.event, self.target_type, self.target_name) - result = self.db_access.query_event_data(self.target_type, self.target_name) - if len(result) == 0: - msg = "processing update event for {}/{}, but event distribution info is not found in database, " \ - "replaying this event to cluster if required". \ - format(self.target_type, self.target_name) - DTIProcessor.logger.warning(msg) - self._result['WARNING'] = msg - res_dict = self.add_replay() - else: - msg = "DTIProcessor.update() handle update flow for {}/{}, for k8s rediscover scn list and" \ - "identify new vs update cases".format(self.target_type, self.target_name) - DTIProcessor.logger.debug(msg) - try: - tpl_arr = CfyClient().iter_components(self.target_type, - dcae_service_location=self.event_clli) - res_dict_docker = dict(self.docker_pool.map(notify_svc, - (( - self.event, self.db_access, - self.prim_db_event, - tp) - for tp in tpl_arr))) - except Exception as e: - msg = "DTIProcessor.update() running docker_pool.map() got exception {}: {!s}".format( - type(e).__name__, e) - DTIProcessor.logger.error(msg) - self._result['ERROR'] = msg - else: - # event is new for the handler - msg = "processing update event for {}/{}, but current event info is not found in database, " \ - "executing add event".format(self.target_type, self.target_name) - DTIProcessor.logger.warning(msg) - self._result['WARNING'] = msg - res_dict = self.add() - except Exception as e: - msg = "DTIProcessor.update() got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.error(msg) - self._result['ERROR'] = msg - - if res_dict_k8s: - utils.update_dict(res_dict, res_dict_k8s) - - if res_dict_docker: - utils.update_dict(res_dict, res_dict_docker) - - return res_dict - - def notify(self): - """ - event handler to notify all the pods in the kubernetes cluster whose FQDN is present in the incoming event - This notification is meant for the cluster failover. - """ - res_dict = {} - try: - self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name) - if self.prim_db_event: - self.db_access.update_event_item(self.event, self.target_type, self.target_name) - else: - self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type, - location_clli=self.event_clli) - self.db_access.saveDomainObject(self.prim_db_event) - except Exception as e: - msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args) - DTIProcessor.logger.warning(msg) - self._result['ERROR'] = msg - - try: - self.k8s_pool.map(notify_pods, ((self.event, tp) for tp in - CfyClient().query_k8_components(self.target_name))) - for k, v in notify_response_arr: - res_dict[k] = v - except Exception as e: - msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args) - DTIProcessor.logger.warning(msg) - self._result['WARNING'] = msg - - return res_dict - - def get_result(self): - return self._result - - @classmethod - def get_k8_raw_events(cls, pod, cluster, namespace): - """ - Get DTI events for a k8 stateful set pod container - - :param pod: required - k8s stateful set pod ID that was configured with a specific set of DTI Events - :param cluster: required - k8s cluster FQDN where the mS was deployed - :param namespace: required - k8s namespace where the stateful set was deployed in that namespace - :return: - Dictionary of DTI event(s). - DTI events will be keyed by vnf_type, sub-keyed by vnf_id. - """ - db_access = EventDbAccess() - results = db_access.query_raw_k8_events(cluster, pod, namespace) - - target_types = set([]) - outer_dict = {} - - for evnt_item in results: - target_types.add(evnt_item.target_type) - - for targ_type in target_types: - inner_name_evt_dict = {} - for evnt in results: - if targ_type == evnt.target_type: - inner_name_evt_dict[evnt.target_name] = evnt.event - - outer_dict[targ_type] = inner_name_evt_dict - - return outer_dict - - @classmethod - def get_docker_raw_events(cls, service_name, service_location): - """ - Get DTI events for docker container. - - Parameters - ---------- - service_name : string - required. The service component name assigned by dockerplugin to the component that is unique to the - cloudify node instance and used in its Consul key(s). - service_location : string - optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location - in service_location. - If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul - TAGs if service_name is provided, - otherwise results are not location filtered. - - Returns - ------- - dict - Dictionary of DTI event(s). - DTI events will be keyed by vnf_type, sub-keyed by vnf_id. - - """ - - r_dict = {} - - want_locs = [] - if service_location: - want_locs = service_location.split(',') - - give_types = [] - if service_name: - if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node - try: - node_name = ConsulClient.lookup_service(service_name)[0].get("Node") - if node_name: - services = ConsulClient.lookup_node(node_name).get("Services") - if services: - for node_svc in list(services.keys()): - if "-component-dockerhost-" in node_svc: - want_locs = services[node_svc].get("Tags", []) - break - except: - pass - - try: - supported_types = ConsulClient.get_value(service_name + ":oti") - except: - return r_dict - else: - if supported_types: - supported_types = [t_type.lower() for t_type in list(supported_types.keys())] - give_types = supported_types - if not give_types or (len(give_types) == 1 and give_types[0] == ''): - return r_dict - - db_access = EventDbAccess() - results = db_access.query_raw_docker_events(give_types, want_locs) - - target_types = set([]) - outer_dict = {} - - for evnt_item in results: - target_types.add(evnt_item.target_type) - - for targ_type in target_types: - inner_name_evt_dict = {} - for evnt in results: - if targ_type == evnt.target_type: - inner_name_evt_dict[evnt.target_name] = evnt.event - - outer_dict[targ_type] = inner_name_evt_dict - - return outer_dict diff --git a/oti/event-handler/otihandler/onap/CommonLogger.py b/oti/event-handler/otihandler/onap/CommonLogger.py deleted file mode 100644 index 3b5b477..0000000 --- a/oti/event-handler/otihandler/onap/CommonLogger.py +++ /dev/null @@ -1,958 +0,0 @@ -#!/usr/bin/python -# -*- indent-tabs-mode: nil -*- vi: set expandtab: - -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -""" Common Logging library in Python. - -CommonLogger.py - -Original Written by: Terry Schmalzried -Date written: October 1, 2015 -Last updated: December 1, 2016 - -version 0.8 -""" - -import os -import logging -import logging.handlers -import re -import socket -import sys -import threading -import time - - -class CommonLogger: - """ Common Logging object. - - Public methods: - __init__ - setFields - debug - info - warn - error - fatal - """ - - UnknownFile = -1 - ErrorFile = 0 - DebugFile = 1 - AuditFile = 2 - MetricsFile = 3 - DateFmt = '%Y-%m-%dT%H:%M:%S' - verbose = False - - def __init__(self, configFile, logKey, **kwargs): - """Construct a Common Logger for one Log File. - - Arguments: - configFile -- configuration filename. - logKey -- the keyword in configFile that identifies the log filename. - - Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error - style -- the log file format (style) to use when writing log messages, - one of CommonLogger.ErrorFile, CommonLogger.DebugFile, - CommonLogger.AuditFile and CommonLogger.MetricsFile, or - one of the strings "error", "debug", "audit" or "metrics". - May also be set in the config file using a field named - <logKey>Style (where <logKey> is the value of the logKey - parameter). The keyword value overrides the value in the - config file. - requestID (dame) -- optional default value for this log record field. - serviceInstanceID (am) -- optional default value for this log record field. - threadID (am) -- optional default value for this log record field. - serverName (am) -- optional default value for this log record field. - serviceName (am) -- optional default value for this log record field. - instanceUUID (am) -- optional default value for this log record field. - severity (am) -- optional default value for this log record field. - serverIPAddress (am) -- optional default value for this log record field. - server (am) -- optional default value for this log record field. - IPAddress (am) -- optional default value for this log record field. - className (am) -- optional default value for this log record field. - timer (am) -- (ElapsedTime) optional default value for this log record field. - partnerName (ame) -- optional default value for this log record field. - targetEntity (me) -- optional default value for this log record field. - targetServiceName (me) -- optional default value for this log record field. - statusCode (am) -- optional default value for this log record field. - responseCode (am) -- optional default value for this log record field. - responseDescription (am) -- optional default value for this log record field. - processKey (am) -- optional default value for this log record field. - targetVirtualEntity (m) -- optional default value for this log record field. - customField1 (am) -- optional default value for this log record field. - customField2 (am) -- optional default value for this log record field. - customField3 (am) -- optional default value for this log record field. - customField4 (am) -- optional default value for this log record field. - errorCategory (e) -- optional default value for this log record field. - errorCode (e) -- optional default value for this log record field. - errorDescription (e) -- optional default value for this log record field. - - Note: the pipe '|' character is not allowed in any log record field. - """ - - self._monitorFlag = False - - # Get configuration parameters - self._logKey = str(logKey) - self._configFile = str(configFile) - self._rotateMethod = 'time' - self._timeRotateIntervalType = 'midnight' - self._timeRotateInterval = 1 - self._sizeMaxBytes = 0 - self._sizeRotateMode = 'a' - self._socketHost = None - self._socketPort = 0 - self._typeLogger = 'filelogger' - self._backupCount = 6 - self._logLevelThreshold = self._intLogLevel('') - self._logFile = None - self._begTime = None - self._begMsec = 0 - self._fields = {} - self._fields["style"] = CommonLogger.UnknownFile - try: - self._configFileModified = os.path.getmtime(self._configFile) - for line in open(self._configFile): - line = line.split('#',1)[0] # remove comments - if '=' in line: - key, value = [x.strip() for x in line.split('=',1)] - if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none']: - self._rotateMethod = value.lower() - elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']: - self._timeRotateIntervalType = value - elif key == 'timeRotateInterval' and int( value ) > 0: - self._timeRotateInterval = int( value ) - elif key == 'sizeMaxBytes' and int( value ) >= 0: - self._sizeMaxBytes = int( value ) - elif key == 'sizeRotateMode' and value in ['a']: - self._sizeRotateMode = value - elif key == 'backupCount' and int( value ) >= 0: - self._backupCount = int( value ) - elif key == self._logKey + 'SocketHost': - self._socketHost = value - elif key == self._logKey + 'SocketPort' and int( value ) == 0: - self._socketPort = int( value ) - elif key == self._logKey + 'LogType' and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']: - self._typeLogger = value.lower() - elif key == self._logKey + 'LogLevel': - self._logLevelThreshold = self._intLogLevel(value.upper()) - elif key == self._logKey + 'Style': - self._fields["style"] = value - elif key == self._logKey: - self._logFile = value - except Exception as x: - print("exception reading '%s' configuration file: %s" %(self._configFile, str(x))) - sys.exit(2) - except: - print("exception reading '%s' configuration file" %(self._configFile)) - sys.exit(2) - - if self._logFile is None: - print('configuration file %s is missing definition %s for log file' %(self._configFile, self._logKey)) - sys.exit(2) - - - # initialize default log fields - # timestamp will automatically be generated - for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \ - 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \ - 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \ - 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \ - 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \ - 'errorDescription' ]: - if key in kwargs and kwargs[key] != None: - self._fields[key] = kwargs[key] - - self._resetStyleField() - - # Set up logger - self._logLock = threading.Lock() - with self._logLock: - self._logger = logging.getLogger(self._logKey) - self._logger.propagate = False - self._createLogger() - - self._defaultServerInfo() - - # spawn a thread to monitor configFile for logLevel and logFile changes - self._monitorFlag = True - self._monitorThread = threading.Thread(target=self._monitorConfigFile, args=()) - self._monitorThread.daemon = True - self._monitorThread.start() - - - def _createLogger(self): - if self._typeLogger == 'filelogger': - self._mkdir_p(self._logFile) - if self._rotateMethod == 'time': - self._logHandler = logging.handlers.TimedRotatingFileHandler(self._logFile, \ - when=self._timeRotateIntervalType, interval=self._timeRotateInterval, \ - backupCount=self._backupCount, encoding=None, delay=False, utc=True) - elif self._rotateMethod == 'size': - self._logHandler = logging.handlers.RotatingFileHandler(self._logFile, \ - mode=self._sizeRotateMode, maxBytes=self._sizeMaxBytes, \ - backupCount=self._backupCount, encoding=None, delay=False) - - else: - self._logHandler = logging.handlers.WatchedFileHandler(self._logFile, \ - mode=self._sizeRotateMode, \ - encoding=None, delay=False) - elif self._typeLogger == 'stderrlogger': - self._logHandler = logging.handlers.StreamHandler(sys.stderr) - elif self._typeLogger == 'stdoutlogger': - self._logHandler = logging.handlers.StreamHandler(sys.stdout) - elif self._typeLogger == 'socketlogger': - self._logHandler = logging.handlers.SocketHandler(self._socketHost, self._socketPort) - elif self._typeLogger == 'nulllogger': - self._logHandler = logging.handlers.NullHandler() - - if self._fields["style"] == CommonLogger.AuditFile or self._fields["style"] == CommonLogger.MetricsFile: - self._logFormatter = logging.Formatter(fmt='%(begtime)s,%(begmsecs)03d+00:00|%(endtime)s,%(endmsecs)03d+00:00|%(message)s', datefmt=CommonLogger.DateFmt) - else: - self._logFormatter = logging.Formatter(fmt='%(asctime)s,%(msecs)03d+00:00|%(message)s', datefmt='%Y-%m-%dT%H:%M:%S') - self._logFormatter.converter = time.gmtime - self._logHandler.setFormatter(self._logFormatter) - self._logger.addHandler(self._logHandler) - - def _resetStyleField(self): - styleFields = ["error", "debug", "audit", "metrics"] - if self._fields['style'] in styleFields: - self._fields['style'] = styleFields.index(self._fields['style']) - - def __del__(self): - if not self._monitorFlag: - return - - self._monitorFlag = False - - if self._monitorThread is not None and self._monitorThread.is_alive(): - self._monitorThread.join() - - self._monitorThread = None - - - def _defaultServerInfo(self): - - # If not set or purposely set = None, then set default - if self._fields.get('server') is None: - try: - self._fields['server'] = socket.getfqdn() - except Exception as err: - try: - self._fields['server'] = socket.gethostname() - except Exception as err: - self._fields['server'] = "" - - # If not set or purposely set = None, then set default - if self._fields.get('serverIPAddress') is None: - try: - self._fields['serverIPAddress'] = socket.gethostbyname(self._fields['server']) - except Exception as err: - self._fields['serverIPAddress'] = "" - - - def _monitorConfigFile(self): - while self._monitorFlag: - try: - fileTime = os.path.getmtime(self._configFile) - if fileTime > self._configFileModified: - self._configFileModified = fileTime - ReopenLogFile = False - logFile = self._logFile - with open(self._configFile) as fp: - for line in fp: - line = line.split('#',1)[0] # remove comments - if '=' in line: - key, value = [x.strip() for x in line.split('=',1)] - if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none'] and self._rotateMethod != value: - self._rotateMethod = value.lower() - ReopenLogFile = True - elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']: - self._timeRotateIntervalType = value - ReopenLogFile = True - elif key == 'timeRotateInterval' and int( value ) > 0: - self._timeRotateInterval = int( value ) - ReopenLogFile = True - elif key == 'sizeMaxBytes' and int( value ) >= 0: - self._sizeMaxBytes = int( value ) - ReopenLogFile = True - elif key == 'sizeRotateMode' and value in ['a']: - self._sizeRotateMode = value - ReopenLogFile = True - elif key == 'backupCount' and int( value ) >= 0: - self._backupCount = int( value ) - ReopenLogFile = True - elif key == self._logKey + 'SocketHost' and self._socketHost != value: - self._socketHost = value - ReopenLogFile = True - elif key == self._logKey + 'SocketPort' and self._socketPort > 0 and self._socketPort != int( value ): - self._socketPort = int( value ) - ReopenLogFile = True - elif key == self._logKey + 'LogLevel' and self._logLevelThreshold != self._intLogLevel( value.upper() ): - self._logLevelThreshold = self._intLogLevel(value.upper()) - elif key == self._logKey + 'LogType' and self._typeLogger != value and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']: - self._typeLogger = value.lower() - ReopenLogFile = True - elif key == self._logKey + 'Style': - self._fields["style"] = value - self._resetStyleField() - elif key == self._logKey and self._logFile != value: - logFile = value - ReopenLogFile = True - if ReopenLogFile: - with self._logLock: - self._logger.removeHandler(self._logHandler) - self._logFile = logFile - self._createLogger() - except Exception as err: - pass - - time.sleep(5) - - - def setFields(self, **kwargs): - """Set default values for log fields. - - Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error - style -- the log file format (style) to use when writing log messages - requestID (dame) -- optional default value for this log record field. - serviceInstanceID (am) -- optional default value for this log record field. - threadID (am) -- optional default value for this log record field. - serverName (am) -- optional default value for this log record field. - serviceName (am) -- optional default value for this log record field. - instanceUUID (am) -- optional default value for this log record field. - severity (am) -- optional default value for this log record field. - serverIPAddress (am) -- optional default value for this log record field. - server (am) -- optional default value for this log record field. - IPAddress (am) -- optional default value for this log record field. - className (am) -- optional default value for this log record field. - timer (am) -- (ElapsedTime) optional default value for this log record field. - partnerName (ame) -- optional default value for this log record field. - targetEntity (me) -- optional default value for this log record field. - targetServiceName (me) -- optional default value for this log record field. - statusCode (am) -- optional default value for this log record field. - responseCode (am) -- optional default value for this log record field. - responseDescription (am) -- optional default value for this log record field. - processKey (am) -- optional default value for this log record field. - targetVirtualEntity (m) -- optional default value for this log record field. - customField1 (am) -- optional default value for this log record field. - customField2 (am) -- optional default value for this log record field. - customField3 (am) -- optional default value for this log record field. - customField4 (am) -- optional default value for this log record field. - errorCategory (e) -- optional default value for this log record field. - errorCode (e) -- optional default value for this log record field. - errorDescription (e) -- optional default value for this log record field. - - Note: the pipe '|' character is not allowed in any log record field. - """ - - for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \ - 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \ - 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \ - 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \ - 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \ - 'errorDescription' ]: - if key in kwargs: - if kwargs[key] != None: - self._fields[key] = kwargs[key] - elif key in self._fields: - del self._fields[key] - - self._defaultServerInfo() - - - def debug(self, message, **kwargs): - """Write a DEBUG level message to the log file. - - Arguments: - message -- value for the last log record field. - - Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error - style -- the log file format (style) to use when writing log messages - requestID (dame) -- optional default value for this log record field. - serviceInstanceID (am) -- optional default value for this log record field. - threadID (am) -- optional default value for this log record field. - serverName (am) -- optional default value for this log record field. - serviceName (am) -- optional default value for this log record field. - instanceUUID (am) -- optional default value for this log record field. - severity (am) -- optional default value for this log record field. - serverIPAddress (am) -- optional default value for this log record field. - server (am) -- optional default value for this log record field. - IPAddress (am) -- optional default value for this log record field. - className (am) -- optional default value for this log record field. - timer (am) -- (ElapsedTime) optional default value for this log record field. - partnerName (ame) -- optional default value for this log record field. - targetEntity (me) -- optional default value for this log record field. - targetServiceName (me) -- optional default value for this log record field. - statusCode (am) -- optional default value for this log record field. - responseCode (am) -- optional default value for this log record field. - responseDescription (am) -- optional default value for this log record field. - processKey (am) -- optional default value for this log record field. - targetVirtualEntity (m) -- optional default value for this log record field. - customField1 (am) -- optional default value for this log record field. - customField2 (am) -- optional default value for this log record field. - customField3 (am) -- optional default value for this log record field. - customField4 (am) -- optional default value for this log record field. - errorCategory (e) -- optional default value for this log record field. - errorCode (e) -- optional default value for this log record field. - errorDescription (e) -- optional default value for this log record field. - begTime (am) -- optional starting time for this audit/metrics log record. - - Note: the pipe '|' character is not allowed in any log record field. - """ - - self._log('DEBUG', message, errorCategory = 'DEBUG', **kwargs) - - def info(self, message, **kwargs): - """Write an INFO level message to the log file. - - Arguments: - message -- value for the last log record field. - - Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error - style -- the log file format (style) to use when writing log messages - requestID (dame) -- optional default value for this log record field. - serviceInstanceID (am) -- optional default value for this log record field. - threadID (am) -- optional default value for this log record field. - serverName (am) -- optional default value for this log record field. - serviceName (am) -- optional default value for this log record field. - instanceUUID (am) -- optional default value for this log record field. - severity (am) -- optional default value for this log record field. - serverIPAddress (am) -- optional default value for this log record field. - server (am) -- optional default value for this log record field. - IPAddress (am) -- optional default value for this log record field. - className (am) -- optional default value for this log record field. - timer (am) -- (ElapsedTime) optional default value for this log record field. - partnerName (ame) -- optional default value for this log record field. - targetEntity (me) -- optional default value for this log record field. - targetServiceName (me) -- optional default value for this log record field. - statusCode (am) -- optional default value for this log record field. - responseCode (am) -- optional default value for this log record field. - responseDescription (am) -- optional default value for this log record field. - processKey (am) -- optional default value for this log record field. - targetVirtualEntity (m) -- optional default value for this log record field. - customField1 (am) -- optional default value for this log record field. - customField2 (am) -- optional default value for this log record field. - customField3 (am) -- optional default value for this log record field. - customField4 (am) -- optional default value for this log record field. - errorCategory (e) -- optional default value for this log record field. - errorCode (e) -- optional default value for this log record field. - errorDescription (e) -- optional default value for this log record field. - begTime (am) -- optional starting time for this audit/metrics log record. - - Note: the pipe '|' character is not allowed in any log record field. - """ - - self._log('INFO', message, errorCategory = 'INFO', **kwargs) - - def warn(self, message, **kwargs): - """Write a WARN level message to the log file. - - Arguments: - message -- value for the last log record field. - - Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error - style -- the log file format (style) to use when writing log messages - requestID (dame) -- optional default value for this log record field. - serviceInstanceID (am) -- optional default value for this log record field. - threadID (am) -- optional default value for this log record field. - serverName (am) -- optional default value for this log record field. - serviceName (am) -- optional default value for this log record field. - instanceUUID (am) -- optional default value for this log record field. - severity (am) -- optional default value for this log record field. - serverIPAddress (am) -- optional default value for this log record field. - server (am) -- optional default value for this log record field. - IPAddress (am) -- optional default value for this log record field. - className (am) -- optional default value for this log record field. - timer (am) -- (ElapsedTime) optional default value for this log record field. - partnerName (ame) -- optional default value for this log record field. - targetEntity (me) -- optional default value for this log record field. - targetServiceName (me) -- optional default value for this log record field. - statusCode (am) -- optional default value for this log record field. - responseCode (am) -- optional default value for this log record field. - responseDescription (am) -- optional default value for this log record field. - processKey (am) -- optional default value for this log record field. - targetVirtualEntity (m) -- optional default value for this log record field. - customField1 (am) -- optional default value for this log record field. - customField2 (am) -- optional default value for this log record field. - customField3 (am) -- optional default value for this log record field. - customField4 (am) -- optional default value for this log record field. - errorCategory (e) -- optional default value for this log record field. - errorCode (e) -- optional default value for this log record field. - errorDescription (e) -- optional default value for this log record field. - begTime (am) -- optional starting time for this audit/metrics log record. - - Note: the pipe '|' character is not allowed in any log record field. - """ - - self._log('WARN', message, errorCategory = 'WARN', **kwargs) - - def error(self, message, **kwargs): - """Write an ERROR level message to the log file. - - Arguments: - message -- value for the last log record field. - - Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error - style -- the log file format (style) to use when writing log messages - requestID (dame) -- optional default value for this log record field. - serviceInstanceID (am) -- optional default value for this log record field. - threadID (am) -- optional default value for this log record field. - serverName (am) -- optional default value for this log record field. - serviceName (am) -- optional default value for this log record field. - instanceUUID (am) -- optional default value for this log record field. - severity (am) -- optional default value for this log record field. - serverIPAddress (am) -- optional default value for this log record field. - server (am) -- optional default value for this log record field. - IPAddress (am) -- optional default value for this log record field. - className (am) -- optional default value for this log record field. - timer (am) -- (ElapsedTime) optional default value for this log record field. - partnerName (ame) -- optional default value for this log record field. - targetEntity (me) -- optional default value for this log record field. - targetServiceName (me) -- optional default value for this log record field. - statusCode (am) -- optional default value for this log record field. - responseCode (am) -- optional default value for this log record field. - responseDescription (am) -- optional default value for this log record field. - processKey (am) -- optional default value for this log record field. - targetVirtualEntity (m) -- optional default value for this log record field. - customField1 (am) -- optional default value for this log record field. - customField2 (am) -- optional default value for this log record field. - customField3 (am) -- optional default value for this log record field. - customField4 (am) -- optional default value for this log record field. - errorCategory (e) -- optional default value for this log record field. - errorCode (e) -- optional default value for this log record field. - errorDescription (e) -- optional default value for this log record field. - begTime (am) -- optional starting time for this audit/metrics log record. - - Note: the pipe '|' character is not allowed in any log record field. - """ - - self._log('ERROR', message, errorCategory = 'ERROR', **kwargs) - - def fatal(self, message, **kwargs): - """Write a FATAL level message to the log file. - - Arguments: - message -- value for the last log record field. - - Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error - style -- the log file format (style) to use when writing log messages - requestID (dame) -- optional default value for this log record field. - serviceInstanceID (am) -- optional default value for this log record field. - threadID (am) -- optional default value for this log record field. - serverName (am) -- optional default value for this log record field. - serviceName (am) -- optional default value for this log record field. - instanceUUID (am) -- optional default value for this log record field. - severity (am) -- optional default value for this log record field. - serverIPAddress (am) -- optional default value for this log record field. - server (am) -- optional default value for this log record field. - IPAddress (am) -- optional default value for this log record field. - className (am) -- optional default value for this log record field. - timer (am) -- (ElapsedTime) optional default value for this log record field. - partnerName (ame) -- optional default value for this log record field. - targetEntity (me) -- optional default value for this log record field. - targetServiceName (me) -- optional default value for this log record field. - statusCode (am) -- optional default value for this log record field. - responseCode (am) -- optional default value for this log record field. - responseDescription (am) -- optional default value for this log record field. - processKey (am) -- optional default value for this log record field. - targetVirtualEntity (m) -- optional default value for this log record field. - customField1 (am) -- optional default value for this log record field. - customField2 (am) -- optional default value for this log record field. - customField3 (am) -- optional default value for this log record field. - customField4 (am) -- optional default value for this log record field. - errorCategory (e) -- optional default value for this log record field. - errorCode (e) -- optional default value for this log record field. - errorDescription (e) -- optional default value for this log record field. - begTime (am) -- optional starting time for this audit/metrics log record. - - Note: the pipe '|' character is not allowed in any log record field. - """ - - self._log('FATAL', message, errorCategory = 'FATAL', **kwargs) - - def _log(self, logLevel, message, **kwargs): - """Write a message to the log file. - - Arguments: - logLevel -- value ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', ...) for the log record. - message -- value for the last log record field. - - Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error - style -- the log file format (style) to use when writing log messages - requestID (dame) -- optional default value for this log record field. - serviceInstanceID (am) -- optional default value for this log record field. - threadID (am) -- optional default value for this log record field. - serverName (am) -- optional default value for this log record field. - serviceName (am) -- optional default value for this log record field. - instanceUUID (am) -- optional default value for this log record field. - severity (am) -- optional default value for this log record field. - serverIPAddress (am) -- optional default value for this log record field. - server (am) -- optional default value for this log record field. - IPAddress (am) -- optional default value for this log record field. - className (am) -- optional default value for this log record field. - timer (am) -- (ElapsedTime) optional default value for this log record field. - partnerName (ame) -- optional default value for this log record field. - targetEntity (me) -- optional default value for this log record field. - targetServiceName (me) -- optional default value for this log record field. - statusCode (am) -- optional default value for this log record field. - responseCode (am) -- optional default value for this log record field. - responseDescription (am) -- optional default value for this log record field. - processKey (am) -- optional default value for this log record field. - targetVirtualEntity (m) -- optional default value for this log record field. - customField1 (am) -- optional default value for this log record field. - customField2 (am) -- optional default value for this log record field. - customField3 (am) -- optional default value for this log record field. - customField4 (am) -- optional default value for this log record field. - errorCategory (e) -- optional default value for this log record field. - errorCode (e) -- optional default value for this log record field. - errorDescription (e) -- optional default value for this log record field. - begTime (am) -- optional starting time for this audit/metrics log record. - - Note: the pipe '|' character is not allowed in any log record field. - """ - - # timestamp will automatically be inserted - style = int(self._getVal('style', '', **kwargs)) - requestID = self._getVal('requestID', '', **kwargs) - serviceInstanceID = self._getVal('serviceInstanceID', '', **kwargs) - threadID = self._getVal('threadID', threading.currentThread().getName(), **kwargs) - serverName = self._getVal('serverName', '', **kwargs) - serviceName = self._getVal('serviceName', '', **kwargs) - instanceUUID = self._getVal('instanceUUID', '', **kwargs) - upperLogLevel = self._noSep(logLevel.upper()) - severity = self._getVal('severity', '', **kwargs) - serverIPAddress = self._getVal('serverIPAddress', '', **kwargs) - server = self._getVal('server', '', **kwargs) - IPAddress = self._getVal('IPAddress', '', **kwargs) - className = self._getVal('className', '', **kwargs) - timer = self._getVal('timer', '', **kwargs) - partnerName = self._getVal('partnerName', '', **kwargs) - targetEntity = self._getVal('targetEntity', '', **kwargs) - targetServiceName = self._getVal('targetServiceName', '', **kwargs) - statusCode = self._getVal('statusCode', '', **kwargs) - responseCode = self._getVal('responseCode', '', **kwargs) - responseDescription = self._noSep(self._getVal('responseDescription', '', **kwargs)) - processKey = self._getVal('processKey', '', **kwargs) - targetVirtualEntity = self._getVal('targetVirtualEntity', '', **kwargs) - customField1 = self._getVal('customField1', '', **kwargs) - customField2 = self._getVal('customField2', '', **kwargs) - customField3 = self._getVal('customField3', '', **kwargs) - customField4 = self._getVal('customField4', '', **kwargs) - errorCategory = self._getVal('errorCategory', '', **kwargs) - errorCode = self._getVal('errorCode', '', **kwargs) - errorDescription = self._noSep(self._getVal('errorDescription', '', **kwargs)) - nbegTime = self._getArg('begTime', {}, **kwargs) - - detailMessage = self._noSep(message) - if bool(re.match(r" *$", detailMessage)): - return # don't log empty messages - - useLevel = self._intLogLevel(upperLogLevel) - if CommonLogger.verbose: print("logger STYLE=%s" % style) - if useLevel < self._logLevelThreshold: - if CommonLogger.verbose: print("skipping because of level") - pass - else: - with self._logLock: - if style == CommonLogger.ErrorFile: - if CommonLogger.verbose: print("using CommonLogger.ErrorFile") - self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ - %(requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName, - errorCategory, errorCode, errorDescription, detailMessage)) - elif style == CommonLogger.DebugFile: - if CommonLogger.verbose: print("using CommonLogger.DebugFile") - self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ - %(requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel, - severity, serverIPAddress, server, IPAddress, className, timer, detailMessage)) - elif style == CommonLogger.AuditFile: - if CommonLogger.verbose: print("using CommonLogger.AuditFile") - endAuditTime, endAuditMsec = self._getTime() - if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime: - d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endAuditTime, 'endmsecs': endAuditMsec } - elif self._begTime is not None: - d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec } - else: - d = { 'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec } - self._begTime = None - unused = "" - self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ - %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName, - statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel, - severity, serverIPAddress, timer, server, IPAddress, className, unused, - processKey, customField1, customField2, customField3, customField4, detailMessage), extra=d) - elif style == CommonLogger.MetricsFile: - if CommonLogger.verbose: print("using CommonLogger.MetricsFile") - endMetricsTime, endMetricsMsec = self._getTime() - if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime: - d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec } - elif self._begTime is not None: - d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec } - else: - d = { 'begtime': endMetricsTime, 'begmsecs': endMetricsMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec } - self._begTime = None - unused = "" - self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ - %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName, - targetEntity, targetServiceName, statusCode, responseCode, responseDescription, - instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server, IPAddress, - className, unused, processKey, targetVirtualEntity, customField1, customField2, - customField3, customField4, detailMessage), extra=d) - else: - print("!!!!!!!!!!!!!!!! style not set: %s" % self._fields["style"]) - - def _getTime(self): - ct = time.time() - lt = time.localtime(ct) - return (time.strftime(CommonLogger.DateFmt, lt), (ct - int(ct)) * 1000) - - def setStartRecordEvent(self): - """ - Set the start time to be saved for both audit and metrics records - """ - self._begTime, self._begMsec = self._getTime() - - def getStartRecordEvent(self): - """ - Retrieve the start time to be used for either audit and metrics records - """ - begTime, begMsec = self._getTime() - return {'begTime':begTime, 'begMsec':begMsec} - - def _getVal(self, key, default, **kwargs): - val = self._fields.get(key) - if key in kwargs: val = kwargs[key] - if val is None: val = default - return self._noSep(val) - - def _getArg(self, key, default, **kwargs): - val = None - if key in kwargs: val = kwargs[key] - if val is None: val = default - return val - - def _noSep(self, message): - if message is None: return '' - return re.sub(r'[\|\n]', ' ', str(message)) - - def _intLogLevel(self, logLevel): - if logLevel == 'FATAL': useLevel = 50 - elif logLevel == 'ERROR': useLevel = 40 - elif logLevel == 'WARN': useLevel = 30 - elif logLevel == 'INFO': useLevel = 20 - elif logLevel == 'DEBUG': useLevel = 10 - else: useLevel = 0 - return useLevel - - def _mkdir_p(self, filename): - """Create missing directories from a full filename path like mkdir -p""" - - if filename is None: - return - - folder=os.path.dirname(filename) - - if folder == "": - return - - if not os.path.exists(folder): - try: - os.makedirs(folder) - except OSError as err: - print("error number %d creating %s directory to hold %s logfile: %s" %(err.errno, err.filename, filename, err.strerror)) - sys.exit(2) - except Exception as err: - print("error creating %s directory to hold %s logfile: %s" %(folder, filename, str(err))) - sys.exit(2) - -if __name__ == "__main__": # pragma: no cover - - def __checkOneTime(line): - format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[+]00:00[|]' - m = re.match(format, line) - if not m: - print("ERROR: time string did not match proper time format, %s" %line) - print("\t: format=%s" % format) - return 1 - return 0 - - def __checkTwoTimes(line, different): - format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|][0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|]' - m = re.match(format, line) - if not m: - print("ERROR: time strings did not match proper time format, %s" %line) - print("\t: format=%s" % format) - return 1 - second1 = int(m.group(1)) - msec1 = int(m.group(2)) - second2 = int(m.group(3)) - msec2 = int(m.group(4)) - if second1 > second2: second2 += 60 - t1 = second1 * 1000 + msec1 - t2 = second2 * 1000 + msec2 - diff = t2 - t1 - # print("t1=%d (%d,%d) t2=%d (%d,%d), diff = %d" % (t1, second1, msec1, t2, second2, msec2, diff)) - if different: - if diff < 500: - print("ERROR: times did not differ enough: %s" % line) - return 1 - else: - if diff > 10: - print("ERROR: times were too far apart: %s" % line) - return 1 - return 0 - - def __checkBegTime(line): - format = "begTime should be ([-0-9T:]+)" - # print("checkBegTime(%s)" % line) - strt = 'begTime should be ' - i = line.index(strt) - rest = line[i+len(strt):].rstrip() - if not line.startswith(rest + ","): - print("ERROR: line %s should start with %s" % (line,rest)) - return 1 - return 0 - - def __checkLog(logfile, numLines, numFields): - lineCount = 0 - errorCount = 0 - with open(logfile, "r") as fp: - for line in fp: - # print("saw line %s" % line) - lineCount += 1 - c = line.count('|') - if c != numFields: - print("ERROR: wrong number of fields. Expected %d, got %d: %s" % (numFields, c, line)) - errorCount += 1 - if re.search("should not appear", line): - print("ERROR: a line appeared that should not have appeared, %s" % line) - errorCount += 1 - elif re.search("single time", line): - errorCount += __checkOneTime(line) - elif re.search("time should be the same", line): - errorCount += __checkTwoTimes(line, different=False) - elif re.search("time should be ", line): - errorCount += __checkTwoTimes(line, different=True) - elif re.search("begTime should be ", line): - errorCount += __checkBegTime(line) - else: - print("ERROR: an unknown message appeared, %s" % line) - errorCount += 1 - - if lineCount != numLines: - print("ERROR: expected %d lines, but got %d lines" % (numLines, lineCount)) - errorCount += 1 - return errorCount - - import os, argparse - parser = argparse.ArgumentParser(description="test the CommonLogger functions") - parser.add_argument("-k", "--keeplogs", help="Keep the log files after finishing the tests", action="store_true") - parser.add_argument("-v", "--verbose", help="Print debugging messages", action="store_true") - args = parser.parse_args() - - spid = str(os.getpid()) - if args.keeplogs: - spid = "" - logcfg = "/tmp/cl.log" + spid + ".cfg" - errorLog = "/tmp/cl.error" + spid + ".log" - metricsLog = "/tmp/cl.metrics" + spid + ".log" - auditLog = "/tmp/cl.audit" + spid + ".log" - debugLog = "/tmp/cl.debug" + spid + ".log" - if args.verbose: CommonLogger.verbose = True - - import atexit - def cleanupTmps(): - for f in [ logcfg, errorLog, metricsLog, auditLog, debugLog ]: - try: - os.remove(f) - except: - pass - if not args.keeplogs: - atexit.register(cleanupTmps) - - with open(logcfg, "w") as o: - o.write("error = " + errorLog + "\n" + - "errorLogLevel = WARN\n" + - "metrics = " + metricsLog + "\n" + - "metricsLogLevel = INFO\n" + - "audit = " + auditLog + "\n" + - "auditLogLevel = INFO\n" + - "debug = " + debugLog + "\n" + - "debugLogLevel = DEBUG\n") - - import uuid - instanceUUID = uuid.uuid1() - serviceName = "testharness" - errorLogger = CommonLogger(logcfg, "error", style=CommonLogger.ErrorFile, instanceUUID=instanceUUID, serviceName=serviceName) - debugLogger = CommonLogger(logcfg, "debug", style=CommonLogger.DebugFile, instanceUUID=instanceUUID, serviceName=serviceName) - auditLogger = CommonLogger(logcfg, "audit", style=CommonLogger.AuditFile, instanceUUID=instanceUUID, serviceName=serviceName) - metricsLogger = CommonLogger(logcfg, "metrics", style=CommonLogger.MetricsFile, instanceUUID=instanceUUID, serviceName=serviceName) - - testsRun = 0 - errorCount = 0 - errorLogger.debug("error calling debug (should not appear)") - errorLogger.info("error calling info (should not appear)") - errorLogger.warn("error calling warn (single time)") - errorLogger.error("error calling error (single time)") - errorLogger.setStartRecordEvent() - time.sleep(1) - errorLogger.fatal("error calling fatal, after setStartRecordEvent and sleep (start should be ignored, single time)") - testsRun += 6 - errorCount += __checkLog(errorLog, 3, 10) - - auditLogger.debug("audit calling debug (should not appear)") - auditLogger.info("audit calling info (time should be the same)") - auditLogger.warn("audit calling warn (time should be the same)") - auditLogger.error("audit calling error (time should be the same)") - bt = auditLogger.getStartRecordEvent() - # print("bt=%s" % bt) - time.sleep(1) - auditLogger.setStartRecordEvent() - time.sleep(1) - auditLogger.fatal("audit calling fatal, after setStartRecordEvent and sleep, time should be different)") - time.sleep(1) - auditLogger.fatal("audit calling fatal, begTime should be %s" % bt['begTime'], begTime=bt) - testsRun += 7 - errorCount += __checkLog(auditLog, 5, 25) - - debugLogger.debug("debug calling debug (single time)") - debugLogger.info("debug calling info (single time)") - debugLogger.warn("debug calling warn (single time)") - debugLogger.setStartRecordEvent() - time.sleep(1) - debugLogger.error("debug calling error, after SetStartRecordEvent and sleep (start should be ignored, single time)") - debugLogger.fatal("debug calling fatal (single time)") - errorCount += __checkLog(debugLog, 5, 13) - testsRun += 6 - - metricsLogger.debug("metrics calling debug (should not appear)") - metricsLogger.info("metrics calling info (time should be the same)") - metricsLogger.warn("metrics calling warn (time should be the same)") - bt = metricsLogger.getStartRecordEvent() - time.sleep(1) - metricsLogger.setStartRecordEvent() - time.sleep(1) - metricsLogger.error("metrics calling error, after SetStartRecordEvent and sleep, time should be different") - metricsLogger.fatal("metrics calling fatal (time should be the same)") - time.sleep(1) - metricsLogger.fatal("metrics calling fatal, begTime should be %s" % bt['begTime'], begTime=bt) - testsRun += 6 - errorCount += __checkLog(metricsLog, 5, 28) - - print("%d tests run, %d errors found" % (testsRun, errorCount)) diff --git a/oti/event-handler/otihandler/onap/__init__.py b/oti/event-handler/otihandler/onap/__init__.py deleted file mode 100644 index 87cf002..0000000 --- a/oti/event-handler/otihandler/onap/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= diff --git a/oti/event-handler/otihandler/onap/audit.py b/oti/event-handler/otihandler/onap/audit.py deleted file mode 100644 index 8cd16cf..0000000 --- a/oti/event-handler/otihandler/onap/audit.py +++ /dev/null @@ -1,375 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -"""generic class to keep track of request handling - from receiving it through reponse and log all the activities - - call Audit.init("component-name", "path/to/config_file") to init the loggers before any requests - - start each outside request with creation of the Audit object - audit = Audit(request_id=None, headers=None, msg=None) -""" - -import os -import sys -import json -import uuid -import time -import copy -from datetime import datetime -from threading import Lock -from enum import Enum - -from .CommonLogger import CommonLogger -from .health import Health - -REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID" -REQUEST_REMOTE_ADDR = "Remote-Addr" -REQUEST_HOST = "Host" -HOSTNAME = "HOSTNAME" - -AUDIT_REQUESTID = 'requestID' -AUDIT_IPADDRESS = 'IPAddress' -AUDIT_SERVER = 'server' -AUDIT_TARGET_ENTITY = 'targetEntity' - -HEADER_CLIENTAUTH = "clientauth" -HEADER_AUTHORIZATION = "authorization" - -class AuditHttpCode(Enum): - """audit http codes""" - HTTP_OK = 200 - PERMISSION_UNAUTHORIZED_ERROR = 401 - PERMISSION_FORBIDDEN_ERROR = 403 - RESPONSE_ERROR = 400 - DATA_NOT_FOUND_ERROR = 404 - SERVER_INTERNAL_ERROR = 500 - SERVICE_UNAVAILABLE_ERROR = 503 - DATA_ERROR = 1030 - SCHEMA_ERROR = 1040 - -class AuditResponseCode(Enum): - """audit response codes""" - SUCCESS = 0 - PERMISSION_ERROR = 100 - AVAILABILITY_ERROR = 200 - DATA_ERROR = 300 - SCHEMA_ERROR = 400 - BUSINESS_PROCESS_ERROR = 500 - UNKNOWN_ERROR = 900 - - @staticmethod - def get_response_code(http_status_code): - """calculates the response_code from max_http_status_code""" - response_code = AuditResponseCode.UNKNOWN_ERROR - if http_status_code <= AuditHttpCode.HTTP_OK.value: - response_code = AuditResponseCode.SUCCESS - - elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, - AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]: - response_code = AuditResponseCode.PERMISSION_ERROR - elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value: - response_code = AuditResponseCode.AVAILABILITY_ERROR - elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: - response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR - elif http_status_code in [AuditHttpCode.DATA_ERROR.value, - AuditHttpCode.RESPONSE_ERROR.value, - AuditHttpCode.DATA_NOT_FOUND_ERROR.value]: - response_code = AuditResponseCode.DATA_ERROR - elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value: - response_code = AuditResponseCode.SCHEMA_ERROR - - return response_code - - @staticmethod - def get_human_text(response_code): - """convert enum name into human readable text""" - if not response_code: - return "unknown" - return response_code.name.lower().replace("_", " ") - -class Audit(object): - """put the audit object on stack per each initiating request in the system - - :request_id: is the X-ECOMP-RequestID for tracing - - :req_message: is the request message string for logging - - :aud_parent: is the parent request - used for sub-query metrics to other systems - - :kwargs: - put any request related params into kwargs - """ - _service_name = "" - _service_version = "" - _service_instance_uuid = str(uuid.uuid4()) - _started = datetime.now() - _logger_debug = None - _logger_error = None - _logger_metrics = None - _logger_audit = None - _health = Health() - _py_ver = sys.version.replace("\n", "") - - @staticmethod - def init(service_name, service_version, config_file_path): - """init static invariants and loggers""" - Audit._service_name = service_name - Audit._service_version = service_version - Audit._logger_debug = CommonLogger(config_file_path, "debug", \ - instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) - Audit._logger_error = CommonLogger(config_file_path, "error", \ - instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) - Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ - instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) - Audit._logger_audit = CommonLogger(config_file_path, "audit", \ - instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) - - @staticmethod - def health(): - """returns json for health check""" - now = datetime.now() - return { - "service_name" : Audit._service_name, - "service_version" : Audit._service_version, - "service_instance_UUID" : Audit._service_instance_uuid, - "python" : Audit._py_ver, - "started" : str(Audit._started), - "now" : str(now), - "uptime" : str(now - Audit._started), - "stats" : Audit._health.dump(), - "packages" : "N/A" # Audit._packages - } - - def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs): - """create audit object per each request in the system - - :request_id: is the X-ECOMP-RequestID for tracing - :req_message: is the request message string for logging - :aud_parent: is the parent Audit - used for sub-query metrics to other systems - :kwargs: - put any request related params into kwargs - """ - self.request_id = request_id - self.req_message = req_message or "" - self.aud_parent = aud_parent - self.kwargs = kwargs or {} - - self.retry_get_config = False - self.max_http_status_code = 0 - self._lock = Lock() - - if self.aud_parent: - if not self.request_id: - self.request_id = self.aud_parent.request_id - if not self.req_message: - self.req_message = self.aud_parent.req_message - self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs) - else: - headers = self.kwargs.get("headers", {}) - if headers: - if not self.request_id: - self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID) - if AUDIT_IPADDRESS not in self.kwargs: - self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR) - if AUDIT_SERVER not in self.kwargs: - self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST) - - if AUDIT_SERVER not in self.kwargs: - self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME) - - created_req = "" - if not self.request_id: - created_req = " with new" - self.request_id = str(uuid.uuid4()) - - self.kwargs[AUDIT_REQUESTID] = self.request_id - - self._started = time.time() - self._start_event = Audit._logger_audit.getStartRecordEvent() - self.metrics_start() - - if not self.aud_parent: - self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\ - .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) - - def merge_all_kwargs(self, **kwargs): - """returns the merge of copy of self.kwargs with the param kwargs""" - all_kwargs = self.kwargs.copy() - if kwargs: - all_kwargs.update(kwargs) - return all_kwargs - - def set_http_status_code(self, http_status_code): - """accumulate the highest(worst) http status code""" - self._lock.acquire() - if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: - self.max_http_status_code = max(http_status_code, self.max_http_status_code) - self._lock.release() - - def get_max_http_status_code(self): - """returns the highest(worst) http status code""" - self._lock.acquire() - max_http_status_code = self.max_http_status_code - self._lock.release() - return max_http_status_code - - @staticmethod - def get_status_code(success): - """COMPLETE versus ERROR""" - if success: - return 'COMPLETE' - return 'ERROR' - - @staticmethod - def hide_secrets(obj): - """hides the known secret field values of the dictionary""" - if not isinstance(obj, dict): - return obj - - for key in obj: - if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]: - obj[key] = "*" - elif isinstance(obj[key], dict): - obj[key] = Audit.hide_secrets(obj[key]) - - return obj - - @staticmethod - def log_json_dumps(obj, **kwargs): - """hide the known secret field values of the dictionary and return json.dumps""" - if not isinstance(obj, dict): - return json.dumps(obj, **kwargs) - - return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) - - def is_serious_error(self, status_code): - """returns whether the response_code is success and a human text for response code""" - return AuditResponseCode.PERMISSION_ERROR.value \ - == AuditResponseCode.get_response_code(status_code).value \ - or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value - - def _get_response_status(self): - """calculates the response status fields from max_http_status_code""" - max_http_status_code = self.get_max_http_status_code() - response_code = AuditResponseCode.get_response_code(max_http_status_code) - success = (response_code.value == AuditResponseCode.SUCCESS.value) - response_description = AuditResponseCode.get_human_text(response_code) - return success, max_http_status_code, response_code, response_description - - def is_success(self): - """returns whether the response_code is success and a human text for response code""" - success, _, _, _ = self._get_response_status() - return success - - def debug(self, log_line, **kwargs): - """debug - the debug=lowest level of logging""" - Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs)) - - def info(self, log_line, **kwargs): - """debug - the info level of logging""" - Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs)) - - def info_requested(self, result=None, **kwargs): - """info "requested ..." - the info level of logging""" - self.info("requested {0} {1}".format(self.req_message, result or ""), \ - **self.merge_all_kwargs(**kwargs)) - - def warn(self, log_line, **kwargs): - """debug+error - the warn level of logging""" - all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.warn(log_line, **all_kwargs) - Audit._logger_error.warn(log_line, **all_kwargs) - - def error(self, log_line, **kwargs): - """debug+error - the error level of logging""" - all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.error(log_line, **all_kwargs) - Audit._logger_error.error(log_line, **all_kwargs) - - def fatal(self, log_line, **kwargs): - """debug+error - the fatal level of logging""" - all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.fatal(log_line, **all_kwargs) - Audit._logger_error.fatal(log_line, **all_kwargs) - - @staticmethod - def get_elapsed_time(started): - """returns the elapsed time since started in milliseconds""" - return int(round(1000 * (time.time() - started))) - - def metrics_start(self, log_line=None, **kwargs): - """reset metrics timing""" - self._metrics_started = time.time() - self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent() - if log_line: - self.info(log_line, **self.merge_all_kwargs(**kwargs)) - - def metrics(self, log_line, **kwargs): - """debug+metrics - the metrics=sub-audit level of logging""" - all_kwargs = self.merge_all_kwargs(**kwargs) - success, max_http_status_code, response_code, response_description = \ - self._get_response_status() - metrics_func = None - timer = Audit.get_elapsed_time(self._metrics_started) - if success: - log_line = "done: {0}".format(log_line) - self.info(log_line, **all_kwargs) - metrics_func = Audit._logger_metrics.info - Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) - else: - log_line = "failed: {0}".format(log_line) - self.error(log_line, errorCode=response_code.value, \ - errorDescription=response_description, **all_kwargs) - metrics_func = Audit._logger_metrics.error - Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) - - metrics_func(log_line, begTime=self._metrics_start_event, timer=timer, - statusCode=Audit.get_status_code(success), responseCode=response_code.value, - responseDescription=response_description, - **all_kwargs - ) - - self.metrics_start() - return (success, max_http_status_code, response_description) - - def audit_done(self, result=None, **kwargs): - """debug+audit - the audit=top level of logging""" - all_kwargs = self.merge_all_kwargs(**kwargs) - success, max_http_status_code, response_code, response_description = \ - self._get_response_status() - log_line = "{0} {1}".format(self.req_message, result or "").strip() - audit_func = None - timer = Audit.get_elapsed_time(self._started) - if success: - log_line = "done: {0}".format(log_line) - self.info(log_line, **all_kwargs) - audit_func = Audit._logger_audit.info - Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) - else: - log_line = "failed: {0}".format(log_line) - self.error(log_line, errorCode=response_code.value, - errorDescription=response_description, **all_kwargs) - audit_func = Audit._logger_audit.error - Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) - - audit_func(log_line, begTime=self._start_event, timer=timer, - statusCode=Audit.get_status_code(success), - responseCode=response_code.value, - responseDescription=response_description, - **all_kwargs - ) - - return (success, max_http_status_code, response_description) - # this line added to test diff --git a/oti/event-handler/otihandler/onap/health.py b/oti/event-handler/otihandler/onap/health.py deleted file mode 100644 index 39edc0d..0000000 --- a/oti/event-handler/otihandler/onap/health.py +++ /dev/null @@ -1,102 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -"""generic class to keep track of app health""" - -import uuid -from datetime import datetime -from threading import Lock - - -class HealthStats(object): - """keep track of stats for calls""" - def __init__(self, name): - """keep track of stats for metrics calls""" - self._name = name or "stats_" + str(uuid.uuid4()) - self._lock = Lock() - self._call_count = 0 - self._error_count = 0 - self._longest_timer = 0 - self._total_timer = 0 - self._last_success = None - self._last_error = None - - def dump(self): - """returns dict of stats""" - dump = None - with self._lock: - dump = { - "call_count" : self._call_count, - "error_count" : self._error_count, - "last_success" : str(self._last_success), - "last_error" : str(self._last_error), - "longest_timer_millisecs" : self._longest_timer, - "ave_timer_millisecs" : (float(self._total_timer)/self._call_count \ - if self._call_count else 0) - } - return dump - - def success(self, timer): - """records the successful execution""" - with self._lock: - self._call_count += 1 - self._last_success = datetime.now() - self._total_timer += timer - if not self._longest_timer or self._longest_timer < timer: - self._longest_timer = timer - - def error(self, timer): - """records the errored execution""" - with self._lock: - self._call_count += 1 - self._error_count += 1 - self._last_error = datetime.now() - self._total_timer += timer - if not self._longest_timer or self._longest_timer < timer: - self._longest_timer = timer - -class Health(object): - """Health stats for multiple requests""" - def __init__(self): - """Health stats for application""" - self._all_stats = {} - self._lock = Lock() - - def _add_or_get_stats(self, stats_name): - """add to or get from the ever growing dict of HealthStats""" - stats = None - with self._lock: - stats = self._all_stats.get(stats_name) - if not stats: - self._all_stats[stats_name] = stats = HealthStats(stats_name) - return stats - - def success(self, stats_name, timer): - """records the successful execution on stats_name""" - stats = self._add_or_get_stats(stats_name) - stats.success(timer) - - def error(self, stats_name, timer): - """records the error execution on stats_name""" - stats = self._add_or_get_stats(stats_name) - stats.error(timer) - - def dump(self): - """returns dict of stats""" - with self._lock: - stats = dict((k, v.dump()) for (k, v) in self._all_stats.items()) - - return stats diff --git a/oti/event-handler/otihandler/utils.py b/oti/event-handler/otihandler/utils.py deleted file mode 100644 index 4f9dbda..0000000 --- a/oti/event-handler/otihandler/utils.py +++ /dev/null @@ -1,83 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -import base64 -import collections -import copy -import os - -from Crypto import Random -from Crypto.Cipher import PKCS1_v1_5 -from Crypto.Hash import SHA -from Crypto.PublicKey import RSA - - -def update_dict(d, u): - """Recursively updates dict - - Update dict d with dict u - """ - for k, v in u.items(): - if isinstance(v, collections.Mapping): - r = update_dict(d.get(k, {}), v) - d[k] = r - else: - d[k] = u[k] - return d - -def replace_token(configure_content): - try: - with open("/opt/app/config-map/dcae-k8s-cluster-token",'r') as fh: - dcae_token = fh.readline().rstrip('\n') - - new_config = copy.deepcopy(configure_content) - - # override the default-user token - ix=0 - for user in new_config['users'][:]: - if user['name'] == "default-user": - new_config['users'][ix] = { - "name": "default-user", - "user": { - "token": dcae_token - } - } - ix += 1 - - return new_config - - except Exception as e: - return configure_content - -def decrypt(b64_ciphertext): - """returns decrypted b64_ciphertext that was encoded like this: - - echo "cleartext" | openssl pkeyutl -encrypt -pubin -inkey rsa.pub | base64 --wrap=0 - - requires private key in environment variable EOMUSER_PRIVATE - """ - - if len(b64_ciphertext) <= 30: # For transition, assume short values are not encrypted - return b64_ciphertext - - try: - ciphertext = base64.b64decode(b64_ciphertext) - key = RSA.importKey(os.getenv('EOMUSER_PRIVATE')) - cleartext = PKCS1_v1_5.new(key).decrypt(ciphertext, Random.new().read(15+SHA.digest_size)) - except Exception as e: - return b64_ciphertext - - return cleartext diff --git a/oti/event-handler/otihandler/web_server.py b/oti/event-handler/otihandler/web_server.py deleted file mode 100644 index 45c407f..0000000 --- a/oti/event-handler/otihandler/web_server.py +++ /dev/null @@ -1,605 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -"""web-service for oti_handler""" - -import json -import logging -import os -import time -from datetime import datetime - -import cherrypy - -from otihandler.cbs_rest import CBSRest -from otihandler.config import Config -from otihandler.dti_processor import DTIProcessor -from otihandler.onap.audit import Audit - - -class DTIWeb(object): - """run REST API of OTI Handler""" - - logger = logging.getLogger("oti_handler.web_server") - HOST_INADDR_ANY = ".".join("0"*4) - - @staticmethod - def run_forever(audit): - """run the web-server of OTI Handler forever""" - - cherrypy.config.update({"server.socket_host": DTIWeb.HOST_INADDR_ANY, - "server.socket_port": Config.wservice_port}) - - protocol = "http" - tls_info = "" - if Config.tls_server_cert_file and Config.tls_private_key_file: - tm_cert = os.path.getmtime(Config.tls_server_cert_file) - tm_key = os.path.getmtime(Config.tls_private_key_file) - #cherrypy.server.ssl_module = 'builtin' - cherrypy.server.ssl_module = 'pyOpenSSL' - cherrypy.server.ssl_certificate = Config.tls_server_cert_file - cherrypy.server.ssl_private_key = Config.tls_private_key_file - if Config.tls_server_ca_chain_file: - cherrypy.server.ssl_certificate_chain = Config.tls_server_ca_chain_file - protocol = "https" - tls_info = "cert: {} {} {}".format(Config.tls_server_cert_file, - Config.tls_private_key_file, - Config.tls_server_ca_chain_file) - - cherrypy.tree.mount(_DTIWeb(), '/') - - DTIWeb.logger.info( - "%s with config: %s", audit.info("running oti_handler as {}://{}:{} {}".format( - protocol, cherrypy.server.socket_host, cherrypy.server.socket_port, tls_info)), - json.dumps(cherrypy.config)) - cherrypy.engine.start() - - # If HTTPS server certificate changes, exit to let kubernetes restart us - if Config.tls_server_cert_file and Config.tls_private_key_file: - while True: - time.sleep(600) - c_tm_cert = os.path.getmtime(Config.tls_server_cert_file) - c_tm_key = os.path.getmtime(Config.tls_private_key_file) - if c_tm_cert > tm_cert or c_tm_key > tm_key: - DTIWeb.logger.info("cert or key file updated") - cherrypy.engine.stop() - cherrypy.engine.exit() - break - - -class _DTIWeb(object): - """REST API of DTI Handler""" - - VALID_EVENT_TYPES = ['deploy', 'undeploy', 'add', 'delete', 'update', 'notify'] - - @staticmethod - def _get_request_info(request): - """Returns info about the http request.""" - - return "{} {}{}".format(request.method, request.script_name, request.path_info) - - - #----- Common endpoint methods - - @cherrypy.expose - @cherrypy.tools.json_out() - def healthcheck(self): - """Returns healthcheck results.""" - - req_info = _DTIWeb._get_request_info(cherrypy.request) - audit = Audit(req_message=req_info, headers=cherrypy.request.headers) - - DTIWeb.logger.info("%s", req_info) - - result = Audit.health() - - DTIWeb.logger.info("healthcheck %s: result=%s", req_info, json.dumps(result)) - - audit.audit_done(result=json.dumps(result)) - return result - - @cherrypy.expose - def shutdown(self): - """Shutdown the web server.""" - - req_info = _DTIWeb._get_request_info(cherrypy.request) - audit = Audit(req_message=req_info, headers=cherrypy.request.headers) - - DTIWeb.logger.info("%s: --- stopping REST API of DTI Handler ---", req_info) - - cherrypy.engine.exit() - - health = json.dumps(Audit.health()) - audit.info("oti_handler health: {}".format(health)) - DTIWeb.logger.info("oti_handler health: %s", health) - DTIWeb.logger.info("%s: --------- the end -----------", req_info) - result = str(datetime.now()) - audit.info_requested(result) - return "goodbye! shutdown requested {}".format(result) - - # ----- DTI Handler mock endpoint methods - @cherrypy.expose - @cherrypy.tools.json_out() - @cherrypy.tools.json_in() - def mockevents(self): - - result = {"KubeNamespace":"com-my-dcae-test", "KubePod":"pod-0", "KubeServiceName":"pod-0.service.local", "KubeServicePort":"8880", "KubeClusterFqdn":"fqdn-1"} - - return result - - #----- DTI Handler endpoint methods - - @cherrypy.expose - @cherrypy.tools.json_out() - @cherrypy.tools.json_in() - def events(self, notify="y"): - """ - Run dti reconfig script in service component instances configured to accept the DTI Event. - - POST /events < <dcae_event> - - POST /events?ndtify="n" < <dcae_event> - - where <dcae_event> is the entire DTI Event passed as a JSON object and contains at least these top-level keys: - dcae_service_action : string - required, 'deploy' or 'undeploy' - dcae_target_name : string - required, VNF Instance ID - dcae_target_type : string - required, VNF Type of the VNF Instance - dcae_service_location : string - optional, CLLI location. Not provided or '' infers all locations. - - Parameters - ---------- - notify : string - optional, default "y", any of these will not notify components: [ "f", "false", "False", "FALSE", "n", "no" ] - When "n" will **not** notify components of this DTI Event update to Consul. - - Returns - ------- - dict - JSON object containing success or error of executing the dti reconfig script on - each component instance's docker container, keyed by service_component_name. - - """ - - if cherrypy.request.method != "POST": - raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) - - msg = "" - - dti_event = cherrypy.request.json or {} - str_dti_event = json.dumps(dti_event) - - req_info = _DTIWeb._get_request_info(cherrypy.request) - audit = Audit(req_message="{}: {}".format(req_info, str_dti_event), \ - headers=cherrypy.request.headers) - DTIWeb.logger.info("%s: dti_event=%s headers=%s", \ - req_info, str_dti_event, json.dumps(cherrypy.request.headers)) - - dcae_service_action = dti_event.get('dcae_service_action') - if not dcae_service_action: - msg = 'dcae_service_action is missing' - elif dcae_service_action.lower() not in self.VALID_EVENT_TYPES: - msg = 'dcae_service_action is invalid' - - dcae_target_name = dti_event.get('dcae_target_name') - if not msg and not dcae_target_name: - msg = 'dcae_target_name is missing' - - dcae_target_type = dti_event.get('dcae_target_type', '') - if not msg and not dcae_target_type: - msg = 'dcae_target_type is missing' - - if msg: - result = {"ERROR": msg} - - DTIWeb.logger.error("%s: dti_event=%s result=%s", \ - req_info, str_dti_event, json.dumps(result)) - else: - send_notification = True - if (isinstance(notify, bool) and not notify) or \ - (isinstance(notify, str) and notify.lower() in [ "f", "false", "n", "no" ]): - send_notification = False - - prc = DTIProcessor(dti_event, send_notification=send_notification) - result = prc.get_result() - - DTIWeb.logger.info("%s: dti_event=%s result=%s", \ - req_info, str_dti_event, json.dumps(result)) - - success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) - if msg: - cherrypy.response.status = "400 Bad Request" - elif not success: - cherrypy.response.status = http_status_code - - return result - - def get_docker_events(self, request, service, location): - """ - common routine for dti_docker_events and oti_docker_events - - :param request: HTTP GET request - :param service: HTTP request query parameter for service name - :param location: HTTP request query parameter for location CLLI - :return: - """ - - if request.method != "GET": - raise cherrypy.HTTPError(404, "unexpected method {}".format(request.method)) - - req_info = _DTIWeb._get_request_info(request) - audit = Audit(req_message=req_info, headers=request.headers) - - return DTIProcessor.get_docker_raw_events(service, location) - - def get_k8s_events(self, request, **params): - """ - common routine for dti_k8s_events and oti_k8s_events - - :param request: HTTP GET request - :param params: HTTP request query parameters - :return: - """ - if request.method != "GET": - raise cherrypy.HTTPError(404, "unexpected method {}".format(request.method)) - - req_info = _DTIWeb._get_request_info(request) - audit = Audit(req_message=req_info, headers=request.headers) - - pod = request.params['pod'] - namespace = request.params['namespace'] - cluster = request.params['cluster'] - - return DTIProcessor.get_k8_raw_events(pod, cluster, namespace) - - @cherrypy.expose - @cherrypy.tools.json_out() - def oti_k8s_events(self, **params): - """ - Retrieve raw JSON events from application events database - - GET /oti_k8_events?pod=<sts-1>&namespace=<ns1>&cluster=<cluster1> - - Parameters - ---------- - pod ID : string - POD ID of the stateful set POD - namespace: string - kubernetes namespace - cluster: string - kubernetes cluster FQDN - - Returns - ------- - dict - JSON object containing the fully-bound configuration. - - """ - - return self.get_k8s_events(cherrypy.request, params) - - @cherrypy.expose - @cherrypy.tools.json_out() - def dti_k8s_events(self, **params): - """ - Retrieve raw JSON events from application events database - - GET /dti_k8_events?pod=<sts-1>&namespace=<ns1>&cluster=<cluster1> - - Parameters - ---------- - pod ID : string - POD ID of the stateful set POD - namespace: string - kubernetes namespace - cluster: string - kubernetes cluster FQDN - - Returns - ------- - dict - JSON object containing the fully-bound configuration. - - """ - - return self.get_k8s_events(cherrypy.request, params) - - @cherrypy.expose - @cherrypy.tools.json_out() - def oti_docker_events(self, service, location=None): - """ - Retrieve raw JSON events from application events database related to docker deployments - - GET /oti_docker_events?service=<svc>&location=<location> - - Parameters - ---------- - service : string - The service component name assigned by dockerplugin to the component - that is unique to the cloudify node instance and used in its Consul key(s). - location : string - optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location. - If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided, - otherwise results are not location filtered. - - Returns - ------- - dict - JSON object containing the fully-bound configuration. - - """ - - return self.get_docker_events(cherrypy.request, service, location) - - @cherrypy.expose - @cherrypy.tools.json_out() - def dti_docker_events(self, service, location=None): - """ - Retrieve raw JSON events from application events database related to docker deployments - - GET /dti_docker_events?service=<svc>&location=<location> - - Parameters - ---------- - service : string - The service component name assigned by dockerplugin to the component - that is unique to the cloudify node instance and used in its Consul key(s). - location : string - optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location. - If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided, - otherwise results are not location filtered. - - Returns - ------- - dict - JSON object containing the fully-bound configuration. - - """ - - return self.get_docker_events(cherrypy.request, service, location) - - #----- Config Binding Service (CBS) endpoint methods - - @cherrypy.expose - @cherrypy.popargs('service_name') - @cherrypy.tools.json_out() - def service_component(self, service_name): - """ - Retrieve fully-bound configuration for service_name from Consul KVs. - - GET /service_component/<service_name> - - Parameters - ---------- - service_name : string - The service component name assigned by dockerplugin to the component - that is unique to the cloudify node instance and used in its Consul key(s). - - Returns - ------- - dict - JSON object containing the fully-bound configuration. - - """ - - if cherrypy.request.method != "GET": - raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) - - req_info = _DTIWeb._get_request_info(cherrypy.request) - audit = Audit(req_message=req_info, headers=cherrypy.request.headers) - DTIWeb.logger.info("%s: service_name=%s headers=%s", \ - req_info, service_name, json.dumps(cherrypy.request.headers)) - - try: - result = CBSRest.get_service_component(service_name) - except Exception as e: - result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} - audit.set_http_status_code(404) - - DTIWeb.logger.info("%s: service_name=%s result=%s", \ - req_info, service_name, json.dumps(result)) - - success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) - if not success: - cherrypy.response.status = http_status_code - - return result - - @cherrypy.expose - @cherrypy.popargs('service_name') - @cherrypy.tools.json_out() - def service_component_all(self, service_name, service_location=None, policy_ids="y"): - """ - Retrieve all information for service_name (config, dti, dti_events, and policies) from Consul KVs. - - GET /service_component_all/<service_name> - - GET /service_component_all/<service_name>?service_location=<service_location> - - GET /service_component_all/<service_name>?service_location=<service_location>;policy_ids=n - - Parameters - ---------- - service_name : string - The service component name assigned by dockerplugin to the component - that is unique to the cloudify node instance and used in its Consul key(s). - service_location : string - optional, allows multiple values separated by commas. - Filters DTI events with dcae_service_location in service_location. - policy_ids : string - optional, default "y", any of these will unset: [ "f", "false", "False", "FALSE", "n", "no" ] - When unset, formats policies items as a list (without policy_ids) rather than as an object indexed by policy_id. - - Returns - ------- - dict - JSON object containing all information for component service_name. - The top-level keys may include the following: - config : dict - The cloudify node's application_config property from when the start workflow was executed. - dti : dict - Keys are VNF Types that the component currently is assigned to monitor. Policy can change them. - dti_events : dict - The latest deploy DTI events, keyed by VNF Type and sub-keyed by VNF Instance ID. - policies : dict - event : dict - Contains information about when the policies folder was last written. - items : dict - Contains all policy bodies for the service_name component, keyed by policy_id. - - """ - - if cherrypy.request.method != "GET": - raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) - - req_info = _DTIWeb._get_request_info(cherrypy.request) - audit = Audit(req_message=req_info, headers=cherrypy.request.headers) - DTIWeb.logger.info("%s: service_name=%s headers=%s", \ - req_info, service_name, json.dumps(cherrypy.request.headers)) - - policies_as_list = False - if (isinstance(policy_ids, bool) and not policy_ids) or \ - (isinstance(policy_ids, str) and policy_ids.lower() in [ "f", "false", "n", "no" ]): - policies_as_list = True - try: - result = CBSRest.get_service_component_all(service_name, service_location=service_location, policies_as_list=policies_as_list) - except Exception as e: - result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} - audit.set_http_status_code(404) - - DTIWeb.logger.info("%s: service_name=%s result=%s", \ - req_info, service_name, json.dumps(result)) - - success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) - if not success: - cherrypy.response.status = http_status_code - - return result - - @cherrypy.expose - @cherrypy.popargs('service_name') - @cherrypy.tools.json_out() - def dti(self, service_name=None, vnf_type=None, vnf_id=None, service_location=None): - """ - Retrieve current (latest, not undeployed) DTI events from Consul KVs. - - GET /dti/<service_name> - - GET /dti/<service_name>?vnf_type=<vnf_type>;vnf_id=<vnf_id>;service_location=<service_location> - - GET /dti - - GET /dti?vnf_type=<vnf_type>;vnf_id=<vnf_id>;service_location=<service_location> - - Parameters - ---------- - service_name : string - optional. The service component name assigned by dockerplugin to the component - that is unique to the cloudify node instance and used in its Consul key(s). - vnf_type : string - optional, allows multiple values separated by commas. Gets DTI events for these vnf_type(s). - vnf_id : string - optional. Requires vnf_type also. Gets DTI event for this vnf_id. - service_location : string - optional, allows multiple values separated by commas. - Filters DTI events with dcae_service_location in service_location. - - Returns - ------- - dict - Dictionary of DTI event(s). - If one vnf_type and vnf_id are both specified, then object returned will be just the one DTI event. - If one vnf_type is specified but not vnf_id, then DTI events will be keyed by vnf_id. - Otherwise the DTI events will be keyed by vnf_type, sub-keyed by vnf_id. - - """ - - if cherrypy.request.method != "GET": - raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) - - req_info = _DTIWeb._get_request_info(cherrypy.request) - audit = Audit(req_message=req_info, headers=cherrypy.request.headers) - DTIWeb.logger.info("%s: service_name=%s headers=%s", \ - req_info, service_name, json.dumps(cherrypy.request.headers)) - - try: - result = CBSRest.get_oti(service_name=service_name, vnf_type=vnf_type, vnf_id=vnf_id, service_location=service_location) - except Exception as e: - result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} - audit.set_http_status_code(404) - - DTIWeb.logger.info("%s: service_name=%s result=%s", \ - req_info, service_name, json.dumps(result)) - - success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) - if not success: - cherrypy.response.status = http_status_code - - return result - - @cherrypy.expose - @cherrypy.popargs('service_name') - @cherrypy.tools.json_out() - def policies(self, service_name, policy_id=None): - """ - Retrieve policies for service_name from Consul KVs. - - GET /policies/<service_name> - - GET /policies/<service_name>?policy_id=<policy_id> - - Parameters - ---------- - service_name : string - The service component name assigned by dockerplugin to the component - that is unique to the cloudify node instance and used in its Consul key(s). - policy_id : string - optional. Limits returned policy to this policy_id. - - Returns - ------- - dict - JSON object containing policy bodies for the service_name component. - If policy_id is specified, then object returned will be just the one policy body. - If policy_id is not specified, then object will contain all policy bodies, keyed by policy_id. - - """ - - if cherrypy.request.method != "GET": - raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) - - req_info = _DTIWeb._get_request_info(cherrypy.request) - audit = Audit(req_message=req_info, headers=cherrypy.request.headers) - DTIWeb.logger.info("%s: service_name=%s headers=%s", \ - req_info, service_name, json.dumps(cherrypy.request.headers)) - - try: - result = CBSRest.get_policies(service_name, policy_id=policy_id) - except Exception as e: - result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} - audit.set_http_status_code(404) - - DTIWeb.logger.info("%s: service_name=%s result=%s", \ - req_info, service_name, json.dumps(result)) - - success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) - if not success: - cherrypy.response.status = http_status_code - - return result |