summaryrefslogtreecommitdiffstats
path: root/oti/event-handler/otihandler/dti_processor.py
blob: 58022333e88d096ff4e81b809170fd451e7670ef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
# ================================================================================
# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================

"""OTI Event processor for handling all the event types"""

import copy
import json
import logging
from multiprocessing.dummy import Pool as ThreadPool
from threading import Lock

import requests

from otihandler import utils
from otihandler.cfy_client import CfyClient
from otihandler.consul_client import ConsulClient
from otihandler.dbclient.apis import EventDbAccess
from otihandler.dbclient.models import Event, EventAck
from otihandler.docker_client import DockerClient

notify_response_arr = []
lock = Lock()
K8S_CLUSTER_PROXY_NODE_PORT = '30132'


# def notify_docker(args_tuple):
#     """
#     event notification executor inside a process pool to communicate with docker container
#     interacts with docker client library
#     """
#     (dti_event, db_access, ack_item) = args_tuple
#     try:
#         dcae_service_action = dti_event.get('dcae_service_action')
#         component_scn = ack_item.service_component
#         deployment_id = ack_item.deployment_id
#         container_id = ack_item.container_id
#         docker_host = ack_item.docker_host
#         reconfig_script = ack_item.reconfig_script
#         container_type = 'docker'
#     except Exception as e:
#         return (
#             "ERROR", "dti_processor.notify_docker() processing args got exception {}: {!s}".format(type(e).__name__, e))
#     what = ""
#     try:
#         what = "{} in {} container {} on {} that was deployed by {}".format(
#             reconfig_script, container_type, container_id, docker_host, deployment_id)
#         if dcae_service_action == 'add':
#             add_action = {"dcae_service_action": "deploy"}
#             dti_event.update(add_action)
# 
#         if dcae_service_action == 'delete':
#             add_action = {"dcae_service_action": "undeploy"}
#             dti_event.update(add_action)
# 
#         # dkr = DockerClient(docker_host, reauth=False)
#         result = ''
#         # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ])
#         if dti_event.get('dcae_service_action') == 'undeploy':
#             # delete from dti_event_ack table
#             try:
#                 db_access.deleteDomainObject(ack_item)
#             except Exception as e:
#                 msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
#                 DTIProcessor.logger.warning(msg)
#                 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
#             else:
#                 return (component_scn, "ran {}, got: {!s}".format(what, result))
# 
#     except Exception as e:
#         return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))


