summaryrefslogtreecommitdiffstats
path: root/oti/event-handler/otihandler/dti_processor.py
diff options
context:
space:
mode:
Diffstat (limited to 'oti/event-handler/otihandler/dti_processor.py')
-rw-r--r--oti/event-handler/otihandler/dti_processor.py815
1 files changed, 0 insertions, 815 deletions
diff --git a/oti/event-handler/otihandler/dti_processor.py b/oti/event-handler/otihandler/dti_processor.py
deleted file mode 100644
index 5802233..0000000
--- a/oti/event-handler/otihandler/dti_processor.py
+++ /dev/null
@@ -1,815 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""OTI Event processor for handling all the event types"""
-
-import copy
-import json
-import logging
-from multiprocessing.dummy import Pool as ThreadPool
-from threading import Lock
-
-import requests
-
-from otihandler import utils
-from otihandler.cfy_client import CfyClient
-from otihandler.consul_client import ConsulClient
-from otihandler.dbclient.apis import EventDbAccess
-from otihandler.dbclient.models import Event, EventAck
-from otihandler.docker_client import DockerClient
-
-notify_response_arr = []
-lock = Lock()
-K8S_CLUSTER_PROXY_NODE_PORT = '30132'
-
-
-# def notify_docker(args_tuple):
-# """
-# event notification executor inside a process pool to communicate with docker container
-# interacts with docker client library
-# """
-# (dti_event, db_access, ack_item) = args_tuple
-# try:
-# dcae_service_action = dti_event.get('dcae_service_action')
-# component_scn = ack_item.service_component
-# deployment_id = ack_item.deployment_id
-# container_id = ack_item.container_id
-# docker_host = ack_item.docker_host
-# reconfig_script = ack_item.reconfig_script
-# container_type = 'docker'
-# except Exception as e:
-# return (
-# "ERROR", "dti_processor.notify_docker() processing args got exception {}: {!s}".format(type(e).__name__, e))
-# what = ""
-# try:
-# what = "{} in {} container {} on {} that was deployed by {}".format(
-# reconfig_script, container_type, container_id, docker_host, deployment_id)
-# if dcae_service_action == 'add':
-# add_action = {"dcae_service_action": "deploy"}
-# dti_event.update(add_action)
-#
-# if dcae_service_action == 'delete':
-# add_action = {"dcae_service_action": "undeploy"}
-# dti_event.update(add_action)
-#
-# # dkr = DockerClient(docker_host, reauth=False)
-# result = ''
-# # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ])
-# if dti_event.get('dcae_service_action') == 'undeploy':
-# # delete from dti_event_ack table
-# try:
-# db_access.deleteDomainObject(ack_item)
-# except Exception as e:
-# msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
-# DTIProcessor.logger.warning(msg)
-# return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-# else:
-# return (component_scn, "ran {}, got: {!s}".format(what, result))
-#
-# except Exception as e:
-# return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-
-
-def notify_svc(args_tuple):
- """
- add/update/delete event handler
- event notification executor inside a process pool to communicate with docker container and k8s services
- interacts with docker client library
- interacts with k8s node port services using REST client
- """
- (orig_dti_event, db_access, curr_evt, res_tuple) = args_tuple
- dti_event = copy.deepcopy(orig_dti_event)
- try:
- dcae_service_action = dti_event.get('dcae_service_action').lower()
-
- component_scn = res_tuple[0]
- deployment_id = res_tuple[1]
- container_id = res_tuple[2]
- node_id = res_tuple[3]
- docker_host = res_tuple[6]
- reconfig_script = res_tuple[7]
- container_type = res_tuple[8]
- except Exception as e:
- return ("ERROR", "dti_processor.notify_svc() processing args got exception {}: {!s}".format(type(e).__name__, e))
-
- what = ""
- if container_type == "docker":
- # exec reconfigure.sh in docker container
- try:
- what = "{} in {} container {} on {} that was deployed by {} node {}".format(
- reconfig_script, container_type, container_id, docker_host, deployment_id, node_id)
- if dcae_service_action == 'add':
- add_action = {"dcae_service_action": "deploy"}
- dti_event.update(add_action)
-
- if dcae_service_action == 'delete':
- add_action = {"dcae_service_action": "undeploy"}
- dti_event.update(add_action)
-
- dkr = DockerClient(docker_host, reauth=False)
- result = ''
- if dti_event.get('dcae_service_action') == 'update':
- # undeploy + deploy
- DTIProcessor.logger.debug("update 1 - running undeploy {}".format(what))
- dti_event.update({"dcae_service_action": "undeploy"})
- result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
- DTIProcessor.logger.debug("update 2 - running deploy {}".format(what))
- dti_event.update({"dcae_service_action": "deploy"})
- result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
- try:
- upd_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
- container_id)
- upd_evt_ack.update_action('update')
- db_access.saveDomainObject(upd_evt_ack)
- except Exception as e:
- msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
- else:
- DTIProcessor.logger.debug("running {}".format(what))
- result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
- if dti_event.get('dcae_service_action') == 'deploy':
- # add into dti_event_ack table
- try:
- add_evt_ack = EventAck(service_component=component_scn, deployment_id=deployment_id,
- container_type='docker', docker_host=docker_host,
- container_id=container_id, reconfig_script=reconfig_script,
- event=curr_evt,
- action='add')
- db_access.saveDomainObject(add_evt_ack)
- except Exception as e:
- msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
- else:
- # remove from dtih_event_ack if present
- if curr_evt:
- try:
- del_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
- container_id)
- db_access.deleteDomainObject(del_evt_ack)
- except Exception as e:
- msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
- except Exception as e:
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-
- return (component_scn, "ran {}, got: {!s}".format(what, result))
- elif container_type == "k8s":
- DTIProcessor.logger.debug("dti_processor.notify_svc() handling k8s component")
- # if action is 'update', check if k8s pod info exists already for this event in app db
- if dcae_service_action == 'add':
- DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for add action")
- return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
- elif dcae_service_action == 'update':
- # handle update for pods being tracked and handle add for new pods
- k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn)
- if k8s_scn_result:
- # update
- DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for update action")
- return notify_k8s_pod((dti_event, db_access, k8s_scn_result))
- else:
- # add
- DTIProcessor.logger.debug("dti_processor.notify_svc(), convert update to add action in k8s ")
- add_action = {"dcae_service_action": "add"}
- dti_event.update(add_action)
- return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
-
-
-def notify_k8s(args_tuple):
- """
- add event handler
- event notification executor inside a process pool to communicate with k8s statefulset nodeport service
- uses REST API client to call k8s services
- """
- (dti_event, db_access, curr_evt, res_tuple) = args_tuple
- component_scn = res_tuple[0]
- deployment_id = res_tuple[1]
- node_id = res_tuple[3]
- container_type = res_tuple[8]
- service_address = res_tuple[9]
- service_port = res_tuple[10]
- what = "{} in {} deployment {} that was deployed by {} node {}".format(
- "add", container_type, "statefulset", deployment_id, node_id)
- # call scn node port service REST API
- svc_nodeport_url = "https://{}:{}".format(service_address, service_port)
- try:
- DTIProcessor.logger.debug("running {}".format(what))
- response = requests.put(svc_nodeport_url, json=dti_event, timeout=50)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "collector nodeport service({}) threw exception {}: {!s}".format(
- svc_nodeport_url, type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
- try:
- event_ack_info = response.json()
- except Exception as e:
- msg = "collector nodeport service({}) threw exception {}: {!s}".format(
- svc_nodeport_url, type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-
- if not event_ack_info:
- msg = "collector nodeport service returned bad data"
- DTIProcessor.logger.error(msg)
- return (component_scn, "collector nodeport service returned bad data")
-
- namespace = event_ack_info.get("KubeNamespace")
- svc_name = event_ack_info.get("KubeServiceName")
- svc_port = event_ack_info.get("KubeServicePort")
- proxy_fqdn = event_ack_info.get("KubeProxyFqdn")
- cluster_fqdn = event_ack_info.get("KubeClusterFqdn")
- pod_name = event_ack_info.get("KubePod")
- statefulset = pod_name[0:pod_name.rindex('-')]
-
- what = "{} in {} deployment {} in namespace {} that was deployed by {} node {}".format(
- "add", container_type, statefulset, namespace, deployment_id, node_id)
- try:
- add_evt_ack = EventAck(k8s_namespace=namespace, k8s_service_name=svc_name, deployment_id=deployment_id,
- k8s_service_port=svc_port, k8s_cluster_fqdn=cluster_fqdn, k8s_proxy_fqdn=proxy_fqdn,
- k8s_pod_id=pod_name, event=curr_evt, action='add', container_type='k8s',
- service_component=component_scn)
- db_access.saveDomainObject(add_evt_ack)
- return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info))
- except Exception as e:
- msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-
-
-def notify_pods(args_tuple):
- """
- notify event handler
- event notification executor inside a process pool to communicate with k8s DTIH proxy nodeport service
- uses REST API client to call k8s services
- """
- event_ack_info = ''
- (dti_event, res_tuple) = args_tuple
- try:
- cluster = res_tuple[0]
- port = K8S_CLUSTER_PROXY_NODE_PORT
- namespace = res_tuple[1]
- svc_name = res_tuple[2]
- svc_port = res_tuple[4]
- replicas = res_tuple[3]
-
- for replica in range(replicas):
- pod_id = "sts-{}-{}".format(svc_name, replica)
- item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(cluster, port, namespace,
- pod_id, svc_name,
- svc_port)
- what = "{} for pod id {} in cluster {} and namespace {}".format("notify", pod_id, cluster, namespace)
- try:
- DTIProcessor.logger.debug("running {}".format(what))
- response = requests.put(item_pod_url, json=dti_event, timeout=50)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
- item_pod_url, type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- with lock:
- notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
- else:
- try:
- event_ack_info = response.json()
- except Exception as e:
- msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
- item_pod_url, type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- with lock:
- notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
-
- if not event_ack_info:
- msg = "stateful set proxy service returned bad data"
- DTIProcessor.logger.error(msg)
- with lock:
- notify_response_arr.append ((pod_id, "no acknowledgement - running {}".format(what)))
-
- with lock:
- notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info)))
- except Exception as e:
- with lock:
- notify_response_arr.append (("ERROR", "dti_processor.notify_pods() processing args got exception {}: {!s}".format(type(e).__name__, e)))
-
-def notify_k8s_pod(args_tuple):
- """
- update event handler
- event notification executor inside a process pool to communicate with k8s DTIH proxy service
- uses REST API client to call k8s services
- """
- item_pod_url = ''
- component_scn = ''
- (dti_event, db_access, ack_item) = args_tuple
- # call ingress proxy to dispatch delete event
-
- action = dti_event.get('dcae_service_action')
- what = "{} to {} ID {} in namespace {} that was deployed in cluster {}".format(
- action, 'k8s pod', ack_item.k8s_pod_id, ack_item.k8s_namespace, ack_item.k8s_cluster_fqdn)
- try:
- DTIProcessor.logger.debug("running {}".format(what))
- item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(
- ack_item.k8s_proxy_fqdn, K8S_CLUSTER_PROXY_NODE_PORT, ack_item.k8s_namespace,
- ack_item.k8s_pod_id, ack_item.k8s_service_name, ack_item.k8s_service_port)
- component_scn = ack_item.service_component
- response = requests.put(item_pod_url, json=dti_event, timeout=50)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "exception occured, stateful set proxy service({}) threw {}: {!s}".format(
- item_pod_url, type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- return (component_scn, "ran {}, got: {!s}".format(what, msg))
- else:
- if action == 'delete':
- try:
- db_access.deleteDomainObject(ack_item)
- except Exception as e:
- msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
- else:
- try:
- ack_item.update_action('update')
- db_access.saveDomainObject(ack_item)
- except Exception as e:
- msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-
- return (component_scn, "ran {}, got: {!s}".format(what, response.json()))
-
-
-class DTIProcessor(object):
- """
- Main event processing class that encapsulates all the logic of this handler application!
- An instance of this class is created per incoming client request.
-
- Generates input data by querying platform services - cloudify, consul, postgresSql
-
- It creates a pool of worker processes using a multiprocessing Pool class instance.
- Tasks are offloaded to the worker processes that exist in the pool.
- The input data is distributed across processes of the Pool object to enable parallel execution of
- event notification function across multiple input values (data parallelism).
- """
-
- logger = logging.getLogger("oti_handler.dti_processor")
- K8S_CLUSTER_PROXY_NODE_PORT = '30132'
- db_access = None
- docker_pool = None
- k8s_pool = None
-
- def __init__(self, dti_event, send_notification=True):
- self._result = {}
- self.event = dti_event
- self.is_notify = send_notification
- self.action = dti_event.get('dcae_service_action').lower()
- self.target_name = dti_event.get('dcae_target_name')
- self.target_type = dti_event.get('dcae_target_type', '').lower()
- self.event_clli = dti_event.get('dcae_service_location')
- res_dict = None
- try:
- self.docker_pool = ThreadPool(8)
- self.k8s_pool = ThreadPool(8)
- except Exception as e:
- msg = "DTIProcessor.__init__() creating ThreadPool got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
- raise e
- else:
- self.db_access = EventDbAccess()
- self.prim_db_event = None
- try:
- res_dict = self.dispatcher()
- except:
- raise
- finally:
- try:
- self.docker_pool.close()
- self.k8s_pool.close()
- except Exception as e:
- msg = "DTIProcessor.__init__() running pool.close() got exception {}: {!s}".format(type(e).__name__,
- e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
- try:
- self.docker_pool.join()
- self.k8s_pool.join()
- except Exception as e:
- msg = "DTIProcessor.__init__() running pool.join() got exception {}: {!s}".format(type(e).__name__,
- e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
-
- # if not send_notification:
- # DTIProcessor._logger.debug("DTIProcessor.__init__() not notifying DCAE-Controller components")
- # return
-
- if res_dict:
- try:
- utils.update_dict(self._result, res_dict)
- except Exception as e:
- msg = "DTIProcessor.__init__() running utils.update_dict() got exception {}: {!s}".format(
- type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
-
- DTIProcessor.logger.debug("DTIProcessor.__init__() done notifying new DCAE-Controller components")
-
- def dispatcher(self):
- """ dispatch method to execute specific method based on event type """
-
- arg = str(self.action)
- method = getattr(self, arg, lambda: "Invalid action")
- return method()
-
- def undeploy(self):
- """
- delete event from consul KV store, this functionality will be retired as events are stored
- in postgresSql oti database
- """
- global key
- try:
- # update Consul KV store with DTI Event - storing them in a folder for all components
- key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
- result = ConsulClient.delete_key(key)
- except Exception as e:
- msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- self._result['WARNING'] = msg
- else:
- if not result:
- msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name)
- DTIProcessor.logger.warning(msg)
- self._result['WARNING'] = msg
-
- def deploy(self):
- """
- add event to consul KV store, this functionality will be retired as events are stored
- in postgresSql oti database
- """
- dep_key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
- try:
- # update Consul KV store with DTI Event - storing them in a folder for all components
- result = ConsulClient.store_kvs({dep_key: self.event})
- except Exception as e:
- msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- self._result['WARNING'] = msg
-
- def add(self):
- """
- process DTI event that contains a new VNF instance that has to be configured in the collector microservices
- """
- res_dict = None
- try:
- msg = "processing add event for {}/{}".format(self.target_type, self.target_name)
- DTIProcessor.logger.debug(msg)
- # insert add event into dtih_event table
- self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
- location_clli=self.event_clli)
- self.db_access.saveDomainObject(self.prim_db_event)
- except Exception as e:
- msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warning(msg)
- self._result['ERROR'] = msg
- raise Exception(msg)
- else:
- if self.is_notify:
- try:
- # force the action to add, to avoid bad things later
- add_action = {"dcae_service_action": "add"}
- self.event.update(add_action)
- # mock up data
- mock_tp11 = (
- "scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
- "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
- "dcae-d1.idns.cip.corp.com", "30996")
- mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
- "docker_node_instance_id1",
- "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
- "dcae-d1.idns.cip.corp.com", "30996")
- # tpl_arr = []
- # tpl_arr.append(mock_tp11)
- # tpl_arr.append(mock_tp12)
- # res_dict = dict(self.docker_pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
- res_dict = dict(self.docker_pool.map(notify_svc,
- ((self.event, self.db_access, self.prim_db_event, tp) for tp in
- CfyClient().iter_components(self.target_type,
- dcae_service_location=self.event_clli))
- ))
- except Exception as e:
- msg = "DTIProcessor.__init__() running pool.map() got exception {}: {!s}".format(type(e).__name__,
- e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
- return res_dict
-
- def add_replay(self):
- """
- convert an update event flow and replay as an add event type since the event acknowledgement is missing
- from application database
- """
- res_dict = None
- try:
- # force the action to add, to avoid bad things later
- add_action = {"dcae_service_action": "add"}
- self.event.update(add_action)
- # mock up data
- mock_tp11 = ("scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
- "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
- "dcae-d1.idns.cip.corp.com", "30996")
- mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
- "docker_node_instance_id1",
- "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
- "dcae-d1.idns.cip.corp.com", "30996")
- # tpl_arr = []
- # tpl_arr.append(mock_tp11)
- # tpl_arr.append(mock_tp12)
- # res_dict = dict(self.pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
- res_dict = dict(self.docker_pool.map(notify_svc,
- ((self.event, self.db_access, self.prim_db_event, tp) for tp in
- CfyClient().iter_components(self.target_type,
- dcae_service_location=self.event_clli))
- ))
- except Exception as e:
- msg = "DTIProcessor.add_replay() running pool.map() got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
- return res_dict
-
- def delete(self):
- """
- process DTI event that indicates a VNF instance has to be removed from the collector microservices
- """
- res_dict = {}
- res_dict_k8s = {}
- res_dict_docker = {}
- try:
- self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
- if self.is_notify:
- try:
- msg = "processing delete event for {}/{} to relate with any docker hosts".format(
- self.target_type, self.target_name)
- DTIProcessor.logger.warning(msg)
- res_dict_docker = dict(self.docker_pool.map(notify_svc,
- ((self.event, self.db_access, self.prim_db_event, tp)
- for tp
- in CfyClient().iter_components_for_docker(
- self.target_type,
- dcae_service_location=self.event_clli))
- ))
- except Exception as e:
- msg = "DTIProcessor.delete() running docker_pool.map() got exception {}: {!s}".format(type(e).__name__,
- e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
-
- try:
- msg = "processing delete event for {}/{} to relate with any k8s hosts".format(
- self.target_type, self.target_name)
- DTIProcessor.logger.warning(msg)
- if self.prim_db_event:
- result = self.db_access.query_event_data_k8s(self.target_type, self.target_name)
- res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, (
- ((self.event, self.db_access, ack_item) for ack_item in result))))
- except Exception as e:
- msg = "DTIProcessor.delete() running k8s_pool.map() got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
-
- try:
- if self.prim_db_event:
- self.db_access.deleteDomainObject(self.prim_db_event)
- except Exception as e:
- msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warning(msg)
- self._result['ERROR'] = msg
- except Exception as e:
- msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warning(msg)
- self._result['ERROR'] = msg
-
- if res_dict_k8s:
- utils.update_dict(res_dict, res_dict_k8s)
-
- if res_dict_docker:
- utils.update_dict(res_dict, res_dict_docker)
-
- return res_dict
-
- def update(self):
- """
- process DTI event that indicates VNF instance has to be updated in the collector microservices
- """
- res_dict = {}
- res_dict_k8s = {}
- res_dict_docker = {}
-
- if self.is_notify:
- try:
- self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
- if self.prim_db_event:
- self.db_access.update_event_item(self.event, self.target_type, self.target_name)
- result = self.db_access.query_event_data(self.target_type, self.target_name)
- if len(result) == 0:
- msg = "processing update event for {}/{}, but event distribution info is not found in database, " \
- "replaying this event to cluster if required". \
- format(self.target_type, self.target_name)
- DTIProcessor.logger.warning(msg)
- self._result['WARNING'] = msg
- res_dict = self.add_replay()
- else:
- msg = "DTIProcessor.update() handle update flow for {}/{}, for k8s rediscover scn list and" \
- "identify new vs update cases".format(self.target_type, self.target_name)
- DTIProcessor.logger.debug(msg)
- try:
- tpl_arr = CfyClient().iter_components(self.target_type,
- dcae_service_location=self.event_clli)
- res_dict_docker = dict(self.docker_pool.map(notify_svc,
- ((
- self.event, self.db_access,
- self.prim_db_event,
- tp)
- for tp in tpl_arr)))
- except Exception as e:
- msg = "DTIProcessor.update() running docker_pool.map() got exception {}: {!s}".format(
- type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
- else:
- # event is new for the handler
- msg = "processing update event for {}/{}, but current event info is not found in database, " \
- "executing add event".format(self.target_type, self.target_name)
- DTIProcessor.logger.warning(msg)
- self._result['WARNING'] = msg
- res_dict = self.add()
- except Exception as e:
- msg = "DTIProcessor.update() got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
-
- if res_dict_k8s:
- utils.update_dict(res_dict, res_dict_k8s)
-
- if res_dict_docker:
- utils.update_dict(res_dict, res_dict_docker)
-
- return res_dict
-
- def notify(self):
- """
- event handler to notify all the pods in the kubernetes cluster whose FQDN is present in the incoming event
- This notification is meant for the cluster failover.
- """
- res_dict = {}
- try:
- self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
- if self.prim_db_event:
- self.db_access.update_event_item(self.event, self.target_type, self.target_name)
- else:
- self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
- location_clli=self.event_clli)
- self.db_access.saveDomainObject(self.prim_db_event)
- except Exception as e:
- msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warning(msg)
- self._result['ERROR'] = msg
-
- try:
- self.k8s_pool.map(notify_pods, ((self.event, tp) for tp in
- CfyClient().query_k8_components(self.target_name)))
- for k, v in notify_response_arr:
- res_dict[k] = v
- except Exception as e:
- msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warning(msg)
- self._result['WARNING'] = msg
-
- return res_dict
-
- def get_result(self):
- return self._result
-
- @classmethod
- def get_k8_raw_events(cls, pod, cluster, namespace):
- """
- Get DTI events for a k8 stateful set pod container
-
- :param pod: required
- k8s stateful set pod ID that was configured with a specific set of DTI Events
- :param cluster: required
- k8s cluster FQDN where the mS was deployed
- :param namespace: required
- k8s namespace where the stateful set was deployed in that namespace
- :return:
- Dictionary of DTI event(s).
- DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
- """
- db_access = EventDbAccess()
- results = db_access.query_raw_k8_events(cluster, pod, namespace)
-
- target_types = set([])
- outer_dict = {}
-
- for evnt_item in results:
- target_types.add(evnt_item.target_type)
-
- for targ_type in target_types:
- inner_name_evt_dict = {}
- for evnt in results:
- if targ_type == evnt.target_type:
- inner_name_evt_dict[evnt.target_name] = evnt.event
-
- outer_dict[targ_type] = inner_name_evt_dict
-
- return outer_dict
-
- @classmethod
- def get_docker_raw_events(cls, service_name, service_location):
- """
- Get DTI events for docker container.
-
- Parameters
- ----------
- service_name : string
- required. The service component name assigned by dockerplugin to the component that is unique to the
- cloudify node instance and used in its Consul key(s).
- service_location : string
- optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location
- in service_location.
- If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul
- TAGs if service_name is provided,
- otherwise results are not location filtered.
-
- Returns
- -------
- dict
- Dictionary of DTI event(s).
- DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
-
- """
-
- r_dict = {}
-
- want_locs = []
- if service_location:
- want_locs = service_location.split(',')
-
- give_types = []
- if service_name:
- if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node
- try:
- node_name = ConsulClient.lookup_service(service_name)[0].get("Node")
- if node_name:
- services = ConsulClient.lookup_node(node_name).get("Services")
- if services:
- for node_svc in list(services.keys()):
- if "-component-dockerhost-" in node_svc:
- want_locs = services[node_svc].get("Tags", [])
- break
- except:
- pass
-
- try:
- supported_types = ConsulClient.get_value(service_name + ":oti")
- except:
- return r_dict
- else:
- if supported_types:
- supported_types = [t_type.lower() for t_type in list(supported_types.keys())]
- give_types = supported_types
- if not give_types or (len(give_types) == 1 and give_types[0] == ''):
- return r_dict
-
- db_access = EventDbAccess()
- results = db_access.query_raw_docker_events(give_types, want_locs)
-
- target_types = set([])
- outer_dict = {}
-
- for evnt_item in results:
- target_types.add(evnt_item.target_type)
-
- for targ_type in target_types:
- inner_name_evt_dict = {}
- for evnt in results:
- if targ_type == evnt.target_type:
- inner_name_evt_dict[evnt.target_name] = evnt.event
-
- outer_dict[targ_type] = inner_name_evt_dict
-
- return outer_dict