summaryrefslogtreecommitdiffstats
path: root/oti/event-handler/otihandler
diff options
context:
space:
mode:
Diffstat (limited to 'oti/event-handler/otihandler')
-rw-r--r--oti/event-handler/otihandler/cfy_client.py10
-rw-r--r--oti/event-handler/otihandler/config.py1
-rw-r--r--oti/event-handler/otihandler/consul_client.py20
-rw-r--r--oti/event-handler/otihandler/dti_processor.py142
-rw-r--r--oti/event-handler/otihandler/onap/CommonLogger.py2
-rw-r--r--oti/event-handler/otihandler/web_server.py42
6 files changed, 110 insertions, 107 deletions
diff --git a/oti/event-handler/otihandler/cfy_client.py b/oti/event-handler/otihandler/cfy_client.py
index c823340..4e3de87 100644
--- a/oti/event-handler/otihandler/cfy_client.py
+++ b/oti/event-handler/otihandler/cfy_client.py
@@ -168,7 +168,7 @@ class CfyClient(object):
if not password:
raise CfyClientConsulError("{} value is missing 'cloudify.password'".format(source))
- b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), 'utf-8')).decode("utf-8")
+ b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), "utf-8")).decode("utf-8")
headers = {'Authorization': 'Basic ' + b64_encoded_str.rstrip('\n')}
#headers = {'Authorization': 'Basic ' + '{}:{}'.format(username, password).encode("base64").rstrip('\n')}
@@ -416,8 +416,8 @@ class CfyClient(object):
continue
# Check if the collector supports this VNF Type
- # scn:dti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
- dti_key = scn + ':dti'
+ # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
+ dti_key = scn + ':oti'
try:
obj = ConsulClient.get_value(dti_key)
except Exception as e:
@@ -531,8 +531,8 @@ class CfyClient(object):
continue
# Check if the collector supports this VNF Type
- # scn:dti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
- dti_key = scn + ':dti'
+ # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
+ dti_key = scn + ':oti'
try:
obj = ConsulClient.get_value(dti_key)
except Exception as e:
diff --git a/oti/event-handler/otihandler/config.py b/oti/event-handler/otihandler/config.py
index d5149cc..5c87f43 100644
--- a/oti/event-handler/otihandler/config.py
+++ b/oti/event-handler/otihandler/config.py
@@ -24,6 +24,7 @@ import os
from otihandler.consul_client import ConsulClient
+os.makedirs('logs', exist_ok=True)
logging.basicConfig(
filename='logs/oti_handler.log', \
format='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \
diff --git a/oti/event-handler/otihandler/consul_client.py b/oti/event-handler/otihandler/consul_client.py
index d26d3a1..1b25f3e 100644
--- a/oti/event-handler/otihandler/consul_client.py
+++ b/oti/event-handler/otihandler/consul_client.py
@@ -410,7 +410,7 @@ class ConsulClient(object):
# key = urllib.quote(key) # can't use urllib.quote() because it kills ':' in the key
if value:
- return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(value)}}
+ return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(bytes(value, "utf-8")).decode("utf-8")}}
return {"KV": {"Verb": verb, "Key": key}}
@@ -450,7 +450,7 @@ class ConsulClient(object):
"""put kvs into consul-kv"""
if not kvs:
- ConsulClient._logger.warn("kvs not supplied to store_kvs()")
+ ConsulClient._logger.warning("kvs not supplied to store_kvs()")
return
store_kvs = [
@@ -474,7 +474,7 @@ class ConsulClient(object):
"""delete key from consul-kv"""
if not key:
- ConsulClient._logger.warn("key not supplied to delete_key()")
+ ConsulClient._logger.warning("key not supplied to delete_key()")
return
delete_key = [
@@ -488,7 +488,7 @@ class ConsulClient(object):
"""delete key from consul-kv"""
if not key:
- ConsulClient._logger.warn("key not supplied to delete_kvs()")
+ ConsulClient._logger.warning("key not supplied to delete_kvs()")
return
delete_kvs = [
@@ -555,16 +555,16 @@ class ConsulClient(object):
@staticmethod
def add_vnf_id(scn, vnf_type, vnf_id, dti_dict):
"""
- Add VNF instance to Consul scn:dti key.
+ Add VNF instance to Consul scn:oti key.
Treat its value as a JSON string representing a dict.
Extend the dict by adding a dti_dict for vnf_id under vnf_type.
Turn the resulting extended dict into a JSON string.
- Store the string back into Consul under scn:dti key.
+ Store the string back into Consul under scn:oti key.
Watch out for conflicting concurrent updates.
"""
- key = scn + ':dti'
+ key = scn + ':oti'
lc_vnf_type = vnf_type.lower()
while True: # do until update succeeds
(mod_index, v) = ConsulClient.get_value(key, get_index=True)
@@ -583,16 +583,16 @@ class ConsulClient(object):
@staticmethod
def delete_vnf_id(scn, vnf_type, vnf_id):
"""
- Delete VNF instance from Consul scn:dti key.
+ Delete VNF instance from Consul scn:oti key.
Treat its value as a JSON string representing a dict.
Modify the dict by deleting the vnf_id key entry from under vnf_type.
Turn the resulting extended dict into a JSON string.
- Store the string back into Consul under scn:dti key.
+ Store the string back into Consul under scn:oti key.
Watch out for conflicting concurrent updates.
"""
- key = scn + ':dti'
+ key = scn + ':oti'
lc_vnf_type = vnf_type.lower()
while True: # do until update succeeds
(mod_index, v) = ConsulClient.get_value(key, get_index=True)
diff --git a/oti/event-handler/otihandler/dti_processor.py b/oti/event-handler/otihandler/dti_processor.py
index 970e020..eb0eb99 100644
--- a/oti/event-handler/otihandler/dti_processor.py
+++ b/oti/event-handler/otihandler/dti_processor.py
@@ -36,51 +36,51 @@ lock = Lock()
K8S_CLUSTER_PROXY_NODE_PORT = '30132'
-def notify_docker(args_tuple):
- """
- event notification executor inside a process pool to communicate with docker container
- interacts with docker client library
- """
- (dti_event, db_access, ack_item) = args_tuple
- try:
- dcae_service_action = dti_event.get('dcae_service_action')
- component_scn = ack_item.service_component
- deployment_id = ack_item.deployment_id
- container_id = ack_item.container_id
- docker_host = ack_item.docker_host
- reconfig_script = ack_item.reconfig_script
- container_type = 'docker'
- except Exception as e:
- return (
- "ERROR", "dti_processor.notify_docker processing args got exception {}: {!s}".format(type(e).__name__, e))
- what = ""
- try:
- what = "{} in {} container {} on {} that was deployed by {}".format(
- reconfig_script, container_type, container_id, docker_host, deployment_id)
- if dcae_service_action == 'add':
- add_action = {"dcae_service_action": "deploy"}
- dti_event.update(add_action)
-
- if dcae_service_action == 'delete':
- add_action = {"dcae_service_action": "undeploy"}
- dti_event.update(add_action)
-
- # dkr = DockerClient(docker_host, reauth=False)
- result = ''
- # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ])
- if dti_event.get('dcae_service_action') == 'undeploy':
- # delete from dti_event_ack table
- try:
- db_access.deleteDomainObject(ack_item)
- except Exception as e:
- msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warn(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
- else:
- return (component_scn, "ran {}, got: {!s}".format(what, result))
-
- except Exception as e:
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+# def notify_docker(args_tuple):
+# """
+# event notification executor inside a process pool to communicate with docker container
+# interacts with docker client library
+# """
+# (dti_event, db_access, ack_item) = args_tuple
+# try:
+# dcae_service_action = dti_event.get('dcae_service_action')
+# component_scn = ack_item.service_component
+# deployment_id = ack_item.deployment_id
+# container_id = ack_item.container_id
+# docker_host = ack_item.docker_host
+# reconfig_script = ack_item.reconfig_script
+# container_type = 'docker'
+# except Exception as e:
+# return (
+# "ERROR", "dti_processor.notify_docker() processing args got exception {}: {!s}".format(type(e).__name__, e))
+# what = ""
+# try:
+# what = "{} in {} container {} on {} that was deployed by {}".format(
+# reconfig_script, container_type, container_id, docker_host, deployment_id)
+# if dcae_service_action == 'add':
+# add_action = {"dcae_service_action": "deploy"}
+# dti_event.update(add_action)
+#
+# if dcae_service_action == 'delete':
+# add_action = {"dcae_service_action": "undeploy"}
+# dti_event.update(add_action)
+#
+# # dkr = DockerClient(docker_host, reauth=False)
+# result = ''
+# # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ])
+# if dti_event.get('dcae_service_action') == 'undeploy':
+# # delete from dti_event_ack table
+# try:
+# db_access.deleteDomainObject(ack_item)
+# except Exception as e:
+# msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+# DTIProcessor.logger.warning(msg)
+# return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+# else:
+# return (component_scn, "ran {}, got: {!s}".format(what, result))
+#
+# except Exception as e:
+# return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
def notify_svc(args_tuple):
@@ -103,7 +103,7 @@ def notify_svc(args_tuple):
reconfig_script = res_tuple[7]
container_type = res_tuple[8]
except Exception as e:
- return ("ERROR", "oti_processor.notify processing args got exception {}: {!s}".format(type(e).__name__, e))
+ return ("ERROR", "dti_processor.notify_svc() processing args got exception {}: {!s}".format(type(e).__name__, e))
what = ""
if container_type == "docker":
@@ -136,7 +136,7 @@ def notify_svc(args_tuple):
db_access.saveDomainObject(upd_evt_ack)
except Exception as e:
msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
else:
DTIProcessor.logger.debug("running {}".format(what))
@@ -152,7 +152,7 @@ def notify_svc(args_tuple):
db_access.saveDomainObject(add_evt_ack)
except Exception as e:
msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
else:
# remove from dtih_event_ack if present
@@ -163,28 +163,28 @@ def notify_svc(args_tuple):
db_access.deleteDomainObject(del_evt_ack)
except Exception as e:
msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
except Exception as e:
return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
return (component_scn, "ran {}, got: {!s}".format(what, result))
elif container_type == "k8s":
- DTIProcessor.logger.debug("DTIProcessor.notify_svc() handling k8s component")
+ DTIProcessor.logger.debug("dti_processor.notify_svc() handling k8s component")
# if action is 'update', check if k8s pod info exists already for this event in app db
if dcae_service_action == 'add':
- DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for add action")
+ DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for add action")
return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
elif dcae_service_action == 'update':
# handle update for pods being tracked and handle add for new pods
k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn)
if k8s_scn_result is not None:
# update
- DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for update action")
+ DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for update action")
return notify_k8s_pod((dti_event, db_access, k8s_scn_result))
else:
# add
- DTIProcessor.logger.debug("DTIProcessor.notify_svc(), convert update to add action in k8s ")
+ DTIProcessor.logger.debug("dti_processor.notify_svc(), convert update to add action in k8s ")
add_action = {"dcae_service_action": "add"}
dti_event.update(add_action)
return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
@@ -248,7 +248,7 @@ def notify_k8s(args_tuple):
return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info))
except Exception as e:
msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
@@ -304,7 +304,7 @@ def notify_pods(args_tuple):
notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info)))
except Exception as e:
with lock:
- notify_response_arr.append (("ERROR", "dti_processor.notify() processing args got exception {}: {!s}".format(type(e).__name__, e)))
+ notify_response_arr.append (("ERROR", "dti_processor.notify_pods() processing args got exception {}: {!s}".format(type(e).__name__, e)))
def notify_k8s_pod(args_tuple):
"""
@@ -339,7 +339,7 @@ def notify_k8s_pod(args_tuple):
db_access.deleteDomainObject(ack_item)
except Exception as e:
msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
else:
try:
@@ -347,7 +347,7 @@ def notify_k8s_pod(args_tuple):
db_access.saveDomainObject(ack_item)
except Exception as e:
msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
return (component_scn, "ran {}, got: {!s}".format(what, response.json()))
@@ -448,12 +448,12 @@ class DTIProcessor(object):
result = ConsulClient.delete_key(key)
except Exception as e:
msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
self._result['WARNING'] = msg
else:
if not result:
msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
self._result['WARNING'] = msg
def deploy(self):
@@ -467,7 +467,7 @@ class DTIProcessor(object):
result = ConsulClient.store_kvs({dep_key: self.event})
except Exception as e:
msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
self._result['WARNING'] = msg
def add(self):
@@ -484,7 +484,7 @@ class DTIProcessor(object):
self.db_access.saveDomainObject(self.prim_db_event)
except Exception as e:
msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
self._result['ERROR'] = msg
raise Exception(msg)
else:
@@ -546,7 +546,7 @@ class DTIProcessor(object):
dcae_service_location=self.event_clli))
))
except Exception as e:
- msg = "DTIProcessor._add() running pool.map() got exception {}: {!s}".format(type(e).__name__, e)
+ msg = "DTIProcessor.add_replay() running pool.map() got exception {}: {!s}".format(type(e).__name__, e)
DTIProcessor.logger.error(msg)
self._result['ERROR'] = msg
return res_dict
@@ -564,7 +564,7 @@ class DTIProcessor(object):
try:
msg = "processing delete event for {}/{} to relate with any docker hosts".format(
self.target_type, self.target_name)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
res_dict_docker = dict(self.docker_pool.map(notify_svc,
((self.event, self.db_access, self.prim_db_event, tp)
for tp
@@ -581,7 +581,7 @@ class DTIProcessor(object):
try:
msg = "processing delete event for {}/{} to relate with any k8s hosts".format(
self.target_type, self.target_name)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
if self.prim_db_event is not None:
result = self.db_access.query_event_data_k8s(self.target_type, self.target_name)
res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, (
@@ -596,11 +596,11 @@ class DTIProcessor(object):
self.db_access.deleteDomainObject(self.prim_db_event)
except Exception as e:
msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
self._result['ERROR'] = msg
except Exception as e:
msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
self._result['ERROR'] = msg
if res_dict_k8s is not None:
@@ -629,7 +629,7 @@ class DTIProcessor(object):
msg = "processing update event for {}/{}, but event distribution info is not found in database, " \
"replaying this event to cluster if required". \
format(self.target_type, self.target_name)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
self._result['WARNING'] = msg
res_dict = self.add_replay()
else:
@@ -654,7 +654,7 @@ class DTIProcessor(object):
# event is new for the handler
msg = "processing update event for {}/{}, but current event info is not found in database, " \
"executing add event".format(self.target_type, self.target_name)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
self._result['WARNING'] = msg
res_dict = self.add()
except Exception as e:
@@ -686,7 +686,7 @@ class DTIProcessor(object):
self.db_access.saveDomainObject(self.prim_db_event)
except Exception as e:
msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
self._result['ERROR'] = msg
try:
@@ -696,7 +696,7 @@ class DTIProcessor(object):
res_dict[k] = v
except Exception as e:
msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warn(msg)
+ DTIProcessor.logger.warning(msg)
self._result['WARNING'] = msg
return res_dict
@@ -785,7 +785,7 @@ class DTIProcessor(object):
pass
try:
- supported_types = ConsulClient.get_value(service_name + ":dti")
+ supported_types = ConsulClient.get_value(service_name + ":oti")
except:
return r_dict
else:
diff --git a/oti/event-handler/otihandler/onap/CommonLogger.py b/oti/event-handler/otihandler/onap/CommonLogger.py
index 644534d..3b5b477 100644
--- a/oti/event-handler/otihandler/onap/CommonLogger.py
+++ b/oti/event-handler/otihandler/onap/CommonLogger.py
@@ -781,7 +781,7 @@ class CommonLogger:
print("error creating %s directory to hold %s logfile: %s" %(folder, filename, str(err)))
sys.exit(2)
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
def __checkOneTime(line):
format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[+]00:00[|]'
diff --git a/oti/event-handler/otihandler/web_server.py b/oti/event-handler/otihandler/web_server.py
index f3eb071..45c407f 100644
--- a/oti/event-handler/otihandler/web_server.py
+++ b/oti/event-handler/otihandler/web_server.py
@@ -180,6 +180,8 @@ class _DTIWeb(object):
if cherrypy.request.method != "POST":
raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method))
+ msg = ""
+
dti_event = cherrypy.request.json or {}
str_dti_event = json.dumps(dti_event)
@@ -192,38 +194,38 @@ class _DTIWeb(object):
dcae_service_action = dti_event.get('dcae_service_action')
if not dcae_service_action:
msg = 'dcae_service_action is missing'
- DTIWeb.logger.error(msg)
- raise cherrypy.HTTPError(400, msg)
elif dcae_service_action.lower() not in self.VALID_EVENT_TYPES:
msg = 'dcae_service_action is invalid'
- DTIWeb.logger.error(msg)
- raise cherrypy.HTTPError(400,msg)
dcae_target_name = dti_event.get('dcae_target_name')
- if not dcae_target_name:
+ if not msg and not dcae_target_name:
msg = 'dcae_target_name is missing'
- DTIWeb.logger.error(msg)
- raise cherrypy.HTTPError(400, msg)
dcae_target_type = dti_event.get('dcae_target_type', '')
- if not dcae_target_type:
+ if not msg and not dcae_target_type:
msg = 'dcae_target_type is missing'
- DTIWeb.logger.error(msg)
- raise cherrypy.HTTPError(400, msg)
- send_notification = True
- if (isinstance(notify, bool) and not notify) or \
- (isinstance(notify, str) and notify.lower() in [ "f", "false", "n", "no" ]):
- send_notification = False
+ if msg:
+ result = {"ERROR": msg}
+
+ DTIWeb.logger.error("%s: dti_event=%s result=%s", \
+ req_info, str_dti_event, json.dumps(result))
+ else:
+ send_notification = True
+ if (isinstance(notify, bool) and not notify) or \
+ (isinstance(notify, str) and notify.lower() in [ "f", "false", "n", "no" ]):
+ send_notification = False
- prc = DTIProcessor(dti_event, send_notification=send_notification)
- result = prc.get_result()
+ prc = DTIProcessor(dti_event, send_notification=send_notification)
+ result = prc.get_result()
- DTIWeb.logger.info("%s: dti_event=%s result=%s", \
- req_info, str_dti_event, json.dumps(result))
+ DTIWeb.logger.info("%s: dti_event=%s result=%s", \
+ req_info, str_dti_event, json.dumps(result))
success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
- if not success:
+ if msg:
+ cherrypy.response.status = "400 Bad Request"
+ elif not success:
cherrypy.response.status = http_status_code
return result
@@ -537,7 +539,7 @@ class _DTIWeb(object):
req_info, service_name, json.dumps(cherrypy.request.headers))
try:
- result = CBSRest.get_dti(service_name=service_name, vnf_type=vnf_type, vnf_id=vnf_id, service_location=service_location)
+ result = CBSRest.get_oti(service_name=service_name, vnf_type=vnf_type, vnf_id=vnf_id, service_location=service_location)
except Exception as e:
result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)}
audit.set_http_status_code(404)