summaryrefslogtreecommitdiffstats
path: root/oti/event-handler/otihandler/dti_processor.py
diff options
context:
space:
mode:
authorKotagiri, Ramprasad (rp5662) <rp5662@att.com>2019-12-19 17:41:16 -0500
committerKotagiri, Ramprasad (rp5662) <rp5662@att.com>2020-01-21 16:50:17 -0500
commit158b75abd6095a3155f5057832ec868bc99ced36 (patch)
treed374ba4adcfa6db9a036cb2bf018fe529c215eee /oti/event-handler/otihandler/dti_processor.py
parent77900bb3097491cd9fca964c111ea70724e53989 (diff)
Add OTI event-handler project
OTI event handler application in DCAEGEN2 platform Change-Id: Ie64f26f851e2045f00043f90279d503c2dc62948 Issue-ID: DCAEGEN2-1910 Signed-off-by: Kotagiri, Ramprasad (rp5662) <rp5662@att.com>
Diffstat (limited to 'oti/event-handler/otihandler/dti_processor.py')
-rw-r--r--oti/event-handler/otihandler/dti_processor.py815
1 files changed, 815 insertions, 0 deletions
diff --git a/oti/event-handler/otihandler/dti_processor.py b/oti/event-handler/otihandler/dti_processor.py
new file mode 100644
index 0000000..970e020
--- /dev/null
+++ b/oti/event-handler/otihandler/dti_processor.py
@@ -0,0 +1,815 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""OTI Event processor for handling all the event types"""
+
+import copy
+import json
+import logging
+from multiprocessing.dummy import Pool as ThreadPool
+from threading import Lock
+
+import requests
+
+from otihandler import utils
+from otihandler.cfy_client import CfyClient
+from otihandler.consul_client import ConsulClient
+from otihandler.dbclient.apis import EventDbAccess
+from otihandler.dbclient.models import Event, EventAck
+from otihandler.docker_client import DockerClient
+
+notify_response_arr = []
+lock = Lock()
+K8S_CLUSTER_PROXY_NODE_PORT = '30132'
+
+
+def notify_docker(args_tuple):
+ """
+ event notification executor inside a process pool to communicate with docker container
+ interacts with docker client library
+ """
+ (dti_event, db_access, ack_item) = args_tuple
+ try:
+ dcae_service_action = dti_event.get('dcae_service_action')
+ component_scn = ack_item.service_component
+ deployment_id = ack_item.deployment_id
+ container_id = ack_item.container_id
+ docker_host = ack_item.docker_host
+ reconfig_script = ack_item.reconfig_script
+ container_type = 'docker'
+ except Exception as e:
+ return (
+ "ERROR", "dti_processor.notify_docker processing args got exception {}: {!s}".format(type(e).__name__, e))
+ what = ""
+ try:
+ what = "{} in {} container {} on {} that was deployed by {}".format(
+ reconfig_script, container_type, container_id, docker_host, deployment_id)
+ if dcae_service_action == 'add':
+ add_action = {"dcae_service_action": "deploy"}
+ dti_event.update(add_action)
+
+ if dcae_service_action == 'delete':
+ add_action = {"dcae_service_action": "undeploy"}
+ dti_event.update(add_action)
+
+ # dkr = DockerClient(docker_host, reauth=False)
+ result = ''
+ # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ])
+ if dti_event.get('dcae_service_action') == 'undeploy':
+ # delete from dti_event_ack table
+ try:
+ db_access.deleteDomainObject(ack_item)
+ except Exception as e:
+ msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+ else:
+ return (component_scn, "ran {}, got: {!s}".format(what, result))
+
+ except Exception as e:
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+
+
+def notify_svc(args_tuple):
+ """
+ add/update/delete event handler
+ event notification executor inside a process pool to communicate with docker container and k8s services
+ interacts with docker client library
+ interacts with k8s node port services using REST client
+ """
+ (orig_dti_event, db_access, curr_evt, res_tuple) = args_tuple
+ dti_event = copy.deepcopy(orig_dti_event)
+ try:
+ dcae_service_action = dti_event.get('dcae_service_action').lower()
+
+ component_scn = res_tuple[0]
+ deployment_id = res_tuple[1]
+ container_id = res_tuple[2]
+ node_id = res_tuple[3]
+ docker_host = res_tuple[6]
+ reconfig_script = res_tuple[7]
+ container_type = res_tuple[8]
+ except Exception as e:
+ return ("ERROR", "oti_processor.notify processing args got exception {}: {!s}".format(type(e).__name__, e))
+
+ what = ""
+ if container_type == "docker":
+ # exec reconfigure.sh in docker container
+ try:
+ what = "{} in {} container {} on {} that was deployed by {} node {}".format(
+ reconfig_script, container_type, container_id, docker_host, deployment_id, node_id)
+ if dcae_service_action == 'add':
+ add_action = {"dcae_service_action": "deploy"}
+ dti_event.update(add_action)
+
+ if dcae_service_action == 'delete':
+ add_action = {"dcae_service_action": "undeploy"}
+ dti_event.update(add_action)
+
+ dkr = DockerClient(docker_host, reauth=False)
+ result = ''
+ if dti_event.get('dcae_service_action') == 'update':
+ # undeploy + deploy
+ DTIProcessor.logger.debug("update 1 - running undeploy {}".format(what))
+ dti_event.update({"dcae_service_action": "undeploy"})
+ result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
+ DTIProcessor.logger.debug("update 2 - running deploy {}".format(what))
+ dti_event.update({"dcae_service_action": "deploy"})
+ result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
+ try:
+ upd_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
+ container_id)
+ upd_evt_ack.update_action('update')
+ db_access.saveDomainObject(upd_evt_ack)
+ except Exception as e:
+ msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+ else:
+ DTIProcessor.logger.debug("running {}".format(what))
+ result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
+ if dti_event.get('dcae_service_action') == 'deploy':
+ # add into dti_event_ack table
+ try:
+ add_evt_ack = EventAck(service_component=component_scn, deployment_id=deployment_id,
+ container_type='docker', docker_host=docker_host,
+ container_id=container_id, reconfig_script=reconfig_script,
+ event=curr_evt,
+ action='add')
+ db_access.saveDomainObject(add_evt_ack)
+ except Exception as e:
+ msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+ else:
+ # remove from dtih_event_ack if present
+ if curr_evt is not None:
+ try:
+ del_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
+ container_id)
+ db_access.deleteDomainObject(del_evt_ack)
+ except Exception as e:
+ msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+ except Exception as e:
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+
+ return (component_scn, "ran {}, got: {!s}".format(what, result))
+ elif container_type == "k8s":
+ DTIProcessor.logger.debug("DTIProcessor.notify_svc() handling k8s component")
+ # if action is 'update', check if k8s pod info exists already for this event in app db
+ if dcae_service_action == 'add':
+ DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for add action")
+ return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
+ elif dcae_service_action == 'update':
+ # handle update for pods being tracked and handle add for new pods
+ k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn)
+ if k8s_scn_result is not None:
+ # update
+ DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for update action")
+ return notify_k8s_pod((dti_event, db_access, k8s_scn_result))
+ else:
+ # add
+ DTIProcessor.logger.debug("DTIProcessor.notify_svc(), convert update to add action in k8s ")
+ add_action = {"dcae_service_action": "add"}
+ dti_event.update(add_action)
+ return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
+
+
+def notify_k8s(args_tuple):
+ """
+ add event handler
+ event notification executor inside a process pool to communicate with k8s statefulset nodeport service
+ uses REST API client to call k8s services
+ """
+ (dti_event, db_access, curr_evt, res_tuple) = args_tuple
+ component_scn = res_tuple[0]
+ deployment_id = res_tuple[1]
+ node_id = res_tuple[3]
+ container_type = res_tuple[8]
+ service_address = res_tuple[9]
+ service_port = res_tuple[10]
+ what = "{} in {} deployment {} that was deployed by {} node {}".format(
+ "add", container_type, "statefulset", deployment_id, node_id)
+ # call scn node port service REST API
+ svc_nodeport_url = "https://{}:{}".format(service_address, service_port)
+ try:
+ DTIProcessor.logger.debug("running {}".format(what))
+ response = requests.put(svc_nodeport_url, json=dti_event, timeout=50)
+ response.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ msg = "collector nodeport service({}) threw exception {}: {!s}".format(
+ svc_nodeport_url, type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+ try:
+ event_ack_info = response.json()
+ except Exception as e:
+ msg = "collector nodeport service({}) threw exception {}: {!s}".format(
+ svc_nodeport_url, type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+
+ if not event_ack_info:
+ msg = "collector nodeport service returned bad data"
+ DTIProcessor.logger.error(msg)
+ return (component_scn, "collector nodeport service returned bad data")
+
+ namespace = event_ack_info.get("KubeNamespace")
+ svc_name = event_ack_info.get("KubeServiceName")
+ svc_port = event_ack_info.get("KubeServicePort")
+ proxy_fqdn = event_ack_info.get("KubeProxyFqdn")
+ cluster_fqdn = event_ack_info.get("KubeClusterFqdn")
+ pod_name = event_ack_info.get("KubePod")
+ statefulset = pod_name[0:pod_name.rindex('-')]
+
+ what = "{} in {} deployment {} in namespace {} that was deployed by {} node {}".format(
+ "add", container_type, statefulset, namespace, deployment_id, node_id)
+ try:
+ add_evt_ack = EventAck(k8s_namespace=namespace, k8s_service_name=svc_name, deployment_id=deployment_id,
+ k8s_service_port=svc_port, k8s_cluster_fqdn=cluster_fqdn, k8s_proxy_fqdn=proxy_fqdn,
+ k8s_pod_id=pod_name, event=curr_evt, action='add', container_type='k8s',
+ service_component=component_scn)
+ db_access.saveDomainObject(add_evt_ack)
+ return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info))
+ except Exception as e:
+ msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+
+
+def notify_pods(args_tuple):
+ """
+ notify event handler
+ event notification executor inside a process pool to communicate with k8s DTIH proxy nodeport service
+ uses REST API client to call k8s services
+ """
+ event_ack_info = ''
+ (dti_event, res_tuple) = args_tuple
+ try:
+ cluster = res_tuple[0]
+ port = K8S_CLUSTER_PROXY_NODE_PORT
+ namespace = res_tuple[1]
+ svc_name = res_tuple[2]
+ svc_port = res_tuple[4]
+ replicas = res_tuple[3]
+
+ for replica in range(replicas):
+ pod_id = "sts-{}-{}".format(svc_name, replica)
+ item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(cluster, port, namespace,
+ pod_id, svc_name,
+ svc_port)
+ what = "{} for pod id {} in cluster {} and namespace {}".format("notify", pod_id, cluster, namespace)
+ try:
+ DTIProcessor.logger.debug("running {}".format(what))
+ response = requests.put(item_pod_url, json=dti_event, timeout=50)
+ response.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
+ item_pod_url, type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ with lock:
+ notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
+ else:
+ try:
+ event_ack_info = response.json()
+ except Exception as e:
+ msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
+ item_pod_url, type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ with lock:
+ notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
+
+ if not event_ack_info:
+ msg = "stateful set proxy service returned bad data"
+ DTIProcessor.logger.error(msg)
+ with lock:
+ notify_response_arr.append ((pod_id, "no acknowledgement - running {}".format(what)))
+
+ with lock:
+ notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info)))
+ except Exception as e:
+ with lock:
+ notify_response_arr.append (("ERROR", "dti_processor.notify() processing args got exception {}: {!s}".format(type(e).__name__, e)))
+
+def notify_k8s_pod(args_tuple):
+ """
+ update event handler
+ event notification executor inside a process pool to communicate with k8s DTIH proxy service
+ uses REST API client to call k8s services
+ """
+ item_pod_url = ''
+ component_scn = ''
+ (dti_event, db_access, ack_item) = args_tuple
+ # call ingress proxy to dispatch delete event
+
+ action = dti_event.get('dcae_service_action')
+ what = "{} to {} ID {} in namespace {} that was deployed in cluster {}".format(
+ action, 'k8s pod', ack_item.k8s_pod_id, ack_item.k8s_namespace, ack_item.k8s_cluster_fqdn)
+ try:
+ DTIProcessor.logger.debug("running {}".format(what))
+ item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(
+ ack_item.k8s_proxy_fqdn, K8S_CLUSTER_PROXY_NODE_PORT, ack_item.k8s_namespace,
+ ack_item.k8s_pod_id, ack_item.k8s_service_name, ack_item.k8s_service_port)
+ component_scn = ack_item.service_component
+ response = requests.put(item_pod_url, json=dti_event, timeout=50)
+ response.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ msg = "exception occured, stateful set proxy service({}) threw {}: {!s}".format(
+ item_pod_url, type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ return (component_scn, "ran {}, got: {!s}".format(what, msg))
+ else:
+ if action == 'delete':
+ try:
+ db_access.deleteDomainObject(ack_item)
+ except Exception as e:
+ msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+ else:
+ try:
+ ack_item.update_action('update')
+ db_access.saveDomainObject(ack_item)
+ except Exception as e:
+ msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+
+ return (component_scn, "ran {}, got: {!s}".format(what, response.json()))
+
+
+class DTIProcessor(object):
+ """
+ Main event processing class that encapsulates all the logic of this handler application!
+ An instance of this class is created per incoming client request.
+
+ Generates input data by querying platform services - cloudify, consul, postgresSql
+
+ It creates a pool of worker processes using a multiprocessing Pool class instance.
+ Tasks are offloaded to the worker processes that exist in the pool.
+ The input data is distributed across processes of the Pool object to enable parallel execution of
+ event notification function across multiple input values (data parallelism).
+ """
+
+ logger = logging.getLogger("oti_handler.dti_processor")
+ K8S_CLUSTER_PROXY_NODE_PORT = '30132'
+ db_access = None
+ docker_pool = None
+ k8s_pool = None
+
+ def __init__(self, dti_event, send_notification=True):
+ self._result = {}
+ self.event = dti_event
+ self.is_notify = send_notification
+ self.action = dti_event.get('dcae_service_action').lower()
+ self.target_name = dti_event.get('dcae_target_name')
+ self.target_type = dti_event.get('dcae_target_type', '').lower()
+ self.event_clli = dti_event.get('dcae_service_location')
+ res_dict = None
+ try:
+ self.docker_pool = ThreadPool(8)
+ self.k8s_pool = ThreadPool(8)
+ except Exception as e:
+ msg = "DTIProcessor.__init__() creating ThreadPool got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+ raise e
+ else:
+ self.db_access = EventDbAccess()
+ self.prim_db_event = None
+ try:
+ res_dict = self.dispatcher()
+ except:
+ raise
+ finally:
+ try:
+ self.docker_pool.close()
+ self.k8s_pool.close()
+ except Exception as e:
+ msg = "DTIProcessor.__init__() running pool.close() got exception {}: {!s}".format(type(e).__name__,
+ e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+ try:
+ self.docker_pool.join()
+ self.k8s_pool.join()
+ except Exception as e:
+ msg = "DTIProcessor.__init__() running pool.join() got exception {}: {!s}".format(type(e).__name__,
+ e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+
+ # if not send_notification:
+ # DTIProcessor._logger.debug("DTIProcessor.__init__() not notifying DCAE-Controller components")
+ # return
+
+ if res_dict:
+ try:
+ utils.update_dict(self._result, res_dict)
+ except Exception as e:
+ msg = "DTIProcessor.__init__() running utils.update_dict() got exception {}: {!s}".format(
+ type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+
+ DTIProcessor.logger.debug("DTIProcessor.__init__() done notifying new DCAE-Controller components")
+
+ def dispatcher(self):
+ """ dispatch method to execute specific method based on event type """
+
+ arg = str(self.action)
+ method = getattr(self, arg, lambda: "Invalid action")
+ return method()
+
+ def undeploy(self):
+ """
+ delete event from consul KV store, this functionality will be retired as events are stored
+ in postgresSql oti database
+ """
+ global key
+ try:
+ # update Consul KV store with DTI Event - storing them in a folder for all components
+ key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
+ result = ConsulClient.delete_key(key)
+ except Exception as e:
+ msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ self._result['WARNING'] = msg
+ else:
+ if not result:
+ msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name)
+ DTIProcessor.logger.warn(msg)
+ self._result['WARNING'] = msg
+
+ def deploy(self):
+ """
+ add event to consul KV store, this functionality will be retired as events are stored
+ in postgresSql oti database
+ """
+ dep_key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
+ try:
+ # update Consul KV store with DTI Event - storing them in a folder for all components
+ result = ConsulClient.store_kvs({dep_key: self.event})
+ except Exception as e:
+ msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ self._result['WARNING'] = msg
+
+ def add(self):
+ """
+ process DTI event that contains a new VNF instance that has to be configured in the collector microservices
+ """
+ res_dict = None
+ try:
+ msg = "processing add event for {}/{}".format(self.target_type, self.target_name)
+ DTIProcessor.logger.debug(msg)
+ # insert add event into dtih_event table
+ self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
+ location_clli=self.event_clli)
+ self.db_access.saveDomainObject(self.prim_db_event)
+ except Exception as e:
+ msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args)
+ DTIProcessor.logger.warn(msg)
+ self._result['ERROR'] = msg
+ raise Exception(msg)
+ else:
+ if self.is_notify:
+ try:
+ # force the action to add, to avoid bad things later
+ add_action = {"dcae_service_action": "add"}
+ self.event.update(add_action)
+ # mock up data
+ mock_tp11 = (
+ "scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
+ "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
+ "dcae-d1.idns.cip.corp.com", "30996")
+ mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
+ "docker_node_instance_id1",
+ "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
+ "dcae-d1.idns.cip.corp.com", "30996")
+ # tpl_arr = []
+ # tpl_arr.append(mock_tp11)
+ # tpl_arr.append(mock_tp12)
+ # res_dict = dict(self.docker_pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
+ res_dict = dict(self.docker_pool.map(notify_svc,
+ ((self.event, self.db_access, self.prim_db_event, tp) for tp in
+ CfyClient().iter_components(self.target_type,
+ dcae_service_location=self.event_clli))
+ ))
+ except Exception as e:
+ msg = "DTIProcessor.__init__() running pool.map() got exception {}: {!s}".format(type(e).__name__,
+ e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+ return res_dict
+
+ def add_replay(self):
+ """
+ convert an update event flow and replay as an add event type since the event acknowledgement is missing
+ from application database
+ """
+ res_dict = None
+ try:
+ # force the action to add, to avoid bad things later
+ add_action = {"dcae_service_action": "add"}
+ self.event.update(add_action)
+ # mock up data
+ mock_tp11 = ("scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
+ "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
+ "dcae-d1.idns.cip.corp.com", "30996")
+ mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
+ "docker_node_instance_id1",
+ "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
+ "dcae-d1.idns.cip.corp.com", "30996")
+ # tpl_arr = []
+ # tpl_arr.append(mock_tp11)
+ # tpl_arr.append(mock_tp12)
+ # res_dict = dict(self.pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
+ res_dict = dict(self.docker_pool.map(notify_svc,
+ ((self.event, self.db_access, self.prim_db_event, tp) for tp in
+ CfyClient().iter_components(self.target_type,
+ dcae_service_location=self.event_clli))
+ ))
+ except Exception as e:
+ msg = "DTIProcessor._add() running pool.map() got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+ return res_dict
+
+ def delete(self):
+ """
+ process DTI event that indicates a VNF instance has to be removed from the collector microservices
+ """
+ res_dict = {}
+ res_dict_k8s = {}
+ res_dict_docker = {}
+ try:
+ self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
+ if self.is_notify:
+ try:
+ msg = "processing delete event for {}/{} to relate with any docker hosts".format(
+ self.target_type, self.target_name)
+ DTIProcessor.logger.warn(msg)
+ res_dict_docker = dict(self.docker_pool.map(notify_svc,
+ ((self.event, self.db_access, self.prim_db_event, tp)
+ for tp
+ in CfyClient().iter_components_for_docker(
+ self.target_type,
+ dcae_service_location=self.event_clli))
+ ))
+ except Exception as e:
+ msg = "DTIProcessor.delete() running docker_pool.map() got exception {}: {!s}".format(type(e).__name__,
+ e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+
+ try:
+ msg = "processing delete event for {}/{} to relate with any k8s hosts".format(
+ self.target_type, self.target_name)
+ DTIProcessor.logger.warn(msg)
+ if self.prim_db_event is not None:
+ result = self.db_access.query_event_data_k8s(self.target_type, self.target_name)
+ res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, (
+ ((self.event, self.db_access, ack_item) for ack_item in result))))
+ except Exception as e:
+ msg = "DTIProcessor.delete() running k8s_pool.map() got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+
+ try:
+ if self.prim_db_event is not None:
+ self.db_access.deleteDomainObject(self.prim_db_event)
+ except Exception as e:
+ msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args)
+ DTIProcessor.logger.warn(msg)
+ self._result['ERROR'] = msg
+ except Exception as e:
+ msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args)
+ DTIProcessor.logger.warn(msg)
+ self._result['ERROR'] = msg
+
+ if res_dict_k8s is not None:
+ utils.update_dict(res_dict, res_dict_k8s)
+
+ if res_dict_docker is not None:
+ utils.update_dict(res_dict, res_dict_docker)
+
+ return res_dict
+
+ def update(self):
+ """
+ process DTI event that indicates VNF instance has to be updated in the collector microservices
+ """
+ res_dict = {}
+ res_dict_k8s = {}
+ res_dict_docker = {}
+
+ if self.is_notify:
+ try:
+ self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
+ if self.prim_db_event is not None:
+ self.db_access.update_event_item(self.event, self.target_type, self.target_name)
+ result = self.db_access.query_event_data(self.target_type, self.target_name)
+ if len(result) == 0:
+ msg = "processing update event for {}/{}, but event distribution info is not found in database, " \
+ "replaying this event to cluster if required". \
+ format(self.target_type, self.target_name)
+ DTIProcessor.logger.warn(msg)
+ self._result['WARNING'] = msg
+ res_dict = self.add_replay()
+ else:
+ msg = "DTIProcessor.update() handle update flow for {}/{}, for k8s rediscover scn list and" \
+ "identify new vs update cases".format(self.target_type, self.target_name)
+ DTIProcessor.logger.debug(msg)
+ try:
+ tpl_arr = CfyClient().iter_components(self.target_type,
+ dcae_service_location=self.event_clli)
+ res_dict_docker = dict(self.docker_pool.map(notify_svc,
+ ((
+ self.event, self.db_access,
+ self.prim_db_event,
+ tp)
+ for tp in tpl_arr)))
+ except Exception as e:
+ msg = "DTIProcessor.update() running docker_pool.map() got exception {}: {!s}".format(
+ type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+ else:
+ # event is new for the handler
+ msg = "processing update event for {}/{}, but current event info is not found in database, " \
+ "executing add event".format(self.target_type, self.target_name)
+ DTIProcessor.logger.warn(msg)
+ self._result['WARNING'] = msg
+ res_dict = self.add()
+ except Exception as e:
+ msg = "DTIProcessor.update() got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+
+ if res_dict_k8s is not None:
+ utils.update_dict(res_dict, res_dict_k8s)
+
+ if res_dict_docker is not None:
+ utils.update_dict(res_dict, res_dict_docker)
+
+ return res_dict
+
+ def notify(self):
+ """
+ event handler to notify all the pods in the kubernetes cluster whose FQDN is present in the incoming event
+ This notification is meant for the cluster failover.
+ """
+ res_dict = {}
+ try:
+ self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
+ if self.prim_db_event is not None:
+ self.db_access.update_event_item(self.event, self.target_type, self.target_name)
+ else:
+ self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
+ location_clli=self.event_clli)
+ self.db_access.saveDomainObject(self.prim_db_event)
+ except Exception as e:
+ msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
+ DTIProcessor.logger.warn(msg)
+ self._result['ERROR'] = msg
+
+ try:
+ self.k8s_pool.map(notify_pods, ((self.event, tp) for tp in
+ CfyClient().query_k8_components(self.target_name)))
+ for k, v in notify_response_arr:
+ res_dict[k] = v
+ except Exception as e:
+ msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
+ DTIProcessor.logger.warn(msg)
+ self._result['WARNING'] = msg
+
+ return res_dict
+
+ def get_result(self):
+ return self._result
+
+ @classmethod
+ def get_k8_raw_events(cls, pod, cluster, namespace):
+ """
+ Get DTI events for a k8 stateful set pod container
+
+ :param pod: required
+ k8s stateful set pod ID that was configured with a specific set of DTI Events
+ :param cluster: required
+ k8s cluster FQDN where the mS was deployed
+ :param namespace: required
+ k8s namespace where the stateful set was deployed in that namespace
+ :return:
+ Dictionary of DTI event(s).
+ DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
+ """
+ db_access = EventDbAccess()
+ results = db_access.query_raw_k8_events(cluster, pod, namespace)
+
+ target_types = set([])
+ outer_dict = {}
+
+ for evnt_item in results:
+ target_types.add(evnt_item.target_type)
+
+ for targ_type in target_types:
+ inner_name_evt_dict = {}
+ for evnt in results:
+ if targ_type == evnt.target_type:
+ inner_name_evt_dict[evnt.target_name] = evnt.event
+
+ outer_dict[targ_type] = inner_name_evt_dict
+
+ return outer_dict
+
+ @classmethod
+ def get_docker_raw_events(cls, service_name, service_location):
+ """
+ Get DTI events for docker container.
+
+ Parameters
+ ----------
+ service_name : string
+ required. The service component name assigned by dockerplugin to the component that is unique to the
+ cloudify node instance and used in its Consul key(s).
+ service_location : string
+ optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location
+ in service_location.
+ If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul
+ TAGs if service_name is provided,
+ otherwise results are not location filtered.
+
+ Returns
+ -------
+ dict
+ Dictionary of DTI event(s).
+ DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
+
+ """
+
+ r_dict = {}
+
+ want_locs = []
+ if service_location:
+ want_locs = service_location.split(',')
+
+ give_types = []
+ if service_name:
+ if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node
+ try:
+ node_name = ConsulClient.lookup_service(service_name)[0].get("Node")
+ if node_name:
+ services = ConsulClient.lookup_node(node_name).get("Services")
+ if services:
+ for node_svc in list(services.keys()):
+ if "-component-dockerhost-" in node_svc:
+ want_locs = services[node_svc].get("Tags", [])
+ break
+ except:
+ pass
+
+ try:
+ supported_types = ConsulClient.get_value(service_name + ":dti")
+ except:
+ return r_dict
+ else:
+ if supported_types:
+ supported_types = [t_type.lower() for t_type in list(supported_types.keys())]
+ give_types = supported_types
+ if not give_types or (len(give_types) == 1 and give_types[0] == ''):
+ return r_dict
+
+ db_access = EventDbAccess()
+ results = db_access.query_raw_docker_events(give_types, want_locs)
+
+ target_types = set([])
+ outer_dict = {}
+
+ for evnt_item in results:
+ target_types.add(evnt_item.target_type)
+
+ for targ_type in target_types:
+ inner_name_evt_dict = {}
+ for evnt in results:
+ if targ_type == evnt.target_type:
+ inner_name_evt_dict[evnt.target_name] = evnt.event
+
+ outer_dict[targ_type] = inner_name_evt_dict
+
+ return outer_dict