diff options
Diffstat (limited to 'oti/event-handler/otihandler')
-rw-r--r-- | oti/event-handler/otihandler/cfy_client.py | 10 | ||||
-rw-r--r-- | oti/event-handler/otihandler/config.py | 1 | ||||
-rw-r--r-- | oti/event-handler/otihandler/consul_client.py | 20 | ||||
-rw-r--r-- | oti/event-handler/otihandler/dti_processor.py | 142 | ||||
-rw-r--r-- | oti/event-handler/otihandler/onap/CommonLogger.py | 2 | ||||
-rw-r--r-- | oti/event-handler/otihandler/web_server.py | 42 |
6 files changed, 110 insertions, 107 deletions
diff --git a/oti/event-handler/otihandler/cfy_client.py b/oti/event-handler/otihandler/cfy_client.py index c823340..4e3de87 100644 --- a/oti/event-handler/otihandler/cfy_client.py +++ b/oti/event-handler/otihandler/cfy_client.py @@ -168,7 +168,7 @@ class CfyClient(object): if not password: raise CfyClientConsulError("{} value is missing 'cloudify.password'".format(source)) - b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), 'utf-8')).decode("utf-8") + b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), "utf-8")).decode("utf-8") headers = {'Authorization': 'Basic ' + b64_encoded_str.rstrip('\n')} #headers = {'Authorization': 'Basic ' + '{}:{}'.format(username, password).encode("base64").rstrip('\n')} @@ -416,8 +416,8 @@ class CfyClient(object): continue # Check if the collector supports this VNF Type - # scn:dti Consul key is authoritative for vnfTypes that a collector supports (not docker_config) - dti_key = scn + ':dti' + # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config) + dti_key = scn + ':oti' try: obj = ConsulClient.get_value(dti_key) except Exception as e: @@ -531,8 +531,8 @@ class CfyClient(object): continue # Check if the collector supports this VNF Type - # scn:dti Consul key is authoritative for vnfTypes that a collector supports (not docker_config) - dti_key = scn + ':dti' + # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config) + dti_key = scn + ':oti' try: obj = ConsulClient.get_value(dti_key) except Exception as e: diff --git a/oti/event-handler/otihandler/config.py b/oti/event-handler/otihandler/config.py index d5149cc..5c87f43 100644 --- a/oti/event-handler/otihandler/config.py +++ b/oti/event-handler/otihandler/config.py @@ -24,6 +24,7 @@ import os from otihandler.consul_client import ConsulClient +os.makedirs('logs', exist_ok=True) logging.basicConfig( filename='logs/oti_handler.log', \ format='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \ diff --git a/oti/event-handler/otihandler/consul_client.py b/oti/event-handler/otihandler/consul_client.py index d26d3a1..1b25f3e 100644 --- a/oti/event-handler/otihandler/consul_client.py +++ b/oti/event-handler/otihandler/consul_client.py @@ -410,7 +410,7 @@ class ConsulClient(object): # key = urllib.quote(key) # can't use urllib.quote() because it kills ':' in the key if value: - return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(value)}} + return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(bytes(value, "utf-8")).decode("utf-8")}} return {"KV": {"Verb": verb, "Key": key}} @@ -450,7 +450,7 @@ class ConsulClient(object): """put kvs into consul-kv""" if not kvs: - ConsulClient._logger.warn("kvs not supplied to store_kvs()") + ConsulClient._logger.warning("kvs not supplied to store_kvs()") return store_kvs = [ @@ -474,7 +474,7 @@ class ConsulClient(object): """delete key from consul-kv""" if not key: - ConsulClient._logger.warn("key not supplied to delete_key()") + ConsulClient._logger.warning("key not supplied to delete_key()") return delete_key = [ @@ -488,7 +488,7 @@ class ConsulClient(object): """delete key from consul-kv""" if not key: - ConsulClient._logger.warn("key not supplied to delete_kvs()") + ConsulClient._logger.warning("key not supplied to delete_kvs()") return delete_kvs = [ @@ -555,16 +555,16 @@ class ConsulClient(object): @staticmethod def add_vnf_id(scn, vnf_type, vnf_id, dti_dict): """ - Add VNF instance to Consul scn:dti key. + Add VNF instance to Consul scn:oti key. Treat its value as a JSON string representing a dict. Extend the dict by adding a dti_dict for vnf_id under vnf_type. Turn the resulting extended dict into a JSON string. - Store the string back into Consul under scn:dti key. + Store the string back into Consul under scn:oti key. Watch out for conflicting concurrent updates. """ - key = scn + ':dti' + key = scn + ':oti' lc_vnf_type = vnf_type.lower() while True: # do until update succeeds (mod_index, v) = ConsulClient.get_value(key, get_index=True) @@ -583,16 +583,16 @@ class ConsulClient(object): @staticmethod def delete_vnf_id(scn, vnf_type, vnf_id): """ - Delete VNF instance from Consul scn:dti key. + Delete VNF instance from Consul scn:oti key. Treat its value as a JSON string representing a dict. Modify the dict by deleting the vnf_id key entry from under vnf_type. Turn the resulting extended dict into a JSON string. - Store the string back into Consul under scn:dti key. + Store the string back into Consul under scn:oti key. Watch out for conflicting concurrent updates. """ - key = scn + ':dti' + key = scn + ':oti' lc_vnf_type = vnf_type.lower() while True: # do until update succeeds (mod_index, v) = ConsulClient.get_value(key, get_index=True) diff --git a/oti/event-handler/otihandler/dti_processor.py b/oti/event-handler/otihandler/dti_processor.py index 970e020..eb0eb99 100644 --- a/oti/event-handler/otihandler/dti_processor.py +++ b/oti/event-handler/otihandler/dti_processor.py @@ -36,51 +36,51 @@ lock = Lock() K8S_CLUSTER_PROXY_NODE_PORT = '30132' -def notify_docker(args_tuple): - """ - event notification executor inside a process pool to communicate with docker container - interacts with docker client library - """ - (dti_event, db_access, ack_item) = args_tuple - try: - dcae_service_action = dti_event.get('dcae_service_action') - component_scn = ack_item.service_component - deployment_id = ack_item.deployment_id - container_id = ack_item.container_id - docker_host = ack_item.docker_host - reconfig_script = ack_item.reconfig_script - container_type = 'docker' - except Exception as e: - return ( - "ERROR", "dti_processor.notify_docker processing args got exception {}: {!s}".format(type(e).__name__, e)) - what = "" - try: - what = "{} in {} container {} on {} that was deployed by {}".format( - reconfig_script, container_type, container_id, docker_host, deployment_id) - if dcae_service_action == 'add': - add_action = {"dcae_service_action": "deploy"} - dti_event.update(add_action) - - if dcae_service_action == 'delete': - add_action = {"dcae_service_action": "undeploy"} - dti_event.update(add_action) - - # dkr = DockerClient(docker_host, reauth=False) - result = '' - # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ]) - if dti_event.get('dcae_service_action') == 'undeploy': - # delete from dti_event_ack table - try: - db_access.deleteDomainObject(ack_item) - except Exception as e: - msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warn(msg) - return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) - else: - return (component_scn, "ran {}, got: {!s}".format(what, result)) - - except Exception as e: - return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) +# def notify_docker(args_tuple): +# """ +# event notification executor inside a process pool to communicate with docker container +# interacts with docker client library +# """ +# (dti_event, db_access, ack_item) = args_tuple +# try: +# dcae_service_action = dti_event.get('dcae_service_action') +# component_scn = ack_item.service_component +# deployment_id = ack_item.deployment_id +# container_id = ack_item.container_id +# docker_host = ack_item.docker_host +# reconfig_script = ack_item.reconfig_script +# container_type = 'docker' +# except Exception as e: +# return ( +# "ERROR", "dti_processor.notify_docker() processing args got exception {}: {!s}".format(type(e).__name__, e)) +# what = "" +# try: +# what = "{} in {} container {} on {} that was deployed by {}".format( +# reconfig_script, container_type, container_id, docker_host, deployment_id) +# if dcae_service_action == 'add': +# add_action = {"dcae_service_action": "deploy"} +# dti_event.update(add_action) +# +# if dcae_service_action == 'delete': +# add_action = {"dcae_service_action": "undeploy"} +# dti_event.update(add_action) +# +# # dkr = DockerClient(docker_host, reauth=False) +# result = '' +# # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ]) +# if dti_event.get('dcae_service_action') == 'undeploy': +# # delete from dti_event_ack table +# try: +# db_access.deleteDomainObject(ack_item) +# except Exception as e: +# msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e) +# DTIProcessor.logger.warning(msg) +# return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) +# else: +# return (component_scn, "ran {}, got: {!s}".format(what, result)) +# +# except Exception as e: +# return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) def notify_svc(args_tuple): @@ -103,7 +103,7 @@ def notify_svc(args_tuple): reconfig_script = res_tuple[7] container_type = res_tuple[8] except Exception as e: - return ("ERROR", "oti_processor.notify processing args got exception {}: {!s}".format(type(e).__name__, e)) + return ("ERROR", "dti_processor.notify_svc() processing args got exception {}: {!s}".format(type(e).__name__, e)) what = "" if container_type == "docker": @@ -136,7 +136,7 @@ def notify_svc(args_tuple): db_access.saveDomainObject(upd_evt_ack) except Exception as e: msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) else: DTIProcessor.logger.debug("running {}".format(what)) @@ -152,7 +152,7 @@ def notify_svc(args_tuple): db_access.saveDomainObject(add_evt_ack) except Exception as e: msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) else: # remove from dtih_event_ack if present @@ -163,28 +163,28 @@ def notify_svc(args_tuple): db_access.deleteDomainObject(del_evt_ack) except Exception as e: msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) except Exception as e: return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) return (component_scn, "ran {}, got: {!s}".format(what, result)) elif container_type == "k8s": - DTIProcessor.logger.debug("DTIProcessor.notify_svc() handling k8s component") + DTIProcessor.logger.debug("dti_processor.notify_svc() handling k8s component") # if action is 'update', check if k8s pod info exists already for this event in app db if dcae_service_action == 'add': - DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for add action") + DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for add action") return notify_k8s((dti_event, db_access, curr_evt, res_tuple)) elif dcae_service_action == 'update': # handle update for pods being tracked and handle add for new pods k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn) if k8s_scn_result is not None: # update - DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for update action") + DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for update action") return notify_k8s_pod((dti_event, db_access, k8s_scn_result)) else: # add - DTIProcessor.logger.debug("DTIProcessor.notify_svc(), convert update to add action in k8s ") + DTIProcessor.logger.debug("dti_processor.notify_svc(), convert update to add action in k8s ") add_action = {"dcae_service_action": "add"} dti_event.update(add_action) return notify_k8s((dti_event, db_access, curr_evt, res_tuple)) @@ -248,7 +248,7 @@ def notify_k8s(args_tuple): return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info)) except Exception as e: msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) @@ -304,7 +304,7 @@ def notify_pods(args_tuple): notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info))) except Exception as e: with lock: - notify_response_arr.append (("ERROR", "dti_processor.notify() processing args got exception {}: {!s}".format(type(e).__name__, e))) + notify_response_arr.append (("ERROR", "dti_processor.notify_pods() processing args got exception {}: {!s}".format(type(e).__name__, e))) def notify_k8s_pod(args_tuple): """ @@ -339,7 +339,7 @@ def notify_k8s_pod(args_tuple): db_access.deleteDomainObject(ack_item) except Exception as e: msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) else: try: @@ -347,7 +347,7 @@ def notify_k8s_pod(args_tuple): db_access.saveDomainObject(ack_item) except Exception as e: msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) return (component_scn, "ran {}, got: {!s}".format(what, response.json())) @@ -448,12 +448,12 @@ class DTIProcessor(object): result = ConsulClient.delete_key(key) except Exception as e: msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) self._result['WARNING'] = msg else: if not result: msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) self._result['WARNING'] = msg def deploy(self): @@ -467,7 +467,7 @@ class DTIProcessor(object): result = ConsulClient.store_kvs({dep_key: self.event}) except Exception as e: msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) self._result['WARNING'] = msg def add(self): @@ -484,7 +484,7 @@ class DTIProcessor(object): self.db_access.saveDomainObject(self.prim_db_event) except Exception as e: msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) self._result['ERROR'] = msg raise Exception(msg) else: @@ -546,7 +546,7 @@ class DTIProcessor(object): dcae_service_location=self.event_clli)) )) except Exception as e: - msg = "DTIProcessor._add() running pool.map() got exception {}: {!s}".format(type(e).__name__, e) + msg = "DTIProcessor.add_replay() running pool.map() got exception {}: {!s}".format(type(e).__name__, e) DTIProcessor.logger.error(msg) self._result['ERROR'] = msg return res_dict @@ -564,7 +564,7 @@ class DTIProcessor(object): try: msg = "processing delete event for {}/{} to relate with any docker hosts".format( self.target_type, self.target_name) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) res_dict_docker = dict(self.docker_pool.map(notify_svc, ((self.event, self.db_access, self.prim_db_event, tp) for tp @@ -581,7 +581,7 @@ class DTIProcessor(object): try: msg = "processing delete event for {}/{} to relate with any k8s hosts".format( self.target_type, self.target_name) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) if self.prim_db_event is not None: result = self.db_access.query_event_data_k8s(self.target_type, self.target_name) res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, ( @@ -596,11 +596,11 @@ class DTIProcessor(object): self.db_access.deleteDomainObject(self.prim_db_event) except Exception as e: msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) self._result['ERROR'] = msg except Exception as e: msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) self._result['ERROR'] = msg if res_dict_k8s is not None: @@ -629,7 +629,7 @@ class DTIProcessor(object): msg = "processing update event for {}/{}, but event distribution info is not found in database, " \ "replaying this event to cluster if required". \ format(self.target_type, self.target_name) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) self._result['WARNING'] = msg res_dict = self.add_replay() else: @@ -654,7 +654,7 @@ class DTIProcessor(object): # event is new for the handler msg = "processing update event for {}/{}, but current event info is not found in database, " \ "executing add event".format(self.target_type, self.target_name) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) self._result['WARNING'] = msg res_dict = self.add() except Exception as e: @@ -686,7 +686,7 @@ class DTIProcessor(object): self.db_access.saveDomainObject(self.prim_db_event) except Exception as e: msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) self._result['ERROR'] = msg try: @@ -696,7 +696,7 @@ class DTIProcessor(object): res_dict[k] = v except Exception as e: msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args) - DTIProcessor.logger.warn(msg) + DTIProcessor.logger.warning(msg) self._result['WARNING'] = msg return res_dict @@ -785,7 +785,7 @@ class DTIProcessor(object): pass try: - supported_types = ConsulClient.get_value(service_name + ":dti") + supported_types = ConsulClient.get_value(service_name + ":oti") except: return r_dict else: diff --git a/oti/event-handler/otihandler/onap/CommonLogger.py b/oti/event-handler/otihandler/onap/CommonLogger.py index 644534d..3b5b477 100644 --- a/oti/event-handler/otihandler/onap/CommonLogger.py +++ b/oti/event-handler/otihandler/onap/CommonLogger.py @@ -781,7 +781,7 @@ class CommonLogger: print("error creating %s directory to hold %s logfile: %s" %(folder, filename, str(err))) sys.exit(2) -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover def __checkOneTime(line): format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[+]00:00[|]' diff --git a/oti/event-handler/otihandler/web_server.py b/oti/event-handler/otihandler/web_server.py index f3eb071..45c407f 100644 --- a/oti/event-handler/otihandler/web_server.py +++ b/oti/event-handler/otihandler/web_server.py @@ -180,6 +180,8 @@ class _DTIWeb(object): if cherrypy.request.method != "POST": raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) + msg = "" + dti_event = cherrypy.request.json or {} str_dti_event = json.dumps(dti_event) @@ -192,38 +194,38 @@ class _DTIWeb(object): dcae_service_action = dti_event.get('dcae_service_action') if not dcae_service_action: msg = 'dcae_service_action is missing' - DTIWeb.logger.error(msg) - raise cherrypy.HTTPError(400, msg) elif dcae_service_action.lower() not in self.VALID_EVENT_TYPES: msg = 'dcae_service_action is invalid' - DTIWeb.logger.error(msg) - raise cherrypy.HTTPError(400,msg) dcae_target_name = dti_event.get('dcae_target_name') - if not dcae_target_name: + if not msg and not dcae_target_name: msg = 'dcae_target_name is missing' - DTIWeb.logger.error(msg) - raise cherrypy.HTTPError(400, msg) dcae_target_type = dti_event.get('dcae_target_type', '') - if not dcae_target_type: + if not msg and not dcae_target_type: msg = 'dcae_target_type is missing' - DTIWeb.logger.error(msg) - raise cherrypy.HTTPError(400, msg) - send_notification = True - if (isinstance(notify, bool) and not notify) or \ - (isinstance(notify, str) and notify.lower() in [ "f", "false", "n", "no" ]): - send_notification = False + if msg: + result = {"ERROR": msg} + + DTIWeb.logger.error("%s: dti_event=%s result=%s", \ + req_info, str_dti_event, json.dumps(result)) + else: + send_notification = True + if (isinstance(notify, bool) and not notify) or \ + (isinstance(notify, str) and notify.lower() in [ "f", "false", "n", "no" ]): + send_notification = False - prc = DTIProcessor(dti_event, send_notification=send_notification) - result = prc.get_result() + prc = DTIProcessor(dti_event, send_notification=send_notification) + result = prc.get_result() - DTIWeb.logger.info("%s: dti_event=%s result=%s", \ - req_info, str_dti_event, json.dumps(result)) + DTIWeb.logger.info("%s: dti_event=%s result=%s", \ + req_info, str_dti_event, json.dumps(result)) success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) - if not success: + if msg: + cherrypy.response.status = "400 Bad Request" + elif not success: cherrypy.response.status = http_status_code return result @@ -537,7 +539,7 @@ class _DTIWeb(object): req_info, service_name, json.dumps(cherrypy.request.headers)) try: - result = CBSRest.get_dti(service_name=service_name, vnf_type=vnf_type, vnf_id=vnf_id, service_location=service_location) + result = CBSRest.get_oti(service_name=service_name, vnf_type=vnf_type, vnf_id=vnf_id, service_location=service_location) except Exception as e: result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} audit.set_http_status_code(404) |