def notify_svc(args_tuple):
    """
    add/update/delete event handler
    event notification executor inside a process pool to communicate with docker container and k8s services
    interacts with docker client library
    interacts with k8s node port services using REST client
    """
    (orig_dti_event, db_access, curr_evt, res_tuple) = args_tuple
    dti_event = copy.deepcopy(orig_dti_event)
    try:
        dcae_service_action = dti_event.get('dcae_service_action').lower()

        component_scn = res_tuple[0]
        deployment_id = res_tuple[1]
        container_id = res_tuple[2]
        node_id = res_tuple[3]
        docker_host = res_tuple[6]
        reconfig_script = res_tuple[7]
        container_type = res_tuple[8]
    except Exception as e:
        return ("ERROR", "dti_processor.notify_svc() processing args got exception {}: {!s}".format(type(e).__name__, e))

    what = ""
    if container_type == "docker":
        # exec reconfigure.sh in docker container
        try:
            what = "{} in {} container {} on {} that was deployed by {} node {}".format(
                reconfig_script, container_type, container_id, docker_host, deployment_id, node_id)
            if dcae_service_action == 'add':
                add_action = {"dcae_service_action": "deploy"}
                dti_event.update(add_action)

            if dcae_service_action == 'delete':
                add_action = {"dcae_service_action": "undeploy"}
                dti_event.update(add_action)

            dkr = DockerClient(docker_host, reauth=False)
            result = ''
            if dti_event.get('dcae_service_action') == 'update':
                # undeploy + deploy
                DTIProcessor.logger.debug("update 1 - running undeploy {}".format(what))
                dti_event.update({"dcae_service_action": "undeploy"})
                result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
                DTIProcessor.logger.debug("update 2 - running deploy {}".format(what))
                dti_event.update({"dcae_service_action": "deploy"})
                result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
                try:
                    upd_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
                                                                    container_id)
                    upd_evt_ack.update_action('update')
                    db_access.saveDomainObject(upd_evt_ack)
                except Exception as e:
                    msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
                    DTIProcessor.logger.warning(msg)
                    return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
            else:
                DTIProcessor.logger.debug("running {}".format(what))
                result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
                if dti_event.get('dcae_service_action') == 'deploy':
                    # add into dti_event_ack table
                    try:
                        add_evt_ack = EventAck(service_component=component_scn, deployment_id=deployment_id,
                                               container_type='docker', docker_host=docker_host,
                                               container_id=container_id, reconfig_script=reconfig_script,
                                               event=curr_evt,
                                               action='add')
                        db_access.saveDomainObject(add_evt_ack)
                    except Exception as e:
                        msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
                        DTIProcessor.logger.warning(msg)
                        return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
                else:
                    # remove from dtih_event_ack if present
                    if curr_evt: 
                        try:
                            del_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
                                                                            container_id)
                            db_access.deleteDomainObject(del_evt_ack)
                        except Exception as e:
                            msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e)
                            DTIProcessor.logger.warning(msg)
                            return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
        except Exception as e:
            return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))

        return (component_scn, "ran {}, got: {!s}".format(what, result))
    elif container_type == "k8s":
        DTIProcessor.logger.debug("dti_processor.notify_svc() handling k8s component")
        # if action is 'update', check if k8s pod info exists already for this event in app db
        if dcae_service_action == 'add':
            DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for add action")
            return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
        elif dcae_service_action == 'update':
            # handle update for pods being tracked and handle add for new pods
            k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn)
            if k8s_scn_result: 
                # update
                DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for update action")
                return notify_k8s_pod((dti_event, db_access, k8s_scn_result))
            else:
                # add
                DTIProcessor.logger.debug("dti_processor.notify_svc(), convert update to add action in k8s ")
                add_action = {"dcae_service_action": "add"}
                dti_event.update(add_action)
                return notify_k8s((dti_event, db_access, curr_evt, res_tuple))


def notify_k8s(args_tuple):
    """
    add event handler
    event notification executor inside a process pool to communicate with k8s statefulset nodeport service
    uses REST API client to call k8s services
    """
    (dti_event, db_access, curr_evt, res_tuple) = args_tuple
    component_scn = res_tuple[0]
    deployment_id = res_tuple[1]
    node_id = res_tuple[3]
    container_type = res_tuple[8]
    service_address = res_tuple[9]
    service_port = res_tuple[10]
    what = "{} in {} deployment {} that was deployed by {} node {}".format(
        "add", container_type, "statefulset", deployment_id, node_id)
    # call scn node port service REST API
    svc_nodeport_url = "https://{}:{}".format(service_address, service_port)
    try:
        DTIProcessor.logger.debug("running {}".format(what))
        response = requests.put(svc_nodeport_url, json=dti_event, timeout=50)
        response.raise_for_status()
    except requests.exceptions.RequestException as e:
        msg = "collector nodeport service({}) threw exception {}: {!s}".format(
            svc_nodeport_url, type(e).__name__, e)
        DTIProcessor.logger.error(msg)
        return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
    try:
        event_ack_info = response.json()
    except Exception as e:
        msg = "collector nodeport service({}) threw exception {}: {!s}".format(
            svc_nodeport_url, type(e).__name__, e)
        DTIProcessor.logger.error(msg)
        return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))

    if not event_ack_info:
        msg = "collector nodeport service returned bad data"
        DTIProcessor.logger.error(msg)
        return (component_scn, "collector nodeport service returned bad data")

    namespace = event_ack_info.get("KubeNamespace")
    svc_name = event_ack_info.get("KubeServiceName")
    svc_port = event_ack_info.get("KubeServicePort")
    proxy_fqdn = event_ack_info.get("KubeProxyFqdn")
    cluster_fqdn = event_ack_info.get("KubeClusterFqdn")
    pod_name = event_ack_info.get("KubePod")
    statefulset = pod_name[0:pod_name.rindex('-')]

    what = "{} in {} deployment {} in namespace {} that was deployed by {} node {}".format(
        "add", container_type, statefulset, namespace, deployment_id, node_id)
    try:
        add_evt_ack = EventAck(k8s_namespace=namespace, k8s_service_name=svc_name, deployment_id=deployment_id,
                               k8s_service_port=svc_port, k8s_cluster_fqdn=cluster_fqdn, k8s_proxy_fqdn=proxy_fqdn,
                               k8s_pod_id=pod_name, event=curr_evt, action='add', container_type='k8s',
                               service_component=component_scn)
        db_access.saveDomainObject(add_evt_ack)
        return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info))
    except Exception as e:
        msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
        DTIProcessor.logger.warning(msg)
        return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))


