diff options
Diffstat (limited to 'oti/event-handler/otihandler')
22 files changed, 5235 insertions, 0 deletions
diff --git a/oti/event-handler/otihandler/__init__.py b/oti/event-handler/otihandler/__init__.py new file mode 100644 index 0000000..87cf002 --- /dev/null +++ b/oti/event-handler/otihandler/__init__.py @@ -0,0 +1,15 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= diff --git a/oti/event-handler/otihandler/__main__.py b/oti/event-handler/otihandler/__main__.py new file mode 100644 index 0000000..59a7087 --- /dev/null +++ b/oti/event-handler/otihandler/__main__.py @@ -0,0 +1,74 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""run as server: python -m otihandler""" + +import logging +import os +import sys + +from otihandler.config import Config +from otihandler.onap.audit import Audit +from otihandler.web_server import DTIWeb +from otihandler.dbclient import DaoBase + + +class LogWriter(object): + """redirect the standard out + err to the logger""" + + def __init__(self, logger_func): + self.logger_func = logger_func + + def write(self, log_line): + """actual writer to be used in place of stdout or stderr""" + + log_line = log_line.rstrip() + if log_line: + self.logger_func(log_line) + + def flush(self): + """no real flushing of the buffer""" + + pass + + +def run_event_handler(): + """main run function for event_handler""" + + Config.load_from_file() + # Config.discover() + + logger = logging.getLogger("event_handler") + sys.stdout = LogWriter(logger.info) + sys.stderr = LogWriter(logger.error) + + logger.info("========== run_event_handler ==========") + app_version = os.getenv("APP_VER") + logger.info("app_version %s", app_version) + Audit.init(Config.get_system_name(), app_version, Config.LOGGER_CONFIG_FILE_PATH) + + logger.info("starting event_handler with config:") + logger.info(Audit.log_json_dumps(Config.config)) + + audit = Audit(req_message="start event_handler") + + audit = Audit(req_message="DB init start") + DaoBase.init_db(os.environ.get("DB_CONN_URL")) + + DTIWeb.run_forever(audit) + +if __name__ == "__main__": + run_event_handler() diff --git a/oti/event-handler/otihandler/cbs_rest.py b/oti/event-handler/otihandler/cbs_rest.py new file mode 100644 index 0000000..2e3e7de --- /dev/null +++ b/oti/event-handler/otihandler/cbs_rest.py @@ -0,0 +1,202 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""REST for high-level information retrievals from Consul KVs""" + +import copy +import logging + +from otihandler.consul_client import ConsulClient + + +class CBSRest(object): + _logger = logging.getLogger("oti_handler.cbs_rest") + + @staticmethod + def get_value(key, default=None): + """Wrap ConsulClient.get_value() to ignore exceptions.""" + + data = default + try: + data = ConsulClient.get_value(key) + except Exception as e: + pass + + return data + + @staticmethod + def get_kvs(key): + """Wrap ConsulClient.get_kvs() to ignore exceptions.""" + + data = {} + try: + data = ConsulClient.get_kvs(key, trim_prefix=True) + except Exception as e: + data = {} + + if not data: + data = {} + return data + + @staticmethod + def get_service_component(service_name): + """Get the fully-bound config for a service_name.""" + + return ConsulClient.get_service_component(service_name) + + @staticmethod + def get_service_component_all(service_name, service_location=None, policies_as_list=False): + """Get all Consul objects for a service_name.""" + + r_dict = ConsulClient.get_service_component_all(service_name, policies_as_list=policies_as_list) + if r_dict and r_dict.get('oti'): + r_dict['oti'] = CBSRest.get_oti(service_name, service_location=service_location) + return r_dict + + @staticmethod + def get_oti(service_name=None, vnf_type=None, vnf_id=None, service_location=None): + """ + Get DTI events. + + Parameters + ---------- + service_name : string + optional. The service component name assigned by dockerplugin to the component that is unique to the cloudify node instance and used in its Consul key(s). + vnf_type : string + optional, allows multiple values separated by commas. Gets DTI events for these vnf_type(s). + vnf_id : string + optional. Requires vnf_type also. Gets DTI event for this vnf_id. + service_location : string + optional, allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location. + If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided, + otherwise results are not location filtered. + + Returns + ------- + dict + Dictionary of DTI event(s). + If one vnf_type and vnf_id are both specified, then object returned will be just the one DTI event. + If one vnf_type is specified but not vnf_id, then DTI events will be keyed by vnf_id. + Otherwise the DTI events will be keyed by vnf_type, sub-keyed by vnf_id. + + """ + + lc_vnf_type = vnf_type + if vnf_type: + lc_vnf_type = vnf_type.lower() + + r_dict = {} + + want_locs = [] + if service_location: + want_locs = service_location.split(',') + + give_types = [] + if service_name: + if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node + try: + node_name = ConsulClient.lookup_service(service_name)[0].get("Node") + if node_name: + services = ConsulClient.lookup_node(node_name).get("Services") + if services: + for node_svc in list(services.keys()): + if "-component-dockerhost-" in node_svc or "_component_kubernetes_master" in node_svc: + want_locs = services[node_svc].get("Tags", []) + break + except: + pass + supported_types = ConsulClient.get_value(service_name + ":oti") + if supported_types: + supported_types = [type.lower() for type in list(supported_types.keys())] + if supported_types: + if lc_vnf_type: # If user specifies vnf_type(s), constrain to supported ones + for type in lc_vnf_type.split(','): + if type in supported_types: + give_types.append(type) + else: + give_types = supported_types + if not give_types or (len(give_types) == 1 and give_types[0] == ''): + return r_dict + elif lc_vnf_type: + give_types = lc_vnf_type.split(',') + + + # If they specified only one vnf_type ... + if lc_vnf_type and ',' not in lc_vnf_type: + type = give_types[0] + + # ... and vnf_id + if vnf_id: + # get just one vnf_id + t_dict = CBSRest.get_value("oti_events/" + type + "/" + vnf_id, default=None) + if t_dict: + event_loc = t_dict.get('dcae_service_location') + if not event_loc or not want_locs or event_loc in want_locs: + r_dict = copy.deepcopy(t_dict) + + # ... and not vnf_id + else: + # get all DTI events of just one type, indexed by vnf_id + t_dict = CBSRest.get_kvs("oti_events/" + type + "/") + if t_dict: + if not want_locs: + r_dict = copy.deepcopy(t_dict) + else: + for id in t_dict: + event_loc = t_dict[id].get('dcae_service_location') + if not event_loc or event_loc in want_locs: + r_dict[id] = copy.deepcopy(t_dict[id]) + + # If they did not specify either service_name or vnf_type (the only way give_types=[]) + elif not give_types: + # get all DTI events, indexed by vnf_type then vnf_id + t_dict = CBSRest.get_kvs("oti_events/") + if t_dict: + for type in t_dict: + for id in t_dict[type]: + if not vnf_id or vnf_id == id: + if want_locs: + event_loc = t_dict[type][id].get('dcae_service_location') + if not want_locs or not event_loc or event_loc in want_locs: + if type not in r_dict: + r_dict[type] = {} + r_dict[type][id] = copy.deepcopy(t_dict[type][id]) + + # If they speclfied multiple vnf_types + else: + # get all DTI events of give_types, indexed by vnf_type then vnf_id + for type in give_types: + t_dict = CBSRest.get_kvs("oti_events/" + type + "/") + if t_dict: + for id in t_dict: + if not vnf_id or vnf_id == id: + if want_locs: + event_loc = t_dict[id].get('dcae_service_location') + if not want_locs or not event_loc or event_loc in want_locs: + if type not in r_dict: + r_dict[type] = {} + r_dict[type][id] = copy.deepcopy(t_dict[id]) + + return r_dict + + @staticmethod + def get_policies(service_name, policy_id=None): + """Get one or all policies for a service_name.""" + + if policy_id: + return ConsulClient.get_value(service_name + ":policies/items/" + policy_id) + else: + return ConsulClient.get_kvs(service_name + ":policies/items/", trim_prefix=True) diff --git a/oti/event-handler/otihandler/cfy_client.py b/oti/event-handler/otihandler/cfy_client.py new file mode 100644 index 0000000..c823340 --- /dev/null +++ b/oti/event-handler/otihandler/cfy_client.py @@ -0,0 +1,638 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""Our client interface to Cloudify""" +import base64 +import copy +import json +import logging +import os + +import requests + +from otihandler.consul_client import ConsulClient + + +class CfyClientConsulError(RuntimeError): + pass + + +class CloudifyClient(object): + """quick replacement for cloudify_rest_client -- this supports pagination and scans all DCAE tenants""" + + def __init__(self, **kwargs): + self._protocol = kwargs.get('protocol', 'http') + self._host = kwargs.get('host') + self._port = kwargs.get('port') + self._headers = kwargs.get('headers') + + self.node_instances = self + + def list(self, **kwargs): + url_mask = "{}://{}:{}/api/v3.1/tenants".format(self._protocol, self._host, self._port) + # s = Session() + # req = Request('GET', url_mask, headers=self._headers) + # prepped = req.prepare() + # response = s.send(prepped,verify=False,timeout=30) + response = requests.get(url_mask, headers=self._headers, timeout=30) + obj = response.json() + tenants = [x["name"] for x in obj["items"]] + tenants_with_containers = [x for x in tenants if 'DCAE' in x] + + size = 1000 + url_mask = "{}://{}:{}/api/v3.1/node-instances?_size={}&_offset={}".format( + self._protocol, self._host, self._port, size, "{}") + if kwargs: + for (key,val) in kwargs.items(): + if isinstance(val, str): + url_mask = url_mask + '&{}={}'.format(key, val) + elif isinstance(val, list): + url_mask = url_mask + '&{}={}'.format(key, ','.join(val)) + + for tenant in tenants_with_containers: + self._headers_with_tenant = copy.deepcopy(self._headers) + self._headers_with_tenant['Tenant'] = tenant + + offset = 0 + total = 1 + while offset < total: + # s = Session() + # req = Request('GET', url_mask.format(offset), headers=self._headers_with_tenant) + # prepped = req.prepare() + # response = s.send(prepped, verify=False, timeout=30) + response = requests.get(url_mask.format(offset), headers=self._headers_with_tenant, timeout=30) + response.raise_for_status() + obj = response.json() + offset = offset + len(obj["items"]) + total = obj["metadata"]["pagination"]["total"] + for item in obj["items"]: + yield NodeInstance(item) + + def update_node_instance(self, node_instance_id, body, **kwargs): + headers = copy.deepcopy(self._headers_with_tenant) + headers['Content-Type'] = "application/json" + url_mask = "{}://{}:{}/api/v3.1/node-instances/{}".format( + self._protocol, self._host, self._port, node_instance_id) + response = requests.patch(url_mask, json=body, headers=headers, timeout=30) + obj = response.json() + return obj + + +class NodeInstance(object): + """quick replacement for cloudify_rest_client""" + + def __init__(self, instance): + self.id = instance.get("id") + self.deployment_id = instance.get("deployment_id") + self.host_id = instance.get("host_id") + self.runtime_properties = instance.get("runtime_properties") + self.relationships = instance.get("relationships") + self.state = instance.get("state") + self.version = instance.get("version") + self.node_id = instance.get("node_id") + self.scaling_groups = instance.get("scaling_groups") + + +class CfyClient(object): + _logger = logging.getLogger("oti_handler.cfy_client") + _client = None + + + @staticmethod + def __set_cloudify_manager_client(): + """Create connection to Cloudify_Manager.""" + + if CfyClient._client: + return + + host = None + port = None + obj = json.loads(os.environ.get("CLOUDIFY", "{}")).get("cloudify") + source = "CLOUDIFY environment variable" + if not obj: + CM_KEY = 'cloudify_manager' + source = "Consul key '{}'".format(CM_KEY) + + try: + results = ConsulClient.lookup_service(CM_KEY) + except Exception as e: + msg = "Unexpected exception {}: {!s} from ConsulClient.lookup_service({})".format(type(e).__name__, e, CM_KEY) + CfyClient._logger.error(msg) + raise CfyClientConsulError(msg) + result = results[0] + host = result['ServiceAddress'] + port = result['ServicePort'] + + try: + obj = ConsulClient.get_value(CM_KEY) + except Exception as e: + msg = "Unexpected exception {}: {!s} from ConsulClient.get_value({})".format(type(e).__name__, e, CM_KEY) + CfyClient._logger.error(msg) + raise CfyClientConsulError(msg) + if not obj: + raise CfyClientConsulError("{} value is empty or invalid".format(source)) + + obj = obj.get('cloudify') + + if not obj: + raise CfyClientConsulError("{} value is missing 'cloudify' key or value".format(source)) + + host = obj.get('address', host) + if not host: + raise CfyClientConsulError("{} value is missing 'cloudify.address'".format(source)) + + port = obj.get('port', port) + if not port: + raise CfyClientConsulError("{} value is missing 'cloudify.port'".format(source)) + + protocol = obj.get('protocol') + if not protocol: + raise CfyClientConsulError("{} value is missing 'cloudify.protocol'".format(source)) + username = obj.get('user') + if not username: + raise CfyClientConsulError("{} value is missing 'cloudify.user'".format(source)) + password = obj.get('password') + if not password: + raise CfyClientConsulError("{} value is missing 'cloudify.password'".format(source)) + + b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), 'utf-8')).decode("utf-8") + headers = {'Authorization': 'Basic ' + b64_encoded_str.rstrip('\n')} + #headers = {'Authorization': 'Basic ' + '{}:{}'.format(username, password).encode("base64").rstrip('\n')} + + CfyClient._client = CloudifyClient(host=host, port=port, protocol=protocol, headers=headers) + + + @staticmethod + def query_k8_components(in_cluster_fqdn): + """ + Iterate components that belong to a cluster fqdn. + + Parameters + ---------- + in_cluster_fqdn : string + k8s cluster FQDN + + Returns + ------- + A generator of tuples of component information + [ (proxy_fqdn, namespace, scn, replicas, scn_port), ... ] + """ + + cnt_found = 0 + CfyClient.__set_cloudify_manager_client() + for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"): + rtp = node_instance.runtime_properties + scn_port = None + cluster_fqdn = None + proxy_fqdn = None + dti_info = rtp.get('dti_info') + if dti_info: + env_items = dti_info.get('env') + for env in env_items: + if env.get("name") == 'KUBE_CLUSTER_FQDN': + cluster_fqdn = env.get("value") + if env.get("name") == 'KUBE_PROXY_FQDN': + proxy_fqdn = env.get("value") + ports = dti_info.get('ports') + if ports: + scn_port = ports[0].split(':')[0] + else: + continue + + if in_cluster_fqdn != cluster_fqdn: + continue + + controller_type = rtp.get('k8s_controller_type') + if not controller_type: + CfyClient._logger.debug("controller type is missing") + continue + elif controller_type != "statefulset": + CfyClient._logger.debug("not a stateful set") + continue + + container_id = rtp.get('k8s_deployment') + if not container_id: + CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format( + node_instance.deployment_id, node_instance.id)) + continue + + try: + namespace = container_id.get('namespace') + except: + namespace = '' + pass + + replicas = 1 + try: + replicas = rtp.get('replicas') + except: + pass + + scn = rtp.get('service_component_name') + if not scn: + CfyClient._logger.debug( + "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, + node_instance.id)) + continue + + cnt_found += 1 + yield (proxy_fqdn, namespace, scn, replicas, scn_port) + continue + + msg = "Found {} components (collectors) for cluster={}" \ + .format(cnt_found, in_cluster_fqdn) + CfyClient._logger.debug(msg) + + + @staticmethod + def iter_components(dcae_target_type, dcae_service_location='', component_type=''): + """ + Iterate components that handle a given dcae_target_type. + + Parameters + ---------- + dcae_target_type : string + VNF Type + dcae_service_location : string + Location of the component (optional) + component_type : string + Type of the component (optional) + + Returns + ------- + A generator of tuples of component information + [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ] + or + [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ] + + """ + + cnt_found = 0 + + # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI) + dockerhosts = [] + k8s_svcs_tagged_with_clli = [] + if dcae_service_location: + try: + dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location]) + except Exception as e: + msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "-component-dockerhost-", [dcae_service_location]) + CfyClient._logger.error(msg) + raise CfyClientConsulError(msg) + try: + k8s_svcs_tagged_with_clli = ConsulClient.search_services("_component_kubernetes_master", [dcae_service_location]) + except Exception as e: + msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "_component_kubernetes_master", [dcae_service_location]) + CfyClient._logger.error(msg) + raise CfyClientConsulError(msg) + + CfyClient.__set_cloudify_manager_client() + for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"): + rtp = node_instance.runtime_properties + + # Skip this node_instance if it is not a collector + container_type = "docker" + container_id = rtp.get('container_id') + docker_host = '' + svc_with_my_clli_tags = '' + if not container_id: + container_type = "k8s" + container_id = rtp.get('k8s_deployment') + if not container_id: + CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id)) + continue + docker_config = rtp.get('docker_config') + if not docker_config: + CfyClient._logger.debug("{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, node_instance.id)) + continue + dti_reconfig_script = "" + if container_type == "docker": + dti_reconfig_script = rtp.get('dti_reconfig_script') + if not dti_reconfig_script: + CfyClient._logger.debug("{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, node_instance.id)) + continue + elif container_type == "k8s": + dti_reconfig_script = docker_config.get('reconfigs',{}).get('dti') + if not dti_reconfig_script: + CfyClient._logger.debug("{} {} runtime_properties docker_config has no reconfigs.dti".format(node_instance.deployment_id, node_instance.id)) + continue + + scn = rtp.get('service_component_name') + scn_address = None + scn_port = None + if not scn: + CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id)) + continue + if container_type == "docker": + docker_host = rtp.get('selected_container_destination') + if not docker_host: + CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id)) + continue + elif container_type == "k8s": + try: + srvcCatalogItem = ConsulClient.lookup_service(scn)[0] + scn_address = srvcCatalogItem.get("ServiceAddress") + except: + CfyClient._logger.debug( + "{} {} runtime_properties has no consul svc catalog registry".format(node_instance.deployment_id, + node_instance.id)) + continue + svc_with_my_clli_tags = rtp.get('svc_with_my_clli_tags') + # e.g., scn="s908d92e232ed43..." + if not svc_with_my_clli_tags: + # We should not incur this burden. k8splugin should store this into runtime properties. + try: + node_name = srvcCatalogItem.get("Node") + if node_name: + # e.g., node_name="zldcdyh1adce3kpma00" + services = ConsulClient.lookup_node(node_name).get("Services") + if services: + for node_svc in list(services.keys()): + if "_component_kubernetes_master" in node_svc: + # e.g., node_svc="zldcdyh1adce3_kp_component_kubernetes_master" + svc_with_my_clli_tags = node_svc + break + except: + pass + # ... cache results we find into runtime properties to avoid searching again + if svc_with_my_clli_tags: + CfyClient._logger.debug("{} {} storing runtime property svc_with_my_clli_tags={}".format( + node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags)) + rtp['svc_with_my_clli_tags'] = svc_with_my_clli_tags + body = { + "runtime_properties": rtp, + "state": node_instance.state, + "version": 1 + int(node_instance.version) + } + try: + CfyClient._client.update_node_instance(node_instance.id, body) + except: + pass + + if not svc_with_my_clli_tags: + CfyClient._logger.debug("{} {} runtime_properties has no svc_with_my_clli_tags".format(node_instance.deployment_id, node_instance.id)) + continue + + # get the nodeport for statefulset sidecar service + dti_info = rtp.get('dti_info') + if dti_info: + ports = dti_info.get('ports') + if ports: + scn_port = ports[0].split(':')[1] + docker_host = rtp.get('configuration',{}).get('file_content') + if not docker_host: + CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id)) + continue + + # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG + if dcae_service_location: + if container_type == "docker" and docker_host not in dockerhosts: + CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}" + .format(node_instance.deployment_id, node_instance.id, docker_host, dcae_service_location)) + continue + elif container_type == "k8s" and svc_with_my_clli_tags not in k8s_svcs_tagged_with_clli: + CfyClient._logger.debug("{} {} svc_with_my_clli_tags {} is not TAGged with DTI Event dcae_service_location {}" + .format(node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags, dcae_service_location)) + continue + + # If DTI Event specifies component_type, then collector's service_component_type must match + if component_type: + c_component_type = rtp.get('service_component_type') + if component_type != c_component_type: + CfyClient._logger.debug("{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id)) + continue + + # Check if the collector supports this VNF Type + # scn:dti Consul key is authoritative for vnfTypes that a collector supports (not docker_config) + dti_key = scn + ':dti' + try: + obj = ConsulClient.get_value(dti_key) + except Exception as e: + CfyClient._logger.error( + "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}" + .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id) + ) + continue + if not obj: + CfyClient._logger.debug("{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, dti_key)) + continue + obj_types = set(k.lower() for k in obj) + if dcae_target_type.lower() in obj_types: + CfyClient._logger.debug("{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, dcae_target_type)) + cnt_found += 1 + yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, dti_reconfig_script, container_type, scn_address, scn_port ) + continue + else: + CfyClient._logger.debug("{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, dcae_target_type, dti_key)) + continue + + msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}"\ + .format(cnt_found, dcae_target_type, dcae_service_location, component_type) + CfyClient._logger.debug(msg) + + @staticmethod + def iter_components_for_docker(dcae_target_type, dcae_service_location='', component_type=''): + """ + Iterate components that handle a given dcae_target_type to find the components of docker type + + Parameters + ---------- + dcae_target_type : string + VNF Type + dcae_service_location : string + Location of the component (optional) + component_type : string + Type of the component (optional) + + Returns + ------- + A generator of tuples of component information + [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ] + + """ + + cnt_found = 0 + # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI) + dockerhosts = [] + + if dcae_service_location: + try: + dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location]) + except Exception as e: + msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format( + type(e).__name__, e, "-component-dockerhost-", [dcae_service_location]) + CfyClient._logger.error(msg) + raise CfyClientConsulError(msg) + + CfyClient.__set_cloudify_manager_client() + for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"): + rtp = node_instance.runtime_properties + + # Skip this node_instance if it is not a collector + container_type = "docker" + container_id = rtp.get('container_id') + if not container_id: + if not container_id: + CfyClient._logger.debug("{} {} runtime_properties has no container_id".format( + node_instance.deployment_id, node_instance.id)) + continue + docker_config = rtp.get('docker_config') + if not docker_config: + CfyClient._logger.debug( + "{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, + node_instance.id)) + continue + dti_reconfig_script = "" + dti_reconfig_script = rtp.get('dti_reconfig_script') + if not dti_reconfig_script: + CfyClient._logger.debug( + "{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, + node_instance.id)) + continue + scn = rtp.get('service_component_name') + if not scn: + CfyClient._logger.debug( + "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, + node_instance.id)) + continue + docker_host = rtp.get('selected_container_destination') + if not docker_host: + CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format( + node_instance.deployment_id, node_instance.id)) + continue + + # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG + if dcae_service_location: + if docker_host not in dockerhosts: + CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}" + .format(node_instance.deployment_id, node_instance.id, docker_host, + dcae_service_location)) + continue + + # If DTI Event specifies component_type, then collector's service_component_type must match + if component_type: + c_component_type = rtp.get('service_component_type') + if component_type != c_component_type: + CfyClient._logger.debug( + "{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id)) + continue + + # Check if the collector supports this VNF Type + # scn:dti Consul key is authoritative for vnfTypes that a collector supports (not docker_config) + dti_key = scn + ':dti' + try: + obj = ConsulClient.get_value(dti_key) + except Exception as e: + CfyClient._logger.error( + "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}" + .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id) + ) + continue + if not obj: + CfyClient._logger.debug( + "{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, + dti_key)) + continue + obj_types = set(k.lower() for k in obj) + if dcae_target_type.lower() in obj_types: + CfyClient._logger.debug( + "{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, + dcae_target_type)) + cnt_found += 1 + yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, + node_instance.state, docker_host, dti_reconfig_script, container_type, '', '') + continue + else: + CfyClient._logger.debug( + "{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, + dcae_target_type, dti_key)) + continue + + msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}" \ + .format(cnt_found, dcae_target_type, dcae_service_location, component_type) + CfyClient._logger.debug(msg) + + + @staticmethod + def iter_components_of_deployment(deployment_id, node_id=None, reconfig_type="app"): + """ + Iterate components of a specific deployment_id. + + Parameters + ---------- + deployment_id : string + Cloudify deployment ID that created the component(s). + node_id : string + Cloudify node ID that created the component. + reconfig_type : string + "app" + + Returns + ------- + A generator of tuples of component information + [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ] + or + [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ] + + """ + + cnt_found = 0 + + CfyClient.__set_cloudify_manager_client() + for node_instance in CfyClient._client.node_instances.list( + deployment_id=deployment_id, + _include=['id','node_id','deployment_id','state','runtime_properties'] + ): + if node_id and node_instance.node_id != node_id: + continue + + rtp = node_instance.runtime_properties + + # Skip this node_instance if it is not a collector + container_type = "docker" + container_id = rtp.get('container_id') + if not container_id: + container_type = "k8s" + container_id = rtp.get('k8s_deployment') + if not container_id: + CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id)) + continue + reconfig_script = rtp.get('docker_config',{}).get('reconfigs',{}).get(reconfig_type) + if not reconfig_script: + CfyClient._logger.debug("{} {} runtime_properties has no docker_config.reconfigs.{}".format(node_instance.deployment_id, node_instance.id, reconfig_type)) + continue + scn = rtp.get('service_component_name') + if not scn: + CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id)) + continue + if container_type == "docker": + docker_host = rtp.get('selected_container_destination') + if not docker_host: + CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id)) + continue + elif container_type == "k8s": + docker_host = rtp.get('configuration',{}).get('file_content') + if not docker_host: + CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id)) + continue + + CfyClient._logger.debug("{} {} is a {}-reconfigurable component".format(node_instance.deployment_id, node_instance.id, reconfig_type)) + cnt_found += 1 + yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, reconfig_script, container_type) + continue + + msg = "Found {} {}-reconfigurable components".format(cnt_found, reconfig_type) + CfyClient._logger.debug(msg) diff --git a/oti/event-handler/otihandler/config.py b/oti/event-handler/otihandler/config.py new file mode 100644 index 0000000..d5149cc --- /dev/null +++ b/oti/event-handler/otihandler/config.py @@ -0,0 +1,179 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""read and use the config""" + +import copy +import json +import logging +import logging.config +import os + +from otihandler.consul_client import ConsulClient + +logging.basicConfig( + filename='logs/oti_handler.log', \ + format='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \ + '%(threadName)s %(name)s.%(funcName)s: %(message)s', \ + datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG) + +class Config(object): + """main config of the application""" + + CONFIG_FILE_PATH = "etc/config.json" + LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config" + SERVICE_NAME = "oti_handler" + + FIELD_SYSTEM = "system" + FIELD_WSERVICE_PORT = "wservice_port" + FIELD_TLS = "tls" + + _logger = logging.getLogger("oti_handler.config") + config = None + + cloudify_proto = None + cloudify_addr = None + cloudify_port = None + cloudify_user = None + cloudify_pass = None + cloudify = None + consul_url = "http://consul:8500" + tls_cacert_file = None + tls_server_cert_file = None + tls_private_key_file = None + tls_server_ca_chain_file = None + wservice_port = 9443 + + @staticmethod + def _get_tls_file_path(tls_config, cert_directory, tls_name): + """calc file path and verify its existance""" + + file_name = tls_config.get(tls_name) + if not file_name: + return None + tls_file_path = os.path.join(cert_directory, file_name) + if not os.path.isfile(tls_file_path) or not os.access(tls_file_path, os.R_OK): + Config._logger.error("invalid %s: %s", tls_name, tls_file_path) + return None + return tls_file_path + + @staticmethod + def _set_tls_config(tls_config): + """verify and set tls certs in config""" + + try: + Config.tls_cacert_file = None + Config.tls_server_cert_file = None + Config.tls_private_key_file = None + Config.tls_server_ca_chain_file = None + + if not (tls_config and isinstance(tls_config, dict)): + Config._logger.info("no tls in config: %s", json.dumps(tls_config)) + return + + cert_directory = tls_config.get("cert_directory") + + if not (cert_directory and isinstance(cert_directory, str)): + Config._logger.warning("unexpected tls.cert_directory: %r", cert_directory) + return + + cert_directory = os.path.join( + os.path.dirname(os.path.dirname(os.path.realpath(__file__))), str(cert_directory)) + if not (cert_directory and os.path.isdir(cert_directory)): + Config._logger.warning("ignoring invalid cert_directory: %s", cert_directory) + return + + Config.tls_cacert_file = Config._get_tls_file_path(tls_config, cert_directory, "cacert") + Config.tls_server_cert_file = Config._get_tls_file_path(tls_config, cert_directory, + "server_cert") + Config.tls_private_key_file = Config._get_tls_file_path(tls_config, cert_directory, + "private_key") + Config.tls_server_ca_chain_file = Config._get_tls_file_path(tls_config, cert_directory, + "server_ca_chain") + + finally: + Config._logger.info("tls_cacert_file = %s", Config.tls_cacert_file) + Config._logger.info("tls_server_cert_file = %s", Config.tls_server_cert_file) + Config._logger.info("tls_private_key_file = %s", Config.tls_private_key_file) + Config._logger.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file) + + @staticmethod + def merge(new_config): + """merge the new_config into current config - override the values""" + + if not new_config: + return + + if not Config.config: + Config.config = new_config + return + + new_config = copy.deepcopy(new_config) + Config.config.update(new_config) + + @staticmethod + def get_system_name(): + """find the name of the oti_handler system + to be used as the key in consul-kv for config of oti_handler + """ + + return (Config.config or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME) + + @staticmethod + def discover(): + """bring and merge the config settings from Consul""" + + system_key = Config.get_system_name() + new_config = ConsulClient.get_value(system_key) + + if not new_config or not isinstance(new_config, dict): + Config._logger.warn("unexpected config from Consul: %s", new_config) + return + + Config._logger.debug("loaded config from Consul(%s): %s", + system_key, json.dumps(new_config)) + Config._logger.debug("config before merge from Consul: %s", json.dumps(Config.config)) + Config.merge(new_config.get(Config.SERVICE_NAME)) + Config._logger.debug("merged config from Consul: %s", json.dumps(Config.config)) + + @staticmethod + def load_from_file(file_path=None): + """read and store the config from config file""" + + if not file_path: + file_path = Config.CONFIG_FILE_PATH + + loaded_config = None + if os.access(file_path, os.R_OK): + with open(file_path, 'r') as config_json: + loaded_config = json.load(config_json) + + if not loaded_config: + Config._logger.info("config not loaded from file: %s", file_path) + return + + Config._logger.info("config loaded from file: %s", file_path) + logging_config = loaded_config.get("logging") + if logging_config: + logging.config.dictConfig(logging_config) + + Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port) + + local_config = loaded_config.get(Config.SERVICE_NAME, {}) + Config._set_tls_config(local_config.get(Config.FIELD_TLS)) + + Config.merge(loaded_config.get(Config.SERVICE_NAME)) + return True diff --git a/oti/event-handler/otihandler/consul_client.py b/oti/event-handler/otihandler/consul_client.py new file mode 100644 index 0000000..d26d3a1 --- /dev/null +++ b/oti/event-handler/otihandler/consul_client.py @@ -0,0 +1,617 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""client to talk to consul at consul port 8500""" + +import base64 +import copy +import json +import logging +import os +import re +import socket + +import requests + + +class ConsulClientError(RuntimeError): + pass + +class ConsulClientConnectionError(RuntimeError): + pass + +class ConsulClientServiceNotFoundError(RuntimeError): + pass + +class ConsulClientNodeNotFoundError(RuntimeError): + pass + +class ConsulClientKVEntryNotFoundError(RuntimeError): + pass + + +class ConsulClient(object): + """talking to consul""" + + CONSUL_SERVICE_MASK = "{}/v1/catalog/service/{}" + CONSUL_KV_MASK = "{}/v1/kv/{}" + CONSUL_KVS_MASK = "{}/v1/kv/{}?recurse=true" + CONSUL_TRANSACTION_URL = "{}/v1/txn" + _logger = logging.getLogger("oti_handler.consul_client") + + MAX_OPS_PER_TXN = 64 + # MAX_VALUE_LEN = 512 * 1000 + + OPERATION_SET = "set" + OPERATION_DELETE = "delete" + OPERATION_DELETE_FOLDER = "delete-tree" + + + #----- Methods for Consul services + + @staticmethod + def lookup_service(service_name): + """find the service record in consul""" + + service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name) + + ConsulClient._logger.info("lookup_service(%s)", service_path) + + try: + response = requests.get(service_path, timeout=30) + response.raise_for_status() + # except requests.exceptions.HTTPError as e: + # except requests.exceptions.ConnectionError as e: + # except requests.exceptions.Timeout as e: + except requests.exceptions.RequestException as e: + msg = "lookup_service({}) requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + return_list = response.json() + # except ValueError as e: + except Exception as e: + msg = "lookup_service({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + if not return_list: + msg = "lookup_service({}) got empty or no value from requests.get({})".format( + service_name, service_path) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + return return_list + + + @staticmethod + def get_all_services(): + """List all services from consul""" + + service_path = "{}/v1/catalog/services".format(os.environ.get("CONSUL_URL").rstrip("/")) + + ConsulClient._logger.info("get_all_services(%s)", service_path) + + try: + response = requests.get(service_path, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "get_all_services() requests.get({}) threw exception {}: {!s}".format( + service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + return_dict = response.json() + except Exception as e: + msg = "get_all_services() parsing JSON from requests.get({}) threw exception {}: {!s}".format( + service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + if not return_dict: + msg = "get_all_services() got empty or no value from requests.get({})".format( + service_path) + ConsulClient._logger.info(msg) + # raise ConsulClientServiceNotFoundError(msg) + + return return_dict + + + @staticmethod + def _find_matching_services(services, name_search, tags): + """Find matching services given search criteria""" + sub_tags = tags[0][4:6] + tags.append(sub_tags) + + def is_match(service): + srv_name, srv_tags = service + return name_search in srv_name and \ + any([tag in srv_tags for tag in tags]) + + return [ srv[0] for srv in list(services.items()) if is_match(srv) ] + + + @staticmethod + def search_services(name_search, tags): + """ + Search for services that match criteria + + Args: + ----- + name_search: (string) Name to search for as a substring + tags: (list) List of strings that are tags. A service must match **ANY OF** the + tags in the list. + + Returns: + -------- + List of names of services that matched + """ + + matches = [] + + # srvs is dict where key is service name and value is list of tags + srvs = ConsulClient.get_all_services() + + if srvs: + matches = ConsulClient._find_matching_services(srvs, name_search, tags) + + return matches + + + @staticmethod + def get_service_fqdn_port(service_name, node_meta=False): + """find the service record in consul""" + + service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name) + + ConsulClient._logger.info("get_service_fqdn_port(%s)", service_path) + + try: + response = requests.get(service_path, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "get_service_fqdn_port({}) requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + service = response.json() + except Exception as e: + msg = "get_service_fqdn_port({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + if not service: + msg = "get_service_fqdn_port({}) got empty or no value from requests.get({})".format( + service_name, service_path) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + try: + service = service[0] # arbitrarily choose the first one + port = service["ServicePort"] + + # HTTPS certificate validation requires FQDN not IP address + fqdn = "" + if node_meta: + meta = service.get("NodeMeta") + if meta: + fqdn = meta.get("fqdn") + if not fqdn: + fqdn = socket.getfqdn(str(service["ServiceAddress"])) + except Exception as e: + msg = "get_service_fqdn_port({}) parsing result from requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + return (fqdn, port) + + + #----- Methods for Consul nodes + + @staticmethod + def lookup_node(node_name): + """find the node record in consul""" + + node_path = "{}/v1/catalog/node/{}".format(os.environ.get("CONSUL_URL").rstrip("/"), node_name) + + ConsulClient._logger.info("lookup_node(%s)", node_path) + + try: + response = requests.get(node_path, timeout=30) + response.raise_for_status() + # except requests.exceptions.HTTPError as e: + # except requests.exceptions.ConnectionError as e: + # except requests.exceptions.Timeout as e: + except requests.exceptions.RequestException as e: + msg = "lookup_node({}) requests.get({}) threw exception {}: {!s}".format( + node_name, node_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + return_dict = response.json() + # except ValueError as e: + except Exception as e: + msg = "lookup_node({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + node_name, node_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientNodeNotFoundError(msg) + + if not return_dict: + msg = "lookup_node({}) got empty or no value from requests.get({})".format( + node_name, node_path) + ConsulClient._logger.error(msg) + raise ConsulClientNodeNotFoundError(msg) + + return return_dict + + + #----- Methods for Consul key-values + + @staticmethod + def put_value(key, data, cas=None): + """put the value for key into consul-kv""" + + # ConsulClient._logger.info("put_value(%s)", str(key)) + + URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key) + if cas is not None: + URL = '{}?cas={}'.format(URL, cas) + + try: + response = requests.put(URL, data=json.dumps(data), timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "put_value({}) requests.put({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + updated = response.json() + except Exception as e: + msg = "put_value({}) parsing JSON from requests.put({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + return updated + + + @staticmethod + def get_value(key, get_index=False): + """get the value for key from consul-kv""" + + URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key) + + try: + response = requests.get(URL, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "get_value({}) requests.get({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + data = response.json() + except Exception as e: + msg = "get_value({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + if not data: + msg = "get_value({}) got empty or no value from requests.get({})".format( + key, URL) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + try: + value = base64.b64decode(data[0]["Value"]).decode("utf-8") + value_dict = json.loads(value) + except Exception as e: + msg = "get_value({}) decoding value from requests.get({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + ConsulClient._logger.info("consul-kv key=%s value(%s) data=%s", + key, value, json.dumps(data)) + + if get_index: + return data[0]["ModifyIndex"], value_dict + + return value_dict + + + @staticmethod + def get_kvs(prefix, nest=True, trim_prefix=False): + """get key-values for keys beginning with prefix from consul-kv""" + + URL = ConsulClient.CONSUL_KVS_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), prefix) + + try: + response = requests.get(URL, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "get_kvs({}) requests.get({}) threw exception {}: {!s}".format( + prefix, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + data = response.json() + except Exception as e: + msg = "get_kvs({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + prefix, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + if not data: + msg = "get_kvs({}) got empty or no value from requests.get({})".format( + prefix, URL) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + def put_level_value(level_keys, value, level_dict={}): + if level_keys: + key = level_keys.pop(0) + level_dict[key] = put_level_value(level_keys, value, level_dict.get(key, {})) + return level_dict + else: + return value + + rdict = {} + for item in data: + v = base64.b64decode(item["Value"]).decode("utf-8") + try: + value = json.loads(v) + except Exception as e: + value = v + key = item['Key'] + if trim_prefix: + key = key[len(prefix):] + if nest: + level_keys = key.split('/') + rdict = put_level_value(level_keys, value, rdict) + else: + rdict[key] = value + + ConsulClient._logger.info("consul-kv prefix=%s value(%s) data=%s", + prefix, json.dumps(rdict), json.dumps(data)) + return rdict + + + @staticmethod + def _gen_txn_operation(verb, key, value=None): + """returns the properly formatted operation to be used inside transaction""" + + # key = urllib.quote(key) # can't use urllib.quote() because it kills ':' in the key + if value: + return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(value)}} + return {"KV": {"Verb": verb, "Key": key}} + + + @staticmethod + def _run_transaction(operation_name, txn): + """run a single transaction of several operations at consul /txn""" + + if not txn: + return + + txn_url = ConsulClient.CONSUL_TRANSACTION_URL.format(os.environ.get("CONSUL_URL").rstrip("/")) + response = None + try: + response = requests.put(txn_url, json=txn, timeout=30) + except requests.exceptions.RequestException as e: + ConsulClient._logger.error("failed to {} at {}: exception {}: {!s} on txn={}" + .format(operation_name, txn_url, type(e).__name__, e, json.dumps(txn))) + return + + if response.status_code != requests.codes.ok: + ConsulClient._logger.error("failed {} {}: {} text={} txn={} headers={}" + .format(operation_name, txn_url, response.status_code, + response.text, json.dumps(txn), + json.dumps(dict(list(response.request.headers.items()))))) + return + + ConsulClient._logger.info("response for {} {}: {} text={} txn={} headers={}" + .format(operation_name, txn_url, response.status_code, + response.text, json.dumps(txn), + json.dumps(dict(list(response.request.headers.items()))))) + + return True + + + @staticmethod + def store_kvs(kvs): + """put kvs into consul-kv""" + + if not kvs: + ConsulClient._logger.warn("kvs not supplied to store_kvs()") + return + + store_kvs = [ + ConsulClient._gen_txn_operation(ConsulClient.OPERATION_SET, + key, json.dumps(value)) + for key, value in kvs.items() + ] + txn = [] + idx_step = ConsulClient.MAX_OPS_PER_TXN - len(txn) + for idx in range(0, len(store_kvs), idx_step): + txn += store_kvs[idx : idx + idx_step] + if not ConsulClient._run_transaction("store_kvs", txn): + return False + txn = [] + + return ConsulClient._run_transaction("store_kvs", txn) + + + @staticmethod + def delete_key(key): + """delete key from consul-kv""" + + if not key: + ConsulClient._logger.warn("key not supplied to delete_key()") + return + + delete_key = [ + ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE, key) + ] + return ConsulClient._run_transaction("delete_key", delete_key) + + + @staticmethod + def delete_kvs(key): + """delete key from consul-kv""" + + if not key: + ConsulClient._logger.warn("key not supplied to delete_kvs()") + return + + delete_kvs = [ + ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE_FOLDER, key) + ] + return ConsulClient._run_transaction("delete_kvs", delete_kvs) + + + #----- Methods for Config Binding Service + + @staticmethod + def get_service_component(scn): + config = json.dumps(ConsulClient.get_value(scn)) + + try: + dmaap = ConsulClient.get_value(scn + ":dmaap") + except Exception as e: + dmaap = None + if dmaap: + for key in list(dmaap.keys()): + config = re.sub('"<<' + key + '>>"', json.dumps(dmaap[key]), config) + + try: + rel = ConsulClient.get_value(scn + ":rel") + except Exception as e: + rel = None + if rel: + for key in list(rel.keys()): + config = re.sub('"{{' + key + '}}"', json.dumps(rel[key]), config) + + return json.loads(config) + + + @staticmethod + def get_service_component_all(scn, policies_as_list=True): + t_scn = scn + ":" + t_len = len(t_scn) + a_dict = ConsulClient.get_kvs(scn) + b_dict = {} + for key in a_dict: + b_key = None + if key == scn: + b_dict["config"] = ConsulClient.get_service_component(scn) + elif key == scn + ":dmaap": + continue + elif key[0:t_len] == t_scn: + b_key = key[t_len:] + # policies_as_list = True formats policies items in a list like ONAP's CBS; False keeps policy_ids keys + if policies_as_list and b_key == "policies": # convert items from KVs to a values list + b_dict[b_key] = {} + for sub_key in a_dict[key]: + if sub_key == "items": + b_dict[b_key][sub_key] = [] + d_dict = a_dict[key][sub_key] + for item in sorted(d_dict.keys()): # old CBS sorted them so we emulate + b_dict[b_key][sub_key].append(d_dict[item]) + else: + b_dict[b_key][sub_key] = copy.deepcopy(a_dict[key][sub_key]) + else: + b_dict[b_key] = copy.deepcopy(a_dict[key]) + return b_dict + + + @staticmethod + def add_vnf_id(scn, vnf_type, vnf_id, dti_dict): + """ + Add VNF instance to Consul scn:dti key. + + Treat its value as a JSON string representing a dict. + Extend the dict by adding a dti_dict for vnf_id under vnf_type. + Turn the resulting extended dict into a JSON string. + Store the string back into Consul under scn:dti key. + Watch out for conflicting concurrent updates. + """ + + key = scn + ':dti' + lc_vnf_type = vnf_type.lower() + while True: # do until update succeeds + (mod_index, v) = ConsulClient.get_value(key, get_index=True) + lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case + # but DCAE-C doesn't create such keys + + if lc_vnf_type not in lc_v: + return # That VNF type is not supported by this component + lc_v[lc_vnf_type][vnf_id] = dti_dict # add or replace the VNF instance + + updated = ConsulClient.put_value(key, lc_v, cas=mod_index) + if updated: + return lc_v + + + @staticmethod + def delete_vnf_id(scn, vnf_type, vnf_id): + """ + Delete VNF instance from Consul scn:dti key. + + Treat its value as a JSON string representing a dict. + Modify the dict by deleting the vnf_id key entry from under vnf_type. + Turn the resulting extended dict into a JSON string. + Store the string back into Consul under scn:dti key. + Watch out for conflicting concurrent updates. + """ + + key = scn + ':dti' + lc_vnf_type = vnf_type.lower() + while True: # do until update succeeds + (mod_index, v) = ConsulClient.get_value(key, get_index=True) + lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case + # but DCAE-C doesn't create such keys + + if lc_vnf_type not in lc_v: + return # That VNF type is not supported by this component + if vnf_id not in lc_v[lc_vnf_type]: + return lc_v + del lc_v[lc_vnf_type][vnf_id] # delete the VNF instance + + updated = ConsulClient.put_value(key, lc_v, cas=mod_index) + if updated: + return lc_v + + +if __name__ == "__main__": + value = None + + if value: + print(json.dumps(value, sort_keys=True, indent=4, separators=(',', ': '))) diff --git a/oti/event-handler/otihandler/dbclient/__init__.py b/oti/event-handler/otihandler/dbclient/__init__.py new file mode 100644 index 0000000..ee3ec3e --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/__init__.py @@ -0,0 +1,19 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +from .models import Event +from .models import EventAck +from .db_dao import DaoBase diff --git a/oti/event-handler/otihandler/dbclient/apis/__init__.py b/oti/event-handler/otihandler/dbclient/apis/__init__.py new file mode 100644 index 0000000..05e3800 --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/apis/__init__.py @@ -0,0 +1,18 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +from .db_access import DbAccess +from .event_db_access import EventDbAccess diff --git a/oti/event-handler/otihandler/dbclient/apis/db_access.py b/oti/event-handler/otihandler/dbclient/apis/db_access.py new file mode 100644 index 0000000..f064b30 --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/apis/db_access.py @@ -0,0 +1,50 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +""" +Base class for APIs to interact with application database using sqlAlchemy ORM lib and postgresSql driver +""" + +from sqlalchemy.orm import sessionmaker +from ..db_dao import DaoBase +import psycopg2 +from psycopg2.extras import execute_values +import os +import logging + + +class DbAccess(object): + logger = logging.getLogger("dti_handler.DbAccess") + engine = None + session = None + + def __init__(self): + self.engine = DaoBase.getDbEngine() + # create a configured "Session" class + Session = sessionmaker(bind=self.engine) + + # create a Session + self.session = Session() + + def saveDomainObject(self, obj): + self.session.add(obj) + self.session.commit() + self.session.close() + + def deleteDomainObject(self,obj): + self.session.delete(obj) + self.session.commit() + self.session.close() diff --git a/oti/event-handler/otihandler/dbclient/apis/event_db_access.py b/oti/event-handler/otihandler/dbclient/apis/event_db_access.py new file mode 100644 index 0000000..898ee8e --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/apis/event_db_access.py @@ -0,0 +1,154 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +""" DB APIs to interact with application database using sqlAlchemy ORM lib and postgresSql driver""" + +from sqlalchemy import and_ +from sqlalchemy.orm.exc import NoResultFound + +from .db_access import DbAccess +from ..models import Event, EventAck + + +class EventDbAccess(DbAccess): + + def __init__(self): + DbAccess.__init__(self) + + def query_event_item(self, target_type, target_name): + try: + query = self.session.query(Event).filter(Event.target_type == target_type).\ + filter(Event.target_name == target_name) + evt = query.one() + except NoResultFound: + return None + else: + return evt + + def query_event_data(self, target_type, target_name): + try: + query = self.session.query(Event).filter(Event.target_type == target_type).\ + filter(Event.target_name == target_name) + evt = query.one() + except NoResultFound: + return [] + else: + try: + ack_result = self.session.query(EventAck).filter(EventAck.event == evt).all() + except NoResultFound: + return [] + else: + return ack_result + + def query_event_data_k8s(self, target_type, target_name): + try: + query = self.session.query(Event).filter(Event.target_type == target_type).\ + filter(Event.target_name == target_name) + evt = query.one() + except NoResultFound: + return [] + else: + try: + ack_result = self.session.query(EventAck).filter(EventAck.event == evt).\ + filter(EventAck.container_type != 'docker').all() + except NoResultFound: + return [] + else: + return ack_result + + def query_event_info_docker(self, prim_evt, service_component, deployment_id, container_id): + try: + query = self.session.query(EventAck).filter(EventAck.event == prim_evt).filter( + and_(EventAck.service_component == service_component, + EventAck.deployment_id == deployment_id, + EventAck.container_id == container_id, + EventAck.container_type == 'docker')) + evt = query.one() + except NoResultFound as nrf: + raise nrf + else: + return evt + + def update_event_item(self, dti_event, target_type, target_name): + self.session.query(Event).filter(Event.target_type == target_type). \ + filter(Event.target_name == target_name).update({Event.event:dti_event}) + self.session.commit() + + def query_raw_k8_events(self, cluster, pod, namespace): + """ + run an inner JOIN query to dtih_event and dtih_event_ack tables using supplied query predicates + + :param cluster: + :param pod: + :param namespace: + :return: + Set of event objects related to k8s pods + """ + try: + return self.session.query(Event).filter(Event.dtih_event_id.in_( + self.session.query(EventAck.dtih_event_id).filter(and_(EventAck.k8s_cluster_fqdn == cluster, + EventAck.k8s_pod_id == pod, + EventAck.k8s_namespace == namespace)))).all() + except NoResultFound: + print("invalid query or no data") + return () + + def query_raw_docker_events(self, target_types, locations): + """ + run a query to dtih_event table using supplied query predicates + + :param target_types: required + :param locations: optional + :return: + set of event objects related to docker container + """ + try: + if not locations or (len(locations) == 1 and locations[0] == ''): + return self.session.query(Event).filter(Event.target_type.in_(target_types)).all() + else: + return self.session.query(Event).filter(Event.target_type.in_(target_types)).filter( + Event.location_clli.in_(locations)).all() + except NoResultFound: + print("invalid query or no data") + return () + + def query_pod_info2(self, cluster): + try: + return self.session.query(EventAck).filter(EventAck.k8s_cluster_fqdn == cluster).all() + except NoResultFound: + print("invalid query or no data") + return () + + def query_pod_info(self, cluster): + try: + return self.session.query(EventAck.k8s_pod_id, EventAck.k8s_namespace, + EventAck.k8s_proxy_fqdn, EventAck.k8s_service_name, + EventAck.k8s_service_port)\ + .filter(EventAck.k8s_cluster_fqdn == cluster) \ + .distinct().order_by(EventAck.k8s_cluster_fqdn).all() + except NoResultFound: + print("invalid query or no data") + return () + + def query_event_data_k8s_pod(self, prim_evt, scn): + try: + query = self.session.query(EventAck).filter(EventAck.event == prim_evt).filter( + and_(EventAck.service_component == scn)) + event_info = query.one() + except NoResultFound: + return None + else: + return event_info diff --git a/oti/event-handler/otihandler/dbclient/db_dao.py b/oti/event-handler/otihandler/dbclient/db_dao.py new file mode 100644 index 0000000..78fa058 --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/db_dao.py @@ -0,0 +1,33 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +""" SqlAlchemy ORM engine for postgresSql dti database """ + +from sqlalchemy import create_engine + +class DaoBase: + _engine = None + + @staticmethod + def init_db(dbConStr): + if DaoBase._engine: + return + DaoBase._engine = create_engine(dbConStr) + + @staticmethod + def getDbEngine(): + return DaoBase._engine + diff --git a/oti/event-handler/otihandler/dbclient/models/__init__.py b/oti/event-handler/otihandler/dbclient/models/__init__.py new file mode 100644 index 0000000..bc802f5 --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/models/__init__.py @@ -0,0 +1,19 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + + +from .event import Event +from .event_ack import EventAck diff --git a/oti/event-handler/otihandler/dbclient/models/event.py b/oti/event-handler/otihandler/dbclient/models/event.py new file mode 100644 index 0000000..553bec2 --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/models/event.py @@ -0,0 +1,40 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +""" ORM - mapping class for dtih_event table """ + +from sqlalchemy import Column, String, Integer, ForeignKey, func +from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP +from sqlalchemy.ext.declarative import declarative_base +import datetime + + +Base = declarative_base() + +class Event(Base): + __tablename__ = 'dtih_event' + __table_args__ = {'schema': 'dti'} + dtih_event_id = Column(Integer, primary_key=True) + event = Column(JSONB) + create_ts = Column(TIMESTAMP(timezone=True), default=func.now()) + last_modified_ts = Column(TIMESTAMP(timezone=True), default=func.now()) + target_name = Column(String) + target_type = Column(String) + location_clli = Column(String) + # def __repr__(self): + # return "<Event(event_id='%s', target_type='%s', target_name='%s')" % ( + # self.event_id, self.target_type, self.target_name + # ) diff --git a/oti/event-handler/otihandler/dbclient/models/event_ack.py b/oti/event-handler/otihandler/dbclient/models/event_ack.py new file mode 100644 index 0000000..2b19316 --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/models/event_ack.py @@ -0,0 +1,51 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +""" ORM - mapping class for dtih_event_ack table """ + +import datetime +from sqlalchemy import Column, String, Integer, ForeignKey, func +from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP +from sqlalchemy.orm import relationship +from sqlalchemy.ext.declarative import declarative_base +from ..models import Event + +Base = declarative_base() + +class EventAck(Base): + __tablename__ = 'dtih_event_ack' + __table_args__ = {'schema': 'dti'} + dtih_event_ack_id = Column(Integer, primary_key=True) + create_ts = Column(TIMESTAMP(timezone=True), default=func.now()) + last_modified_ts = Column(TIMESTAMP(timezone=True), default=func.now()) + action = Column(String) + k8s_namespace = Column(String) + k8s_service_name = Column(String) + k8s_service_port = Column(String) + k8s_cluster_fqdn = Column(String) + k8s_proxy_fqdn = Column(String) + k8s_pod_id = Column(String) + service_component = Column(String) + deployment_id = Column(String) + container_type = Column(String) + docker_host = Column(String) + container_id = Column(String) + reconfig_script = Column(String) + dtih_event_id = Column(Integer, ForeignKey(Event.dtih_event_id)) + event = relationship(Event) + + def update_action(self, action): + setattr(self, 'action', action) diff --git a/oti/event-handler/otihandler/docker_client.py b/oti/event-handler/otihandler/docker_client.py new file mode 100644 index 0000000..621a1ec --- /dev/null +++ b/oti/event-handler/otihandler/docker_client.py @@ -0,0 +1,175 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""client interface to docker""" + +import docker +import json +import logging +import time + +from otihandler.config import Config +from otihandler.consul_client import ConsulClient +from otihandler.utils import decrypt + + +# class DockerClientError(RuntimeError): +# pass + +class DockerClientConnectionError(RuntimeError): + pass + + +class DockerClient(object): + """ + All Docker logins are in Consul's key-value store under + "docker_plugin/docker_logins" as a list of json objects where + each object is a single login: + + [{ "username": "XXXX", "password": "yyyy", + "registry": "hostname.domain:18443" }] + """ + + _logger = logging.getLogger("oti_handler.docker_client") + + def __init__(self, docker_host, reauth=False): + """Create Docker client + + Args: + ----- + reauth: (boolean) Forces reauthentication, e.g., Docker login + """ + + (fqdn, port) = ConsulClient.get_service_fqdn_port(docker_host, node_meta=True) + base_url = "https://{}:{}".format(fqdn, port) + + try: + tls_config = docker.tls.TLSConfig( + client_cert=( + Config.tls_server_ca_chain_file, + Config.tls_private_key_file + ) + ) + self._client = docker.APIClient(base_url=base_url, tls=tls_config, version='auto', timeout=60) + + for dcl in ConsulClient.get_value("docker_plugin/docker_logins"): + dcl['password'] = decrypt(dcl['password']) + dcl["reauth"] = reauth + self._client.login(**dcl) + + # except requests.exceptions.RequestException as e: + except Exception as e: + msg = "DockerClient.__init__({}) attempt to {} with TLS got exception {}: {!s}".format( + docker_host, base_url, type(e).__name__, e) + + # Then try connecting to dockerhost without TLS + try: + base_url = "tcp://{}:{}".format(fqdn, port) + self._client = docker.APIClient(base_url=base_url, tls=False, version='auto', timeout=60) + + for dcl in ConsulClient.get_value("docker_plugin/docker_logins"): + dcl['password'] = decrypt(dcl['password']) + dcl["reauth"] = reauth + self._client.login(**dcl) + + # except requests.exceptions.RequestException as e: + except Exception as e: + msg = "{}\nDockerClient.__init__({}) attempt to {} without TLS got exception {}: {!s}".format( + msg, docker_host, base_url, type(e).__name__, e) + DockerClient._logger.error(msg) + raise DockerClientConnectionError(msg) + + @staticmethod + def build_cmd(script_path, use_sh=True, msg_type="dti", **kwargs): + """Build command to execute""" + + data = json.dumps(kwargs or {}) + + if use_sh: + return ['/bin/sh', script_path, msg_type, data] + else: + return [script_path, msg_type, data] + + def notify_for_reconfiguration(self, container_id, cmd): + """Notify Docker container that reconfiguration occurred + + Notify the Docker container by doing Docker exec of passed-in command + + Args: + ----- + container_id: (string) + cmd: (list) of strings each entry being part of the command + """ + + for attempts_remaining in range(11,-1,-1): + try: + result = self._client.exec_create(container=container_id, cmd=cmd) + except docker.errors.APIError as e: + # e # 500 Server Error: Internal Server Error ("{"message":"Container 624108d1ab96f24b568662ca0e5ffc39b59c1c57431aec0bef231fb62b04e166 is not running"}") + DockerClient._logger.debug("exec_create() returned APIError: {!s}".format(e)) + + # e.message # 500 Server Error: Internal Server Error + # DockerClient._logger.debug("e.message: {}".format(e.message)) + # e.response.status_code # 500 + # DockerClient._logger.debug("e.response.status_code: {}".format(e.response.status_code)) + # e.response.reason # Internal Server Error + # DockerClient._logger.debug("e.response.reason: {}".format(e.response.reason)) + # e.explanation # {"message":"Container 624108d1ab96f24b568662ca0e5ffc39b59c1c57431aec0bef231fb62b04e166 is not running"} + # DockerClient._logger.debug("e.explanation: {}".format(e.explanation)) + + # docker container restarting can wait + if e.explanation and 'is restarting' in e.explanation.lower(): + DockerClient._logger.debug("notification exec_create() experienced: {!s}".format(e)) + if attempts_remaining == 0: + result = None + break + time.sleep(10) + # elif e.explanation and 'no such container' in e.explanation.lower(): + # elif e.explanation and 'is not running' in e.explanation.lower(): + else: + DockerClient._logger.warn("aborting notification exec_create() because exception {}: {!s}".format(type(e).__name__, e)) + return str(e) # don't raise or CM will retry usually forever + # raise DockerClientError(e) + except Exception as e: + DockerClient._logger.warn("aborting notification exec_create() because exception {}: {!s}".format( + type(e).__name__, e)) + return str(e) # don't raise or CM will retry usually forever + # raise DockerClientError(e) + else: + break + if not result: + DockerClient._logger.warn("aborting notification exec_create() because docker exec failed") + return "notification unsuccessful" # failed to get an exec_id, perhaps trying multiple times, so don't raise or CM will retry usually forever + DockerClient._logger.debug("notification exec_create() succeeded") + + for attempts_remaining in range(11,-1,-1): + try: + result = self._client.exec_start(exec_id=result['Id']) + except Exception as e: + DockerClient._logger.debug("notification exec_start() got exception {}: {!s}".format(type(e).__name__, e)) + if attempts_remaining == 0: + DockerClient._logger.warn("aborting notification exec_start() because exception {}: {!s}".format(type(e).__name__, e)) + return str(e) # don't raise or CM will retry usually forever + # raise DockerClientError(e) + time.sleep(10) + else: + break + DockerClient._logger.debug("notification exec_start() succeeded") + + DockerClient._logger.info("Pass to docker exec {} {} {}".format( + container_id, cmd, result)) + + return result diff --git a/oti/event-handler/otihandler/dti_processor.py b/oti/event-handler/otihandler/dti_processor.py new file mode 100644 index 0000000..970e020 --- /dev/null +++ b/oti/event-handler/otihandler/dti_processor.py @@ -0,0 +1,815 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""OTI Event processor for handling all the event types""" + +import copy +import json +import logging +from multiprocessing.dummy import Pool as ThreadPool +from threading import Lock + +import requests + +from otihandler import utils +from otihandler.cfy_client import CfyClient +from otihandler.consul_client import ConsulClient +from otihandler.dbclient.apis import EventDbAccess +from otihandler.dbclient.models import Event, EventAck +from otihandler.docker_client import DockerClient + +notify_response_arr = [] +lock = Lock() +K8S_CLUSTER_PROXY_NODE_PORT = '30132' + + +def notify_docker(args_tuple): + """ + event notification executor inside a process pool to communicate with docker container + interacts with docker client library + """ + (dti_event, db_access, ack_item) = args_tuple + try: + dcae_service_action = dti_event.get('dcae_service_action') + component_scn = ack_item.service_component + deployment_id = ack_item.deployment_id + container_id = ack_item.container_id + docker_host = ack_item.docker_host + reconfig_script = ack_item.reconfig_script + container_type = 'docker' + except Exception as e: + return ( + "ERROR", "dti_processor.notify_docker processing args got exception {}: {!s}".format(type(e).__name__, e)) + what = "" + try: + what = "{} in {} container {} on {} that was deployed by {}".format( + reconfig_script, container_type, container_id, docker_host, deployment_id) + if dcae_service_action == 'add': + add_action = {"dcae_service_action": "deploy"} + dti_event.update(add_action) + + if dcae_service_action == 'delete': + add_action = {"dcae_service_action": "undeploy"} + dti_event.update(add_action) + + # dkr = DockerClient(docker_host, reauth=False) + result = '' + # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ]) + if dti_event.get('dcae_service_action') == 'undeploy': + # delete from dti_event_ack table + try: + db_access.deleteDomainObject(ack_item) + except Exception as e: + msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + else: + return (component_scn, "ran {}, got: {!s}".format(what, result)) + + except Exception as e: + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + + +def notify_svc(args_tuple): + """ + add/update/delete event handler + event notification executor inside a process pool to communicate with docker container and k8s services + interacts with docker client library + interacts with k8s node port services using REST client + """ + (orig_dti_event, db_access, curr_evt, res_tuple) = args_tuple + dti_event = copy.deepcopy(orig_dti_event) + try: + dcae_service_action = dti_event.get('dcae_service_action').lower() + + component_scn = res_tuple[0] + deployment_id = res_tuple[1] + container_id = res_tuple[2] + node_id = res_tuple[3] + docker_host = res_tuple[6] + reconfig_script = res_tuple[7] + container_type = res_tuple[8] + except Exception as e: + return ("ERROR", "oti_processor.notify processing args got exception {}: {!s}".format(type(e).__name__, e)) + + what = "" + if container_type == "docker": + # exec reconfigure.sh in docker container + try: + what = "{} in {} container {} on {} that was deployed by {} node {}".format( + reconfig_script, container_type, container_id, docker_host, deployment_id, node_id) + if dcae_service_action == 'add': + add_action = {"dcae_service_action": "deploy"} + dti_event.update(add_action) + + if dcae_service_action == 'delete': + add_action = {"dcae_service_action": "undeploy"} + dti_event.update(add_action) + + dkr = DockerClient(docker_host, reauth=False) + result = '' + if dti_event.get('dcae_service_action') == 'update': + # undeploy + deploy + DTIProcessor.logger.debug("update 1 - running undeploy {}".format(what)) + dti_event.update({"dcae_service_action": "undeploy"}) + result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)]) + DTIProcessor.logger.debug("update 2 - running deploy {}".format(what)) + dti_event.update({"dcae_service_action": "deploy"}) + result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)]) + try: + upd_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id, + container_id) + upd_evt_ack.update_action('update') + db_access.saveDomainObject(upd_evt_ack) + except Exception as e: + msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + else: + DTIProcessor.logger.debug("running {}".format(what)) + result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)]) + if dti_event.get('dcae_service_action') == 'deploy': + # add into dti_event_ack table + try: + add_evt_ack = EventAck(service_component=component_scn, deployment_id=deployment_id, + container_type='docker', docker_host=docker_host, + container_id=container_id, reconfig_script=reconfig_script, + event=curr_evt, + action='add') + db_access.saveDomainObject(add_evt_ack) + except Exception as e: + msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + else: + # remove from dtih_event_ack if present + if curr_evt is not None: + try: + del_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id, + container_id) + db_access.deleteDomainObject(del_evt_ack) + except Exception as e: + msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + except Exception as e: + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + + return (component_scn, "ran {}, got: {!s}".format(what, result)) + elif container_type == "k8s": + DTIProcessor.logger.debug("DTIProcessor.notify_svc() handling k8s component") + # if action is 'update', check if k8s pod info exists already for this event in app db + if dcae_service_action == 'add': + DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for add action") + return notify_k8s((dti_event, db_access, curr_evt, res_tuple)) + elif dcae_service_action == 'update': + # handle update for pods being tracked and handle add for new pods + k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn) + if k8s_scn_result is not None: + # update + DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for update action") + return notify_k8s_pod((dti_event, db_access, k8s_scn_result)) + else: + # add + DTIProcessor.logger.debug("DTIProcessor.notify_svc(), convert update to add action in k8s ") + add_action = {"dcae_service_action": "add"} + dti_event.update(add_action) + return notify_k8s((dti_event, db_access, curr_evt, res_tuple)) + + +def notify_k8s(args_tuple): + """ + add event handler + event notification executor inside a process pool to communicate with k8s statefulset nodeport service + uses REST API client to call k8s services + """ + (dti_event, db_access, curr_evt, res_tuple) = args_tuple + component_scn = res_tuple[0] + deployment_id = res_tuple[1] + node_id = res_tuple[3] + container_type = res_tuple[8] + service_address = res_tuple[9] + service_port = res_tuple[10] + what = "{} in {} deployment {} that was deployed by {} node {}".format( + "add", container_type, "statefulset", deployment_id, node_id) + # call scn node port service REST API + svc_nodeport_url = "https://{}:{}".format(service_address, service_port) + try: + DTIProcessor.logger.debug("running {}".format(what)) + response = requests.put(svc_nodeport_url, json=dti_event, timeout=50) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "collector nodeport service({}) threw exception {}: {!s}".format( + svc_nodeport_url, type(e).__name__, e) + DTIProcessor.logger.error(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + try: + event_ack_info = response.json() + except Exception as e: + msg = "collector nodeport service({}) threw exception {}: {!s}".format( + svc_nodeport_url, type(e).__name__, e) + DTIProcessor.logger.error(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + + if not event_ack_info: + msg = "collector nodeport service returned bad data" + DTIProcessor.logger.error(msg) + return (component_scn, "collector nodeport service returned bad data") + + namespace = event_ack_info.get("KubeNamespace") + svc_name = event_ack_info.get("KubeServiceName") + svc_port = event_ack_info.get("KubeServicePort") + proxy_fqdn = event_ack_info.get("KubeProxyFqdn") + cluster_fqdn = event_ack_info.get("KubeClusterFqdn") + pod_name = event_ack_info.get("KubePod") + statefulset = pod_name[0:pod_name.rindex('-')] + + what = "{} in {} deployment {} in namespace {} that was deployed by {} node {}".format( + "add", container_type, statefulset, namespace, deployment_id, node_id) + try: + add_evt_ack = EventAck(k8s_namespace=namespace, k8s_service_name=svc_name, deployment_id=deployment_id, + k8s_service_port=svc_port, k8s_cluster_fqdn=cluster_fqdn, k8s_proxy_fqdn=proxy_fqdn, + k8s_pod_id=pod_name, event=curr_evt, action='add', container_type='k8s', + service_component=component_scn) + db_access.saveDomainObject(add_evt_ack) + return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info)) + except Exception as e: + msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + + +def notify_pods(args_tuple): + """ + notify event handler + event notification executor inside a process pool to communicate with k8s DTIH proxy nodeport service + uses REST API client to call k8s services + """ + event_ack_info = '' + (dti_event, res_tuple) = args_tuple + try: + cluster = res_tuple[0] + port = K8S_CLUSTER_PROXY_NODE_PORT + namespace = res_tuple[1] + svc_name = res_tuple[2] + svc_port = res_tuple[4] + replicas = res_tuple[3] + + for replica in range(replicas): + pod_id = "sts-{}-{}".format(svc_name, replica) + item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(cluster, port, namespace, + pod_id, svc_name, + svc_port) + what = "{} for pod id {} in cluster {} and namespace {}".format("notify", pod_id, cluster, namespace) + try: + DTIProcessor.logger.debug("running {}".format(what)) + response = requests.put(item_pod_url, json=dti_event, timeout=50) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "stateful set proxy service({}) threw exception {}: {!s}".format( + item_pod_url, type(e).__name__, e) + DTIProcessor.logger.error(msg) + with lock: + notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what))) + else: + try: + event_ack_info = response.json() + except Exception as e: + msg = "stateful set proxy service({}) threw exception {}: {!s}".format( + item_pod_url, type(e).__name__, e) + DTIProcessor.logger.error(msg) + with lock: + notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what))) + + if not event_ack_info: + msg = "stateful set proxy service returned bad data" + DTIProcessor.logger.error(msg) + with lock: + notify_response_arr.append ((pod_id, "no acknowledgement - running {}".format(what))) + + with lock: + notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info))) + except Exception as e: + with lock: + notify_response_arr.append (("ERROR", "dti_processor.notify() processing args got exception {}: {!s}".format(type(e).__name__, e))) + +def notify_k8s_pod(args_tuple): + """ + update event handler + event notification executor inside a process pool to communicate with k8s DTIH proxy service + uses REST API client to call k8s services + """ + item_pod_url = '' + component_scn = '' + (dti_event, db_access, ack_item) = args_tuple + # call ingress proxy to dispatch delete event + + action = dti_event.get('dcae_service_action') + what = "{} to {} ID {} in namespace {} that was deployed in cluster {}".format( + action, 'k8s pod', ack_item.k8s_pod_id, ack_item.k8s_namespace, ack_item.k8s_cluster_fqdn) + try: + DTIProcessor.logger.debug("running {}".format(what)) + item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format( + ack_item.k8s_proxy_fqdn, K8S_CLUSTER_PROXY_NODE_PORT, ack_item.k8s_namespace, + ack_item.k8s_pod_id, ack_item.k8s_service_name, ack_item.k8s_service_port) + component_scn = ack_item.service_component + response = requests.put(item_pod_url, json=dti_event, timeout=50) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "exception occured, stateful set proxy service({}) threw {}: {!s}".format( + item_pod_url, type(e).__name__, e) + DTIProcessor.logger.error(msg) + return (component_scn, "ran {}, got: {!s}".format(what, msg)) + else: + if action == 'delete': + try: + db_access.deleteDomainObject(ack_item) + except Exception as e: + msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + else: + try: + ack_item.update_action('update') + db_access.saveDomainObject(ack_item) + except Exception as e: + msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + + return (component_scn, "ran {}, got: {!s}".format(what, response.json())) + + +class DTIProcessor(object): + """ + Main event processing class that encapsulates all the logic of this handler application! + An instance of this class is created per incoming client request. + + Generates input data by querying platform services - cloudify, consul, postgresSql + + It creates a pool of worker processes using a multiprocessing Pool class instance. + Tasks are offloaded to the worker processes that exist in the pool. + The input data is distributed across processes of the Pool object to enable parallel execution of + event notification function across multiple input values (data parallelism). + """ + + logger = logging.getLogger("oti_handler.dti_processor") + K8S_CLUSTER_PROXY_NODE_PORT = '30132' + db_access = None + docker_pool = None + k8s_pool = None + + def __init__(self, dti_event, send_notification=True): + self._result = {} + self.event = dti_event + self.is_notify = send_notification + self.action = dti_event.get('dcae_service_action').lower() + self.target_name = dti_event.get('dcae_target_name') + self.target_type = dti_event.get('dcae_target_type', '').lower() + self.event_clli = dti_event.get('dcae_service_location') + res_dict = None + try: + self.docker_pool = ThreadPool(8) + self.k8s_pool = ThreadPool(8) + except Exception as e: + msg = "DTIProcessor.__init__() creating ThreadPool got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + raise e + else: + self.db_access = EventDbAccess() + self.prim_db_event = None + try: + res_dict = self.dispatcher() + except: + raise + finally: + try: + self.docker_pool.close() + self.k8s_pool.close() + except Exception as e: + msg = "DTIProcessor.__init__() running pool.close() got exception {}: {!s}".format(type(e).__name__, + e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + try: + self.docker_pool.join() + self.k8s_pool.join() + except Exception as e: + msg = "DTIProcessor.__init__() running pool.join() got exception {}: {!s}".format(type(e).__name__, + e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + + # if not send_notification: + # DTIProcessor._logger.debug("DTIProcessor.__init__() not notifying DCAE-Controller components") + # return + + if res_dict: + try: + utils.update_dict(self._result, res_dict) + except Exception as e: + msg = "DTIProcessor.__init__() running utils.update_dict() got exception {}: {!s}".format( + type(e).__name__, e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + + DTIProcessor.logger.debug("DTIProcessor.__init__() done notifying new DCAE-Controller components") + + def dispatcher(self): + """ dispatch method to execute specific method based on event type """ + + arg = str(self.action) + method = getattr(self, arg, lambda: "Invalid action") + return method() + + def undeploy(self): + """ + delete event from consul KV store, this functionality will be retired as events are stored + in postgresSql oti database + """ + global key + try: + # update Consul KV store with DTI Event - storing them in a folder for all components + key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name) + result = ConsulClient.delete_key(key) + except Exception as e: + msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e) + DTIProcessor.logger.warn(msg) + self._result['WARNING'] = msg + else: + if not result: + msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name) + DTIProcessor.logger.warn(msg) + self._result['WARNING'] = msg + + def deploy(self): + """ + add event to consul KV store, this functionality will be retired as events are stored + in postgresSql oti database + """ + dep_key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name) + try: + # update Consul KV store with DTI Event - storing them in a folder for all components + result = ConsulClient.store_kvs({dep_key: self.event}) + except Exception as e: + msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e) + DTIProcessor.logger.warn(msg) + self._result['WARNING'] = msg + + def add(self): + """ + process DTI event that contains a new VNF instance that has to be configured in the collector microservices + """ + res_dict = None + try: + msg = "processing add event for {}/{}".format(self.target_type, self.target_name) + DTIProcessor.logger.debug(msg) + # insert add event into dtih_event table + self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type, + location_clli=self.event_clli) + self.db_access.saveDomainObject(self.prim_db_event) + except Exception as e: + msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args) + DTIProcessor.logger.warn(msg) + self._result['ERROR'] = msg + raise Exception(msg) + else: + if self.is_notify: + try: + # force the action to add, to avoid bad things later + add_action = {"dcae_service_action": "add"} + self.event.update(add_action) + # mock up data + mock_tp11 = ( + "scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1", + "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s", + "dcae-d1.idns.cip.corp.com", "30996") + mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1", + "docker_node_instance_id1", + "node_instance_state", "docker_host", "dti_reconfig_script", "docker", + "dcae-d1.idns.cip.corp.com", "30996") + # tpl_arr = [] + # tpl_arr.append(mock_tp11) + # tpl_arr.append(mock_tp12) + # res_dict = dict(self.docker_pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr)))) + res_dict = dict(self.docker_pool.map(notify_svc, + ((self.event, self.db_access, self.prim_db_event, tp) for tp in + CfyClient().iter_components(self.target_type, + dcae_service_location=self.event_clli)) + )) + except Exception as e: + msg = "DTIProcessor.__init__() running pool.map() got exception {}: {!s}".format(type(e).__name__, + e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + return res_dict + + def add_replay(self): + """ + convert an update event flow and replay as an add event type since the event acknowledgement is missing + from application database + """ + res_dict = None + try: + # force the action to add, to avoid bad things later + add_action = {"dcae_service_action": "add"} + self.event.update(add_action) + # mock up data + mock_tp11 = ("scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1", + "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s", + "dcae-d1.idns.cip.corp.com", "30996") + mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1", + "docker_node_instance_id1", + "node_instance_state", "docker_host", "dti_reconfig_script", "docker", + "dcae-d1.idns.cip.corp.com", "30996") + # tpl_arr = [] + # tpl_arr.append(mock_tp11) + # tpl_arr.append(mock_tp12) + # res_dict = dict(self.pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr)))) + res_dict = dict(self.docker_pool.map(notify_svc, + ((self.event, self.db_access, self.prim_db_event, tp) for tp in + CfyClient().iter_components(self.target_type, + dcae_service_location=self.event_clli)) + )) + except Exception as e: + msg = "DTIProcessor._add() running pool.map() got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + return res_dict + + def delete(self): + """ + process DTI event that indicates a VNF instance has to be removed from the collector microservices + """ + res_dict = {} + res_dict_k8s = {} + res_dict_docker = {} + try: + self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name) + if self.is_notify: + try: + msg = "processing delete event for {}/{} to relate with any docker hosts".format( + self.target_type, self.target_name) + DTIProcessor.logger.warn(msg) + res_dict_docker = dict(self.docker_pool.map(notify_svc, + ((self.event, self.db_access, self.prim_db_event, tp) + for tp + in CfyClient().iter_components_for_docker( + self.target_type, + dcae_service_location=self.event_clli)) + )) + except Exception as e: + msg = "DTIProcessor.delete() running docker_pool.map() got exception {}: {!s}".format(type(e).__name__, + e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + + try: + msg = "processing delete event for {}/{} to relate with any k8s hosts".format( + self.target_type, self.target_name) + DTIProcessor.logger.warn(msg) + if self.prim_db_event is not None: + result = self.db_access.query_event_data_k8s(self.target_type, self.target_name) + res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, ( + ((self.event, self.db_access, ack_item) for ack_item in result)))) + except Exception as e: + msg = "DTIProcessor.delete() running k8s_pool.map() got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + + try: + if self.prim_db_event is not None: + self.db_access.deleteDomainObject(self.prim_db_event) + except Exception as e: + msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args) + DTIProcessor.logger.warn(msg) + self._result['ERROR'] = msg + except Exception as e: + msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args) + DTIProcessor.logger.warn(msg) + self._result['ERROR'] = msg + + if res_dict_k8s is not None: + utils.update_dict(res_dict, res_dict_k8s) + + if res_dict_docker is not None: + utils.update_dict(res_dict, res_dict_docker) + + return res_dict + + def update(self): + """ + process DTI event that indicates VNF instance has to be updated in the collector microservices + """ + res_dict = {} + res_dict_k8s = {} + res_dict_docker = {} + + if self.is_notify: + try: + self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name) + if self.prim_db_event is not None: + self.db_access.update_event_item(self.event, self.target_type, self.target_name) + result = self.db_access.query_event_data(self.target_type, self.target_name) + if len(result) == 0: + msg = "processing update event for {}/{}, but event distribution info is not found in database, " \ + "replaying this event to cluster if required". \ + format(self.target_type, self.target_name) + DTIProcessor.logger.warn(msg) + self._result['WARNING'] = msg + res_dict = self.add_replay() + else: + msg = "DTIProcessor.update() handle update flow for {}/{}, for k8s rediscover scn list and" \ + "identify new vs update cases".format(self.target_type, self.target_name) + DTIProcessor.logger.debug(msg) + try: + tpl_arr = CfyClient().iter_components(self.target_type, + dcae_service_location=self.event_clli) + res_dict_docker = dict(self.docker_pool.map(notify_svc, + (( + self.event, self.db_access, + self.prim_db_event, + tp) + for tp in tpl_arr))) + except Exception as e: + msg = "DTIProcessor.update() running docker_pool.map() got exception {}: {!s}".format( + type(e).__name__, e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + else: + # event is new for the handler + msg = "processing update event for {}/{}, but current event info is not found in database, " \ + "executing add event".format(self.target_type, self.target_name) + DTIProcessor.logger.warn(msg) + self._result['WARNING'] = msg + res_dict = self.add() + except Exception as e: + msg = "DTIProcessor.update() got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + + if res_dict_k8s is not None: + utils.update_dict(res_dict, res_dict_k8s) + + if res_dict_docker is not None: + utils.update_dict(res_dict, res_dict_docker) + + return res_dict + + def notify(self): + """ + event handler to notify all the pods in the kubernetes cluster whose FQDN is present in the incoming event + This notification is meant for the cluster failover. + """ + res_dict = {} + try: + self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name) + if self.prim_db_event is not None: + self.db_access.update_event_item(self.event, self.target_type, self.target_name) + else: + self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type, + location_clli=self.event_clli) + self.db_access.saveDomainObject(self.prim_db_event) + except Exception as e: + msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args) + DTIProcessor.logger.warn(msg) + self._result['ERROR'] = msg + + try: + self.k8s_pool.map(notify_pods, ((self.event, tp) for tp in + CfyClient().query_k8_components(self.target_name))) + for k, v in notify_response_arr: + res_dict[k] = v + except Exception as e: + msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args) + DTIProcessor.logger.warn(msg) + self._result['WARNING'] = msg + + return res_dict + + def get_result(self): + return self._result + + @classmethod + def get_k8_raw_events(cls, pod, cluster, namespace): + """ + Get DTI events for a k8 stateful set pod container + + :param pod: required + k8s stateful set pod ID that was configured with a specific set of DTI Events + :param cluster: required + k8s cluster FQDN where the mS was deployed + :param namespace: required + k8s namespace where the stateful set was deployed in that namespace + :return: + Dictionary of DTI event(s). + DTI events will be keyed by vnf_type, sub-keyed by vnf_id. + """ + db_access = EventDbAccess() + results = db_access.query_raw_k8_events(cluster, pod, namespace) + + target_types = set([]) + outer_dict = {} + + for evnt_item in results: + target_types.add(evnt_item.target_type) + + for targ_type in target_types: + inner_name_evt_dict = {} + for evnt in results: + if targ_type == evnt.target_type: + inner_name_evt_dict[evnt.target_name] = evnt.event + + outer_dict[targ_type] = inner_name_evt_dict + + return outer_dict + + @classmethod + def get_docker_raw_events(cls, service_name, service_location): + """ + Get DTI events for docker container. + + Parameters + ---------- + service_name : string + required. The service component name assigned by dockerplugin to the component that is unique to the + cloudify node instance and used in its Consul key(s). + service_location : string + optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location + in service_location. + If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul + TAGs if service_name is provided, + otherwise results are not location filtered. + + Returns + ------- + dict + Dictionary of DTI event(s). + DTI events will be keyed by vnf_type, sub-keyed by vnf_id. + + """ + + r_dict = {} + + want_locs = [] + if service_location: + want_locs = service_location.split(',') + + give_types = [] + if service_name: + if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node + try: + node_name = ConsulClient.lookup_service(service_name)[0].get("Node") + if node_name: + services = ConsulClient.lookup_node(node_name).get("Services") + if services: + for node_svc in list(services.keys()): + if "-component-dockerhost-" in node_svc: + want_locs = services[node_svc].get("Tags", []) + break + except: + pass + + try: + supported_types = ConsulClient.get_value(service_name + ":dti") + except: + return r_dict + else: + if supported_types: + supported_types = [t_type.lower() for t_type in list(supported_types.keys())] + give_types = supported_types + if not give_types or (len(give_types) == 1 and give_types[0] == ''): + return r_dict + + db_access = EventDbAccess() + results = db_access.query_raw_docker_events(give_types, want_locs) + + target_types = set([]) + outer_dict = {} + + for evnt_item in results: + target_types.add(evnt_item.target_type) + + for targ_type in target_types: + inner_name_evt_dict = {} + for evnt in results: + if targ_type == evnt.target_type: + inner_name_evt_dict[evnt.target_name] = evnt.event + + outer_dict[targ_type] = inner_name_evt_dict + + return outer_dict diff --git a/oti/event-handler/otihandler/onap/CommonLogger.py b/oti/event-handler/otihandler/onap/CommonLogger.py new file mode 100644 index 0000000..644534d --- /dev/null +++ b/oti/event-handler/otihandler/onap/CommonLogger.py @@ -0,0 +1,958 @@ +#!/usr/bin/python +# -*- indent-tabs-mode: nil -*- vi: set expandtab: + +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +""" Common Logging library in Python. + +CommonLogger.py + +Original Written by: Terry Schmalzried +Date written: October 1, 2015 +Last updated: December 1, 2016 + +version 0.8 +""" + +import os +import logging +import logging.handlers +import re +import socket +import sys +import threading +import time + + +class CommonLogger: + """ Common Logging object. + + Public methods: + __init__ + setFields + debug + info + warn + error + fatal + """ + + UnknownFile = -1 + ErrorFile = 0 + DebugFile = 1 + AuditFile = 2 + MetricsFile = 3 + DateFmt = '%Y-%m-%dT%H:%M:%S' + verbose = False + + def __init__(self, configFile, logKey, **kwargs): + """Construct a Common Logger for one Log File. + + Arguments: + configFile -- configuration filename. + logKey -- the keyword in configFile that identifies the log filename. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages, + one of CommonLogger.ErrorFile, CommonLogger.DebugFile, + CommonLogger.AuditFile and CommonLogger.MetricsFile, or + one of the strings "error", "debug", "audit" or "metrics". + May also be set in the config file using a field named + <logKey>Style (where <logKey> is the value of the logKey + parameter). The keyword value overrides the value in the + config file. + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._monitorFlag = False + + # Get configuration parameters + self._logKey = str(logKey) + self._configFile = str(configFile) + self._rotateMethod = 'time' + self._timeRotateIntervalType = 'midnight' + self._timeRotateInterval = 1 + self._sizeMaxBytes = 0 + self._sizeRotateMode = 'a' + self._socketHost = None + self._socketPort = 0 + self._typeLogger = 'filelogger' + self._backupCount = 6 + self._logLevelThreshold = self._intLogLevel('') + self._logFile = None + self._begTime = None + self._begMsec = 0 + self._fields = {} + self._fields["style"] = CommonLogger.UnknownFile + try: + self._configFileModified = os.path.getmtime(self._configFile) + for line in open(self._configFile): + line = line.split('#',1)[0] # remove comments + if '=' in line: + key, value = [x.strip() for x in line.split('=',1)] + if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none']: + self._rotateMethod = value.lower() + elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']: + self._timeRotateIntervalType = value + elif key == 'timeRotateInterval' and int( value ) > 0: + self._timeRotateInterval = int( value ) + elif key == 'sizeMaxBytes' and int( value ) >= 0: + self._sizeMaxBytes = int( value ) + elif key == 'sizeRotateMode' and value in ['a']: + self._sizeRotateMode = value + elif key == 'backupCount' and int( value ) >= 0: + self._backupCount = int( value ) + elif key == self._logKey + 'SocketHost': + self._socketHost = value + elif key == self._logKey + 'SocketPort' and int( value ) == 0: + self._socketPort = int( value ) + elif key == self._logKey + 'LogType' and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']: + self._typeLogger = value.lower() + elif key == self._logKey + 'LogLevel': + self._logLevelThreshold = self._intLogLevel(value.upper()) + elif key == self._logKey + 'Style': + self._fields["style"] = value + elif key == self._logKey: + self._logFile = value + except Exception as x: + print("exception reading '%s' configuration file: %s" %(self._configFile, str(x))) + sys.exit(2) + except: + print("exception reading '%s' configuration file" %(self._configFile)) + sys.exit(2) + + if self._logFile is None: + print('configuration file %s is missing definition %s for log file' %(self._configFile, self._logKey)) + sys.exit(2) + + + # initialize default log fields + # timestamp will automatically be generated + for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \ + 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \ + 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \ + 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \ + 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \ + 'errorDescription' ]: + if key in kwargs and kwargs[key] != None: + self._fields[key] = kwargs[key] + + self._resetStyleField() + + # Set up logger + self._logLock = threading.Lock() + with self._logLock: + self._logger = logging.getLogger(self._logKey) + self._logger.propagate = False + self._createLogger() + + self._defaultServerInfo() + + # spawn a thread to monitor configFile for logLevel and logFile changes + self._monitorFlag = True + self._monitorThread = threading.Thread(target=self._monitorConfigFile, args=()) + self._monitorThread.daemon = True + self._monitorThread.start() + + + def _createLogger(self): + if self._typeLogger == 'filelogger': + self._mkdir_p(self._logFile) + if self._rotateMethod == 'time': + self._logHandler = logging.handlers.TimedRotatingFileHandler(self._logFile, \ + when=self._timeRotateIntervalType, interval=self._timeRotateInterval, \ + backupCount=self._backupCount, encoding=None, delay=False, utc=True) + elif self._rotateMethod == 'size': + self._logHandler = logging.handlers.RotatingFileHandler(self._logFile, \ + mode=self._sizeRotateMode, maxBytes=self._sizeMaxBytes, \ + backupCount=self._backupCount, encoding=None, delay=False) + + else: + self._logHandler = logging.handlers.WatchedFileHandler(self._logFile, \ + mode=self._sizeRotateMode, \ + encoding=None, delay=False) + elif self._typeLogger == 'stderrlogger': + self._logHandler = logging.handlers.StreamHandler(sys.stderr) + elif self._typeLogger == 'stdoutlogger': + self._logHandler = logging.handlers.StreamHandler(sys.stdout) + elif self._typeLogger == 'socketlogger': + self._logHandler = logging.handlers.SocketHandler(self._socketHost, self._socketPort) + elif self._typeLogger == 'nulllogger': + self._logHandler = logging.handlers.NullHandler() + + if self._fields["style"] == CommonLogger.AuditFile or self._fields["style"] == CommonLogger.MetricsFile: + self._logFormatter = logging.Formatter(fmt='%(begtime)s,%(begmsecs)03d+00:00|%(endtime)s,%(endmsecs)03d+00:00|%(message)s', datefmt=CommonLogger.DateFmt) + else: + self._logFormatter = logging.Formatter(fmt='%(asctime)s,%(msecs)03d+00:00|%(message)s', datefmt='%Y-%m-%dT%H:%M:%S') + self._logFormatter.converter = time.gmtime + self._logHandler.setFormatter(self._logFormatter) + self._logger.addHandler(self._logHandler) + + def _resetStyleField(self): + styleFields = ["error", "debug", "audit", "metrics"] + if self._fields['style'] in styleFields: + self._fields['style'] = styleFields.index(self._fields['style']) + + def __del__(self): + if not self._monitorFlag: + return + + self._monitorFlag = False + + if self._monitorThread is not None and self._monitorThread.is_alive(): + self._monitorThread.join() + + self._monitorThread = None + + + def _defaultServerInfo(self): + + # If not set or purposely set = None, then set default + if self._fields.get('server') is None: + try: + self._fields['server'] = socket.getfqdn() + except Exception as err: + try: + self._fields['server'] = socket.gethostname() + except Exception as err: + self._fields['server'] = "" + + # If not set or purposely set = None, then set default + if self._fields.get('serverIPAddress') is None: + try: + self._fields['serverIPAddress'] = socket.gethostbyname(self._fields['server']) + except Exception as err: + self._fields['serverIPAddress'] = "" + + + def _monitorConfigFile(self): + while self._monitorFlag: + try: + fileTime = os.path.getmtime(self._configFile) + if fileTime > self._configFileModified: + self._configFileModified = fileTime + ReopenLogFile = False + logFile = self._logFile + with open(self._configFile) as fp: + for line in fp: + line = line.split('#',1)[0] # remove comments + if '=' in line: + key, value = [x.strip() for x in line.split('=',1)] + if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none'] and self._rotateMethod != value: + self._rotateMethod = value.lower() + ReopenLogFile = True + elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']: + self._timeRotateIntervalType = value + ReopenLogFile = True + elif key == 'timeRotateInterval' and int( value ) > 0: + self._timeRotateInterval = int( value ) + ReopenLogFile = True + elif key == 'sizeMaxBytes' and int( value ) >= 0: + self._sizeMaxBytes = int( value ) + ReopenLogFile = True + elif key == 'sizeRotateMode' and value in ['a']: + self._sizeRotateMode = value + ReopenLogFile = True + elif key == 'backupCount' and int( value ) >= 0: + self._backupCount = int( value ) + ReopenLogFile = True + elif key == self._logKey + 'SocketHost' and self._socketHost != value: + self._socketHost = value + ReopenLogFile = True + elif key == self._logKey + 'SocketPort' and self._socketPort > 0 and self._socketPort != int( value ): + self._socketPort = int( value ) + ReopenLogFile = True + elif key == self._logKey + 'LogLevel' and self._logLevelThreshold != self._intLogLevel( value.upper() ): + self._logLevelThreshold = self._intLogLevel(value.upper()) + elif key == self._logKey + 'LogType' and self._typeLogger != value and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']: + self._typeLogger = value.lower() + ReopenLogFile = True + elif key == self._logKey + 'Style': + self._fields["style"] = value + self._resetStyleField() + elif key == self._logKey and self._logFile != value: + logFile = value + ReopenLogFile = True + if ReopenLogFile: + with self._logLock: + self._logger.removeHandler(self._logHandler) + self._logFile = logFile + self._createLogger() + except Exception as err: + pass + + time.sleep(5) + + + def setFields(self, **kwargs): + """Set default values for log fields. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \ + 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \ + 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \ + 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \ + 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \ + 'errorDescription' ]: + if key in kwargs: + if kwargs[key] != None: + self._fields[key] = kwargs[key] + elif key in self._fields: + del self._fields[key] + + self._defaultServerInfo() + + + def debug(self, message, **kwargs): + """Write a DEBUG level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('DEBUG', message, errorCategory = 'DEBUG', **kwargs) + + def info(self, message, **kwargs): + """Write an INFO level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('INFO', message, errorCategory = 'INFO', **kwargs) + + def warn(self, message, **kwargs): + """Write a WARN level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('WARN', message, errorCategory = 'WARN', **kwargs) + + def error(self, message, **kwargs): + """Write an ERROR level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('ERROR', message, errorCategory = 'ERROR', **kwargs) + + def fatal(self, message, **kwargs): + """Write a FATAL level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('FATAL', message, errorCategory = 'FATAL', **kwargs) + + def _log(self, logLevel, message, **kwargs): + """Write a message to the log file. + + Arguments: + logLevel -- value ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', ...) for the log record. + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + # timestamp will automatically be inserted + style = int(self._getVal('style', '', **kwargs)) + requestID = self._getVal('requestID', '', **kwargs) + serviceInstanceID = self._getVal('serviceInstanceID', '', **kwargs) + threadID = self._getVal('threadID', threading.currentThread().getName(), **kwargs) + serverName = self._getVal('serverName', '', **kwargs) + serviceName = self._getVal('serviceName', '', **kwargs) + instanceUUID = self._getVal('instanceUUID', '', **kwargs) + upperLogLevel = self._noSep(logLevel.upper()) + severity = self._getVal('severity', '', **kwargs) + serverIPAddress = self._getVal('serverIPAddress', '', **kwargs) + server = self._getVal('server', '', **kwargs) + IPAddress = self._getVal('IPAddress', '', **kwargs) + className = self._getVal('className', '', **kwargs) + timer = self._getVal('timer', '', **kwargs) + partnerName = self._getVal('partnerName', '', **kwargs) + targetEntity = self._getVal('targetEntity', '', **kwargs) + targetServiceName = self._getVal('targetServiceName', '', **kwargs) + statusCode = self._getVal('statusCode', '', **kwargs) + responseCode = self._getVal('responseCode', '', **kwargs) + responseDescription = self._noSep(self._getVal('responseDescription', '', **kwargs)) + processKey = self._getVal('processKey', '', **kwargs) + targetVirtualEntity = self._getVal('targetVirtualEntity', '', **kwargs) + customField1 = self._getVal('customField1', '', **kwargs) + customField2 = self._getVal('customField2', '', **kwargs) + customField3 = self._getVal('customField3', '', **kwargs) + customField4 = self._getVal('customField4', '', **kwargs) + errorCategory = self._getVal('errorCategory', '', **kwargs) + errorCode = self._getVal('errorCode', '', **kwargs) + errorDescription = self._noSep(self._getVal('errorDescription', '', **kwargs)) + nbegTime = self._getArg('begTime', {}, **kwargs) + + detailMessage = self._noSep(message) + if bool(re.match(r" *$", detailMessage)): + return # don't log empty messages + + useLevel = self._intLogLevel(upperLogLevel) + if CommonLogger.verbose: print("logger STYLE=%s" % style) + if useLevel < self._logLevelThreshold: + if CommonLogger.verbose: print("skipping because of level") + pass + else: + with self._logLock: + if style == CommonLogger.ErrorFile: + if CommonLogger.verbose: print("using CommonLogger.ErrorFile") + self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ + %(requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName, + errorCategory, errorCode, errorDescription, detailMessage)) + elif style == CommonLogger.DebugFile: + if CommonLogger.verbose: print("using CommonLogger.DebugFile") + self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ + %(requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel, + severity, serverIPAddress, server, IPAddress, className, timer, detailMessage)) + elif style == CommonLogger.AuditFile: + if CommonLogger.verbose: print("using CommonLogger.AuditFile") + endAuditTime, endAuditMsec = self._getTime() + if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime: + d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endAuditTime, 'endmsecs': endAuditMsec } + elif self._begTime is not None: + d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec } + else: + d = { 'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec } + self._begTime = None + unused = "" + self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ + %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName, + statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel, + severity, serverIPAddress, timer, server, IPAddress, className, unused, + processKey, customField1, customField2, customField3, customField4, detailMessage), extra=d) + elif style == CommonLogger.MetricsFile: + if CommonLogger.verbose: print("using CommonLogger.MetricsFile") + endMetricsTime, endMetricsMsec = self._getTime() + if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime: + d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec } + elif self._begTime is not None: + d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec } + else: + d = { 'begtime': endMetricsTime, 'begmsecs': endMetricsMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec } + self._begTime = None + unused = "" + self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ + %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName, + targetEntity, targetServiceName, statusCode, responseCode, responseDescription, + instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server, IPAddress, + className, unused, processKey, targetVirtualEntity, customField1, customField2, + customField3, customField4, detailMessage), extra=d) + else: + print("!!!!!!!!!!!!!!!! style not set: %s" % self._fields["style"]) + + def _getTime(self): + ct = time.time() + lt = time.localtime(ct) + return (time.strftime(CommonLogger.DateFmt, lt), (ct - int(ct)) * 1000) + + def setStartRecordEvent(self): + """ + Set the start time to be saved for both audit and metrics records + """ + self._begTime, self._begMsec = self._getTime() + + def getStartRecordEvent(self): + """ + Retrieve the start time to be used for either audit and metrics records + """ + begTime, begMsec = self._getTime() + return {'begTime':begTime, 'begMsec':begMsec} + + def _getVal(self, key, default, **kwargs): + val = self._fields.get(key) + if key in kwargs: val = kwargs[key] + if val is None: val = default + return self._noSep(val) + + def _getArg(self, key, default, **kwargs): + val = None + if key in kwargs: val = kwargs[key] + if val is None: val = default + return val + + def _noSep(self, message): + if message is None: return '' + return re.sub(r'[\|\n]', ' ', str(message)) + + def _intLogLevel(self, logLevel): + if logLevel == 'FATAL': useLevel = 50 + elif logLevel == 'ERROR': useLevel = 40 + elif logLevel == 'WARN': useLevel = 30 + elif logLevel == 'INFO': useLevel = 20 + elif logLevel == 'DEBUG': useLevel = 10 + else: useLevel = 0 + return useLevel + + def _mkdir_p(self, filename): + """Create missing directories from a full filename path like mkdir -p""" + + if filename is None: + return + + folder=os.path.dirname(filename) + + if folder == "": + return + + if not os.path.exists(folder): + try: + os.makedirs(folder) + except OSError as err: + print("error number %d creating %s directory to hold %s logfile: %s" %(err.errno, err.filename, filename, err.strerror)) + sys.exit(2) + except Exception as err: + print("error creating %s directory to hold %s logfile: %s" %(folder, filename, str(err))) + sys.exit(2) + +if __name__ == "__main__": + + def __checkOneTime(line): + format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[+]00:00[|]' + m = re.match(format, line) + if not m: + print("ERROR: time string did not match proper time format, %s" %line) + print("\t: format=%s" % format) + return 1 + return 0 + + def __checkTwoTimes(line, different): + format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|][0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|]' + m = re.match(format, line) + if not m: + print("ERROR: time strings did not match proper time format, %s" %line) + print("\t: format=%s" % format) + return 1 + second1 = int(m.group(1)) + msec1 = int(m.group(2)) + second2 = int(m.group(3)) + msec2 = int(m.group(4)) + if second1 > second2: second2 += 60 + t1 = second1 * 1000 + msec1 + t2 = second2 * 1000 + msec2 + diff = t2 - t1 + # print("t1=%d (%d,%d) t2=%d (%d,%d), diff = %d" % (t1, second1, msec1, t2, second2, msec2, diff)) + if different: + if diff < 500: + print("ERROR: times did not differ enough: %s" % line) + return 1 + else: + if diff > 10: + print("ERROR: times were too far apart: %s" % line) + return 1 + return 0 + + def __checkBegTime(line): + format = "begTime should be ([-0-9T:]+)" + # print("checkBegTime(%s)" % line) + strt = 'begTime should be ' + i = line.index(strt) + rest = line[i+len(strt):].rstrip() + if not line.startswith(rest + ","): + print("ERROR: line %s should start with %s" % (line,rest)) + return 1 + return 0 + + def __checkLog(logfile, numLines, numFields): + lineCount = 0 + errorCount = 0 + with open(logfile, "r") as fp: + for line in fp: + # print("saw line %s" % line) + lineCount += 1 + c = line.count('|') + if c != numFields: + print("ERROR: wrong number of fields. Expected %d, got %d: %s" % (numFields, c, line)) + errorCount += 1 + if re.search("should not appear", line): + print("ERROR: a line appeared that should not have appeared, %s" % line) + errorCount += 1 + elif re.search("single time", line): + errorCount += __checkOneTime(line) + elif re.search("time should be the same", line): + errorCount += __checkTwoTimes(line, different=False) + elif re.search("time should be ", line): + errorCount += __checkTwoTimes(line, different=True) + elif re.search("begTime should be ", line): + errorCount += __checkBegTime(line) + else: + print("ERROR: an unknown message appeared, %s" % line) + errorCount += 1 + + if lineCount != numLines: + print("ERROR: expected %d lines, but got %d lines" % (numLines, lineCount)) + errorCount += 1 + return errorCount + + import os, argparse + parser = argparse.ArgumentParser(description="test the CommonLogger functions") + parser.add_argument("-k", "--keeplogs", help="Keep the log files after finishing the tests", action="store_true") + parser.add_argument("-v", "--verbose", help="Print debugging messages", action="store_true") + args = parser.parse_args() + + spid = str(os.getpid()) + if args.keeplogs: + spid = "" + logcfg = "/tmp/cl.log" + spid + ".cfg" + errorLog = "/tmp/cl.error" + spid + ".log" + metricsLog = "/tmp/cl.metrics" + spid + ".log" + auditLog = "/tmp/cl.audit" + spid + ".log" + debugLog = "/tmp/cl.debug" + spid + ".log" + if args.verbose: CommonLogger.verbose = True + + import atexit + def cleanupTmps(): + for f in [ logcfg, errorLog, metricsLog, auditLog, debugLog ]: + try: + os.remove(f) + except: + pass + if not args.keeplogs: + atexit.register(cleanupTmps) + + with open(logcfg, "w") as o: + o.write("error = " + errorLog + "\n" + + "errorLogLevel = WARN\n" + + "metrics = " + metricsLog + "\n" + + "metricsLogLevel = INFO\n" + + "audit = " + auditLog + "\n" + + "auditLogLevel = INFO\n" + + "debug = " + debugLog + "\n" + + "debugLogLevel = DEBUG\n") + + import uuid + instanceUUID = uuid.uuid1() + serviceName = "testharness" + errorLogger = CommonLogger(logcfg, "error", style=CommonLogger.ErrorFile, instanceUUID=instanceUUID, serviceName=serviceName) + debugLogger = CommonLogger(logcfg, "debug", style=CommonLogger.DebugFile, instanceUUID=instanceUUID, serviceName=serviceName) + auditLogger = CommonLogger(logcfg, "audit", style=CommonLogger.AuditFile, instanceUUID=instanceUUID, serviceName=serviceName) + metricsLogger = CommonLogger(logcfg, "metrics", style=CommonLogger.MetricsFile, instanceUUID=instanceUUID, serviceName=serviceName) + + testsRun = 0 + errorCount = 0 + errorLogger.debug("error calling debug (should not appear)") + errorLogger.info("error calling info (should not appear)") + errorLogger.warn("error calling warn (single time)") + errorLogger.error("error calling error (single time)") + errorLogger.setStartRecordEvent() + time.sleep(1) + errorLogger.fatal("error calling fatal, after setStartRecordEvent and sleep (start should be ignored, single time)") + testsRun += 6 + errorCount += __checkLog(errorLog, 3, 10) + + auditLogger.debug("audit calling debug (should not appear)") + auditLogger.info("audit calling info (time should be the same)") + auditLogger.warn("audit calling warn (time should be the same)") + auditLogger.error("audit calling error (time should be the same)") + bt = auditLogger.getStartRecordEvent() + # print("bt=%s" % bt) + time.sleep(1) + auditLogger.setStartRecordEvent() + time.sleep(1) + auditLogger.fatal("audit calling fatal, after setStartRecordEvent and sleep, time should be different)") + time.sleep(1) + auditLogger.fatal("audit calling fatal, begTime should be %s" % bt['begTime'], begTime=bt) + testsRun += 7 + errorCount += __checkLog(auditLog, 5, 25) + + debugLogger.debug("debug calling debug (single time)") + debugLogger.info("debug calling info (single time)") + debugLogger.warn("debug calling warn (single time)") + debugLogger.setStartRecordEvent() + time.sleep(1) + debugLogger.error("debug calling error, after SetStartRecordEvent and sleep (start should be ignored, single time)") + debugLogger.fatal("debug calling fatal (single time)") + errorCount += __checkLog(debugLog, 5, 13) + testsRun += 6 + + metricsLogger.debug("metrics calling debug (should not appear)") + metricsLogger.info("metrics calling info (time should be the same)") + metricsLogger.warn("metrics calling warn (time should be the same)") + bt = metricsLogger.getStartRecordEvent() + time.sleep(1) + metricsLogger.setStartRecordEvent() + time.sleep(1) + metricsLogger.error("metrics calling error, after SetStartRecordEvent and sleep, time should be different") + metricsLogger.fatal("metrics calling fatal (time should be the same)") + time.sleep(1) + metricsLogger.fatal("metrics calling fatal, begTime should be %s" % bt['begTime'], begTime=bt) + testsRun += 6 + errorCount += __checkLog(metricsLog, 5, 28) + + print("%d tests run, %d errors found" % (testsRun, errorCount)) diff --git a/oti/event-handler/otihandler/onap/__init__.py b/oti/event-handler/otihandler/onap/__init__.py new file mode 100644 index 0000000..87cf002 --- /dev/null +++ b/oti/event-handler/otihandler/onap/__init__.py @@ -0,0 +1,15 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= diff --git a/oti/event-handler/otihandler/onap/audit.py b/oti/event-handler/otihandler/onap/audit.py new file mode 100644 index 0000000..8cd16cf --- /dev/null +++ b/oti/event-handler/otihandler/onap/audit.py @@ -0,0 +1,375 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""generic class to keep track of request handling + from receiving it through reponse and log all the activities + + call Audit.init("component-name", "path/to/config_file") to init the loggers before any requests + + start each outside request with creation of the Audit object + audit = Audit(request_id=None, headers=None, msg=None) +""" + +import os +import sys +import json +import uuid +import time +import copy +from datetime import datetime +from threading import Lock +from enum import Enum + +from .CommonLogger import CommonLogger +from .health import Health + +REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID" +REQUEST_REMOTE_ADDR = "Remote-Addr" +REQUEST_HOST = "Host" +HOSTNAME = "HOSTNAME" + +AUDIT_REQUESTID = 'requestID' +AUDIT_IPADDRESS = 'IPAddress' +AUDIT_SERVER = 'server' +AUDIT_TARGET_ENTITY = 'targetEntity' + +HEADER_CLIENTAUTH = "clientauth" +HEADER_AUTHORIZATION = "authorization" + +class AuditHttpCode(Enum): + """audit http codes""" + HTTP_OK = 200 + PERMISSION_UNAUTHORIZED_ERROR = 401 + PERMISSION_FORBIDDEN_ERROR = 403 + RESPONSE_ERROR = 400 + DATA_NOT_FOUND_ERROR = 404 + SERVER_INTERNAL_ERROR = 500 + SERVICE_UNAVAILABLE_ERROR = 503 + DATA_ERROR = 1030 + SCHEMA_ERROR = 1040 + +class AuditResponseCode(Enum): + """audit response codes""" + SUCCESS = 0 + PERMISSION_ERROR = 100 + AVAILABILITY_ERROR = 200 + DATA_ERROR = 300 + SCHEMA_ERROR = 400 + BUSINESS_PROCESS_ERROR = 500 + UNKNOWN_ERROR = 900 + + @staticmethod + def get_response_code(http_status_code): + """calculates the response_code from max_http_status_code""" + response_code = AuditResponseCode.UNKNOWN_ERROR + if http_status_code <= AuditHttpCode.HTTP_OK.value: + response_code = AuditResponseCode.SUCCESS + + elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, + AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]: + response_code = AuditResponseCode.PERMISSION_ERROR + elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value: + response_code = AuditResponseCode.AVAILABILITY_ERROR + elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: + response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR + elif http_status_code in [AuditHttpCode.DATA_ERROR.value, + AuditHttpCode.RESPONSE_ERROR.value, + AuditHttpCode.DATA_NOT_FOUND_ERROR.value]: + response_code = AuditResponseCode.DATA_ERROR + elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value: + response_code = AuditResponseCode.SCHEMA_ERROR + + return response_code + + @staticmethod + def get_human_text(response_code): + """convert enum name into human readable text""" + if not response_code: + return "unknown" + return response_code.name.lower().replace("_", " ") + +class Audit(object): + """put the audit object on stack per each initiating request in the system + + :request_id: is the X-ECOMP-RequestID for tracing + + :req_message: is the request message string for logging + + :aud_parent: is the parent request - used for sub-query metrics to other systems + + :kwargs: - put any request related params into kwargs + """ + _service_name = "" + _service_version = "" + _service_instance_uuid = str(uuid.uuid4()) + _started = datetime.now() + _logger_debug = None + _logger_error = None + _logger_metrics = None + _logger_audit = None + _health = Health() + _py_ver = sys.version.replace("\n", "") + + @staticmethod + def init(service_name, service_version, config_file_path): + """init static invariants and loggers""" + Audit._service_name = service_name + Audit._service_version = service_version + Audit._logger_debug = CommonLogger(config_file_path, "debug", \ + instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) + Audit._logger_error = CommonLogger(config_file_path, "error", \ + instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) + Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ + instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) + Audit._logger_audit = CommonLogger(config_file_path, "audit", \ + instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) + + @staticmethod + def health(): + """returns json for health check""" + now = datetime.now() + return { + "service_name" : Audit._service_name, + "service_version" : Audit._service_version, + "service_instance_UUID" : Audit._service_instance_uuid, + "python" : Audit._py_ver, + "started" : str(Audit._started), + "now" : str(now), + "uptime" : str(now - Audit._started), + "stats" : Audit._health.dump(), + "packages" : "N/A" # Audit._packages + } + + def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs): + """create audit object per each request in the system + + :request_id: is the X-ECOMP-RequestID for tracing + :req_message: is the request message string for logging + :aud_parent: is the parent Audit - used for sub-query metrics to other systems + :kwargs: - put any request related params into kwargs + """ + self.request_id = request_id + self.req_message = req_message or "" + self.aud_parent = aud_parent + self.kwargs = kwargs or {} + + self.retry_get_config = False + self.max_http_status_code = 0 + self._lock = Lock() + + if self.aud_parent: + if not self.request_id: + self.request_id = self.aud_parent.request_id + if not self.req_message: + self.req_message = self.aud_parent.req_message + self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs) + else: + headers = self.kwargs.get("headers", {}) + if headers: + if not self.request_id: + self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID) + if AUDIT_IPADDRESS not in self.kwargs: + self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR) + if AUDIT_SERVER not in self.kwargs: + self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST) + + if AUDIT_SERVER not in self.kwargs: + self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME) + + created_req = "" + if not self.request_id: + created_req = " with new" + self.request_id = str(uuid.uuid4()) + + self.kwargs[AUDIT_REQUESTID] = self.request_id + + self._started = time.time() + self._start_event = Audit._logger_audit.getStartRecordEvent() + self.metrics_start() + + if not self.aud_parent: + self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\ + .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) + + def merge_all_kwargs(self, **kwargs): + """returns the merge of copy of self.kwargs with the param kwargs""" + all_kwargs = self.kwargs.copy() + if kwargs: + all_kwargs.update(kwargs) + return all_kwargs + + def set_http_status_code(self, http_status_code): + """accumulate the highest(worst) http status code""" + self._lock.acquire() + if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: + self.max_http_status_code = max(http_status_code, self.max_http_status_code) + self._lock.release() + + def get_max_http_status_code(self): + """returns the highest(worst) http status code""" + self._lock.acquire() + max_http_status_code = self.max_http_status_code + self._lock.release() + return max_http_status_code + + @staticmethod + def get_status_code(success): + """COMPLETE versus ERROR""" + if success: + return 'COMPLETE' + return 'ERROR' + + @staticmethod + def hide_secrets(obj): + """hides the known secret field values of the dictionary""" + if not isinstance(obj, dict): + return obj + + for key in obj: + if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]: + obj[key] = "*" + elif isinstance(obj[key], dict): + obj[key] = Audit.hide_secrets(obj[key]) + + return obj + + @staticmethod + def log_json_dumps(obj, **kwargs): + """hide the known secret field values of the dictionary and return json.dumps""" + if not isinstance(obj, dict): + return json.dumps(obj, **kwargs) + + return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) + + def is_serious_error(self, status_code): + """returns whether the response_code is success and a human text for response code""" + return AuditResponseCode.PERMISSION_ERROR.value \ + == AuditResponseCode.get_response_code(status_code).value \ + or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value + + def _get_response_status(self): + """calculates the response status fields from max_http_status_code""" + max_http_status_code = self.get_max_http_status_code() + response_code = AuditResponseCode.get_response_code(max_http_status_code) + success = (response_code.value == AuditResponseCode.SUCCESS.value) + response_description = AuditResponseCode.get_human_text(response_code) + return success, max_http_status_code, response_code, response_description + + def is_success(self): + """returns whether the response_code is success and a human text for response code""" + success, _, _, _ = self._get_response_status() + return success + + def debug(self, log_line, **kwargs): + """debug - the debug=lowest level of logging""" + Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs)) + + def info(self, log_line, **kwargs): + """debug - the info level of logging""" + Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs)) + + def info_requested(self, result=None, **kwargs): + """info "requested ..." - the info level of logging""" + self.info("requested {0} {1}".format(self.req_message, result or ""), \ + **self.merge_all_kwargs(**kwargs)) + + def warn(self, log_line, **kwargs): + """debug+error - the warn level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + Audit._logger_debug.warn(log_line, **all_kwargs) + Audit._logger_error.warn(log_line, **all_kwargs) + + def error(self, log_line, **kwargs): + """debug+error - the error level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + Audit._logger_debug.error(log_line, **all_kwargs) + Audit._logger_error.error(log_line, **all_kwargs) + + def fatal(self, log_line, **kwargs): + """debug+error - the fatal level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + Audit._logger_debug.fatal(log_line, **all_kwargs) + Audit._logger_error.fatal(log_line, **all_kwargs) + + @staticmethod + def get_elapsed_time(started): + """returns the elapsed time since started in milliseconds""" + return int(round(1000 * (time.time() - started))) + + def metrics_start(self, log_line=None, **kwargs): + """reset metrics timing""" + self._metrics_started = time.time() + self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent() + if log_line: + self.info(log_line, **self.merge_all_kwargs(**kwargs)) + + def metrics(self, log_line, **kwargs): + """debug+metrics - the metrics=sub-audit level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + success, max_http_status_code, response_code, response_description = \ + self._get_response_status() + metrics_func = None + timer = Audit.get_elapsed_time(self._metrics_started) + if success: + log_line = "done: {0}".format(log_line) + self.info(log_line, **all_kwargs) + metrics_func = Audit._logger_metrics.info + Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) + else: + log_line = "failed: {0}".format(log_line) + self.error(log_line, errorCode=response_code.value, \ + errorDescription=response_description, **all_kwargs) + metrics_func = Audit._logger_metrics.error + Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) + + metrics_func(log_line, begTime=self._metrics_start_event, timer=timer, + statusCode=Audit.get_status_code(success), responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs + ) + + self.metrics_start() + return (success, max_http_status_code, response_description) + + def audit_done(self, result=None, **kwargs): + """debug+audit - the audit=top level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + success, max_http_status_code, response_code, response_description = \ + self._get_response_status() + log_line = "{0} {1}".format(self.req_message, result or "").strip() + audit_func = None + timer = Audit.get_elapsed_time(self._started) + if success: + log_line = "done: {0}".format(log_line) + self.info(log_line, **all_kwargs) + audit_func = Audit._logger_audit.info + Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) + else: + log_line = "failed: {0}".format(log_line) + self.error(log_line, errorCode=response_code.value, + errorDescription=response_description, **all_kwargs) + audit_func = Audit._logger_audit.error + Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) + + audit_func(log_line, begTime=self._start_event, timer=timer, + statusCode=Audit.get_status_code(success), + responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs + ) + + return (success, max_http_status_code, response_description) + # this line added to test diff --git a/oti/event-handler/otihandler/onap/health.py b/oti/event-handler/otihandler/onap/health.py new file mode 100644 index 0000000..39edc0d --- /dev/null +++ b/oti/event-handler/otihandler/onap/health.py @@ -0,0 +1,102 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""generic class to keep track of app health""" + +import uuid +from datetime import datetime +from threading import Lock + + +class HealthStats(object): + """keep track of stats for calls""" + def __init__(self, name): + """keep track of stats for metrics calls""" + self._name = name or "stats_" + str(uuid.uuid4()) + self._lock = Lock() + self._call_count = 0 + self._error_count = 0 + self._longest_timer = 0 + self._total_timer = 0 + self._last_success = None + self._last_error = None + + def dump(self): + """returns dict of stats""" + dump = None + with self._lock: + dump = { + "call_count" : self._call_count, + "error_count" : self._error_count, + "last_success" : str(self._last_success), + "last_error" : str(self._last_error), + "longest_timer_millisecs" : self._longest_timer, + "ave_timer_millisecs" : (float(self._total_timer)/self._call_count \ + if self._call_count else 0) + } + return dump + + def success(self, timer): + """records the successful execution""" + with self._lock: + self._call_count += 1 + self._last_success = datetime.now() + self._total_timer += timer + if not self._longest_timer or self._longest_timer < timer: + self._longest_timer = timer + + def error(self, timer): + """records the errored execution""" + with self._lock: + self._call_count += 1 + self._error_count += 1 + self._last_error = datetime.now() + self._total_timer += timer + if not self._longest_timer or self._longest_timer < timer: + self._longest_timer = timer + +class Health(object): + """Health stats for multiple requests""" + def __init__(self): + """Health stats for application""" + self._all_stats = {} + self._lock = Lock() + + def _add_or_get_stats(self, stats_name): + """add to or get from the ever growing dict of HealthStats""" + stats = None + with self._lock: + stats = self._all_stats.get(stats_name) + if not stats: + self._all_stats[stats_name] = stats = HealthStats(stats_name) + return stats + + def success(self, stats_name, timer): + """records the successful execution on stats_name""" + stats = self._add_or_get_stats(stats_name) + stats.success(timer) + + def error(self, stats_name, timer): + """records the error execution on stats_name""" + stats = self._add_or_get_stats(stats_name) + stats.error(timer) + + def dump(self): + """returns dict of stats""" + with self._lock: + stats = dict((k, v.dump()) for (k, v) in self._all_stats.items()) + + return stats diff --git a/oti/event-handler/otihandler/utils.py b/oti/event-handler/otihandler/utils.py new file mode 100644 index 0000000..4f9dbda --- /dev/null +++ b/oti/event-handler/otihandler/utils.py @@ -0,0 +1,83 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +import base64 +import collections +import copy +import os + +from Crypto import Random +from Crypto.Cipher import PKCS1_v1_5 +from Crypto.Hash import SHA +from Crypto.PublicKey import RSA + + +def update_dict(d, u): + """Recursively updates dict + + Update dict d with dict u + """ + for k, v in u.items(): + if isinstance(v, collections.Mapping): + r = update_dict(d.get(k, {}), v) + d[k] = r + else: + d[k] = u[k] + return d + +def replace_token(configure_content): + try: + with open("/opt/app/config-map/dcae-k8s-cluster-token",'r') as fh: + dcae_token = fh.readline().rstrip('\n') + + new_config = copy.deepcopy(configure_content) + + # override the default-user token + ix=0 + for user in new_config['users'][:]: + if user['name'] == "default-user": + new_config['users'][ix] = { + "name": "default-user", + "user": { + "token": dcae_token + } + } + ix += 1 + + return new_config + + except Exception as e: + return configure_content + +def decrypt(b64_ciphertext): + """returns decrypted b64_ciphertext that was encoded like this: + + echo "cleartext" | openssl pkeyutl -encrypt -pubin -inkey rsa.pub | base64 --wrap=0 + + requires private key in environment variable EOMUSER_PRIVATE + """ + + if len(b64_ciphertext) <= 30: # For transition, assume short values are not encrypted + return b64_ciphertext + + try: + ciphertext = base64.b64decode(b64_ciphertext) + key = RSA.importKey(os.getenv('EOMUSER_PRIVATE')) + cleartext = PKCS1_v1_5.new(key).decrypt(ciphertext, Random.new().read(15+SHA.digest_size)) + except Exception as e: + return b64_ciphertext + + return cleartext diff --git a/oti/event-handler/otihandler/web_server.py b/oti/event-handler/otihandler/web_server.py new file mode 100644 index 0000000..f3eb071 --- /dev/null +++ b/oti/event-handler/otihandler/web_server.py @@ -0,0 +1,603 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""web-service for oti_handler""" + +import json +import logging +import os +import time +from datetime import datetime + +import cherrypy + +from otihandler.cbs_rest import CBSRest +from otihandler.config import Config +from otihandler.dti_processor import DTIProcessor +from otihandler.onap.audit import Audit + + +class DTIWeb(object): + """run REST API of OTI Handler""" + + logger = logging.getLogger("oti_handler.web_server") + HOST_INADDR_ANY = ".".join("0"*4) + + @staticmethod + def run_forever(audit): + """run the web-server of OTI Handler forever""" + + cherrypy.config.update({"server.socket_host": DTIWeb.HOST_INADDR_ANY, + "server.socket_port": Config.wservice_port}) + + protocol = "http" + tls_info = "" + if Config.tls_server_cert_file and Config.tls_private_key_file: + tm_cert = os.path.getmtime(Config.tls_server_cert_file) + tm_key = os.path.getmtime(Config.tls_private_key_file) + #cherrypy.server.ssl_module = 'builtin' + cherrypy.server.ssl_module = 'pyOpenSSL' + cherrypy.server.ssl_certificate = Config.tls_server_cert_file + cherrypy.server.ssl_private_key = Config.tls_private_key_file + if Config.tls_server_ca_chain_file: + cherrypy.server.ssl_certificate_chain = Config.tls_server_ca_chain_file + protocol = "https" + tls_info = "cert: {} {} {}".format(Config.tls_server_cert_file, + Config.tls_private_key_file, + Config.tls_server_ca_chain_file) + + cherrypy.tree.mount(_DTIWeb(), '/') + + DTIWeb.logger.info( + "%s with config: %s", audit.info("running oti_handler as {}://{}:{} {}".format( + protocol, cherrypy.server.socket_host, cherrypy.server.socket_port, tls_info)), + json.dumps(cherrypy.config)) + cherrypy.engine.start() + + # If HTTPS server certificate changes, exit to let kubernetes restart us + if Config.tls_server_cert_file and Config.tls_private_key_file: + while True: + time.sleep(600) + c_tm_cert = os.path.getmtime(Config.tls_server_cert_file) + c_tm_key = os.path.getmtime(Config.tls_private_key_file) + if c_tm_cert > tm_cert or c_tm_key > tm_key: + DTIWeb.logger.info("cert or key file updated") + cherrypy.engine.stop() + cherrypy.engine.exit() + break + + +class _DTIWeb(object): + """REST API of DTI Handler""" + + VALID_EVENT_TYPES = ['deploy', 'undeploy', 'add', 'delete', 'update', 'notify'] + + @staticmethod + def _get_request_info(request): + """Returns info about the http request.""" + + return "{} {}{}".format(request.method, request.script_name, request.path_info) + + + #----- Common endpoint methods + + @cherrypy.expose + @cherrypy.tools.json_out() + def healthcheck(self): + """Returns healthcheck results.""" + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + + DTIWeb.logger.info("%s", req_info) + + result = Audit.health() + + DTIWeb.logger.info("healthcheck %s: result=%s", req_info, json.dumps(result)) + + audit.audit_done(result=json.dumps(result)) + return result + + @cherrypy.expose + def shutdown(self): + """Shutdown the web server.""" + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + + DTIWeb.logger.info("%s: --- stopping REST API of DTI Handler ---", req_info) + + cherrypy.engine.exit() + + health = json.dumps(Audit.health()) + audit.info("oti_handler health: {}".format(health)) + DTIWeb.logger.info("oti_handler health: %s", health) + DTIWeb.logger.info("%s: --------- the end -----------", req_info) + result = str(datetime.now()) + audit.info_requested(result) + return "goodbye! shutdown requested {}".format(result) + + # ----- DTI Handler mock endpoint methods + @cherrypy.expose + @cherrypy.tools.json_out() + @cherrypy.tools.json_in() + def mockevents(self): + + result = {"KubeNamespace":"com-my-dcae-test", "KubePod":"pod-0", "KubeServiceName":"pod-0.service.local", "KubeServicePort":"8880", "KubeClusterFqdn":"fqdn-1"} + + return result + + #----- DTI Handler endpoint methods + + @cherrypy.expose + @cherrypy.tools.json_out() + @cherrypy.tools.json_in() + def events(self, notify="y"): + """ + Run dti reconfig script in service component instances configured to accept the DTI Event. + + POST /events < <dcae_event> + + POST /events?ndtify="n" < <dcae_event> + + where <dcae_event> is the entire DTI Event passed as a JSON object and contains at least these top-level keys: + dcae_service_action : string + required, 'deploy' or 'undeploy' + dcae_target_name : string + required, VNF Instance ID + dcae_target_type : string + required, VNF Type of the VNF Instance + dcae_service_location : string + optional, CLLI location. Not provided or '' infers all locations. + + Parameters + ---------- + notify : string + optional, default "y", any of these will not notify components: [ "f", "false", "False", "FALSE", "n", "no" ] + When "n" will **not** notify components of this DTI Event update to Consul. + + Returns + ------- + dict + JSON object containing success or error of executing the dti reconfig script on + each component instance's docker container, keyed by service_component_name. + + """ + + if cherrypy.request.method != "POST": + raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) + + dti_event = cherrypy.request.json or {} + str_dti_event = json.dumps(dti_event) + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message="{}: {}".format(req_info, str_dti_event), \ + headers=cherrypy.request.headers) + DTIWeb.logger.info("%s: dti_event=%s headers=%s", \ + req_info, str_dti_event, json.dumps(cherrypy.request.headers)) + + dcae_service_action = dti_event.get('dcae_service_action') + if not dcae_service_action: + msg = 'dcae_service_action is missing' + DTIWeb.logger.error(msg) + raise cherrypy.HTTPError(400, msg) + elif dcae_service_action.lower() not in self.VALID_EVENT_TYPES: + msg = 'dcae_service_action is invalid' + DTIWeb.logger.error(msg) + raise cherrypy.HTTPError(400,msg) + + dcae_target_name = dti_event.get('dcae_target_name') + if not dcae_target_name: + msg = 'dcae_target_name is missing' + DTIWeb.logger.error(msg) + raise cherrypy.HTTPError(400, msg) + + dcae_target_type = dti_event.get('dcae_target_type', '') + if not dcae_target_type: + msg = 'dcae_target_type is missing' + DTIWeb.logger.error(msg) + raise cherrypy.HTTPError(400, msg) + + send_notification = True + if (isinstance(notify, bool) and not notify) or \ + (isinstance(notify, str) and notify.lower() in [ "f", "false", "n", "no" ]): + send_notification = False + + prc = DTIProcessor(dti_event, send_notification=send_notification) + result = prc.get_result() + + DTIWeb.logger.info("%s: dti_event=%s result=%s", \ + req_info, str_dti_event, json.dumps(result)) + + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) + if not success: + cherrypy.response.status = http_status_code + + return result + + def get_docker_events(self, request, service, location): + """ + common routine for dti_docker_events and oti_docker_events + + :param request: HTTP GET request + :param service: HTTP request query parameter for service name + :param location: HTTP request query parameter for location CLLI + :return: + """ + + if request.method != "GET": + raise cherrypy.HTTPError(404, "unexpected method {}".format(request.method)) + + req_info = _DTIWeb._get_request_info(request) + audit = Audit(req_message=req_info, headers=request.headers) + + return DTIProcessor.get_docker_raw_events(service, location) + + def get_k8s_events(self, request, **params): + """ + common routine for dti_k8s_events and oti_k8s_events + + :param request: HTTP GET request + :param params: HTTP request query parameters + :return: + """ + if request.method != "GET": + raise cherrypy.HTTPError(404, "unexpected method {}".format(request.method)) + + req_info = _DTIWeb._get_request_info(request) + audit = Audit(req_message=req_info, headers=request.headers) + + pod = request.params['pod'] + namespace = request.params['namespace'] + cluster = request.params['cluster'] + + return DTIProcessor.get_k8_raw_events(pod, cluster, namespace) + + @cherrypy.expose + @cherrypy.tools.json_out() + def oti_k8s_events(self, **params): + """ + Retrieve raw JSON events from application events database + + GET /oti_k8_events?pod=<sts-1>&namespace=<ns1>&cluster=<cluster1> + + Parameters + ---------- + pod ID : string + POD ID of the stateful set POD + namespace: string + kubernetes namespace + cluster: string + kubernetes cluster FQDN + + Returns + ------- + dict + JSON object containing the fully-bound configuration. + + """ + + return self.get_k8s_events(cherrypy.request, params) + + @cherrypy.expose + @cherrypy.tools.json_out() + def dti_k8s_events(self, **params): + """ + Retrieve raw JSON events from application events database + + GET /dti_k8_events?pod=<sts-1>&namespace=<ns1>&cluster=<cluster1> + + Parameters + ---------- + pod ID : string + POD ID of the stateful set POD + namespace: string + kubernetes namespace + cluster: string + kubernetes cluster FQDN + + Returns + ------- + dict + JSON object containing the fully-bound configuration. + + """ + + return self.get_k8s_events(cherrypy.request, params) + + @cherrypy.expose + @cherrypy.tools.json_out() + def oti_docker_events(self, service, location=None): + """ + Retrieve raw JSON events from application events database related to docker deployments + + GET /oti_docker_events?service=<svc>&location=<location> + + Parameters + ---------- + service : string + The service component name assigned by dockerplugin to the component + that is unique to the cloudify node instance and used in its Consul key(s). + location : string + optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location. + If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided, + otherwise results are not location filtered. + + Returns + ------- + dict + JSON object containing the fully-bound configuration. + + """ + + return self.get_docker_events(cherrypy.request, service, location) + + @cherrypy.expose + @cherrypy.tools.json_out() + def dti_docker_events(self, service, location=None): + """ + Retrieve raw JSON events from application events database related to docker deployments + + GET /dti_docker_events?service=<svc>&location=<location> + + Parameters + ---------- + service : string + The service component name assigned by dockerplugin to the component + that is unique to the cloudify node instance and used in its Consul key(s). + location : string + optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location. + If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided, + otherwise results are not location filtered. + + Returns + ------- + dict + JSON object containing the fully-bound configuration. + + """ + + return self.get_docker_events(cherrypy.request, service, location) + + #----- Config Binding Service (CBS) endpoint methods + + @cherrypy.expose + @cherrypy.popargs('service_name') + @cherrypy.tools.json_out() + def service_component(self, service_name): + """ + Retrieve fully-bound configuration for service_name from Consul KVs. + + GET /service_component/<service_name> + + Parameters + ---------- + service_name : string + The service component name assigned by dockerplugin to the component + that is unique to the cloudify node instance and used in its Consul key(s). + + Returns + ------- + dict + JSON object containing the fully-bound configuration. + + """ + + if cherrypy.request.method != "GET": + raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + DTIWeb.logger.info("%s: service_name=%s headers=%s", \ + req_info, service_name, json.dumps(cherrypy.request.headers)) + + try: + result = CBSRest.get_service_component(service_name) + except Exception as e: + result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} + audit.set_http_status_code(404) + + DTIWeb.logger.info("%s: service_name=%s result=%s", \ + req_info, service_name, json.dumps(result)) + + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) + if not success: + cherrypy.response.status = http_status_code + + return result + + @cherrypy.expose + @cherrypy.popargs('service_name') + @cherrypy.tools.json_out() + def service_component_all(self, service_name, service_location=None, policy_ids="y"): + """ + Retrieve all information for service_name (config, dti, dti_events, and policies) from Consul KVs. + + GET /service_component_all/<service_name> + + GET /service_component_all/<service_name>?service_location=<service_location> + + GET /service_component_all/<service_name>?service_location=<service_location>;policy_ids=n + + Parameters + ---------- + service_name : string + The service component name assigned by dockerplugin to the component + that is unique to the cloudify node instance and used in its Consul key(s). + service_location : string + optional, allows multiple values separated by commas. + Filters DTI events with dcae_service_location in service_location. + policy_ids : string + optional, default "y", any of these will unset: [ "f", "false", "False", "FALSE", "n", "no" ] + When unset, formats policies items as a list (without policy_ids) rather than as an object indexed by policy_id. + + Returns + ------- + dict + JSON object containing all information for component service_name. + The top-level keys may include the following: + config : dict + The cloudify node's application_config property from when the start workflow was executed. + dti : dict + Keys are VNF Types that the component currently is assigned to monitor. Policy can change them. + dti_events : dict + The latest deploy DTI events, keyed by VNF Type and sub-keyed by VNF Instance ID. + policies : dict + event : dict + Contains information about when the policies folder was last written. + items : dict + Contains all policy bodies for the service_name component, keyed by policy_id. + + """ + + if cherrypy.request.method != "GET": + raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + DTIWeb.logger.info("%s: service_name=%s headers=%s", \ + req_info, service_name, json.dumps(cherrypy.request.headers)) + + policies_as_list = False + if (isinstance(policy_ids, bool) and not policy_ids) or \ + (isinstance(policy_ids, str) and policy_ids.lower() in [ "f", "false", "n", "no" ]): + policies_as_list = True + try: + result = CBSRest.get_service_component_all(service_name, service_location=service_location, policies_as_list=policies_as_list) + except Exception as e: + result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} + audit.set_http_status_code(404) + + DTIWeb.logger.info("%s: service_name=%s result=%s", \ + req_info, service_name, json.dumps(result)) + + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) + if not success: + cherrypy.response.status = http_status_code + + return result + + @cherrypy.expose + @cherrypy.popargs('service_name') + @cherrypy.tools.json_out() + def dti(self, service_name=None, vnf_type=None, vnf_id=None, service_location=None): + """ + Retrieve current (latest, not undeployed) DTI events from Consul KVs. + + GET /dti/<service_name> + + GET /dti/<service_name>?vnf_type=<vnf_type>;vnf_id=<vnf_id>;service_location=<service_location> + + GET /dti + + GET /dti?vnf_type=<vnf_type>;vnf_id=<vnf_id>;service_location=<service_location> + + Parameters + ---------- + service_name : string + optional. The service component name assigned by dockerplugin to the component + that is unique to the cloudify node instance and used in its Consul key(s). + vnf_type : string + optional, allows multiple values separated by commas. Gets DTI events for these vnf_type(s). + vnf_id : string + optional. Requires vnf_type also. Gets DTI event for this vnf_id. + service_location : string + optional, allows multiple values separated by commas. + Filters DTI events with dcae_service_location in service_location. + + Returns + ------- + dict + Dictionary of DTI event(s). + If one vnf_type and vnf_id are both specified, then object returned will be just the one DTI event. + If one vnf_type is specified but not vnf_id, then DTI events will be keyed by vnf_id. + Otherwise the DTI events will be keyed by vnf_type, sub-keyed by vnf_id. + + """ + + if cherrypy.request.method != "GET": + raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + DTIWeb.logger.info("%s: service_name=%s headers=%s", \ + req_info, service_name, json.dumps(cherrypy.request.headers)) + + try: + result = CBSRest.get_dti(service_name=service_name, vnf_type=vnf_type, vnf_id=vnf_id, service_location=service_location) + except Exception as e: + result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} + audit.set_http_status_code(404) + + DTIWeb.logger.info("%s: service_name=%s result=%s", \ + req_info, service_name, json.dumps(result)) + + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) + if not success: + cherrypy.response.status = http_status_code + + return result + + @cherrypy.expose + @cherrypy.popargs('service_name') + @cherrypy.tools.json_out() + def policies(self, service_name, policy_id=None): + """ + Retrieve policies for service_name from Consul KVs. + + GET /policies/<service_name> + + GET /policies/<service_name>?policy_id=<policy_id> + + Parameters + ---------- + service_name : string + The service component name assigned by dockerplugin to the component + that is unique to the cloudify node instance and used in its Consul key(s). + policy_id : string + optional. Limits returned policy to this policy_id. + + Returns + ------- + dict + JSON object containing policy bodies for the service_name component. + If policy_id is specified, then object returned will be just the one policy body. + If policy_id is not specified, then object will contain all policy bodies, keyed by policy_id. + + """ + + if cherrypy.request.method != "GET": + raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + DTIWeb.logger.info("%s: service_name=%s headers=%s", \ + req_info, service_name, json.dumps(cherrypy.request.headers)) + + try: + result = CBSRest.get_policies(service_name, policy_id=policy_id) + except Exception as e: + result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} + audit.set_http_status_code(404) + + DTIWeb.logger.info("%s: service_name=%s result=%s", \ + req_info, service_name, json.dumps(result)) + + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) + if not success: + cherrypy.response.status = http_status_code + + return result |