def notify_pods(args_tuple):
    """
    notify event handler
    event notification executor inside a process pool to communicate with k8s DTIH proxy nodeport service
    uses REST API client to call k8s services
    """
    event_ack_info = ''
    (dti_event, res_tuple) = args_tuple
    try:
        cluster = res_tuple[0]
        port = K8S_CLUSTER_PROXY_NODE_PORT
        namespace = res_tuple[1]
        svc_name = res_tuple[2]
        svc_port = res_tuple[4]
        replicas = res_tuple[3]

        for replica in range(replicas):
            pod_id = "sts-{}-{}".format(svc_name, replica)
            item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(cluster, port, namespace,
                                                                                    pod_id, svc_name,
                                                                                    svc_port)
            what = "{} for pod id {} in cluster {} and  namespace {}".format("notify", pod_id, cluster, namespace)
            try:
                DTIProcessor.logger.debug("running {}".format(what))
                response = requests.put(item_pod_url, json=dti_event, timeout=50)
                response.raise_for_status()
            except requests.exceptions.RequestException as e:
                msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
                    item_pod_url, type(e).__name__, e)
                DTIProcessor.logger.error(msg)
                with lock:
                    notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
            else:
                try:
                    event_ack_info = response.json()
                except Exception as e:
                    msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
                        item_pod_url, type(e).__name__, e)
                    DTIProcessor.logger.error(msg)
                    with lock:
                        notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))

                if not event_ack_info:
                    msg = "stateful set proxy service returned bad data"
                    DTIProcessor.logger.error(msg)
                    with lock:
                        notify_response_arr.append ((pod_id, "no acknowledgement - running {}".format(what)))

                with lock:
                    notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info)))
    except Exception as e:
        with lock:
            notify_response_arr.append (("ERROR", "dti_processor.notify_pods() processing args got exception {}: {!s}".format(type(e).__name__, e)))

def notify_k8s_pod(args_tuple):
    """
    update event handler
    event notification executor inside a process pool to communicate with k8s DTIH proxy service
    uses REST API client to call k8s services
    """
    item_pod_url = ''
    component_scn = ''
    (dti_event, db_access, ack_item) = args_tuple
    # call ingress proxy to dispatch delete event

    action = dti_event.get('dcae_service_action')
    what = "{} to {} ID {} in namespace {} that was deployed in cluster {}".format(
        action, 'k8s pod', ack_item.k8s_pod_id, ack_item.k8s_namespace, ack_item.k8s_cluster_fqdn)
    try:
        DTIProcessor.logger.debug("running {}".format(what))
        item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(
            ack_item.k8s_proxy_fqdn, K8S_CLUSTER_PROXY_NODE_PORT, ack_item.k8s_namespace,
            ack_item.k8s_pod_id, ack_item.k8s_service_name, ack_item.k8s_service_port)
        component_scn = ack_item.service_component
        response = requests.put(item_pod_url, json=dti_event, timeout=50)
        response.raise_for_status()
    except requests.exceptions.RequestException as e:
        msg = "exception occured, stateful set proxy service({}) threw {}: {!s}".format(
            item_pod_url, type(e).__name__, e)
        DTIProcessor.logger.error(msg)
        return (component_scn, "ran {}, got: {!s}".format(what, msg))
    else:
        if action == 'delete':
            try:
                db_access.deleteDomainObject(ack_item)
            except Exception as e:
                msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
                DTIProcessor.logger.warning(msg)
                return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
        else:
            try:
                ack_item.update_action('update')
                db_access.saveDomainObject(ack_item)
            except Exception as e:
                msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
                DTIProcessor.logger.warning(msg)
                return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))

    return (component_scn, "ran {}, got: {!s}".format(what, response.json()))


class DTIProcessor(object):
    """
    Main event processing class that encapsulates all the logic of this handler application!
    An instance of this class is created per incoming client request.

    Generates input data by querying platform services - cloudify, consul, postgresSql

    It creates a pool of worker processes using a multiprocessing Pool class instance.
    Tasks are offloaded to the worker processes that exist in the pool.
    The input data is distributed across processes of the Pool object to enable parallel execution of
    event notification function across multiple input values (data parallelism).
    """

    logger = logging.getLogger("oti_handler.dti_processor")
    K8S_CLUSTER_PROXY_NODE_PORT = '30132'
    db_access = None
    docker_pool = None
    k8s_pool = None

    def __init__(self, dti_event, send_notification=True):
        self._result = {}
        self.event = dti_event
        self.is_notify = send_notification
        self.action = dti_event.get('dcae_service_action').lower()
        self.target_name = dti_event.get('dcae_target_name')
        self.target_type = dti_event.get('dcae_target_type', '').lower()
        self.event_clli = dti_event.get('dcae_service_location')
        res_dict = None
        try:
            self.docker_pool = ThreadPool(8)
            self.k8s_pool = ThreadPool(8)
        except Exception as e:
            msg = "DTIProcessor.__init__() creating ThreadPool got exception {}: {!s}".format(type(e).__name__, e)
            DTIProcessor.logger.error(msg)
            self._result['ERROR'] = msg
            raise e
        else:
            self.db_access = EventDbAccess()
            self.prim_db_event = None
            try:
                res_dict = self.dispatcher()
            except:
                raise
            finally:
                try:
                    self.docker_pool.close()
                    self.k8s_pool.close()
                except Exception as e:
                    msg = "DTIProcessor.__init__() running pool.close() got exception {}: {!s}".format(type(e).__name__,
                                                                                                       e)
                    DTIProcessor.logger.error(msg)
                    self._result['ERROR'] = msg
                try:
                    self.docker_pool.join()
                    self.k8s_pool.join()
                except Exception as e:
                    msg = "DTIProcessor.__init__() running pool.join() got exception {}: {!s}".format(type(e).__name__,
                                                                                                      e)
                    DTIProcessor.logger.error(msg)
                    self._result['ERROR'] = msg

            # if not send_notification:
            #     DTIProcessor._logger.debug("DTIProcessor.__init__() not notifying DCAE-Controller components")
            #     return

            if res_dict:
                try:
                    utils.update_dict(self._result, res_dict)
                except Exception as e:
                    msg = "DTIProcessor.__init__() running utils.update_dict() got exception {}: {!s}".format(
                        type(e).__name__, e)
                    DTIProcessor.logger.error(msg)
                    self._result['ERROR'] = msg

        DTIProcessor.logger.debug("DTIProcessor.__init__() done notifying new DCAE-Controller components")

    def dispatcher(self):
        """ dispatch method to execute specific method based on event type """

        arg = str(self.action)
        method = getattr(self, arg, lambda: "Invalid action")
        return method()

    def undeploy(self):
        """
        delete event from consul KV store, this functionality will be retired as events are stored
        in postgresSql oti database
        """
        global key
        try:
            # update Consul KV store with DTI Event - storing them in a folder for all components
            key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
            result = ConsulClient.delete_key(key)
        except Exception as e:
            msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
            DTIProcessor.logger.warning(msg)
            self._result['WARNING'] = msg
        else:
            if not result:
                msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name)
                DTIProcessor.logger.warning(msg)
                self._result['WARNING'] = msg

    def deploy(self):
        """
        add event to consul KV store, this functionality will be retired as events are stored
        in postgresSql oti database
        """
        dep_key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
        try:
            # update Consul KV store with DTI Event - storing them in a folder for all components
            result = ConsulClient.store_kvs({dep_key: self.event})
        except Exception as e:
            msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
            DTIProcessor.logger.warning(msg)
            self._result['WARNING'] = msg

    def add(self):
        """
        process DTI event that contains a new VNF instance that has to be configured in the collector microservices
        """
        res_dict = None
        try:
            msg = "processing add event for {}/{}".format(self.target_type, self.target_name)
            DTIProcessor.logger.debug(msg)
            # insert add event into dtih_event table
            self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
                                       location_clli=self.event_clli)
            self.db_access.saveDomainObject(self.prim_db_event)
        except Exception as e:
            msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args)
            DTIProcessor.logger.warning(msg)
            self._result['ERROR'] = msg
            raise Exception(msg)
        else:
            if self.is_notify:
                try:
                    # force the action to add, to avoid bad things later
                    add_action = {"dcae_service_action": "add"}
                    self.event.update(add_action)
                    # mock up data
                    mock_tp11 = (
                        "scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
                        "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
                        "dcae-d1.idns.cip.corp.com", "30996")
                    mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
                                 "docker_node_instance_id1",
                                 "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
                                 "dcae-d1.idns.cip.corp.com", "30996")
                    # tpl_arr = []
                    # tpl_arr.append(mock_tp11)
                    # tpl_arr.append(mock_tp12)
                    # res_dict = dict(self.docker_pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
                    res_dict = dict(self.docker_pool.map(notify_svc,
                                                         ((self.event, self.db_access, self.prim_db_event, tp) for tp in
                                                          CfyClient().iter_components(self.target_type,
                                                                                      dcae_service_location=self.event_clli))
                                                         ))
                except Exception as e:
                    msg = "DTIProcessor.__init__() running pool.map() got exception {}: {!s}".format(type(e).__name__,
                                                                                                     e)
                    DTIProcessor.logger.error(msg)
                    self._result['ERROR'] = msg
        return res_dict

    def add_replay(self):
        """
        convert an update event flow and replay as an add event type since the event acknowledgement is missing
        from application database
        """
        res_dict = None
        try:
            # force the action to add, to avoid bad things later
            add_action = {"dcae_service_action": "add"}
            self.event.update(add_action)
            # mock up data
            mock_tp11 = ("scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
                         "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
                         "dcae-d1.idns.cip.corp.com", "30996")
            mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
                         "docker_node_instance_id1",
                         "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
                         "dcae-d1.idns.cip.corp.com", "30996")
            # tpl_arr = []
            # tpl_arr.append(mock_tp11)
            # tpl_arr.append(mock_tp12)
            # res_dict = dict(self.pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
            res_dict = dict(self.docker_pool.map(notify_svc,
                                                 ((self.event, self.db_access, self.prim_db_event, tp) for tp in
                                                  CfyClient().iter_components(self.target_type,
                                                                              dcae_service_location=self.event_clli))
                                                 ))
        except Exception as e:
            msg = "DTIProcessor.add_replay() running pool.map() got exception {}: {!s}".format(type(e).__name__, e)
            DTIProcessor.logger.error(msg)
            self._result['ERROR'] = msg
        return res_dict

    def delete(self):
        """
        process DTI event that indicates a VNF instance has to be removed from the collector microservices
        """
        res_dict = {}
        res_dict_k8s = {}
        res_dict_docker = {}
        try:
            self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
            if self.is_notify:
                try:
                    msg = "processing delete event for {}/{} to relate with any docker hosts".format(
                        self.target_type, self.target_name)
                    DTIProcessor.logger.warning(msg)
                    res_dict_docker = dict(self.docker_pool.map(notify_svc,
                                                                ((self.event, self.db_access, self.prim_db_event, tp)
                                                                 for tp
                                                                 in CfyClient().iter_components_for_docker(
                                                                    self.target_type,
                                                                    dcae_service_location=self.event_clli))
                                                                ))
                except Exception as e:
                    msg = "DTIProcessor.delete() running docker_pool.map() got exception {}: {!s}".format(type(e).__name__,
                                                                                                          e)
                    DTIProcessor.logger.error(msg)
                    self._result['ERROR'] = msg

                try:
                    msg = "processing delete event for {}/{} to relate with any k8s hosts".format(
                        self.target_type, self.target_name)
                    DTIProcessor.logger.warning(msg)
                    if self.prim_db_event: 
                        result = self.db_access.query_event_data_k8s(self.target_type, self.target_name)
                        res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, (
                            ((self.event, self.db_access, ack_item) for ack_item in result))))
                except Exception as e:
                    msg = "DTIProcessor.delete() running k8s_pool.map() got exception {}: {!s}".format(type(e).__name__, e)
                    DTIProcessor.logger.error(msg)
                    self._result['ERROR'] = msg

            try:
                if self.prim_db_event: 
                    self.db_access.deleteDomainObject(self.prim_db_event)
            except Exception as e:
                msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args)
                DTIProcessor.logger.warning(msg)
                self._result['ERROR'] = msg
        except Exception as e:
                msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args)
                DTIProcessor.logger.warning(msg)
                self._result['ERROR'] = msg

        if res_dict_k8s: 
            utils.update_dict(res_dict, res_dict_k8s)

        if res_dict_docker: 
            utils.update_dict(res_dict, res_dict_docker)

        return res_dict

    def update(self):
        """
        process DTI event that indicates VNF instance has to be updated in the collector microservices
        """
        res_dict = {}
        res_dict_k8s = {}
        res_dict_docker = {}

        if self.is_notify:
            try:
                self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
                if self.prim_db_event: 
                    self.db_access.update_event_item(self.event, self.target_type, self.target_name)
                    result = self.db_access.query_event_data(self.target_type, self.target_name)
                    if len(result) == 0:
                        msg = "processing update event for {}/{}, but event distribution info is not found in database, " \
                              "replaying this event to cluster if required". \
                            format(self.target_type, self.target_name)
                        DTIProcessor.logger.warning(msg)
                        self._result['WARNING'] = msg
                        res_dict = self.add_replay()
                    else:
                        msg = "DTIProcessor.update() handle update flow for {}/{}, for k8s rediscover scn list and" \
                              "identify new vs update cases".format(self.target_type, self.target_name)
                        DTIProcessor.logger.debug(msg)
                        try:
                            tpl_arr = CfyClient().iter_components(self.target_type,
                                                                  dcae_service_location=self.event_clli)
                            res_dict_docker = dict(self.docker_pool.map(notify_svc,
                                                                        ((
                                                                            self.event, self.db_access,
                                                                            self.prim_db_event,
                                                                            tp)
                                                                            for tp in tpl_arr)))
                        except Exception as e:
                            msg = "DTIProcessor.update() running docker_pool.map() got exception {}: {!s}".format(
                                type(e).__name__, e)
                            DTIProcessor.logger.error(msg)
                            self._result['ERROR'] = msg
                else:
                    # event is new for the handler
                    msg = "processing update event for {}/{}, but current event info is not found in database, " \
                          "executing add event".format(self.target_type, self.target_name)
                    DTIProcessor.logger.warning(msg)
                    self._result['WARNING'] = msg
                    res_dict = self.add()
            except Exception as e:
                msg = "DTIProcessor.update() got exception {}: {!s}".format(type(e).__name__, e)
                DTIProcessor.logger.error(msg)
                self._result['ERROR'] = msg

        if res_dict_k8s: 
            utils.update_dict(res_dict, res_dict_k8s)

        if res_dict_docker: 
            utils.update_dict(res_dict, res_dict_docker)

        return res_dict

    def notify(self):
        """
        event handler to notify all the pods in the kubernetes cluster whose FQDN is present in the incoming event
        This notification is meant for the cluster failover.
        """
        res_dict = {}
        try:
            self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
            if self.prim_db_event: 
                self.db_access.update_event_item(self.event, self.target_type, self.target_name)
            else:
                self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
                                           location_clli=self.event_clli)
                self.db_access.saveDomainObject(self.prim_db_event)
        except Exception as e:
            msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
            DTIProcessor.logger.warning(msg)
            self._result['ERROR'] = msg

        try:
            self.k8s_pool.map(notify_pods, ((self.event, tp) for tp in
                                            CfyClient().query_k8_components(self.target_name)))
            for k, v in notify_response_arr:
                res_dict[k] = v
        except Exception as e:
            msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
            DTIProcessor.logger.warning(msg)
            self._result['WARNING'] = msg

        return res_dict

    def get_result(self):
        return self._result

    @classmethod
    def get_k8_raw_events(cls, pod, cluster, namespace):
        """
        Get DTI events for a k8 stateful set pod container

        :param pod: required
            k8s stateful set pod ID that was configured with a specific set of DTI Events
        :param cluster: required
            k8s cluster FQDN where the mS was deployed
        :param namespace: required
            k8s namespace where the stateful set was deployed in that namespace
        :return:
            Dictionary of DTI event(s).
            DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
        """
        db_access = EventDbAccess()
        results = db_access.query_raw_k8_events(cluster, pod, namespace)

        target_types = set([])
        outer_dict = {}

        for evnt_item in results:
            target_types.add(evnt_item.target_type)

        for targ_type in target_types:
            inner_name_evt_dict = {}
            for evnt in results:
                if targ_type == evnt.target_type:
                    inner_name_evt_dict[evnt.target_name] = evnt.event

            outer_dict[targ_type] = inner_name_evt_dict

        return outer_dict

    @classmethod
    def get_docker_raw_events(cls, service_name, service_location):
        """
        Get DTI events for docker container.

        Parameters
        ----------
        service_name : string
            required.  The service component name assigned by dockerplugin to the component that is unique to the
            cloudify node instance and used in its Consul key(s).
        service_location : string
            optional.  allows multiple values separated by commas.  Filters DTI events with dcae_service_location
            in service_location.
            If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul
            TAGs if service_name is provided,
            otherwise results are not location filtered.

        Returns
        -------
        dict
            Dictionary of DTI event(s).
            DTI events will be keyed by vnf_type, sub-keyed by vnf_id.

        """

        r_dict = {}

        want_locs = []
        if service_location:
            want_locs = service_location.split(',')

        give_types = []
        if service_name:
            if not want_locs:  # default to TAGs of container's dockerhost or k8s cluster master node
                try:
                    node_name = ConsulClient.lookup_service(service_name)[0].get("Node")
                    if node_name:
                        services = ConsulClient.lookup_node(node_name).get("Services")
                        if services:
                            for node_svc in list(services.keys()):
                                if "-component-dockerhost-" in node_svc:
                                    want_locs = services[node_svc].get("Tags", [])
                                    break
                except:
                    pass

            try:
                supported_types = ConsulClient.get_value(service_name + ":oti")
            except:
                return r_dict
            else:
                if supported_types:
                    supported_types = [t_type.lower() for t_type in list(supported_types.keys())]
                    give_types = supported_types
                if not give_types or (len(give_types) == 1 and give_types[0] == ''):
                    return r_dict

        db_access = EventDbAccess()
        results = db_access.query_raw_docker_events(give_types, want_locs)

        target_types = set([])
        outer_dict = {}

        for evnt_item in results:
            target_types.add(evnt_item.target_type)

        for targ_type in target_types:
            inner_name_evt_dict = {}
            for evnt in results:
                if targ_type == evnt.target_type:
                    inner_name_evt_dict[evnt.target_name] = evnt.event

            outer_dict[targ_type] = inner_name_evt_dict

        return outer_dict