summaryrefslogtreecommitdiffstats
path: root/oti/event-handler/otihandler
diff options
context:
space:
mode:
authorKotagiri, Ramprasad (rp5662) <rp5662@att.com>2019-12-19 17:41:16 -0500
committerKotagiri, Ramprasad (rp5662) <rp5662@att.com>2020-01-21 16:50:17 -0500
commit158b75abd6095a3155f5057832ec868bc99ced36 (patch)
treed374ba4adcfa6db9a036cb2bf018fe529c215eee /oti/event-handler/otihandler
parent77900bb3097491cd9fca964c111ea70724e53989 (diff)
Add OTI event-handler project
OTI event handler application in DCAEGEN2 platform Change-Id: Ie64f26f851e2045f00043f90279d503c2dc62948 Issue-ID: DCAEGEN2-1910 Signed-off-by: Kotagiri, Ramprasad (rp5662) <rp5662@att.com>
Diffstat (limited to 'oti/event-handler/otihandler')
-rw-r--r--oti/event-handler/otihandler/__init__.py15
-rw-r--r--oti/event-handler/otihandler/__main__.py74
-rw-r--r--oti/event-handler/otihandler/cbs_rest.py202
-rw-r--r--oti/event-handler/otihandler/cfy_client.py638
-rw-r--r--oti/event-handler/otihandler/config.py179
-rw-r--r--oti/event-handler/otihandler/consul_client.py617
-rw-r--r--oti/event-handler/otihandler/dbclient/__init__.py19
-rw-r--r--oti/event-handler/otihandler/dbclient/apis/__init__.py18
-rw-r--r--oti/event-handler/otihandler/dbclient/apis/db_access.py50
-rw-r--r--oti/event-handler/otihandler/dbclient/apis/event_db_access.py154
-rw-r--r--oti/event-handler/otihandler/dbclient/db_dao.py33
-rw-r--r--oti/event-handler/otihandler/dbclient/models/__init__.py19
-rw-r--r--oti/event-handler/otihandler/dbclient/models/event.py40
-rw-r--r--oti/event-handler/otihandler/dbclient/models/event_ack.py51
-rw-r--r--oti/event-handler/otihandler/docker_client.py175
-rw-r--r--oti/event-handler/otihandler/dti_processor.py815
-rw-r--r--oti/event-handler/otihandler/onap/CommonLogger.py958
-rw-r--r--oti/event-handler/otihandler/onap/__init__.py15
-rw-r--r--oti/event-handler/otihandler/onap/audit.py375
-rw-r--r--oti/event-handler/otihandler/onap/health.py102
-rw-r--r--oti/event-handler/otihandler/utils.py83
-rw-r--r--oti/event-handler/otihandler/web_server.py603
22 files changed, 5235 insertions, 0 deletions
diff --git a/oti/event-handler/otihandler/__init__.py b/oti/event-handler/otihandler/__init__.py
new file mode 100644
index 0000000..87cf002
--- /dev/null
+++ b/oti/event-handler/otihandler/__init__.py
@@ -0,0 +1,15 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
diff --git a/oti/event-handler/otihandler/__main__.py b/oti/event-handler/otihandler/__main__.py
new file mode 100644
index 0000000..59a7087
--- /dev/null
+++ b/oti/event-handler/otihandler/__main__.py
@@ -0,0 +1,74 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""run as server: python -m otihandler"""
+
+import logging
+import os
+import sys
+
+from otihandler.config import Config
+from otihandler.onap.audit import Audit
+from otihandler.web_server import DTIWeb
+from otihandler.dbclient import DaoBase
+
+
+class LogWriter(object):
+ """redirect the standard out + err to the logger"""
+
+ def __init__(self, logger_func):
+ self.logger_func = logger_func
+
+ def write(self, log_line):
+ """actual writer to be used in place of stdout or stderr"""
+
+ log_line = log_line.rstrip()
+ if log_line:
+ self.logger_func(log_line)
+
+ def flush(self):
+ """no real flushing of the buffer"""
+
+ pass
+
+
+def run_event_handler():
+ """main run function for event_handler"""
+
+ Config.load_from_file()
+ # Config.discover()
+
+ logger = logging.getLogger("event_handler")
+ sys.stdout = LogWriter(logger.info)
+ sys.stderr = LogWriter(logger.error)
+
+ logger.info("========== run_event_handler ==========")
+ app_version = os.getenv("APP_VER")
+ logger.info("app_version %s", app_version)
+ Audit.init(Config.get_system_name(), app_version, Config.LOGGER_CONFIG_FILE_PATH)
+
+ logger.info("starting event_handler with config:")
+ logger.info(Audit.log_json_dumps(Config.config))
+
+ audit = Audit(req_message="start event_handler")
+
+ audit = Audit(req_message="DB init start")
+ DaoBase.init_db(os.environ.get("DB_CONN_URL"))
+
+ DTIWeb.run_forever(audit)
+
+if __name__ == "__main__":
+ run_event_handler()
diff --git a/oti/event-handler/otihandler/cbs_rest.py b/oti/event-handler/otihandler/cbs_rest.py
new file mode 100644
index 0000000..2e3e7de
--- /dev/null
+++ b/oti/event-handler/otihandler/cbs_rest.py
@@ -0,0 +1,202 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""REST for high-level information retrievals from Consul KVs"""
+
+import copy
+import logging
+
+from otihandler.consul_client import ConsulClient
+
+
+class CBSRest(object):
+ _logger = logging.getLogger("oti_handler.cbs_rest")
+
+ @staticmethod
+ def get_value(key, default=None):
+ """Wrap ConsulClient.get_value() to ignore exceptions."""
+
+ data = default
+ try:
+ data = ConsulClient.get_value(key)
+ except Exception as e:
+ pass
+
+ return data
+
+ @staticmethod
+ def get_kvs(key):
+ """Wrap ConsulClient.get_kvs() to ignore exceptions."""
+
+ data = {}
+ try:
+ data = ConsulClient.get_kvs(key, trim_prefix=True)
+ except Exception as e:
+ data = {}
+
+ if not data:
+ data = {}
+ return data
+
+ @staticmethod
+ def get_service_component(service_name):
+ """Get the fully-bound config for a service_name."""
+
+ return ConsulClient.get_service_component(service_name)
+
+ @staticmethod
+ def get_service_component_all(service_name, service_location=None, policies_as_list=False):
+ """Get all Consul objects for a service_name."""
+
+ r_dict = ConsulClient.get_service_component_all(service_name, policies_as_list=policies_as_list)
+ if r_dict and r_dict.get('oti'):
+ r_dict['oti'] = CBSRest.get_oti(service_name, service_location=service_location)
+ return r_dict
+
+ @staticmethod
+ def get_oti(service_name=None, vnf_type=None, vnf_id=None, service_location=None):
+ """
+ Get DTI events.
+
+ Parameters
+ ----------
+ service_name : string
+ optional. The service component name assigned by dockerplugin to the component that is unique to the cloudify node instance and used in its Consul key(s).
+ vnf_type : string
+ optional, allows multiple values separated by commas. Gets DTI events for these vnf_type(s).
+ vnf_id : string
+ optional. Requires vnf_type also. Gets DTI event for this vnf_id.
+ service_location : string
+ optional, allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location.
+ If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided,
+ otherwise results are not location filtered.
+
+ Returns
+ -------
+ dict
+ Dictionary of DTI event(s).
+ If one vnf_type and vnf_id are both specified, then object returned will be just the one DTI event.
+ If one vnf_type is specified but not vnf_id, then DTI events will be keyed by vnf_id.
+ Otherwise the DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
+
+ """
+
+ lc_vnf_type = vnf_type
+ if vnf_type:
+ lc_vnf_type = vnf_type.lower()
+
+ r_dict = {}
+
+ want_locs = []
+ if service_location:
+ want_locs = service_location.split(',')
+
+ give_types = []
+ if service_name:
+ if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node
+ try:
+ node_name = ConsulClient.lookup_service(service_name)[0].get("Node")
+ if node_name:
+ services = ConsulClient.lookup_node(node_name).get("Services")
+ if services:
+ for node_svc in list(services.keys()):
+ if "-component-dockerhost-" in node_svc or "_component_kubernetes_master" in node_svc:
+ want_locs = services[node_svc].get("Tags", [])
+ break
+ except:
+ pass
+ supported_types = ConsulClient.get_value(service_name + ":oti")
+ if supported_types:
+ supported_types = [type.lower() for type in list(supported_types.keys())]
+ if supported_types:
+ if lc_vnf_type: # If user specifies vnf_type(s), constrain to supported ones
+ for type in lc_vnf_type.split(','):
+ if type in supported_types:
+ give_types.append(type)
+ else:
+ give_types = supported_types
+ if not give_types or (len(give_types) == 1 and give_types[0] == ''):
+ return r_dict
+ elif lc_vnf_type:
+ give_types = lc_vnf_type.split(',')
+
+
+ # If they specified only one vnf_type ...
+ if lc_vnf_type and ',' not in lc_vnf_type:
+ type = give_types[0]
+
+ # ... and vnf_id
+ if vnf_id:
+ # get just one vnf_id
+ t_dict = CBSRest.get_value("oti_events/" + type + "/" + vnf_id, default=None)
+ if t_dict:
+ event_loc = t_dict.get('dcae_service_location')
+ if not event_loc or not want_locs or event_loc in want_locs:
+ r_dict = copy.deepcopy(t_dict)
+
+ # ... and not vnf_id
+ else:
+ # get all DTI events of just one type, indexed by vnf_id
+ t_dict = CBSRest.get_kvs("oti_events/" + type + "/")
+ if t_dict:
+ if not want_locs:
+ r_dict = copy.deepcopy(t_dict)
+ else:
+ for id in t_dict:
+ event_loc = t_dict[id].get('dcae_service_location')
+ if not event_loc or event_loc in want_locs:
+ r_dict[id] = copy.deepcopy(t_dict[id])
+
+ # If they did not specify either service_name or vnf_type (the only way give_types=[])
+ elif not give_types:
+ # get all DTI events, indexed by vnf_type then vnf_id
+ t_dict = CBSRest.get_kvs("oti_events/")
+ if t_dict:
+ for type in t_dict:
+ for id in t_dict[type]:
+ if not vnf_id or vnf_id == id:
+ if want_locs:
+ event_loc = t_dict[type][id].get('dcae_service_location')
+ if not want_locs or not event_loc or event_loc in want_locs:
+ if type not in r_dict:
+ r_dict[type] = {}
+ r_dict[type][id] = copy.deepcopy(t_dict[type][id])
+
+ # If they speclfied multiple vnf_types
+ else:
+ # get all DTI events of give_types, indexed by vnf_type then vnf_id
+ for type in give_types:
+ t_dict = CBSRest.get_kvs("oti_events/" + type + "/")
+ if t_dict:
+ for id in t_dict:
+ if not vnf_id or vnf_id == id:
+ if want_locs:
+ event_loc = t_dict[id].get('dcae_service_location')
+ if not want_locs or not event_loc or event_loc in want_locs:
+ if type not in r_dict:
+ r_dict[type] = {}
+ r_dict[type][id] = copy.deepcopy(t_dict[id])
+
+ return r_dict
+
+ @staticmethod
+ def get_policies(service_name, policy_id=None):
+ """Get one or all policies for a service_name."""
+
+ if policy_id:
+ return ConsulClient.get_value(service_name + ":policies/items/" + policy_id)
+ else:
+ return ConsulClient.get_kvs(service_name + ":policies/items/", trim_prefix=True)
diff --git a/oti/event-handler/otihandler/cfy_client.py b/oti/event-handler/otihandler/cfy_client.py
new file mode 100644
index 0000000..c823340
--- /dev/null
+++ b/oti/event-handler/otihandler/cfy_client.py
@@ -0,0 +1,638 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""Our client interface to Cloudify"""
+import base64
+import copy
+import json
+import logging
+import os
+
+import requests
+
+from otihandler.consul_client import ConsulClient
+
+
+class CfyClientConsulError(RuntimeError):
+ pass
+
+
+class CloudifyClient(object):
+ """quick replacement for cloudify_rest_client -- this supports pagination and scans all DCAE tenants"""
+
+ def __init__(self, **kwargs):
+ self._protocol = kwargs.get('protocol', 'http')
+ self._host = kwargs.get('host')
+ self._port = kwargs.get('port')
+ self._headers = kwargs.get('headers')
+
+ self.node_instances = self
+
+ def list(self, **kwargs):
+ url_mask = "{}://{}:{}/api/v3.1/tenants".format(self._protocol, self._host, self._port)
+ # s = Session()
+ # req = Request('GET', url_mask, headers=self._headers)
+ # prepped = req.prepare()
+ # response = s.send(prepped,verify=False,timeout=30)
+ response = requests.get(url_mask, headers=self._headers, timeout=30)
+ obj = response.json()
+ tenants = [x["name"] for x in obj["items"]]
+ tenants_with_containers = [x for x in tenants if 'DCAE' in x]
+
+ size = 1000
+ url_mask = "{}://{}:{}/api/v3.1/node-instances?_size={}&_offset={}".format(
+ self._protocol, self._host, self._port, size, "{}")
+ if kwargs:
+ for (key,val) in kwargs.items():
+ if isinstance(val, str):
+ url_mask = url_mask + '&{}={}'.format(key, val)
+ elif isinstance(val, list):
+ url_mask = url_mask + '&{}={}'.format(key, ','.join(val))
+
+ for tenant in tenants_with_containers:
+ self._headers_with_tenant = copy.deepcopy(self._headers)
+ self._headers_with_tenant['Tenant'] = tenant
+
+ offset = 0
+ total = 1
+ while offset < total:
+ # s = Session()
+ # req = Request('GET', url_mask.format(offset), headers=self._headers_with_tenant)
+ # prepped = req.prepare()
+ # response = s.send(prepped, verify=False, timeout=30)
+ response = requests.get(url_mask.format(offset), headers=self._headers_with_tenant, timeout=30)
+ response.raise_for_status()
+ obj = response.json()
+ offset = offset + len(obj["items"])
+ total = obj["metadata"]["pagination"]["total"]
+ for item in obj["items"]:
+ yield NodeInstance(item)
+
+ def update_node_instance(self, node_instance_id, body, **kwargs):
+ headers = copy.deepcopy(self._headers_with_tenant)
+ headers['Content-Type'] = "application/json"
+ url_mask = "{}://{}:{}/api/v3.1/node-instances/{}".format(
+ self._protocol, self._host, self._port, node_instance_id)
+ response = requests.patch(url_mask, json=body, headers=headers, timeout=30)
+ obj = response.json()
+ return obj
+
+
+class NodeInstance(object):
+ """quick replacement for cloudify_rest_client"""
+
+ def __init__(self, instance):
+ self.id = instance.get("id")
+ self.deployment_id = instance.get("deployment_id")
+ self.host_id = instance.get("host_id")
+ self.runtime_properties = instance.get("runtime_properties")
+ self.relationships = instance.get("relationships")
+ self.state = instance.get("state")
+ self.version = instance.get("version")
+ self.node_id = instance.get("node_id")
+ self.scaling_groups = instance.get("scaling_groups")
+
+
+class CfyClient(object):
+ _logger = logging.getLogger("oti_handler.cfy_client")
+ _client = None
+
+
+ @staticmethod
+ def __set_cloudify_manager_client():
+ """Create connection to Cloudify_Manager."""
+
+ if CfyClient._client:
+ return
+
+ host = None
+ port = None
+ obj = json.loads(os.environ.get("CLOUDIFY", "{}")).get("cloudify")
+ source = "CLOUDIFY environment variable"
+ if not obj:
+ CM_KEY = 'cloudify_manager'
+ source = "Consul key '{}'".format(CM_KEY)
+
+ try:
+ results = ConsulClient.lookup_service(CM_KEY)
+ except Exception as e:
+ msg = "Unexpected exception {}: {!s} from ConsulClient.lookup_service({})".format(type(e).__name__, e, CM_KEY)
+ CfyClient._logger.error(msg)
+ raise CfyClientConsulError(msg)
+ result = results[0]
+ host = result['ServiceAddress']
+ port = result['ServicePort']
+
+ try:
+ obj = ConsulClient.get_value(CM_KEY)
+ except Exception as e:
+ msg = "Unexpected exception {}: {!s} from ConsulClient.get_value({})".format(type(e).__name__, e, CM_KEY)
+ CfyClient._logger.error(msg)
+ raise CfyClientConsulError(msg)
+ if not obj:
+ raise CfyClientConsulError("{} value is empty or invalid".format(source))
+
+ obj = obj.get('cloudify')
+
+ if not obj:
+ raise CfyClientConsulError("{} value is missing 'cloudify' key or value".format(source))
+
+ host = obj.get('address', host)
+ if not host:
+ raise CfyClientConsulError("{} value is missing 'cloudify.address'".format(source))
+
+ port = obj.get('port', port)
+ if not port:
+ raise CfyClientConsulError("{} value is missing 'cloudify.port'".format(source))
+
+ protocol = obj.get('protocol')
+ if not protocol:
+ raise CfyClientConsulError("{} value is missing 'cloudify.protocol'".format(source))
+ username = obj.get('user')
+ if not username:
+ raise CfyClientConsulError("{} value is missing 'cloudify.user'".format(source))
+ password = obj.get('password')
+ if not password:
+ raise CfyClientConsulError("{} value is missing 'cloudify.password'".format(source))
+
+ b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), 'utf-8')).decode("utf-8")
+ headers = {'Authorization': 'Basic ' + b64_encoded_str.rstrip('\n')}
+ #headers = {'Authorization': 'Basic ' + '{}:{}'.format(username, password).encode("base64").rstrip('\n')}
+
+ CfyClient._client = CloudifyClient(host=host, port=port, protocol=protocol, headers=headers)
+
+
+ @staticmethod
+ def query_k8_components(in_cluster_fqdn):
+ """
+ Iterate components that belong to a cluster fqdn.
+
+ Parameters
+ ----------
+ in_cluster_fqdn : string
+ k8s cluster FQDN
+
+ Returns
+ -------
+ A generator of tuples of component information
+ [ (proxy_fqdn, namespace, scn, replicas, scn_port), ... ]
+ """
+
+ cnt_found = 0
+ CfyClient.__set_cloudify_manager_client()
+ for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
+ rtp = node_instance.runtime_properties
+ scn_port = None
+ cluster_fqdn = None
+ proxy_fqdn = None
+ dti_info = rtp.get('dti_info')
+ if dti_info:
+ env_items = dti_info.get('env')
+ for env in env_items:
+ if env.get("name") == 'KUBE_CLUSTER_FQDN':
+ cluster_fqdn = env.get("value")
+ if env.get("name") == 'KUBE_PROXY_FQDN':
+ proxy_fqdn = env.get("value")
+ ports = dti_info.get('ports')
+ if ports:
+ scn_port = ports[0].split(':')[0]
+ else:
+ continue
+
+ if in_cluster_fqdn != cluster_fqdn:
+ continue
+
+ controller_type = rtp.get('k8s_controller_type')
+ if not controller_type:
+ CfyClient._logger.debug("controller type is missing")
+ continue
+ elif controller_type != "statefulset":
+ CfyClient._logger.debug("not a stateful set")
+ continue
+
+ container_id = rtp.get('k8s_deployment')
+ if not container_id:
+ CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(
+ node_instance.deployment_id, node_instance.id))
+ continue
+
+ try:
+ namespace = container_id.get('namespace')
+ except:
+ namespace = ''
+ pass
+
+ replicas = 1
+ try:
+ replicas = rtp.get('replicas')
+ except:
+ pass
+
+ scn = rtp.get('service_component_name')
+ if not scn:
+ CfyClient._logger.debug(
+ "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id,
+ node_instance.id))
+ continue
+
+ cnt_found += 1
+ yield (proxy_fqdn, namespace, scn, replicas, scn_port)
+ continue
+
+ msg = "Found {} components (collectors) for cluster={}" \
+ .format(cnt_found, in_cluster_fqdn)
+ CfyClient._logger.debug(msg)
+
+
+ @staticmethod
+ def iter_components(dcae_target_type, dcae_service_location='', component_type=''):
+ """
+ Iterate components that handle a given dcae_target_type.
+
+ Parameters
+ ----------
+ dcae_target_type : string
+ VNF Type
+ dcae_service_location : string
+ Location of the component (optional)
+ component_type : string
+ Type of the component (optional)
+
+ Returns
+ -------
+ A generator of tuples of component information
+ [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
+ or
+ [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ]
+
+ """
+
+ cnt_found = 0
+
+ # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI)
+ dockerhosts = []
+ k8s_svcs_tagged_with_clli = []
+ if dcae_service_location:
+ try:
+ dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location])
+ except Exception as e:
+ msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "-component-dockerhost-", [dcae_service_location])
+ CfyClient._logger.error(msg)
+ raise CfyClientConsulError(msg)
+ try:
+ k8s_svcs_tagged_with_clli = ConsulClient.search_services("_component_kubernetes_master", [dcae_service_location])
+ except Exception as e:
+ msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "_component_kubernetes_master", [dcae_service_location])
+ CfyClient._logger.error(msg)
+ raise CfyClientConsulError(msg)
+
+ CfyClient.__set_cloudify_manager_client()
+ for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
+ rtp = node_instance.runtime_properties
+
+ # Skip this node_instance if it is not a collector
+ container_type = "docker"
+ container_id = rtp.get('container_id')
+ docker_host = ''
+ svc_with_my_clli_tags = ''
+ if not container_id:
+ container_type = "k8s"
+ container_id = rtp.get('k8s_deployment')
+ if not container_id:
+ CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id))
+ continue
+ docker_config = rtp.get('docker_config')
+ if not docker_config:
+ CfyClient._logger.debug("{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, node_instance.id))
+ continue
+ dti_reconfig_script = ""
+ if container_type == "docker":
+ dti_reconfig_script = rtp.get('dti_reconfig_script')
+ if not dti_reconfig_script:
+ CfyClient._logger.debug("{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, node_instance.id))
+ continue
+ elif container_type == "k8s":
+ dti_reconfig_script = docker_config.get('reconfigs',{}).get('dti')
+ if not dti_reconfig_script:
+ CfyClient._logger.debug("{} {} runtime_properties docker_config has no reconfigs.dti".format(node_instance.deployment_id, node_instance.id))
+ continue
+
+ scn = rtp.get('service_component_name')
+ scn_address = None
+ scn_port = None
+ if not scn:
+ CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id))
+ continue
+ if container_type == "docker":
+ docker_host = rtp.get('selected_container_destination')
+ if not docker_host:
+ CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id))
+ continue
+ elif container_type == "k8s":
+ try:
+ srvcCatalogItem = ConsulClient.lookup_service(scn)[0]
+ scn_address = srvcCatalogItem.get("ServiceAddress")
+ except:
+ CfyClient._logger.debug(
+ "{} {} runtime_properties has no consul svc catalog registry".format(node_instance.deployment_id,
+ node_instance.id))
+ continue
+ svc_with_my_clli_tags = rtp.get('svc_with_my_clli_tags')
+ # e.g., scn="s908d92e232ed43..."
+ if not svc_with_my_clli_tags:
+ # We should not incur this burden. k8splugin should store this into runtime properties.
+ try:
+ node_name = srvcCatalogItem.get("Node")
+ if node_name:
+ # e.g., node_name="zldcdyh1adce3kpma00"
+ services = ConsulClient.lookup_node(node_name).get("Services")
+ if services:
+ for node_svc in list(services.keys()):
+ if "_component_kubernetes_master" in node_svc:
+ # e.g., node_svc="zldcdyh1adce3_kp_component_kubernetes_master"
+ svc_with_my_clli_tags = node_svc
+ break
+ except:
+ pass
+ # ... cache results we find into runtime properties to avoid searching again
+ if svc_with_my_clli_tags:
+ CfyClient._logger.debug("{} {} storing runtime property svc_with_my_clli_tags={}".format(
+ node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags))
+ rtp['svc_with_my_clli_tags'] = svc_with_my_clli_tags
+ body = {
+ "runtime_properties": rtp,
+ "state": node_instance.state,
+ "version": 1 + int(node_instance.version)
+ }
+ try:
+ CfyClient._client.update_node_instance(node_instance.id, body)
+ except:
+ pass
+
+ if not svc_with_my_clli_tags:
+ CfyClient._logger.debug("{} {} runtime_properties has no svc_with_my_clli_tags".format(node_instance.deployment_id, node_instance.id))
+ continue
+
+ # get the nodeport for statefulset sidecar service
+ dti_info = rtp.get('dti_info')
+ if dti_info:
+ ports = dti_info.get('ports')
+ if ports:
+ scn_port = ports[0].split(':')[1]
+ docker_host = rtp.get('configuration',{}).get('file_content')
+ if not docker_host:
+ CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id))
+ continue
+
+ # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG
+ if dcae_service_location:
+ if container_type == "docker" and docker_host not in dockerhosts:
+ CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}"
+ .format(node_instance.deployment_id, node_instance.id, docker_host, dcae_service_location))
+ continue
+ elif container_type == "k8s" and svc_with_my_clli_tags not in k8s_svcs_tagged_with_clli:
+ CfyClient._logger.debug("{} {} svc_with_my_clli_tags {} is not TAGged with DTI Event dcae_service_location {}"
+ .format(node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags, dcae_service_location))
+ continue
+
+ # If DTI Event specifies component_type, then collector's service_component_type must match
+ if component_type:
+ c_component_type = rtp.get('service_component_type')
+ if component_type != c_component_type:
+ CfyClient._logger.debug("{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id))
+ continue
+
+ # Check if the collector supports this VNF Type
+ # scn:dti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
+ dti_key = scn + ':dti'
+ try:
+ obj = ConsulClient.get_value(dti_key)
+ except Exception as e:
+ CfyClient._logger.error(
+ "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}"
+ .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id)
+ )
+ continue
+ if not obj:
+ CfyClient._logger.debug("{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, dti_key))
+ continue
+ obj_types = set(k.lower() for k in obj)
+ if dcae_target_type.lower() in obj_types:
+ CfyClient._logger.debug("{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, dcae_target_type))
+ cnt_found += 1
+ yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, dti_reconfig_script, container_type, scn_address, scn_port )
+ continue
+ else:
+ CfyClient._logger.debug("{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, dcae_target_type, dti_key))
+ continue
+
+ msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}"\
+ .format(cnt_found, dcae_target_type, dcae_service_location, component_type)
+ CfyClient._logger.debug(msg)
+
+ @staticmethod
+ def iter_components_for_docker(dcae_target_type, dcae_service_location='', component_type=''):
+ """
+ Iterate components that handle a given dcae_target_type to find the components of docker type
+
+ Parameters
+ ----------
+ dcae_target_type : string
+ VNF Type
+ dcae_service_location : string
+ Location of the component (optional)
+ component_type : string
+ Type of the component (optional)
+
+ Returns
+ -------
+ A generator of tuples of component information
+ [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
+
+ """
+
+ cnt_found = 0
+ # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI)
+ dockerhosts = []
+
+ if dcae_service_location:
+ try:
+ dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location])
+ except Exception as e:
+ msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(
+ type(e).__name__, e, "-component-dockerhost-", [dcae_service_location])
+ CfyClient._logger.error(msg)
+ raise CfyClientConsulError(msg)
+
+ CfyClient.__set_cloudify_manager_client()
+ for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
+ rtp = node_instance.runtime_properties
+
+ # Skip this node_instance if it is not a collector
+ container_type = "docker"
+ container_id = rtp.get('container_id')
+ if not container_id:
+ if not container_id:
+ CfyClient._logger.debug("{} {} runtime_properties has no container_id".format(
+ node_instance.deployment_id, node_instance.id))
+ continue
+ docker_config = rtp.get('docker_config')
+ if not docker_config:
+ CfyClient._logger.debug(
+ "{} {} runtime_properties has no docker_config".format(node_instance.deployment_id,
+ node_instance.id))
+ continue
+ dti_reconfig_script = ""
+ dti_reconfig_script = rtp.get('dti_reconfig_script')
+ if not dti_reconfig_script:
+ CfyClient._logger.debug(
+ "{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id,
+ node_instance.id))
+ continue
+ scn = rtp.get('service_component_name')
+ if not scn:
+ CfyClient._logger.debug(
+ "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id,
+ node_instance.id))
+ continue
+ docker_host = rtp.get('selected_container_destination')
+ if not docker_host:
+ CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(
+ node_instance.deployment_id, node_instance.id))
+ continue
+
+ # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG
+ if dcae_service_location:
+ if docker_host not in dockerhosts:
+ CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}"
+ .format(node_instance.deployment_id, node_instance.id, docker_host,
+ dcae_service_location))
+ continue
+
+ # If DTI Event specifies component_type, then collector's service_component_type must match
+ if component_type:
+ c_component_type = rtp.get('service_component_type')
+ if component_type != c_component_type:
+ CfyClient._logger.debug(
+ "{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id))
+ continue
+
+ # Check if the collector supports this VNF Type
+ # scn:dti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
+ dti_key = scn + ':dti'
+ try:
+ obj = ConsulClient.get_value(dti_key)
+ except Exception as e:
+ CfyClient._logger.error(
+ "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}"
+ .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id)
+ )
+ continue
+ if not obj:
+ CfyClient._logger.debug(
+ "{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id,
+ dti_key))
+ continue
+ obj_types = set(k.lower() for k in obj)
+ if dcae_target_type.lower() in obj_types:
+ CfyClient._logger.debug(
+ "{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id,
+ dcae_target_type))
+ cnt_found += 1
+ yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id,
+ node_instance.state, docker_host, dti_reconfig_script, container_type, '', '')
+ continue
+ else:
+ CfyClient._logger.debug(
+ "{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id,
+ dcae_target_type, dti_key))
+ continue
+
+ msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}" \
+ .format(cnt_found, dcae_target_type, dcae_service_location, component_type)
+ CfyClient._logger.debug(msg)
+
+
+ @staticmethod
+ def iter_components_of_deployment(deployment_id, node_id=None, reconfig_type="app"):
+ """
+ Iterate components of a specific deployment_id.
+
+ Parameters
+ ----------
+ deployment_id : string
+ Cloudify deployment ID that created the component(s).
+ node_id : string
+ Cloudify node ID that created the component.
+ reconfig_type : string
+ "app"
+
+ Returns
+ -------
+ A generator of tuples of component information
+ [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
+ or
+ [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ]
+
+ """
+
+ cnt_found = 0
+
+ CfyClient.__set_cloudify_manager_client()
+ for node_instance in CfyClient._client.node_instances.list(
+ deployment_id=deployment_id,
+ _include=['id','node_id','deployment_id','state','runtime_properties']
+ ):
+ if node_id and node_instance.node_id != node_id:
+ continue
+
+ rtp = node_instance.runtime_properties
+
+ # Skip this node_instance if it is not a collector
+ container_type = "docker"
+ container_id = rtp.get('container_id')
+ if not container_id:
+ container_type = "k8s"
+ container_id = rtp.get('k8s_deployment')
+ if not container_id:
+ CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id))
+ continue
+ reconfig_script = rtp.get('docker_config',{}).get('reconfigs',{}).get(reconfig_type)
+ if not reconfig_script:
+ CfyClient._logger.debug("{} {} runtime_properties has no docker_config.reconfigs.{}".format(node_instance.deployment_id, node_instance.id, reconfig_type))
+ continue
+ scn = rtp.get('service_component_name')
+ if not scn:
+ CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id))
+ continue
+ if container_type == "docker":
+ docker_host = rtp.get('selected_container_destination')
+ if not docker_host:
+ CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id))
+ continue
+ elif container_type == "k8s":
+ docker_host = rtp.get('configuration',{}).get('file_content')
+ if not docker_host:
+ CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id))
+ continue
+
+ CfyClient._logger.debug("{} {} is a {}-reconfigurable component".format(node_instance.deployment_id, node_instance.id, reconfig_type))
+ cnt_found += 1
+ yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, reconfig_script, container_type)
+ continue
+
+ msg = "Found {} {}-reconfigurable components".format(cnt_found, reconfig_type)
+ CfyClient._logger.debug(msg)
diff --git a/oti/event-handler/otihandler/config.py b/oti/event-handler/otihandler/config.py
new file mode 100644
index 0000000..d5149cc
--- /dev/null
+++ b/oti/event-handler/otihandler/config.py
@@ -0,0 +1,179 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""read and use the config"""
+
+import copy
+import json
+import logging
+import logging.config
+import os
+
+from otihandler.consul_client import ConsulClient
+
+logging.basicConfig(
+ filename='logs/oti_handler.log', \
+ format='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \
+ '%(threadName)s %(name)s.%(funcName)s: %(message)s', \
+ datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG)
+
+class Config(object):
+ """main config of the application"""
+
+ CONFIG_FILE_PATH = "etc/config.json"
+ LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config"
+ SERVICE_NAME = "oti_handler"
+
+ FIELD_SYSTEM = "system"
+ FIELD_WSERVICE_PORT = "wservice_port"
+ FIELD_TLS = "tls"
+
+ _logger = logging.getLogger("oti_handler.config")
+ config = None
+
+ cloudify_proto = None
+ cloudify_addr = None
+ cloudify_port = None
+ cloudify_user = None
+ cloudify_pass = None
+ cloudify = None
+ consul_url = "http://consul:8500"
+ tls_cacert_file = None
+ tls_server_cert_file = None
+ tls_private_key_file = None
+ tls_server_ca_chain_file = None
+ wservice_port = 9443
+
+ @staticmethod
+ def _get_tls_file_path(tls_config, cert_directory, tls_name):
+ """calc file path and verify its existance"""
+
+ file_name = tls_config.get(tls_name)
+ if not file_name:
+ return None
+ tls_file_path = os.path.join(cert_directory, file_name)
+ if not os.path.isfile(tls_file_path) or not os.access(tls_file_path, os.R_OK):
+ Config._logger.error("invalid %s: %s", tls_name, tls_file_path)
+ return None
+ return tls_file_path
+
+ @staticmethod
+ def _set_tls_config(tls_config):
+ """verify and set tls certs in config"""
+
+ try:
+ Config.tls_cacert_file = None
+ Config.tls_server_cert_file = None
+ Config.tls_private_key_file = None
+ Config.tls_server_ca_chain_file = None
+
+ if not (tls_config and isinstance(tls_config, dict)):
+ Config._logger.info("no tls in config: %s", json.dumps(tls_config))
+ return
+
+ cert_directory = tls_config.get("cert_directory")
+
+ if not (cert_directory and isinstance(cert_directory, str)):
+ Config._logger.warning("unexpected tls.cert_directory: %r", cert_directory)
+ return
+
+ cert_directory = os.path.join(
+ os.path.dirname(os.path.dirname(os.path.realpath(__file__))), str(cert_directory))
+ if not (cert_directory and os.path.isdir(cert_directory)):
+ Config._logger.warning("ignoring invalid cert_directory: %s", cert_directory)
+ return
+
+ Config.tls_cacert_file = Config._get_tls_file_path(tls_config, cert_directory, "cacert")
+ Config.tls_server_cert_file = Config._get_tls_file_path(tls_config, cert_directory,
+ "server_cert")
+ Config.tls_private_key_file = Config._get_tls_file_path(tls_config, cert_directory,
+ "private_key")
+ Config.tls_server_ca_chain_file = Config._get_tls_file_path(tls_config, cert_directory,
+ "server_ca_chain")
+
+ finally:
+ Config._logger.info("tls_cacert_file = %s", Config.tls_cacert_file)
+ Config._logger.info("tls_server_cert_file = %s", Config.tls_server_cert_file)
+ Config._logger.info("tls_private_key_file = %s", Config.tls_private_key_file)
+ Config._logger.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file)
+
+ @staticmethod
+ def merge(new_config):
+ """merge the new_config into current config - override the values"""
+
+ if not new_config:
+ return
+
+ if not Config.config:
+ Config.config = new_config
+ return
+
+ new_config = copy.deepcopy(new_config)
+ Config.config.update(new_config)
+
+ @staticmethod
+ def get_system_name():
+ """find the name of the oti_handler system
+ to be used as the key in consul-kv for config of oti_handler
+ """
+
+ return (Config.config or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME)
+
+ @staticmethod
+ def discover():
+ """bring and merge the config settings from Consul"""
+
+ system_key = Config.get_system_name()
+ new_config = ConsulClient.get_value(system_key)
+
+ if not new_config or not isinstance(new_config, dict):
+ Config._logger.warn("unexpected config from Consul: %s", new_config)
+ return
+
+ Config._logger.debug("loaded config from Consul(%s): %s",
+ system_key, json.dumps(new_config))
+ Config._logger.debug("config before merge from Consul: %s", json.dumps(Config.config))
+ Config.merge(new_config.get(Config.SERVICE_NAME))
+ Config._logger.debug("merged config from Consul: %s", json.dumps(Config.config))
+
+ @staticmethod
+ def load_from_file(file_path=None):
+ """read and store the config from config file"""
+
+ if not file_path:
+ file_path = Config.CONFIG_FILE_PATH
+
+ loaded_config = None
+ if os.access(file_path, os.R_OK):
+ with open(file_path, 'r') as config_json:
+ loaded_config = json.load(config_json)
+
+ if not loaded_config:
+ Config._logger.info("config not loaded from file: %s", file_path)
+ return
+
+ Config._logger.info("config loaded from file: %s", file_path)
+ logging_config = loaded_config.get("logging")
+ if logging_config:
+ logging.config.dictConfig(logging_config)
+
+ Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port)
+
+ local_config = loaded_config.get(Config.SERVICE_NAME, {})
+ Config._set_tls_config(local_config.get(Config.FIELD_TLS))
+
+ Config.merge(loaded_config.get(Config.SERVICE_NAME))
+ return True
diff --git a/oti/event-handler/otihandler/consul_client.py b/oti/event-handler/otihandler/consul_client.py
new file mode 100644
index 0000000..d26d3a1
--- /dev/null
+++ b/oti/event-handler/otihandler/consul_client.py
@@ -0,0 +1,617 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""client to talk to consul at consul port 8500"""
+
+import base64
+import copy
+import json
+import logging
+import os
+import re
+import socket
+
+import requests
+
+
+class ConsulClientError(RuntimeError):
+ pass
+
+class ConsulClientConnectionError(RuntimeError):
+ pass
+
+class ConsulClientServiceNotFoundError(RuntimeError):
+ pass
+
+class ConsulClientNodeNotFoundError(RuntimeError):
+ pass
+
+class ConsulClientKVEntryNotFoundError(RuntimeError):
+ pass
+
+
+class ConsulClient(object):
+ """talking to consul"""
+
+ CONSUL_SERVICE_MASK = "{}/v1/catalog/service/{}"
+ CONSUL_KV_MASK = "{}/v1/kv/{}"
+ CONSUL_KVS_MASK = "{}/v1/kv/{}?recurse=true"
+ CONSUL_TRANSACTION_URL = "{}/v1/txn"
+ _logger = logging.getLogger("oti_handler.consul_client")
+
+ MAX_OPS_PER_TXN = 64
+ # MAX_VALUE_LEN = 512 * 1000
+
+ OPERATION_SET = "set"
+ OPERATION_DELETE = "delete"
+ OPERATION_DELETE_FOLDER = "delete-tree"
+
+
+ #----- Methods for Consul services
+
+ @staticmethod
+ def lookup_service(service_name):
+ """find the service record in consul"""
+
+ service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name)
+
+ ConsulClient._logger.info("lookup_service(%s)", service_path)
+
+ try:
+ response = requests.get(service_path, timeout=30)
+ response.raise_for_status()
+ # except requests.exceptions.HTTPError as e:
+ # except requests.exceptions.ConnectionError as e:
+ # except requests.exceptions.Timeout as e:
+ except requests.exceptions.RequestException as e:
+ msg = "lookup_service({}) requests.get({}) threw exception {}: {!s}".format(
+ service_name, service_path, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientConnectionError(msg)
+
+ try:
+ return_list = response.json()
+ # except ValueError as e:
+ except Exception as e:
+ msg = "lookup_service({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
+ service_name, service_path, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientServiceNotFoundError(msg)
+
+ if not return_list:
+ msg = "lookup_service({}) got empty or no value from requests.get({})".format(
+ service_name, service_path)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientServiceNotFoundError(msg)
+
+ return return_list
+
+
+ @staticmethod
+ def get_all_services():
+ """List all services from consul"""
+
+ service_path = "{}/v1/catalog/services".format(os.environ.get("CONSUL_URL").rstrip("/"))
+
+ ConsulClient._logger.info("get_all_services(%s)", service_path)
+
+ try:
+ response = requests.get(service_path, timeout=30)
+ response.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ msg = "get_all_services() requests.get({}) threw exception {}: {!s}".format(
+ service_path, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientConnectionError(msg)
+
+ try:
+ return_dict = response.json()
+ except Exception as e:
+ msg = "get_all_services() parsing JSON from requests.get({}) threw exception {}: {!s}".format(
+ service_path, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientServiceNotFoundError(msg)
+
+ if not return_dict:
+ msg = "get_all_services() got empty or no value from requests.get({})".format(
+ service_path)
+ ConsulClient._logger.info(msg)
+ # raise ConsulClientServiceNotFoundError(msg)
+
+ return return_dict
+
+
+ @staticmethod
+ def _find_matching_services(services, name_search, tags):
+ """Find matching services given search criteria"""
+ sub_tags = tags[0][4:6]
+ tags.append(sub_tags)
+
+ def is_match(service):
+ srv_name, srv_tags = service
+ return name_search in srv_name and \
+ any([tag in srv_tags for tag in tags])
+
+ return [ srv[0] for srv in list(services.items()) if is_match(srv) ]
+
+
+ @staticmethod
+ def search_services(name_search, tags):
+ """
+ Search for services that match criteria
+
+ Args:
+ -----
+ name_search: (string) Name to search for as a substring
+ tags: (list) List of strings that are tags. A service must match **ANY OF** the
+ tags in the list.
+
+ Returns:
+ --------
+ List of names of services that matched
+ """
+
+ matches = []
+
+ # srvs is dict where key is service name and value is list of tags
+ srvs = ConsulClient.get_all_services()
+
+ if srvs:
+ matches = ConsulClient._find_matching_services(srvs, name_search, tags)
+
+ return matches
+
+
+ @staticmethod
+ def get_service_fqdn_port(service_name, node_meta=False):
+ """find the service record in consul"""
+
+ service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name)
+
+ ConsulClient._logger.info("get_service_fqdn_port(%s)", service_path)
+
+ try:
+ response = requests.get(service_path, timeout=30)
+ response.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ msg = "get_service_fqdn_port({}) requests.get({}) threw exception {}: {!s}".format(
+ service_name, service_path, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientConnectionError(msg)
+
+ try:
+ service = response.json()
+ except Exception as e:
+ msg = "get_service_fqdn_port({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
+ service_name, service_path, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientServiceNotFoundError(msg)
+
+ if not service:
+ msg = "get_service_fqdn_port({}) got empty or no value from requests.get({})".format(
+ service_name, service_path)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientServiceNotFoundError(msg)
+
+ try:
+ service = service[0] # arbitrarily choose the first one
+ port = service["ServicePort"]
+
+ # HTTPS certificate validation requires FQDN not IP address
+ fqdn = ""
+ if node_meta:
+ meta = service.get("NodeMeta")
+ if meta:
+ fqdn = meta.get("fqdn")
+ if not fqdn:
+ fqdn = socket.getfqdn(str(service["ServiceAddress"]))
+ except Exception as e:
+ msg = "get_service_fqdn_port({}) parsing result from requests.get({}) threw exception {}: {!s}".format(
+ service_name, service_path, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientServiceNotFoundError(msg)
+
+ return (fqdn, port)
+
+
+ #----- Methods for Consul nodes
+
+ @staticmethod
+ def lookup_node(node_name):
+ """find the node record in consul"""
+
+ node_path = "{}/v1/catalog/node/{}".format(os.environ.get("CONSUL_URL").rstrip("/"), node_name)
+
+ ConsulClient._logger.info("lookup_node(%s)", node_path)
+
+ try:
+ response = requests.get(node_path, timeout=30)
+ response.raise_for_status()
+ # except requests.exceptions.HTTPError as e:
+ # except requests.exceptions.ConnectionError as e:
+ # except requests.exceptions.Timeout as e:
+ except requests.exceptions.RequestException as e:
+ msg = "lookup_node({}) requests.get({}) threw exception {}: {!s}".format(
+ node_name, node_path, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientConnectionError(msg)
+
+ try:
+ return_dict = response.json()
+ # except ValueError as e:
+ except Exception as e:
+ msg = "lookup_node({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
+ node_name, node_path, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientNodeNotFoundError(msg)
+
+ if not return_dict:
+ msg = "lookup_node({}) got empty or no value from requests.get({})".format(
+ node_name, node_path)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientNodeNotFoundError(msg)
+
+ return return_dict
+
+
+ #----- Methods for Consul key-values
+
+ @staticmethod
+ def put_value(key, data, cas=None):
+ """put the value for key into consul-kv"""
+
+ # ConsulClient._logger.info("put_value(%s)", str(key))
+
+ URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key)
+ if cas is not None:
+ URL = '{}?cas={}'.format(URL, cas)
+
+ try:
+ response = requests.put(URL, data=json.dumps(data), timeout=30)
+ response.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ msg = "put_value({}) requests.put({}) threw exception {}: {!s}".format(
+ key, URL, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientConnectionError(msg)
+
+ try:
+ updated = response.json()
+ except Exception as e:
+ msg = "put_value({}) parsing JSON from requests.put({}) threw exception {}: {!s}".format(
+ key, URL, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientKVEntryNotFoundError(msg)
+
+ return updated
+
+
+ @staticmethod
+ def get_value(key, get_index=False):
+ """get the value for key from consul-kv"""
+
+ URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key)
+
+ try:
+ response = requests.get(URL, timeout=30)
+ response.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ msg = "get_value({}) requests.get({}) threw exception {}: {!s}".format(
+ key, URL, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientConnectionError(msg)
+
+ try:
+ data = response.json()
+ except Exception as e:
+ msg = "get_value({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
+ key, URL, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientKVEntryNotFoundError(msg)
+
+ if not data:
+ msg = "get_value({}) got empty or no value from requests.get({})".format(
+ key, URL)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientKVEntryNotFoundError(msg)
+
+ try:
+ value = base64.b64decode(data[0]["Value"]).decode("utf-8")
+ value_dict = json.loads(value)
+ except Exception as e:
+ msg = "get_value({}) decoding value from requests.get({}) threw exception {}: {!s}".format(
+ key, URL, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientKVEntryNotFoundError(msg)
+
+ ConsulClient._logger.info("consul-kv key=%s value(%s) data=%s",
+ key, value, json.dumps(data))
+
+ if get_index:
+ return data[0]["ModifyIndex"], value_dict
+
+ return value_dict
+
+
+ @staticmethod
+ def get_kvs(prefix, nest=True, trim_prefix=False):
+ """get key-values for keys beginning with prefix from consul-kv"""
+
+ URL = ConsulClient.CONSUL_KVS_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), prefix)
+
+ try:
+ response = requests.get(URL, timeout=30)
+ response.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ msg = "get_kvs({}) requests.get({}) threw exception {}: {!s}".format(
+ prefix, URL, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientConnectionError(msg)
+
+ try:
+ data = response.json()
+ except Exception as e:
+ msg = "get_kvs({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
+ prefix, URL, type(e).__name__, e)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientKVEntryNotFoundError(msg)
+
+ if not data:
+ msg = "get_kvs({}) got empty or no value from requests.get({})".format(
+ prefix, URL)
+ ConsulClient._logger.error(msg)
+ raise ConsulClientKVEntryNotFoundError(msg)
+
+ def put_level_value(level_keys, value, level_dict={}):
+ if level_keys:
+ key = level_keys.pop(0)
+ level_dict[key] = put_level_value(level_keys, value, level_dict.get(key, {}))
+ return level_dict
+ else:
+ return value
+
+ rdict = {}
+ for item in data:
+ v = base64.b64decode(item["Value"]).decode("utf-8")
+ try:
+ value = json.loads(v)
+ except Exception as e:
+ value = v
+ key = item['Key']
+ if trim_prefix:
+ key = key[len(prefix):]
+ if nest:
+ level_keys = key.split('/')
+ rdict = put_level_value(level_keys, value, rdict)
+ else:
+ rdict[key] = value
+
+ ConsulClient._logger.info("consul-kv prefix=%s value(%s) data=%s",
+ prefix, json.dumps(rdict), json.dumps(data))
+ return rdict
+
+
+ @staticmethod
+ def _gen_txn_operation(verb, key, value=None):
+ """returns the properly formatted operation to be used inside transaction"""
+
+ # key = urllib.quote(key) # can't use urllib.quote() because it kills ':' in the key
+ if value:
+ return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(value)}}
+ return {"KV": {"Verb": verb, "Key": key}}
+
+
+ @staticmethod
+ def _run_transaction(operation_name, txn):
+ """run a single transaction of several operations at consul /txn"""
+
+ if not txn:
+ return
+
+ txn_url = ConsulClient.CONSUL_TRANSACTION_URL.format(os.environ.get("CONSUL_URL").rstrip("/"))
+ response = None
+ try:
+ response = requests.put(txn_url, json=txn, timeout=30)
+ except requests.exceptions.RequestException as e:
+ ConsulClient._logger.error("failed to {} at {}: exception {}: {!s} on txn={}"
+ .format(operation_name, txn_url, type(e).__name__, e, json.dumps(txn)))
+ return
+
+ if response.status_code != requests.codes.ok:
+ ConsulClient._logger.error("failed {} {}: {} text={} txn={} headers={}"
+ .format(operation_name, txn_url, response.status_code,
+ response.text, json.dumps(txn),
+ json.dumps(dict(list(response.request.headers.items())))))
+ return
+
+ ConsulClient._logger.info("response for {} {}: {} text={} txn={} headers={}"
+ .format(operation_name, txn_url, response.status_code,
+ response.text, json.dumps(txn),
+ json.dumps(dict(list(response.request.headers.items())))))
+
+ return True
+
+
+ @staticmethod
+ def store_kvs(kvs):
+ """put kvs into consul-kv"""
+
+ if not kvs:
+ ConsulClient._logger.warn("kvs not supplied to store_kvs()")
+ return
+
+ store_kvs = [
+ ConsulClient._gen_txn_operation(ConsulClient.OPERATION_SET,
+ key, json.dumps(value))
+ for key, value in kvs.items()
+ ]
+ txn = []
+ idx_step = ConsulClient.MAX_OPS_PER_TXN - len(txn)
+ for idx in range(0, len(store_kvs), idx_step):
+ txn += store_kvs[idx : idx + idx_step]
+ if not ConsulClient._run_transaction("store_kvs", txn):
+ return False
+ txn = []
+
+ return ConsulClient._run_transaction("store_kvs", txn)
+
+
+ @staticmethod
+ def delete_key(key):
+ """delete key from consul-kv"""
+
+ if not key:
+ ConsulClient._logger.warn("key not supplied to delete_key()")
+ return
+
+ delete_key = [
+ ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE, key)
+ ]
+ return ConsulClient._run_transaction("delete_key", delete_key)
+
+
+ @staticmethod
+ def delete_kvs(key):
+ """delete key from consul-kv"""
+
+ if not key:
+ ConsulClient._logger.warn("key not supplied to delete_kvs()")
+ return
+
+ delete_kvs = [
+ ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE_FOLDER, key)
+ ]
+ return ConsulClient._run_transaction("delete_kvs", delete_kvs)
+
+
+ #----- Methods for Config Binding Service
+
+ @staticmethod
+ def get_service_component(scn):
+ config = json.dumps(ConsulClient.get_value(scn))
+
+ try:
+ dmaap = ConsulClient.get_value(scn + ":dmaap")
+ except Exception as e:
+ dmaap = None
+ if dmaap:
+ for key in list(dmaap.keys()):
+ config = re.sub('"<<' + key + '>>"', json.dumps(dmaap[key]), config)
+
+ try:
+ rel = ConsulClient.get_value(scn + ":rel")
+ except Exception as e:
+ rel = None
+ if rel:
+ for key in list(rel.keys()):
+ config = re.sub('"{{' + key + '}}"', json.dumps(rel[key]), config)
+
+ return json.loads(config)
+
+
+ @staticmethod
+ def get_service_component_all(scn, policies_as_list=True):
+ t_scn = scn + ":"
+ t_len = len(t_scn)
+ a_dict = ConsulClient.get_kvs(scn)
+ b_dict = {}
+ for key in a_dict:
+ b_key = None
+ if key == scn:
+ b_dict["config"] = ConsulClient.get_service_component(scn)
+ elif key == scn + ":dmaap":
+ continue
+ elif key[0:t_len] == t_scn:
+ b_key = key[t_len:]
+ # policies_as_list = True formats policies items in a list like ONAP's CBS; False keeps policy_ids keys
+ if policies_as_list and b_key == "policies": # convert items from KVs to a values list
+ b_dict[b_key] = {}
+ for sub_key in a_dict[key]:
+ if sub_key == "items":
+ b_dict[b_key][sub_key] = []
+ d_dict = a_dict[key][sub_key]
+ for item in sorted(d_dict.keys()): # old CBS sorted them so we emulate
+ b_dict[b_key][sub_key].append(d_dict[item])
+ else:
+ b_dict[b_key][sub_key] = copy.deepcopy(a_dict[key][sub_key])
+ else:
+ b_dict[b_key] = copy.deepcopy(a_dict[key])
+ return b_dict
+
+
+ @staticmethod
+ def add_vnf_id(scn, vnf_type, vnf_id, dti_dict):
+ """
+ Add VNF instance to Consul scn:dti key.
+
+ Treat its value as a JSON string representing a dict.
+ Extend the dict by adding a dti_dict for vnf_id under vnf_type.
+ Turn the resulting extended dict into a JSON string.
+ Store the string back into Consul under scn:dti key.
+ Watch out for conflicting concurrent updates.
+ """
+
+ key = scn + ':dti'
+ lc_vnf_type = vnf_type.lower()
+ while True: # do until update succeeds
+ (mod_index, v) = ConsulClient.get_value(key, get_index=True)
+ lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case
+ # but DCAE-C doesn't create such keys
+
+ if lc_vnf_type not in lc_v:
+ return # That VNF type is not supported by this component
+ lc_v[lc_vnf_type][vnf_id] = dti_dict # add or replace the VNF instance
+
+ updated = ConsulClient.put_value(key, lc_v, cas=mod_index)
+ if updated:
+ return lc_v
+
+
+ @staticmethod
+ def delete_vnf_id(scn, vnf_type, vnf_id):
+ """
+ Delete VNF instance from Consul scn:dti key.
+
+ Treat its value as a JSON string representing a dict.
+ Modify the dict by deleting the vnf_id key entry from under vnf_type.
+ Turn the resulting extended dict into a JSON string.
+ Store the string back into Consul under scn:dti key.
+ Watch out for conflicting concurrent updates.
+ """
+
+ key = scn + ':dti'
+ lc_vnf_type = vnf_type.lower()
+ while True: # do until update succeeds
+ (mod_index, v) = ConsulClient.get_value(key, get_index=True)
+ lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case
+ # but DCAE-C doesn't create such keys
+
+ if lc_vnf_type not in lc_v:
+ return # That VNF type is not supported by this component
+ if vnf_id not in lc_v[lc_vnf_type]:
+ return lc_v
+ del lc_v[lc_vnf_type][vnf_id] # delete the VNF instance
+
+ updated = ConsulClient.put_value(key, lc_v, cas=mod_index)
+ if updated:
+ return lc_v
+
+
+if __name__ == "__main__":
+ value = None
+
+ if value:
+ print(json.dumps(value, sort_keys=True, indent=4, separators=(',', ': ')))
diff --git a/oti/event-handler/otihandler/dbclient/__init__.py b/oti/event-handler/otihandler/dbclient/__init__.py
new file mode 100644
index 0000000..ee3ec3e
--- /dev/null
+++ b/oti/event-handler/otihandler/dbclient/__init__.py
@@ -0,0 +1,19 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from .models import Event
+from .models import EventAck
+from .db_dao import DaoBase
diff --git a/oti/event-handler/otihandler/dbclient/apis/__init__.py b/oti/event-handler/otihandler/dbclient/apis/__init__.py
new file mode 100644
index 0000000..05e3800
--- /dev/null
+++ b/oti/event-handler/otihandler/dbclient/apis/__init__.py
@@ -0,0 +1,18 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from .db_access import DbAccess
+from .event_db_access import EventDbAccess
diff --git a/oti/event-handler/otihandler/dbclient/apis/db_access.py b/oti/event-handler/otihandler/dbclient/apis/db_access.py
new file mode 100644
index 0000000..f064b30
--- /dev/null
+++ b/oti/event-handler/otihandler/dbclient/apis/db_access.py
@@ -0,0 +1,50 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""
+Base class for APIs to interact with application database using sqlAlchemy ORM lib and postgresSql driver
+"""
+
+from sqlalchemy.orm import sessionmaker
+from ..db_dao import DaoBase
+import psycopg2
+from psycopg2.extras import execute_values
+import os
+import logging
+
+
+class DbAccess(object):
+ logger = logging.getLogger("dti_handler.DbAccess")
+ engine = None
+ session = None
+
+ def __init__(self):
+ self.engine = DaoBase.getDbEngine()
+ # create a configured "Session" class
+ Session = sessionmaker(bind=self.engine)
+
+ # create a Session
+ self.session = Session()
+
+ def saveDomainObject(self, obj):
+ self.session.add(obj)
+ self.session.commit()
+ self.session.close()
+
+ def deleteDomainObject(self,obj):
+ self.session.delete(obj)
+ self.session.commit()
+ self.session.close()
diff --git a/oti/event-handler/otihandler/dbclient/apis/event_db_access.py b/oti/event-handler/otihandler/dbclient/apis/event_db_access.py
new file mode 100644
index 0000000..898ee8e
--- /dev/null
+++ b/oti/event-handler/otihandler/dbclient/apis/event_db_access.py
@@ -0,0 +1,154 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+""" DB APIs to interact with application database using sqlAlchemy ORM lib and postgresSql driver"""
+
+from sqlalchemy import and_
+from sqlalchemy.orm.exc import NoResultFound
+
+from .db_access import DbAccess
+from ..models import Event, EventAck
+
+
+class EventDbAccess(DbAccess):
+
+ def __init__(self):
+ DbAccess.__init__(self)
+
+ def query_event_item(self, target_type, target_name):
+ try:
+ query = self.session.query(Event).filter(Event.target_type == target_type).\
+ filter(Event.target_name == target_name)
+ evt = query.one()
+ except NoResultFound:
+ return None
+ else:
+ return evt
+
+ def query_event_data(self, target_type, target_name):
+ try:
+ query = self.session.query(Event).filter(Event.target_type == target_type).\
+ filter(Event.target_name == target_name)
+ evt = query.one()
+ except NoResultFound:
+ return []
+ else:
+ try:
+ ack_result = self.session.query(EventAck).filter(EventAck.event == evt).all()
+ except NoResultFound:
+ return []
+ else:
+ return ack_result
+
+ def query_event_data_k8s(self, target_type, target_name):
+ try:
+ query = self.session.query(Event).filter(Event.target_type == target_type).\
+ filter(Event.target_name == target_name)
+ evt = query.one()
+ except NoResultFound:
+ return []
+ else:
+ try:
+ ack_result = self.session.query(EventAck).filter(EventAck.event == evt).\
+ filter(EventAck.container_type != 'docker').all()
+ except NoResultFound:
+ return []
+ else:
+ return ack_result
+
+ def query_event_info_docker(self, prim_evt, service_component, deployment_id, container_id):
+ try:
+ query = self.session.query(EventAck).filter(EventAck.event == prim_evt).filter(
+ and_(EventAck.service_component == service_component,
+ EventAck.deployment_id == deployment_id,
+ EventAck.container_id == container_id,
+ EventAck.container_type == 'docker'))
+ evt = query.one()
+ except NoResultFound as nrf:
+ raise nrf
+ else:
+ return evt
+
+ def update_event_item(self, dti_event, target_type, target_name):
+ self.session.query(Event).filter(Event.target_type == target_type). \
+ filter(Event.target_name == target_name).update({Event.event:dti_event})
+ self.session.commit()
+
+ def query_raw_k8_events(self, cluster, pod, namespace):
+ """
+ run an inner JOIN query to dtih_event and dtih_event_ack tables using supplied query predicates
+
+ :param cluster:
+ :param pod:
+ :param namespace:
+ :return:
+ Set of event objects related to k8s pods
+ """
+ try:
+ return self.session.query(Event).filter(Event.dtih_event_id.in_(
+ self.session.query(EventAck.dtih_event_id).filter(and_(EventAck.k8s_cluster_fqdn == cluster,
+ EventAck.k8s_pod_id == pod,
+ EventAck.k8s_namespace == namespace)))).all()
+ except NoResultFound:
+ print("invalid query or no data")
+ return ()
+
+ def query_raw_docker_events(self, target_types, locations):
+ """
+ run a query to dtih_event table using supplied query predicates
+
+ :param target_types: required
+ :param locations: optional
+ :return:
+ set of event objects related to docker container
+ """
+ try:
+ if not locations or (len(locations) == 1 and locations[0] == ''):
+ return self.session.query(Event).filter(Event.target_type.in_(target_types)).all()
+ else:
+ return self.session.query(Event).filter(Event.target_type.in_(target_types)).filter(
+ Event.location_clli.in_(locations)).all()
+ except NoResultFound:
+ print("invalid query or no data")
+ return ()
+
+ def query_pod_info2(self, cluster):
+ try:
+ return self.session.query(EventAck).filter(EventAck.k8s_cluster_fqdn == cluster).all()
+ except NoResultFound:
+ print("invalid query or no data")
+ return ()
+
+ def query_pod_info(self, cluster):
+ try:
+ return self.session.query(EventAck.k8s_pod_id, EventAck.k8s_namespace,
+ EventAck.k8s_proxy_fqdn, EventAck.k8s_service_name,
+ EventAck.k8s_service_port)\
+ .filter(EventAck.k8s_cluster_fqdn == cluster) \
+ .distinct().order_by(EventAck.k8s_cluster_fqdn).all()
+ except NoResultFound:
+ print("invalid query or no data")
+ return ()
+
+ def query_event_data_k8s_pod(self, prim_evt, scn):
+ try:
+ query = self.session.query(EventAck).filter(EventAck.event == prim_evt).filter(
+ and_(EventAck.service_component == scn))
+ event_info = query.one()
+ except NoResultFound:
+ return None
+ else:
+ return event_info
diff --git a/oti/event-handler/otihandler/dbclient/db_dao.py b/oti/event-handler/otihandler/dbclient/db_dao.py
new file mode 100644
index 0000000..78fa058
--- /dev/null
+++ b/oti/event-handler/otihandler/dbclient/db_dao.py
@@ -0,0 +1,33 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+""" SqlAlchemy ORM engine for postgresSql dti database """
+
+from sqlalchemy import create_engine
+
+class DaoBase:
+ _engine = None
+
+ @staticmethod
+ def init_db(dbConStr):
+ if DaoBase._engine:
+ return
+ DaoBase._engine = create_engine(dbConStr)
+
+ @staticmethod
+ def getDbEngine():
+ return DaoBase._engine
+
diff --git a/oti/event-handler/otihandler/dbclient/models/__init__.py b/oti/event-handler/otihandler/dbclient/models/__init__.py
new file mode 100644
index 0000000..bc802f5
--- /dev/null
+++ b/oti/event-handler/otihandler/dbclient/models/__init__.py
@@ -0,0 +1,19 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+
+from .event import Event
+from .event_ack import EventAck
diff --git a/oti/event-handler/otihandler/dbclient/models/event.py b/oti/event-handler/otihandler/dbclient/models/event.py
new file mode 100644
index 0000000..553bec2
--- /dev/null
+++ b/oti/event-handler/otihandler/dbclient/models/event.py
@@ -0,0 +1,40 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+""" ORM - mapping class for dtih_event table """
+
+from sqlalchemy import Column, String, Integer, ForeignKey, func
+from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP
+from sqlalchemy.ext.declarative import declarative_base
+import datetime
+
+
+Base = declarative_base()
+
+class Event(Base):
+ __tablename__ = 'dtih_event'
+ __table_args__ = {'schema': 'dti'}
+ dtih_event_id = Column(Integer, primary_key=True)
+ event = Column(JSONB)
+ create_ts = Column(TIMESTAMP(timezone=True), default=func.now())
+ last_modified_ts = Column(TIMESTAMP(timezone=True), default=func.now())
+ target_name = Column(String)
+ target_type = Column(String)
+ location_clli = Column(String)
+ # def __repr__(self):
+ # return "<Event(event_id='%s', target_type='%s', target_name='%s')" % (
+ # self.event_id, self.target_type, self.target_name
+ # )
diff --git a/oti/event-handler/otihandler/dbclient/models/event_ack.py b/oti/event-handler/otihandler/dbclient/models/event_ack.py
new file mode 100644
index 0000000..2b19316
--- /dev/null
+++ b/oti/event-handler/otihandler/dbclient/models/event_ack.py
@@ -0,0 +1,51 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+""" ORM - mapping class for dtih_event_ack table """
+
+import datetime
+from sqlalchemy import Column, String, Integer, ForeignKey, func
+from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP
+from sqlalchemy.orm import relationship
+from sqlalchemy.ext.declarative import declarative_base
+from ..models import Event
+
+Base = declarative_base()
+
+class EventAck(Base):
+ __tablename__ = 'dtih_event_ack'
+ __table_args__ = {'schema': 'dti'}
+ dtih_event_ack_id = Column(Integer, primary_key=True)
+ create_ts = Column(TIMESTAMP(timezone=True), default=func.now())
+ last_modified_ts = Column(TIMESTAMP(timezone=True), default=func.now())
+ action = Column(String)
+ k8s_namespace = Column(String)
+ k8s_service_name = Column(String)
+ k8s_service_port = Column(String)
+ k8s_cluster_fqdn = Column(String)
+ k8s_proxy_fqdn = Column(String)
+ k8s_pod_id = Column(String)
+ service_component = Column(String)
+ deployment_id = Column(String)
+ container_type = Column(String)
+ docker_host = Column(String)
+ container_id = Column(String)
+ reconfig_script = Column(String)
+ dtih_event_id = Column(Integer, ForeignKey(Event.dtih_event_id))
+ event = relationship(Event)
+
+ def update_action(self, action):
+ setattr(self, 'action', action)
diff --git a/oti/event-handler/otihandler/docker_client.py b/oti/event-handler/otihandler/docker_client.py
new file mode 100644
index 0000000..621a1ec
--- /dev/null
+++ b/oti/event-handler/otihandler/docker_client.py
@@ -0,0 +1,175 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""client interface to docker"""
+
+import docker
+import json
+import logging
+import time
+
+from otihandler.config import Config
+from otihandler.consul_client import ConsulClient
+from otihandler.utils import decrypt
+
+
+# class DockerClientError(RuntimeError):
+# pass
+
+class DockerClientConnectionError(RuntimeError):
+ pass
+
+
+class DockerClient(object):
+ """
+ All Docker logins are in Consul's key-value store under
+ "docker_plugin/docker_logins" as a list of json objects where
+ each object is a single login:
+
+ [{ "username": "XXXX", "password": "yyyy",
+ "registry": "hostname.domain:18443" }]
+ """
+
+ _logger = logging.getLogger("oti_handler.docker_client")
+
+ def __init__(self, docker_host, reauth=False):
+ """Create Docker client
+
+ Args:
+ -----
+ reauth: (boolean) Forces reauthentication, e.g., Docker login
+ """
+
+ (fqdn, port) = ConsulClient.get_service_fqdn_port(docker_host, node_meta=True)
+ base_url = "https://{}:{}".format(fqdn, port)
+
+ try:
+ tls_config = docker.tls.TLSConfig(
+ client_cert=(
+ Config.tls_server_ca_chain_file,
+ Config.tls_private_key_file
+ )
+ )
+ self._client = docker.APIClient(base_url=base_url, tls=tls_config, version='auto', timeout=60)
+
+ for dcl in ConsulClient.get_value("docker_plugin/docker_logins"):
+ dcl['password'] = decrypt(dcl['password'])
+ dcl["reauth"] = reauth
+ self._client.login(**dcl)
+
+ # except requests.exceptions.RequestException as e:
+ except Exception as e:
+ msg = "DockerClient.__init__({}) attempt to {} with TLS got exception {}: {!s}".format(
+ docker_host, base_url, type(e).__name__, e)
+
+ # Then try connecting to dockerhost without TLS
+ try:
+ base_url = "tcp://{}:{}".format(fqdn, port)
+ self._client = docker.APIClient(base_url=base_url, tls=False, version='auto', timeout=60)
+
+ for dcl in ConsulClient.get_value("docker_plugin/docker_logins"):
+ dcl['password'] = decrypt(dcl['password'])
+ dcl["reauth"] = reauth
+ self._client.login(**dcl)
+
+ # except requests.exceptions.RequestException as e:
+ except Exception as e:
+ msg = "{}\nDockerClient.__init__({}) attempt to {} without TLS got exception {}: {!s}".format(
+ msg, docker_host, base_url, type(e).__name__, e)
+ DockerClient._logger.error(msg)
+ raise DockerClientConnectionError(msg)
+
+ @staticmethod
+ def build_cmd(script_path, use_sh=True, msg_type="dti", **kwargs):
+ """Build command to execute"""
+
+ data = json.dumps(kwargs or {})
+
+ if use_sh:
+ return ['/bin/sh', script_path, msg_type, data]
+ else:
+ return [script_path, msg_type, data]
+
+ def notify_for_reconfiguration(self, container_id, cmd):
+ """Notify Docker container that reconfiguration occurred
+
+ Notify the Docker container by doing Docker exec of passed-in command
+
+ Args:
+ -----
+ container_id: (string)
+ cmd: (list) of strings each entry being part of the command
+ """
+
+ for attempts_remaining in range(11,-1,-1):
+ try:
+ result = self._client.exec_create(container=container_id, cmd=cmd)
+ except docker.errors.APIError as e:
+ # e # 500 Server Error: Internal Server Error ("{"message":"Container 624108d1ab96f24b568662ca0e5ffc39b59c1c57431aec0bef231fb62b04e166 is not running"}")
+ DockerClient._logger.debug("exec_create() returned APIError: {!s}".format(e))
+
+ # e.message # 500 Server Error: Internal Server Error
+ # DockerClient._logger.debug("e.message: {}".format(e.message))
+ # e.response.status_code # 500
+ # DockerClient._logger.debug("e.response.status_code: {}".format(e.response.status_code))
+ # e.response.reason # Internal Server Error
+ # DockerClient._logger.debug("e.response.reason: {}".format(e.response.reason))
+ # e.explanation # {"message":"Container 624108d1ab96f24b568662ca0e5ffc39b59c1c57431aec0bef231fb62b04e166 is not running"}
+ # DockerClient._logger.debug("e.explanation: {}".format(e.explanation))
+
+ # docker container restarting can wait
+ if e.explanation and 'is restarting' in e.explanation.lower():
+ DockerClient._logger.debug("notification exec_create() experienced: {!s}".format(e))
+ if attempts_remaining == 0:
+ result = None
+ break
+ time.sleep(10)
+ # elif e.explanation and 'no such container' in e.explanation.lower():
+ # elif e.explanation and 'is not running' in e.explanation.lower():
+ else:
+ DockerClient._logger.warn("aborting notification exec_create() because exception {}: {!s}".format(type(e).__name__, e))
+ return str(e) # don't raise or CM will retry usually forever
+ # raise DockerClientError(e)
+ except Exception as e:
+ DockerClient._logger.warn("aborting notification exec_create() because exception {}: {!s}".format(
+ type(e).__name__, e))
+ return str(e) # don't raise or CM will retry usually forever
+ # raise DockerClientError(e)
+ else:
+ break
+ if not result:
+ DockerClient._logger.warn("aborting notification exec_create() because docker exec failed")
+ return "notification unsuccessful" # failed to get an exec_id, perhaps trying multiple times, so don't raise or CM will retry usually forever
+ DockerClient._logger.debug("notification exec_create() succeeded")
+
+ for attempts_remaining in range(11,-1,-1):
+ try:
+ result = self._client.exec_start(exec_id=result['Id'])
+ except Exception as e:
+ DockerClient._logger.debug("notification exec_start() got exception {}: {!s}".format(type(e).__name__, e))
+ if attempts_remaining == 0:
+ DockerClient._logger.warn("aborting notification exec_start() because exception {}: {!s}".format(type(e).__name__, e))
+ return str(e) # don't raise or CM will retry usually forever
+ # raise DockerClientError(e)
+ time.sleep(10)
+ else:
+ break
+ DockerClient._logger.debug("notification exec_start() succeeded")
+
+ DockerClient._logger.info("Pass to docker exec {} {} {}".format(
+ container_id, cmd, result))
+
+ return result
diff --git a/oti/event-handler/otihandler/dti_processor.py b/oti/event-handler/otihandler/dti_processor.py
new file mode 100644
index 0000000..970e020
--- /dev/null
+++ b/oti/event-handler/otihandler/dti_processor.py
@@ -0,0 +1,815 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""OTI Event processor for handling all the event types"""
+
+import copy
+import json
+import logging
+from multiprocessing.dummy import Pool as ThreadPool
+from threading import Lock
+
+import requests
+
+from otihandler import utils
+from otihandler.cfy_client import CfyClient
+from otihandler.consul_client import ConsulClient
+from otihandler.dbclient.apis import EventDbAccess
+from otihandler.dbclient.models import Event, EventAck
+from otihandler.docker_client import DockerClient
+
+notify_response_arr = []
+lock = Lock()
+K8S_CLUSTER_PROXY_NODE_PORT = '30132'
+
+
+def notify_docker(args_tuple):
+ """
+ event notification executor inside a process pool to communicate with docker container
+ interacts with docker client library
+ """
+ (dti_event, db_access, ack_item) = args_tuple
+ try:
+ dcae_service_action = dti_event.get('dcae_service_action')
+ component_scn = ack_item.service_component
+ deployment_id = ack_item.deployment_id
+ container_id = ack_item.container_id
+ docker_host = ack_item.docker_host
+ reconfig_script = ack_item.reconfig_script
+ container_type = 'docker'
+ except Exception as e:
+ return (
+ "ERROR", "dti_processor.notify_docker processing args got exception {}: {!s}".format(type(e).__name__, e))
+ what = ""
+ try:
+ what = "{} in {} container {} on {} that was deployed by {}".format(
+ reconfig_script, container_type, container_id, docker_host, deployment_id)
+ if dcae_service_action == 'add':
+ add_action = {"dcae_service_action": "deploy"}
+ dti_event.update(add_action)
+
+ if dcae_service_action == 'delete':
+ add_action = {"dcae_service_action": "undeploy"}
+ dti_event.update(add_action)
+
+ # dkr = DockerClient(docker_host, reauth=False)
+ result = ''
+ # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ])
+ if dti_event.get('dcae_service_action') == 'undeploy':
+ # delete from dti_event_ack table
+ try:
+ db_access.deleteDomainObject(ack_item)
+ except Exception as e:
+ msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+ else:
+ return (component_scn, "ran {}, got: {!s}".format(what, result))
+
+ except Exception as e:
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+
+
+def notify_svc(args_tuple):
+ """
+ add/update/delete event handler
+ event notification executor inside a process pool to communicate with docker container and k8s services
+ interacts with docker client library
+ interacts with k8s node port services using REST client
+ """
+ (orig_dti_event, db_access, curr_evt, res_tuple) = args_tuple
+ dti_event = copy.deepcopy(orig_dti_event)
+ try:
+ dcae_service_action = dti_event.get('dcae_service_action').lower()
+
+ component_scn = res_tuple[0]
+ deployment_id = res_tuple[1]
+ container_id = res_tuple[2]
+ node_id = res_tuple[3]
+ docker_host = res_tuple[6]
+ reconfig_script = res_tuple[7]
+ container_type = res_tuple[8]
+ except Exception as e:
+ return ("ERROR", "oti_processor.notify processing args got exception {}: {!s}".format(type(e).__name__, e))
+
+ what = ""
+ if container_type == "docker":
+ # exec reconfigure.sh in docker container
+ try:
+ what = "{} in {} container {} on {} that was deployed by {} node {}".format(
+ reconfig_script, container_type, container_id, docker_host, deployment_id, node_id)
+ if dcae_service_action == 'add':
+ add_action = {"dcae_service_action": "deploy"}
+ dti_event.update(add_action)
+
+ if dcae_service_action == 'delete':
+ add_action = {"dcae_service_action": "undeploy"}
+ dti_event.update(add_action)
+
+ dkr = DockerClient(docker_host, reauth=False)
+ result = ''
+ if dti_event.get('dcae_service_action') == 'update':
+ # undeploy + deploy
+ DTIProcessor.logger.debug("update 1 - running undeploy {}".format(what))
+ dti_event.update({"dcae_service_action": "undeploy"})
+ result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
+ DTIProcessor.logger.debug("update 2 - running deploy {}".format(what))
+ dti_event.update({"dcae_service_action": "deploy"})
+ result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
+ try:
+ upd_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
+ container_id)
+ upd_evt_ack.update_action('update')
+ db_access.saveDomainObject(upd_evt_ack)
+ except Exception as e:
+ msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+ else:
+ DTIProcessor.logger.debug("running {}".format(what))
+ result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
+ if dti_event.get('dcae_service_action') == 'deploy':
+ # add into dti_event_ack table
+ try:
+ add_evt_ack = EventAck(service_component=component_scn, deployment_id=deployment_id,
+ container_type='docker', docker_host=docker_host,
+ container_id=container_id, reconfig_script=reconfig_script,
+ event=curr_evt,
+ action='add')
+ db_access.saveDomainObject(add_evt_ack)
+ except Exception as e:
+ msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+ else:
+ # remove from dtih_event_ack if present
+ if curr_evt is not None:
+ try:
+ del_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
+ container_id)
+ db_access.deleteDomainObject(del_evt_ack)
+ except Exception as e:
+ msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+ except Exception as e:
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+
+ return (component_scn, "ran {}, got: {!s}".format(what, result))
+ elif container_type == "k8s":
+ DTIProcessor.logger.debug("DTIProcessor.notify_svc() handling k8s component")
+ # if action is 'update', check if k8s pod info exists already for this event in app db
+ if dcae_service_action == 'add':
+ DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for add action")
+ return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
+ elif dcae_service_action == 'update':
+ # handle update for pods being tracked and handle add for new pods
+ k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn)
+ if k8s_scn_result is not None:
+ # update
+ DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for update action")
+ return notify_k8s_pod((dti_event, db_access, k8s_scn_result))
+ else:
+ # add
+ DTIProcessor.logger.debug("DTIProcessor.notify_svc(), convert update to add action in k8s ")
+ add_action = {"dcae_service_action": "add"}
+ dti_event.update(add_action)
+ return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
+
+
+def notify_k8s(args_tuple):
+ """
+ add event handler
+ event notification executor inside a process pool to communicate with k8s statefulset nodeport service
+ uses REST API client to call k8s services
+ """
+ (dti_event, db_access, curr_evt, res_tuple) = args_tuple
+ component_scn = res_tuple[0]
+ deployment_id = res_tuple[1]
+ node_id = res_tuple[3]
+ container_type = res_tuple[8]
+ service_address = res_tuple[9]
+ service_port = res_tuple[10]
+ what = "{} in {} deployment {} that was deployed by {} node {}".format(
+ "add", container_type, "statefulset", deployment_id, node_id)
+ # call scn node port service REST API
+ svc_nodeport_url = "https://{}:{}".format(service_address, service_port)
+ try:
+ DTIProcessor.logger.debug("running {}".format(what))
+ response = requests.put(svc_nodeport_url, json=dti_event, timeout=50)
+ response.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ msg = "collector nodeport service({}) threw exception {}: {!s}".format(
+ svc_nodeport_url, type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+ try:
+ event_ack_info = response.json()
+ except Exception as e:
+ msg = "collector nodeport service({}) threw exception {}: {!s}".format(
+ svc_nodeport_url, type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+
+ if not event_ack_info:
+ msg = "collector nodeport service returned bad data"
+ DTIProcessor.logger.error(msg)
+ return (component_scn, "collector nodeport service returned bad data")
+
+ namespace = event_ack_info.get("KubeNamespace")
+ svc_name = event_ack_info.get("KubeServiceName")
+ svc_port = event_ack_info.get("KubeServicePort")
+ proxy_fqdn = event_ack_info.get("KubeProxyFqdn")
+ cluster_fqdn = event_ack_info.get("KubeClusterFqdn")
+ pod_name = event_ack_info.get("KubePod")
+ statefulset = pod_name[0:pod_name.rindex('-')]
+
+ what = "{} in {} deployment {} in namespace {} that was deployed by {} node {}".format(
+ "add", container_type, statefulset, namespace, deployment_id, node_id)
+ try:
+ add_evt_ack = EventAck(k8s_namespace=namespace, k8s_service_name=svc_name, deployment_id=deployment_id,
+ k8s_service_port=svc_port, k8s_cluster_fqdn=cluster_fqdn, k8s_proxy_fqdn=proxy_fqdn,
+ k8s_pod_id=pod_name, event=curr_evt, action='add', container_type='k8s',
+ service_component=component_scn)
+ db_access.saveDomainObject(add_evt_ack)
+ return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info))
+ except Exception as e:
+ msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+
+
+def notify_pods(args_tuple):
+ """
+ notify event handler
+ event notification executor inside a process pool to communicate with k8s DTIH proxy nodeport service
+ uses REST API client to call k8s services
+ """
+ event_ack_info = ''
+ (dti_event, res_tuple) = args_tuple
+ try:
+ cluster = res_tuple[0]
+ port = K8S_CLUSTER_PROXY_NODE_PORT
+ namespace = res_tuple[1]
+ svc_name = res_tuple[2]
+ svc_port = res_tuple[4]
+ replicas = res_tuple[3]
+
+ for replica in range(replicas):
+ pod_id = "sts-{}-{}".format(svc_name, replica)
+ item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(cluster, port, namespace,
+ pod_id, svc_name,
+ svc_port)
+ what = "{} for pod id {} in cluster {} and namespace {}".format("notify", pod_id, cluster, namespace)
+ try:
+ DTIProcessor.logger.debug("running {}".format(what))
+ response = requests.put(item_pod_url, json=dti_event, timeout=50)
+ response.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
+ item_pod_url, type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ with lock:
+ notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
+ else:
+ try:
+ event_ack_info = response.json()
+ except Exception as e:
+ msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
+ item_pod_url, type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ with lock:
+ notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
+
+ if not event_ack_info:
+ msg = "stateful set proxy service returned bad data"
+ DTIProcessor.logger.error(msg)
+ with lock:
+ notify_response_arr.append ((pod_id, "no acknowledgement - running {}".format(what)))
+
+ with lock:
+ notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info)))
+ except Exception as e:
+ with lock:
+ notify_response_arr.append (("ERROR", "dti_processor.notify() processing args got exception {}: {!s}".format(type(e).__name__, e)))
+
+def notify_k8s_pod(args_tuple):
+ """
+ update event handler
+ event notification executor inside a process pool to communicate with k8s DTIH proxy service
+ uses REST API client to call k8s services
+ """
+ item_pod_url = ''
+ component_scn = ''
+ (dti_event, db_access, ack_item) = args_tuple
+ # call ingress proxy to dispatch delete event
+
+ action = dti_event.get('dcae_service_action')
+ what = "{} to {} ID {} in namespace {} that was deployed in cluster {}".format(
+ action, 'k8s pod', ack_item.k8s_pod_id, ack_item.k8s_namespace, ack_item.k8s_cluster_fqdn)
+ try:
+ DTIProcessor.logger.debug("running {}".format(what))
+ item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(
+ ack_item.k8s_proxy_fqdn, K8S_CLUSTER_PROXY_NODE_PORT, ack_item.k8s_namespace,
+ ack_item.k8s_pod_id, ack_item.k8s_service_name, ack_item.k8s_service_port)
+ component_scn = ack_item.service_component
+ response = requests.put(item_pod_url, json=dti_event, timeout=50)
+ response.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ msg = "exception occured, stateful set proxy service({}) threw {}: {!s}".format(
+ item_pod_url, type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ return (component_scn, "ran {}, got: {!s}".format(what, msg))
+ else:
+ if action == 'delete':
+ try:
+ db_access.deleteDomainObject(ack_item)
+ except Exception as e:
+ msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+ else:
+ try:
+ ack_item.update_action('update')
+ db_access.saveDomainObject(ack_item)
+ except Exception as e:
+ msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+
+ return (component_scn, "ran {}, got: {!s}".format(what, response.json()))
+
+
+class DTIProcessor(object):
+ """
+ Main event processing class that encapsulates all the logic of this handler application!
+ An instance of this class is created per incoming client request.
+
+ Generates input data by querying platform services - cloudify, consul, postgresSql
+
+ It creates a pool of worker processes using a multiprocessing Pool class instance.
+ Tasks are offloaded to the worker processes that exist in the pool.
+ The input data is distributed across processes of the Pool object to enable parallel execution of
+ event notification function across multiple input values (data parallelism).
+ """
+
+ logger = logging.getLogger("oti_handler.dti_processor")
+ K8S_CLUSTER_PROXY_NODE_PORT = '30132'
+ db_access = None
+ docker_pool = None
+ k8s_pool = None
+
+ def __init__(self, dti_event, send_notification=True):
+ self._result = {}
+ self.event = dti_event
+ self.is_notify = send_notification
+ self.action = dti_event.get('dcae_service_action').lower()
+ self.target_name = dti_event.get('dcae_target_name')
+ self.target_type = dti_event.get('dcae_target_type', '').lower()
+ self.event_clli = dti_event.get('dcae_service_location')
+ res_dict = None
+ try:
+ self.docker_pool = ThreadPool(8)
+ self.k8s_pool = ThreadPool(8)
+ except Exception as e:
+ msg = "DTIProcessor.__init__() creating ThreadPool got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+ raise e
+ else:
+ self.db_access = EventDbAccess()
+ self.prim_db_event = None
+ try:
+ res_dict = self.dispatcher()
+ except:
+ raise
+ finally:
+ try:
+ self.docker_pool.close()
+ self.k8s_pool.close()
+ except Exception as e:
+ msg = "DTIProcessor.__init__() running pool.close() got exception {}: {!s}".format(type(e).__name__,
+ e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+ try:
+ self.docker_pool.join()
+ self.k8s_pool.join()
+ except Exception as e:
+ msg = "DTIProcessor.__init__() running pool.join() got exception {}: {!s}".format(type(e).__name__,
+ e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+
+ # if not send_notification:
+ # DTIProcessor._logger.debug("DTIProcessor.__init__() not notifying DCAE-Controller components")
+ # return
+
+ if res_dict:
+ try:
+ utils.update_dict(self._result, res_dict)
+ except Exception as e:
+ msg = "DTIProcessor.__init__() running utils.update_dict() got exception {}: {!s}".format(
+ type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+
+ DTIProcessor.logger.debug("DTIProcessor.__init__() done notifying new DCAE-Controller components")
+
+ def dispatcher(self):
+ """ dispatch method to execute specific method based on event type """
+
+ arg = str(self.action)
+ method = getattr(self, arg, lambda: "Invalid action")
+ return method()
+
+ def undeploy(self):
+ """
+ delete event from consul KV store, this functionality will be retired as events are stored
+ in postgresSql oti database
+ """
+ global key
+ try:
+ # update Consul KV store with DTI Event - storing them in a folder for all components
+ key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
+ result = ConsulClient.delete_key(key)
+ except Exception as e:
+ msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ self._result['WARNING'] = msg
+ else:
+ if not result:
+ msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name)
+ DTIProcessor.logger.warn(msg)
+ self._result['WARNING'] = msg
+
+ def deploy(self):
+ """
+ add event to consul KV store, this functionality will be retired as events are stored
+ in postgresSql oti database
+ """
+ dep_key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
+ try:
+ # update Consul KV store with DTI Event - storing them in a folder for all components
+ result = ConsulClient.store_kvs({dep_key: self.event})
+ except Exception as e:
+ msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
+ DTIProcessor.logger.warn(msg)
+ self._result['WARNING'] = msg
+
+ def add(self):
+ """
+ process DTI event that contains a new VNF instance that has to be configured in the collector microservices
+ """
+ res_dict = None
+ try:
+ msg = "processing add event for {}/{}".format(self.target_type, self.target_name)
+ DTIProcessor.logger.debug(msg)
+ # insert add event into dtih_event table
+ self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
+ location_clli=self.event_clli)
+ self.db_access.saveDomainObject(self.prim_db_event)
+ except Exception as e:
+ msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args)
+ DTIProcessor.logger.warn(msg)
+ self._result['ERROR'] = msg
+ raise Exception(msg)
+ else:
+ if self.is_notify:
+ try:
+ # force the action to add, to avoid bad things later
+ add_action = {"dcae_service_action": "add"}
+ self.event.update(add_action)
+ # mock up data
+ mock_tp11 = (
+ "scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
+ "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
+ "dcae-d1.idns.cip.corp.com", "30996")
+ mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
+ "docker_node_instance_id1",
+ "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
+ "dcae-d1.idns.cip.corp.com", "30996")
+ # tpl_arr = []
+ # tpl_arr.append(mock_tp11)
+ # tpl_arr.append(mock_tp12)
+ # res_dict = dict(self.docker_pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
+ res_dict = dict(self.docker_pool.map(notify_svc,
+ ((self.event, self.db_access, self.prim_db_event, tp) for tp in
+ CfyClient().iter_components(self.target_type,
+ dcae_service_location=self.event_clli))
+ ))
+ except Exception as e:
+ msg = "DTIProcessor.__init__() running pool.map() got exception {}: {!s}".format(type(e).__name__,
+ e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+ return res_dict
+
+ def add_replay(self):
+ """
+ convert an update event flow and replay as an add event type since the event acknowledgement is missing
+ from application database
+ """
+ res_dict = None
+ try:
+ # force the action to add, to avoid bad things later
+ add_action = {"dcae_service_action": "add"}
+ self.event.update(add_action)
+ # mock up data
+ mock_tp11 = ("scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
+ "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
+ "dcae-d1.idns.cip.corp.com", "30996")
+ mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
+ "docker_node_instance_id1",
+ "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
+ "dcae-d1.idns.cip.corp.com", "30996")
+ # tpl_arr = []
+ # tpl_arr.append(mock_tp11)
+ # tpl_arr.append(mock_tp12)
+ # res_dict = dict(self.pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
+ res_dict = dict(self.docker_pool.map(notify_svc,
+ ((self.event, self.db_access, self.prim_db_event, tp) for tp in
+ CfyClient().iter_components(self.target_type,
+ dcae_service_location=self.event_clli))
+ ))
+ except Exception as e:
+ msg = "DTIProcessor._add() running pool.map() got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+ return res_dict
+
+ def delete(self):
+ """
+ process DTI event that indicates a VNF instance has to be removed from the collector microservices
+ """
+ res_dict = {}
+ res_dict_k8s = {}
+ res_dict_docker = {}
+ try:
+ self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
+ if self.is_notify:
+ try:
+ msg = "processing delete event for {}/{} to relate with any docker hosts".format(
+ self.target_type, self.target_name)
+ DTIProcessor.logger.warn(msg)
+ res_dict_docker = dict(self.docker_pool.map(notify_svc,
+ ((self.event, self.db_access, self.prim_db_event, tp)
+ for tp
+ in CfyClient().iter_components_for_docker(
+ self.target_type,
+ dcae_service_location=self.event_clli))
+ ))
+ except Exception as e:
+ msg = "DTIProcessor.delete() running docker_pool.map() got exception {}: {!s}".format(type(e).__name__,
+ e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+
+ try:
+ msg = "processing delete event for {}/{} to relate with any k8s hosts".format(
+ self.target_type, self.target_name)
+ DTIProcessor.logger.warn(msg)
+ if self.prim_db_event is not None:
+ result = self.db_access.query_event_data_k8s(self.target_type, self.target_name)
+ res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, (
+ ((self.event, self.db_access, ack_item) for ack_item in result))))
+ except Exception as e:
+ msg = "DTIProcessor.delete() running k8s_pool.map() got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+
+ try:
+ if self.prim_db_event is not None:
+ self.db_access.deleteDomainObject(self.prim_db_event)
+ except Exception as e:
+ msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args)
+ DTIProcessor.logger.warn(msg)
+ self._result['ERROR'] = msg
+ except Exception as e:
+ msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args)
+ DTIProcessor.logger.warn(msg)
+ self._result['ERROR'] = msg
+
+ if res_dict_k8s is not None:
+ utils.update_dict(res_dict, res_dict_k8s)
+
+ if res_dict_docker is not None:
+ utils.update_dict(res_dict, res_dict_docker)
+
+ return res_dict
+
+ def update(self):
+ """
+ process DTI event that indicates VNF instance has to be updated in the collector microservices
+ """
+ res_dict = {}
+ res_dict_k8s = {}
+ res_dict_docker = {}
+
+ if self.is_notify:
+ try:
+ self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
+ if self.prim_db_event is not None:
+ self.db_access.update_event_item(self.event, self.target_type, self.target_name)
+ result = self.db_access.query_event_data(self.target_type, self.target_name)
+ if len(result) == 0:
+ msg = "processing update event for {}/{}, but event distribution info is not found in database, " \
+ "replaying this event to cluster if required". \
+ format(self.target_type, self.target_name)
+ DTIProcessor.logger.warn(msg)
+ self._result['WARNING'] = msg
+ res_dict = self.add_replay()
+ else:
+ msg = "DTIProcessor.update() handle update flow for {}/{}, for k8s rediscover scn list and" \
+ "identify new vs update cases".format(self.target_type, self.target_name)
+ DTIProcessor.logger.debug(msg)
+ try:
+ tpl_arr = CfyClient().iter_components(self.target_type,
+ dcae_service_location=self.event_clli)
+ res_dict_docker = dict(self.docker_pool.map(notify_svc,
+ ((
+ self.event, self.db_access,
+ self.prim_db_event,
+ tp)
+ for tp in tpl_arr)))
+ except Exception as e:
+ msg = "DTIProcessor.update() running docker_pool.map() got exception {}: {!s}".format(
+ type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+ else:
+ # event is new for the handler
+ msg = "processing update event for {}/{}, but current event info is not found in database, " \
+ "executing add event".format(self.target_type, self.target_name)
+ DTIProcessor.logger.warn(msg)
+ self._result['WARNING'] = msg
+ res_dict = self.add()
+ except Exception as e:
+ msg = "DTIProcessor.update() got exception {}: {!s}".format(type(e).__name__, e)
+ DTIProcessor.logger.error(msg)
+ self._result['ERROR'] = msg
+
+ if res_dict_k8s is not None:
+ utils.update_dict(res_dict, res_dict_k8s)
+
+ if res_dict_docker is not None:
+ utils.update_dict(res_dict, res_dict_docker)
+
+ return res_dict
+
+ def notify(self):
+ """
+ event handler to notify all the pods in the kubernetes cluster whose FQDN is present in the incoming event
+ This notification is meant for the cluster failover.
+ """
+ res_dict = {}
+ try:
+ self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
+ if self.prim_db_event is not None:
+ self.db_access.update_event_item(self.event, self.target_type, self.target_name)
+ else:
+ self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
+ location_clli=self.event_clli)
+ self.db_access.saveDomainObject(self.prim_db_event)
+ except Exception as e:
+ msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
+ DTIProcessor.logger.warn(msg)
+ self._result['ERROR'] = msg
+
+ try:
+ self.k8s_pool.map(notify_pods, ((self.event, tp) for tp in
+ CfyClient().query_k8_components(self.target_name)))
+ for k, v in notify_response_arr:
+ res_dict[k] = v
+ except Exception as e:
+ msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
+ DTIProcessor.logger.warn(msg)
+ self._result['WARNING'] = msg
+
+ return res_dict
+
+ def get_result(self):
+ return self._result
+
+ @classmethod
+ def get_k8_raw_events(cls, pod, cluster, namespace):
+ """
+ Get DTI events for a k8 stateful set pod container
+
+ :param pod: required
+ k8s stateful set pod ID that was configured with a specific set of DTI Events
+ :param cluster: required
+ k8s cluster FQDN where the mS was deployed
+ :param namespace: required
+ k8s namespace where the stateful set was deployed in that namespace
+ :return:
+ Dictionary of DTI event(s).
+ DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
+ """
+ db_access = EventDbAccess()
+ results = db_access.query_raw_k8_events(cluster, pod, namespace)
+
+ target_types = set([])
+ outer_dict = {}
+
+ for evnt_item in results:
+ target_types.add(evnt_item.target_type)
+
+ for targ_type in target_types:
+ inner_name_evt_dict = {}
+ for evnt in results:
+ if targ_type == evnt.target_type:
+ inner_name_evt_dict[evnt.target_name] = evnt.event
+
+ outer_dict[targ_type] = inner_name_evt_dict
+
+ return outer_dict
+
+ @classmethod
+ def get_docker_raw_events(cls, service_name, service_location):
+ """
+ Get DTI events for docker container.
+
+ Parameters
+ ----------
+ service_name : string
+ required. The service component name assigned by dockerplugin to the component that is unique to the
+ cloudify node instance and used in its Consul key(s).
+ service_location : string
+ optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location
+ in service_location.
+ If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul
+ TAGs if service_name is provided,
+ otherwise results are not location filtered.
+
+ Returns
+ -------
+ dict
+ Dictionary of DTI event(s).
+ DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
+
+ """
+
+ r_dict = {}
+
+ want_locs = []
+ if service_location:
+ want_locs = service_location.split(',')
+
+ give_types = []
+ if service_name:
+ if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node
+ try:
+ node_name = ConsulClient.lookup_service(service_name)[0].get("Node")
+ if node_name:
+ services = ConsulClient.lookup_node(node_name).get("Services")
+ if services:
+ for node_svc in list(services.keys()):
+ if "-component-dockerhost-" in node_svc:
+ want_locs = services[node_svc].get("Tags", [])
+ break
+ except:
+ pass
+
+ try:
+ supported_types = ConsulClient.get_value(service_name + ":dti")
+ except:
+ return r_dict
+ else:
+ if supported_types:
+ supported_types = [t_type.lower() for t_type in list(supported_types.keys())]
+ give_types = supported_types
+ if not give_types or (len(give_types) == 1 and give_types[0] == ''):
+ return r_dict
+
+ db_access = EventDbAccess()
+ results = db_access.query_raw_docker_events(give_types, want_locs)
+
+ target_types = set([])
+ outer_dict = {}
+
+ for evnt_item in results:
+ target_types.add(evnt_item.target_type)
+
+ for targ_type in target_types:
+ inner_name_evt_dict = {}
+ for evnt in results:
+ if targ_type == evnt.target_type:
+ inner_name_evt_dict[evnt.target_name] = evnt.event
+
+ outer_dict[targ_type] = inner_name_evt_dict
+
+ return outer_dict
diff --git a/oti/event-handler/otihandler/onap/CommonLogger.py b/oti/event-handler/otihandler/onap/CommonLogger.py
new file mode 100644
index 0000000..644534d
--- /dev/null
+++ b/oti/event-handler/otihandler/onap/CommonLogger.py
@@ -0,0 +1,958 @@
+#!/usr/bin/python
+# -*- indent-tabs-mode: nil -*- vi: set expandtab:
+
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+""" Common Logging library in Python.
+
+CommonLogger.py
+
+Original Written by: Terry Schmalzried
+Date written: October 1, 2015
+Last updated: December 1, 2016
+
+version 0.8
+"""
+
+import os
+import logging
+import logging.handlers
+import re
+import socket
+import sys
+import threading
+import time
+
+
+class CommonLogger:
+ """ Common Logging object.
+
+ Public methods:
+ __init__
+ setFields
+ debug
+ info
+ warn
+ error
+ fatal
+ """
+
+ UnknownFile = -1
+ ErrorFile = 0
+ DebugFile = 1
+ AuditFile = 2
+ MetricsFile = 3
+ DateFmt = '%Y-%m-%dT%H:%M:%S'
+ verbose = False
+
+ def __init__(self, configFile, logKey, **kwargs):
+ """Construct a Common Logger for one Log File.
+
+ Arguments:
+ configFile -- configuration filename.
+ logKey -- the keyword in configFile that identifies the log filename.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages,
+ one of CommonLogger.ErrorFile, CommonLogger.DebugFile,
+ CommonLogger.AuditFile and CommonLogger.MetricsFile, or
+ one of the strings "error", "debug", "audit" or "metrics".
+ May also be set in the config file using a field named
+ <logKey>Style (where <logKey> is the value of the logKey
+ parameter). The keyword value overrides the value in the
+ config file.
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._monitorFlag = False
+
+ # Get configuration parameters
+ self._logKey = str(logKey)
+ self._configFile = str(configFile)
+ self._rotateMethod = 'time'
+ self._timeRotateIntervalType = 'midnight'
+ self._timeRotateInterval = 1
+ self._sizeMaxBytes = 0
+ self._sizeRotateMode = 'a'
+ self._socketHost = None
+ self._socketPort = 0
+ self._typeLogger = 'filelogger'
+ self._backupCount = 6
+ self._logLevelThreshold = self._intLogLevel('')
+ self._logFile = None
+ self._begTime = None
+ self._begMsec = 0
+ self._fields = {}
+ self._fields["style"] = CommonLogger.UnknownFile
+ try:
+ self._configFileModified = os.path.getmtime(self._configFile)
+ for line in open(self._configFile):
+ line = line.split('#',1)[0] # remove comments
+ if '=' in line:
+ key, value = [x.strip() for x in line.split('=',1)]
+ if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none']:
+ self._rotateMethod = value.lower()
+ elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']:
+ self._timeRotateIntervalType = value
+ elif key == 'timeRotateInterval' and int( value ) > 0:
+ self._timeRotateInterval = int( value )
+ elif key == 'sizeMaxBytes' and int( value ) >= 0:
+ self._sizeMaxBytes = int( value )
+ elif key == 'sizeRotateMode' and value in ['a']:
+ self._sizeRotateMode = value
+ elif key == 'backupCount' and int( value ) >= 0:
+ self._backupCount = int( value )
+ elif key == self._logKey + 'SocketHost':
+ self._socketHost = value
+ elif key == self._logKey + 'SocketPort' and int( value ) == 0:
+ self._socketPort = int( value )
+ elif key == self._logKey + 'LogType' and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']:
+ self._typeLogger = value.lower()
+ elif key == self._logKey + 'LogLevel':
+ self._logLevelThreshold = self._intLogLevel(value.upper())
+ elif key == self._logKey + 'Style':
+ self._fields["style"] = value
+ elif key == self._logKey:
+ self._logFile = value
+ except Exception as x:
+ print("exception reading '%s' configuration file: %s" %(self._configFile, str(x)))
+ sys.exit(2)
+ except:
+ print("exception reading '%s' configuration file" %(self._configFile))
+ sys.exit(2)
+
+ if self._logFile is None:
+ print('configuration file %s is missing definition %s for log file' %(self._configFile, self._logKey))
+ sys.exit(2)
+
+
+ # initialize default log fields
+ # timestamp will automatically be generated
+ for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \
+ 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \
+ 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \
+ 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \
+ 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \
+ 'errorDescription' ]:
+ if key in kwargs and kwargs[key] != None:
+ self._fields[key] = kwargs[key]
+
+ self._resetStyleField()
+
+ # Set up logger
+ self._logLock = threading.Lock()
+ with self._logLock:
+ self._logger = logging.getLogger(self._logKey)
+ self._logger.propagate = False
+ self._createLogger()
+
+ self._defaultServerInfo()
+
+ # spawn a thread to monitor configFile for logLevel and logFile changes
+ self._monitorFlag = True
+ self._monitorThread = threading.Thread(target=self._monitorConfigFile, args=())
+ self._monitorThread.daemon = True
+ self._monitorThread.start()
+
+
+ def _createLogger(self):
+ if self._typeLogger == 'filelogger':
+ self._mkdir_p(self._logFile)
+ if self._rotateMethod == 'time':
+ self._logHandler = logging.handlers.TimedRotatingFileHandler(self._logFile, \
+ when=self._timeRotateIntervalType, interval=self._timeRotateInterval, \
+ backupCount=self._backupCount, encoding=None, delay=False, utc=True)
+ elif self._rotateMethod == 'size':
+ self._logHandler = logging.handlers.RotatingFileHandler(self._logFile, \
+ mode=self._sizeRotateMode, maxBytes=self._sizeMaxBytes, \
+ backupCount=self._backupCount, encoding=None, delay=False)
+
+ else:
+ self._logHandler = logging.handlers.WatchedFileHandler(self._logFile, \
+ mode=self._sizeRotateMode, \
+ encoding=None, delay=False)
+ elif self._typeLogger == 'stderrlogger':
+ self._logHandler = logging.handlers.StreamHandler(sys.stderr)
+ elif self._typeLogger == 'stdoutlogger':
+ self._logHandler = logging.handlers.StreamHandler(sys.stdout)
+ elif self._typeLogger == 'socketlogger':
+ self._logHandler = logging.handlers.SocketHandler(self._socketHost, self._socketPort)
+ elif self._typeLogger == 'nulllogger':
+ self._logHandler = logging.handlers.NullHandler()
+
+ if self._fields["style"] == CommonLogger.AuditFile or self._fields["style"] == CommonLogger.MetricsFile:
+ self._logFormatter = logging.Formatter(fmt='%(begtime)s,%(begmsecs)03d+00:00|%(endtime)s,%(endmsecs)03d+00:00|%(message)s', datefmt=CommonLogger.DateFmt)
+ else:
+ self._logFormatter = logging.Formatter(fmt='%(asctime)s,%(msecs)03d+00:00|%(message)s', datefmt='%Y-%m-%dT%H:%M:%S')
+ self._logFormatter.converter = time.gmtime
+ self._logHandler.setFormatter(self._logFormatter)
+ self._logger.addHandler(self._logHandler)
+
+ def _resetStyleField(self):
+ styleFields = ["error", "debug", "audit", "metrics"]
+ if self._fields['style'] in styleFields:
+ self._fields['style'] = styleFields.index(self._fields['style'])
+
+ def __del__(self):
+ if not self._monitorFlag:
+ return
+
+ self._monitorFlag = False
+
+ if self._monitorThread is not None and self._monitorThread.is_alive():
+ self._monitorThread.join()
+
+ self._monitorThread = None
+
+
+ def _defaultServerInfo(self):
+
+ # If not set or purposely set = None, then set default
+ if self._fields.get('server') is None:
+ try:
+ self._fields['server'] = socket.getfqdn()
+ except Exception as err:
+ try:
+ self._fields['server'] = socket.gethostname()
+ except Exception as err:
+ self._fields['server'] = ""
+
+ # If not set or purposely set = None, then set default
+ if self._fields.get('serverIPAddress') is None:
+ try:
+ self._fields['serverIPAddress'] = socket.gethostbyname(self._fields['server'])
+ except Exception as err:
+ self._fields['serverIPAddress'] = ""
+
+
+ def _monitorConfigFile(self):
+ while self._monitorFlag:
+ try:
+ fileTime = os.path.getmtime(self._configFile)
+ if fileTime > self._configFileModified:
+ self._configFileModified = fileTime
+ ReopenLogFile = False
+ logFile = self._logFile
+ with open(self._configFile) as fp:
+ for line in fp:
+ line = line.split('#',1)[0] # remove comments
+ if '=' in line:
+ key, value = [x.strip() for x in line.split('=',1)]
+ if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none'] and self._rotateMethod != value:
+ self._rotateMethod = value.lower()
+ ReopenLogFile = True
+ elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']:
+ self._timeRotateIntervalType = value
+ ReopenLogFile = True
+ elif key == 'timeRotateInterval' and int( value ) > 0:
+ self._timeRotateInterval = int( value )
+ ReopenLogFile = True
+ elif key == 'sizeMaxBytes' and int( value ) >= 0:
+ self._sizeMaxBytes = int( value )
+ ReopenLogFile = True
+ elif key == 'sizeRotateMode' and value in ['a']:
+ self._sizeRotateMode = value
+ ReopenLogFile = True
+ elif key == 'backupCount' and int( value ) >= 0:
+ self._backupCount = int( value )
+ ReopenLogFile = True
+ elif key == self._logKey + 'SocketHost' and self._socketHost != value:
+ self._socketHost = value
+ ReopenLogFile = True
+ elif key == self._logKey + 'SocketPort' and self._socketPort > 0 and self._socketPort != int( value ):
+ self._socketPort = int( value )
+ ReopenLogFile = True
+ elif key == self._logKey + 'LogLevel' and self._logLevelThreshold != self._intLogLevel( value.upper() ):
+ self._logLevelThreshold = self._intLogLevel(value.upper())
+ elif key == self._logKey + 'LogType' and self._typeLogger != value and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']:
+ self._typeLogger = value.lower()
+ ReopenLogFile = True
+ elif key == self._logKey + 'Style':
+ self._fields["style"] = value
+ self._resetStyleField()
+ elif key == self._logKey and self._logFile != value:
+ logFile = value
+ ReopenLogFile = True
+ if ReopenLogFile:
+ with self._logLock:
+ self._logger.removeHandler(self._logHandler)
+ self._logFile = logFile
+ self._createLogger()
+ except Exception as err:
+ pass
+
+ time.sleep(5)
+
+
+ def setFields(self, **kwargs):
+ """Set default values for log fields.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \
+ 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \
+ 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \
+ 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \
+ 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \
+ 'errorDescription' ]:
+ if key in kwargs:
+ if kwargs[key] != None:
+ self._fields[key] = kwargs[key]
+ elif key in self._fields:
+ del self._fields[key]
+
+ self._defaultServerInfo()
+
+
+ def debug(self, message, **kwargs):
+ """Write a DEBUG level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('DEBUG', message, errorCategory = 'DEBUG', **kwargs)
+
+ def info(self, message, **kwargs):
+ """Write an INFO level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('INFO', message, errorCategory = 'INFO', **kwargs)
+
+ def warn(self, message, **kwargs):
+ """Write a WARN level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('WARN', message, errorCategory = 'WARN', **kwargs)
+
+ def error(self, message, **kwargs):
+ """Write an ERROR level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('ERROR', message, errorCategory = 'ERROR', **kwargs)
+
+ def fatal(self, message, **kwargs):
+ """Write a FATAL level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('FATAL', message, errorCategory = 'FATAL', **kwargs)
+
+ def _log(self, logLevel, message, **kwargs):
+ """Write a message to the log file.
+
+ Arguments:
+ logLevel -- value ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', ...) for the log record.
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ # timestamp will automatically be inserted
+ style = int(self._getVal('style', '', **kwargs))
+ requestID = self._getVal('requestID', '', **kwargs)
+ serviceInstanceID = self._getVal('serviceInstanceID', '', **kwargs)
+ threadID = self._getVal('threadID', threading.currentThread().getName(), **kwargs)
+ serverName = self._getVal('serverName', '', **kwargs)
+ serviceName = self._getVal('serviceName', '', **kwargs)
+ instanceUUID = self._getVal('instanceUUID', '', **kwargs)
+ upperLogLevel = self._noSep(logLevel.upper())
+ severity = self._getVal('severity', '', **kwargs)
+ serverIPAddress = self._getVal('serverIPAddress', '', **kwargs)
+ server = self._getVal('server', '', **kwargs)
+ IPAddress = self._getVal('IPAddress', '', **kwargs)
+ className = self._getVal('className', '', **kwargs)
+ timer = self._getVal('timer', '', **kwargs)
+ partnerName = self._getVal('partnerName', '', **kwargs)
+ targetEntity = self._getVal('targetEntity', '', **kwargs)
+ targetServiceName = self._getVal('targetServiceName', '', **kwargs)
+ statusCode = self._getVal('statusCode', '', **kwargs)
+ responseCode = self._getVal('responseCode', '', **kwargs)
+ responseDescription = self._noSep(self._getVal('responseDescription', '', **kwargs))
+ processKey = self._getVal('processKey', '', **kwargs)
+ targetVirtualEntity = self._getVal('targetVirtualEntity', '', **kwargs)
+ customField1 = self._getVal('customField1', '', **kwargs)
+ customField2 = self._getVal('customField2', '', **kwargs)
+ customField3 = self._getVal('customField3', '', **kwargs)
+ customField4 = self._getVal('customField4', '', **kwargs)
+ errorCategory = self._getVal('errorCategory', '', **kwargs)
+ errorCode = self._getVal('errorCode', '', **kwargs)
+ errorDescription = self._noSep(self._getVal('errorDescription', '', **kwargs))
+ nbegTime = self._getArg('begTime', {}, **kwargs)
+
+ detailMessage = self._noSep(message)
+ if bool(re.match(r" *$", detailMessage)):
+ return # don't log empty messages
+
+ useLevel = self._intLogLevel(upperLogLevel)
+ if CommonLogger.verbose: print("logger STYLE=%s" % style)
+ if useLevel < self._logLevelThreshold:
+ if CommonLogger.verbose: print("skipping because of level")
+ pass
+ else:
+ with self._logLock:
+ if style == CommonLogger.ErrorFile:
+ if CommonLogger.verbose: print("using CommonLogger.ErrorFile")
+ self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+ %(requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName,
+ errorCategory, errorCode, errorDescription, detailMessage))
+ elif style == CommonLogger.DebugFile:
+ if CommonLogger.verbose: print("using CommonLogger.DebugFile")
+ self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+ %(requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel,
+ severity, serverIPAddress, server, IPAddress, className, timer, detailMessage))
+ elif style == CommonLogger.AuditFile:
+ if CommonLogger.verbose: print("using CommonLogger.AuditFile")
+ endAuditTime, endAuditMsec = self._getTime()
+ if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime:
+ d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
+ elif self._begTime is not None:
+ d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
+ else:
+ d = { 'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
+ self._begTime = None
+ unused = ""
+ self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+ %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
+ statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel,
+ severity, serverIPAddress, timer, server, IPAddress, className, unused,
+ processKey, customField1, customField2, customField3, customField4, detailMessage), extra=d)
+ elif style == CommonLogger.MetricsFile:
+ if CommonLogger.verbose: print("using CommonLogger.MetricsFile")
+ endMetricsTime, endMetricsMsec = self._getTime()
+ if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime:
+ d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
+ elif self._begTime is not None:
+ d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
+ else:
+ d = { 'begtime': endMetricsTime, 'begmsecs': endMetricsMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
+ self._begTime = None
+ unused = ""
+ self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+ %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
+ targetEntity, targetServiceName, statusCode, responseCode, responseDescription,
+ instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server, IPAddress,
+ className, unused, processKey, targetVirtualEntity, customField1, customField2,
+ customField3, customField4, detailMessage), extra=d)
+ else:
+ print("!!!!!!!!!!!!!!!! style not set: %s" % self._fields["style"])
+
+ def _getTime(self):
+ ct = time.time()
+ lt = time.localtime(ct)
+ return (time.strftime(CommonLogger.DateFmt, lt), (ct - int(ct)) * 1000)
+
+ def setStartRecordEvent(self):
+ """
+ Set the start time to be saved for both audit and metrics records
+ """
+ self._begTime, self._begMsec = self._getTime()
+
+ def getStartRecordEvent(self):
+ """
+ Retrieve the start time to be used for either audit and metrics records
+ """
+ begTime, begMsec = self._getTime()
+ return {'begTime':begTime, 'begMsec':begMsec}
+
+ def _getVal(self, key, default, **kwargs):
+ val = self._fields.get(key)
+ if key in kwargs: val = kwargs[key]
+ if val is None: val = default
+ return self._noSep(val)
+
+ def _getArg(self, key, default, **kwargs):
+ val = None
+ if key in kwargs: val = kwargs[key]
+ if val is None: val = default
+ return val
+
+ def _noSep(self, message):
+ if message is None: return ''
+ return re.sub(r'[\|\n]', ' ', str(message))
+
+ def _intLogLevel(self, logLevel):
+ if logLevel == 'FATAL': useLevel = 50
+ elif logLevel == 'ERROR': useLevel = 40
+ elif logLevel == 'WARN': useLevel = 30
+ elif logLevel == 'INFO': useLevel = 20
+ elif logLevel == 'DEBUG': useLevel = 10
+ else: useLevel = 0
+ return useLevel
+
+ def _mkdir_p(self, filename):
+ """Create missing directories from a full filename path like mkdir -p"""
+
+ if filename is None:
+ return
+
+ folder=os.path.dirname(filename)
+
+ if folder == "":
+ return
+
+ if not os.path.exists(folder):
+ try:
+ os.makedirs(folder)
+ except OSError as err:
+ print("error number %d creating %s directory to hold %s logfile: %s" %(err.errno, err.filename, filename, err.strerror))
+ sys.exit(2)
+ except Exception as err:
+ print("error creating %s directory to hold %s logfile: %s" %(folder, filename, str(err)))
+ sys.exit(2)
+
+if __name__ == "__main__":
+
+ def __checkOneTime(line):
+ format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[+]00:00[|]'
+ m = re.match(format, line)
+ if not m:
+ print("ERROR: time string did not match proper time format, %s" %line)
+ print("\t: format=%s" % format)
+ return 1
+ return 0
+
+ def __checkTwoTimes(line, different):
+ format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|][0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|]'
+ m = re.match(format, line)
+ if not m:
+ print("ERROR: time strings did not match proper time format, %s" %line)
+ print("\t: format=%s" % format)
+ return 1
+ second1 = int(m.group(1))
+ msec1 = int(m.group(2))
+ second2 = int(m.group(3))
+ msec2 = int(m.group(4))
+ if second1 > second2: second2 += 60
+ t1 = second1 * 1000 + msec1
+ t2 = second2 * 1000 + msec2
+ diff = t2 - t1
+ # print("t1=%d (%d,%d) t2=%d (%d,%d), diff = %d" % (t1, second1, msec1, t2, second2, msec2, diff))
+ if different:
+ if diff < 500:
+ print("ERROR: times did not differ enough: %s" % line)
+ return 1
+ else:
+ if diff > 10:
+ print("ERROR: times were too far apart: %s" % line)
+ return 1
+ return 0
+
+ def __checkBegTime(line):
+ format = "begTime should be ([-0-9T:]+)"
+ # print("checkBegTime(%s)" % line)
+ strt = 'begTime should be '
+ i = line.index(strt)
+ rest = line[i+len(strt):].rstrip()
+ if not line.startswith(rest + ","):
+ print("ERROR: line %s should start with %s" % (line,rest))
+ return 1
+ return 0
+
+ def __checkLog(logfile, numLines, numFields):
+ lineCount = 0
+ errorCount = 0
+ with open(logfile, "r") as fp:
+ for line in fp:
+ # print("saw line %s" % line)
+ lineCount += 1
+ c = line.count('|')
+ if c != numFields:
+ print("ERROR: wrong number of fields. Expected %d, got %d: %s" % (numFields, c, line))
+ errorCount += 1
+ if re.search("should not appear", line):
+ print("ERROR: a line appeared that should not have appeared, %s" % line)
+ errorCount += 1
+ elif re.search("single time", line):
+ errorCount += __checkOneTime(line)
+ elif re.search("time should be the same", line):
+ errorCount += __checkTwoTimes(line, different=False)
+ elif re.search("time should be ", line):
+ errorCount += __checkTwoTimes(line, different=True)
+ elif re.search("begTime should be ", line):
+ errorCount += __checkBegTime(line)
+ else:
+ print("ERROR: an unknown message appeared, %s" % line)
+ errorCount += 1
+
+ if lineCount != numLines:
+ print("ERROR: expected %d lines, but got %d lines" % (numLines, lineCount))
+ errorCount += 1
+ return errorCount
+
+ import os, argparse
+ parser = argparse.ArgumentParser(description="test the CommonLogger functions")
+ parser.add_argument("-k", "--keeplogs", help="Keep the log files after finishing the tests", action="store_true")
+ parser.add_argument("-v", "--verbose", help="Print debugging messages", action="store_true")
+ args = parser.parse_args()
+
+ spid = str(os.getpid())
+ if args.keeplogs:
+ spid = ""
+ logcfg = "/tmp/cl.log" + spid + ".cfg"
+ errorLog = "/tmp/cl.error" + spid + ".log"
+ metricsLog = "/tmp/cl.metrics" + spid + ".log"
+ auditLog = "/tmp/cl.audit" + spid + ".log"
+ debugLog = "/tmp/cl.debug" + spid + ".log"
+ if args.verbose: CommonLogger.verbose = True
+
+ import atexit
+ def cleanupTmps():
+ for f in [ logcfg, errorLog, metricsLog, auditLog, debugLog ]:
+ try:
+ os.remove(f)
+ except:
+ pass
+ if not args.keeplogs:
+ atexit.register(cleanupTmps)
+
+ with open(logcfg, "w") as o:
+ o.write("error = " + errorLog + "\n" +
+ "errorLogLevel = WARN\n" +
+ "metrics = " + metricsLog + "\n" +
+ "metricsLogLevel = INFO\n" +
+ "audit = " + auditLog + "\n" +
+ "auditLogLevel = INFO\n" +
+ "debug = " + debugLog + "\n" +
+ "debugLogLevel = DEBUG\n")
+
+ import uuid
+ instanceUUID = uuid.uuid1()
+ serviceName = "testharness"
+ errorLogger = CommonLogger(logcfg, "error", style=CommonLogger.ErrorFile, instanceUUID=instanceUUID, serviceName=serviceName)
+ debugLogger = CommonLogger(logcfg, "debug", style=CommonLogger.DebugFile, instanceUUID=instanceUUID, serviceName=serviceName)
+ auditLogger = CommonLogger(logcfg, "audit", style=CommonLogger.AuditFile, instanceUUID=instanceUUID, serviceName=serviceName)
+ metricsLogger = CommonLogger(logcfg, "metrics", style=CommonLogger.MetricsFile, instanceUUID=instanceUUID, serviceName=serviceName)
+
+ testsRun = 0
+ errorCount = 0
+ errorLogger.debug("error calling debug (should not appear)")
+ errorLogger.info("error calling info (should not appear)")
+ errorLogger.warn("error calling warn (single time)")
+ errorLogger.error("error calling error (single time)")
+ errorLogger.setStartRecordEvent()
+ time.sleep(1)
+ errorLogger.fatal("error calling fatal, after setStartRecordEvent and sleep (start should be ignored, single time)")
+ testsRun += 6
+ errorCount += __checkLog(errorLog, 3, 10)
+
+ auditLogger.debug("audit calling debug (should not appear)")
+ auditLogger.info("audit calling info (time should be the same)")
+ auditLogger.warn("audit calling warn (time should be the same)")
+ auditLogger.error("audit calling error (time should be the same)")
+ bt = auditLogger.getStartRecordEvent()
+ # print("bt=%s" % bt)
+ time.sleep(1)
+ auditLogger.setStartRecordEvent()
+ time.sleep(1)
+ auditLogger.fatal("audit calling fatal, after setStartRecordEvent and sleep, time should be different)")
+ time.sleep(1)
+ auditLogger.fatal("audit calling fatal, begTime should be %s" % bt['begTime'], begTime=bt)
+ testsRun += 7
+ errorCount += __checkLog(auditLog, 5, 25)
+
+ debugLogger.debug("debug calling debug (single time)")
+ debugLogger.info("debug calling info (single time)")
+ debugLogger.warn("debug calling warn (single time)")
+ debugLogger.setStartRecordEvent()
+ time.sleep(1)
+ debugLogger.error("debug calling error, after SetStartRecordEvent and sleep (start should be ignored, single time)")
+ debugLogger.fatal("debug calling fatal (single time)")
+ errorCount += __checkLog(debugLog, 5, 13)
+ testsRun += 6
+
+ metricsLogger.debug("metrics calling debug (should not appear)")
+ metricsLogger.info("metrics calling info (time should be the same)")
+ metricsLogger.warn("metrics calling warn (time should be the same)")
+ bt = metricsLogger.getStartRecordEvent()
+ time.sleep(1)
+ metricsLogger.setStartRecordEvent()
+ time.sleep(1)
+ metricsLogger.error("metrics calling error, after SetStartRecordEvent and sleep, time should be different")
+ metricsLogger.fatal("metrics calling fatal (time should be the same)")
+ time.sleep(1)
+ metricsLogger.fatal("metrics calling fatal, begTime should be %s" % bt['begTime'], begTime=bt)
+ testsRun += 6
+ errorCount += __checkLog(metricsLog, 5, 28)
+
+ print("%d tests run, %d errors found" % (testsRun, errorCount))
diff --git a/oti/event-handler/otihandler/onap/__init__.py b/oti/event-handler/otihandler/onap/__init__.py
new file mode 100644
index 0000000..87cf002
--- /dev/null
+++ b/oti/event-handler/otihandler/onap/__init__.py
@@ -0,0 +1,15 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
diff --git a/oti/event-handler/otihandler/onap/audit.py b/oti/event-handler/otihandler/onap/audit.py
new file mode 100644
index 0000000..8cd16cf
--- /dev/null
+++ b/oti/event-handler/otihandler/onap/audit.py
@@ -0,0 +1,375 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""generic class to keep track of request handling
+ from receiving it through reponse and log all the activities
+
+ call Audit.init("component-name", "path/to/config_file") to init the loggers before any requests
+
+ start each outside request with creation of the Audit object
+ audit = Audit(request_id=None, headers=None, msg=None)
+"""
+
+import os
+import sys
+import json
+import uuid
+import time
+import copy
+from datetime import datetime
+from threading import Lock
+from enum import Enum
+
+from .CommonLogger import CommonLogger
+from .health import Health
+
+REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID"
+REQUEST_REMOTE_ADDR = "Remote-Addr"
+REQUEST_HOST = "Host"
+HOSTNAME = "HOSTNAME"
+
+AUDIT_REQUESTID = 'requestID'
+AUDIT_IPADDRESS = 'IPAddress'
+AUDIT_SERVER = 'server'
+AUDIT_TARGET_ENTITY = 'targetEntity'
+
+HEADER_CLIENTAUTH = "clientauth"
+HEADER_AUTHORIZATION = "authorization"
+
+class AuditHttpCode(Enum):
+ """audit http codes"""
+ HTTP_OK = 200
+ PERMISSION_UNAUTHORIZED_ERROR = 401
+ PERMISSION_FORBIDDEN_ERROR = 403
+ RESPONSE_ERROR = 400
+ DATA_NOT_FOUND_ERROR = 404
+ SERVER_INTERNAL_ERROR = 500
+ SERVICE_UNAVAILABLE_ERROR = 503
+ DATA_ERROR = 1030
+ SCHEMA_ERROR = 1040
+
+class AuditResponseCode(Enum):
+ """audit response codes"""
+ SUCCESS = 0
+ PERMISSION_ERROR = 100
+ AVAILABILITY_ERROR = 200
+ DATA_ERROR = 300
+ SCHEMA_ERROR = 400
+ BUSINESS_PROCESS_ERROR = 500
+ UNKNOWN_ERROR = 900
+
+ @staticmethod
+ def get_response_code(http_status_code):
+ """calculates the response_code from max_http_status_code"""
+ response_code = AuditResponseCode.UNKNOWN_ERROR
+ if http_status_code <= AuditHttpCode.HTTP_OK.value:
+ response_code = AuditResponseCode.SUCCESS
+
+ elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value,
+ AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]:
+ response_code = AuditResponseCode.PERMISSION_ERROR
+ elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value:
+ response_code = AuditResponseCode.AVAILABILITY_ERROR
+ elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value:
+ response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR
+ elif http_status_code in [AuditHttpCode.DATA_ERROR.value,
+ AuditHttpCode.RESPONSE_ERROR.value,
+ AuditHttpCode.DATA_NOT_FOUND_ERROR.value]:
+ response_code = AuditResponseCode.DATA_ERROR
+ elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value:
+ response_code = AuditResponseCode.SCHEMA_ERROR
+
+ return response_code
+
+ @staticmethod
+ def get_human_text(response_code):
+ """convert enum name into human readable text"""
+ if not response_code:
+ return "unknown"
+ return response_code.name.lower().replace("_", " ")
+
+class Audit(object):
+ """put the audit object on stack per each initiating request in the system
+
+ :request_id: is the X-ECOMP-RequestID for tracing
+
+ :req_message: is the request message string for logging
+
+ :aud_parent: is the parent request - used for sub-query metrics to other systems
+
+ :kwargs: - put any request related params into kwargs
+ """
+ _service_name = ""
+ _service_version = ""
+ _service_instance_uuid = str(uuid.uuid4())
+ _started = datetime.now()
+ _logger_debug = None
+ _logger_error = None
+ _logger_metrics = None
+ _logger_audit = None
+ _health = Health()
+ _py_ver = sys.version.replace("\n", "")
+
+ @staticmethod
+ def init(service_name, service_version, config_file_path):
+ """init static invariants and loggers"""
+ Audit._service_name = service_name
+ Audit._service_version = service_version
+ Audit._logger_debug = CommonLogger(config_file_path, "debug", \
+ instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
+ Audit._logger_error = CommonLogger(config_file_path, "error", \
+ instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
+ Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \
+ instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
+ Audit._logger_audit = CommonLogger(config_file_path, "audit", \
+ instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
+
+ @staticmethod
+ def health():
+ """returns json for health check"""
+ now = datetime.now()
+ return {
+ "service_name" : Audit._service_name,
+ "service_version" : Audit._service_version,
+ "service_instance_UUID" : Audit._service_instance_uuid,
+ "python" : Audit._py_ver,
+ "started" : str(Audit._started),
+ "now" : str(now),
+ "uptime" : str(now - Audit._started),
+ "stats" : Audit._health.dump(),
+ "packages" : "N/A" # Audit._packages
+ }
+
+ def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs):
+ """create audit object per each request in the system
+
+ :request_id: is the X-ECOMP-RequestID for tracing
+ :req_message: is the request message string for logging
+ :aud_parent: is the parent Audit - used for sub-query metrics to other systems
+ :kwargs: - put any request related params into kwargs
+ """
+ self.request_id = request_id
+ self.req_message = req_message or ""
+ self.aud_parent = aud_parent
+ self.kwargs = kwargs or {}
+
+ self.retry_get_config = False
+ self.max_http_status_code = 0
+ self._lock = Lock()
+
+ if self.aud_parent:
+ if not self.request_id:
+ self.request_id = self.aud_parent.request_id
+ if not self.req_message:
+ self.req_message = self.aud_parent.req_message
+ self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs)
+ else:
+ headers = self.kwargs.get("headers", {})
+ if headers:
+ if not self.request_id:
+ self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID)
+ if AUDIT_IPADDRESS not in self.kwargs:
+ self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR)
+ if AUDIT_SERVER not in self.kwargs:
+ self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST)
+
+ if AUDIT_SERVER not in self.kwargs:
+ self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME)
+
+ created_req = ""
+ if not self.request_id:
+ created_req = " with new"
+ self.request_id = str(uuid.uuid4())
+
+ self.kwargs[AUDIT_REQUESTID] = self.request_id
+
+ self._started = time.time()
+ self._start_event = Audit._logger_audit.getStartRecordEvent()
+ self.metrics_start()
+
+ if not self.aud_parent:
+ self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\
+ .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
+
+ def merge_all_kwargs(self, **kwargs):
+ """returns the merge of copy of self.kwargs with the param kwargs"""
+ all_kwargs = self.kwargs.copy()
+ if kwargs:
+ all_kwargs.update(kwargs)
+ return all_kwargs
+
+ def set_http_status_code(self, http_status_code):
+ """accumulate the highest(worst) http status code"""
+ self._lock.acquire()
+ if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
+ self.max_http_status_code = max(http_status_code, self.max_http_status_code)
+ self._lock.release()
+
+ def get_max_http_status_code(self):
+ """returns the highest(worst) http status code"""
+ self._lock.acquire()
+ max_http_status_code = self.max_http_status_code
+ self._lock.release()
+ return max_http_status_code
+
+ @staticmethod
+ def get_status_code(success):
+ """COMPLETE versus ERROR"""
+ if success:
+ return 'COMPLETE'
+ return 'ERROR'
+
+ @staticmethod
+ def hide_secrets(obj):
+ """hides the known secret field values of the dictionary"""
+ if not isinstance(obj, dict):
+ return obj
+
+ for key in obj:
+ if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]:
+ obj[key] = "*"
+ elif isinstance(obj[key], dict):
+ obj[key] = Audit.hide_secrets(obj[key])
+
+ return obj
+
+ @staticmethod
+ def log_json_dumps(obj, **kwargs):
+ """hide the known secret field values of the dictionary and return json.dumps"""
+ if not isinstance(obj, dict):
+ return json.dumps(obj, **kwargs)
+
+ return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs)
+
+ def is_serious_error(self, status_code):
+ """returns whether the response_code is success and a human text for response code"""
+ return AuditResponseCode.PERMISSION_ERROR.value \
+ == AuditResponseCode.get_response_code(status_code).value \
+ or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value
+
+ def _get_response_status(self):
+ """calculates the response status fields from max_http_status_code"""
+ max_http_status_code = self.get_max_http_status_code()
+ response_code = AuditResponseCode.get_response_code(max_http_status_code)
+ success = (response_code.value == AuditResponseCode.SUCCESS.value)
+ response_description = AuditResponseCode.get_human_text(response_code)
+ return success, max_http_status_code, response_code, response_description
+
+ def is_success(self):
+ """returns whether the response_code is success and a human text for response code"""
+ success, _, _, _ = self._get_response_status()
+ return success
+
+ def debug(self, log_line, **kwargs):
+ """debug - the debug=lowest level of logging"""
+ Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs))
+
+ def info(self, log_line, **kwargs):
+ """debug - the info level of logging"""
+ Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs))
+
+ def info_requested(self, result=None, **kwargs):
+ """info "requested ..." - the info level of logging"""
+ self.info("requested {0} {1}".format(self.req_message, result or ""), \
+ **self.merge_all_kwargs(**kwargs))
+
+ def warn(self, log_line, **kwargs):
+ """debug+error - the warn level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+ Audit._logger_debug.warn(log_line, **all_kwargs)
+ Audit._logger_error.warn(log_line, **all_kwargs)
+
+ def error(self, log_line, **kwargs):
+ """debug+error - the error level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+ Audit._logger_debug.error(log_line, **all_kwargs)
+ Audit._logger_error.error(log_line, **all_kwargs)
+
+ def fatal(self, log_line, **kwargs):
+ """debug+error - the fatal level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+ Audit._logger_debug.fatal(log_line, **all_kwargs)
+ Audit._logger_error.fatal(log_line, **all_kwargs)
+
+ @staticmethod
+ def get_elapsed_time(started):
+ """returns the elapsed time since started in milliseconds"""
+ return int(round(1000 * (time.time() - started)))
+
+ def metrics_start(self, log_line=None, **kwargs):
+ """reset metrics timing"""
+ self._metrics_started = time.time()
+ self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent()
+ if log_line:
+ self.info(log_line, **self.merge_all_kwargs(**kwargs))
+
+ def metrics(self, log_line, **kwargs):
+ """debug+metrics - the metrics=sub-audit level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+ success, max_http_status_code, response_code, response_description = \
+ self._get_response_status()
+ metrics_func = None
+ timer = Audit.get_elapsed_time(self._metrics_started)
+ if success:
+ log_line = "done: {0}".format(log_line)
+ self.info(log_line, **all_kwargs)
+ metrics_func = Audit._logger_metrics.info
+ Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer)
+ else:
+ log_line = "failed: {0}".format(log_line)
+ self.error(log_line, errorCode=response_code.value, \
+ errorDescription=response_description, **all_kwargs)
+ metrics_func = Audit._logger_metrics.error
+ Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer)
+
+ metrics_func(log_line, begTime=self._metrics_start_event, timer=timer,
+ statusCode=Audit.get_status_code(success), responseCode=response_code.value,
+ responseDescription=response_description,
+ **all_kwargs
+ )
+
+ self.metrics_start()
+ return (success, max_http_status_code, response_description)
+
+ def audit_done(self, result=None, **kwargs):
+ """debug+audit - the audit=top level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+ success, max_http_status_code, response_code, response_description = \
+ self._get_response_status()
+ log_line = "{0} {1}".format(self.req_message, result or "").strip()
+ audit_func = None
+ timer = Audit.get_elapsed_time(self._started)
+ if success:
+ log_line = "done: {0}".format(log_line)
+ self.info(log_line, **all_kwargs)
+ audit_func = Audit._logger_audit.info
+ Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer)
+ else:
+ log_line = "failed: {0}".format(log_line)
+ self.error(log_line, errorCode=response_code.value,
+ errorDescription=response_description, **all_kwargs)
+ audit_func = Audit._logger_audit.error
+ Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer)
+
+ audit_func(log_line, begTime=self._start_event, timer=timer,
+ statusCode=Audit.get_status_code(success),
+ responseCode=response_code.value,
+ responseDescription=response_description,
+ **all_kwargs
+ )
+
+ return (success, max_http_status_code, response_description)
+ # this line added to test
diff --git a/oti/event-handler/otihandler/onap/health.py b/oti/event-handler/otihandler/onap/health.py
new file mode 100644
index 0000000..39edc0d
--- /dev/null
+++ b/oti/event-handler/otihandler/onap/health.py
@@ -0,0 +1,102 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""generic class to keep track of app health"""
+
+import uuid
+from datetime import datetime
+from threading import Lock
+
+
+class HealthStats(object):
+ """keep track of stats for calls"""
+ def __init__(self, name):
+ """keep track of stats for metrics calls"""
+ self._name = name or "stats_" + str(uuid.uuid4())
+ self._lock = Lock()
+ self._call_count = 0
+ self._error_count = 0
+ self._longest_timer = 0
+ self._total_timer = 0
+ self._last_success = None
+ self._last_error = None
+
+ def dump(self):
+ """returns dict of stats"""
+ dump = None
+ with self._lock:
+ dump = {
+ "call_count" : self._call_count,
+ "error_count" : self._error_count,
+ "last_success" : str(self._last_success),
+ "last_error" : str(self._last_error),
+ "longest_timer_millisecs" : self._longest_timer,
+ "ave_timer_millisecs" : (float(self._total_timer)/self._call_count \
+ if self._call_count else 0)
+ }
+ return dump
+
+ def success(self, timer):
+ """records the successful execution"""
+ with self._lock:
+ self._call_count += 1
+ self._last_success = datetime.now()
+ self._total_timer += timer
+ if not self._longest_timer or self._longest_timer < timer:
+ self._longest_timer = timer
+
+ def error(self, timer):
+ """records the errored execution"""
+ with self._lock:
+ self._call_count += 1
+ self._error_count += 1
+ self._last_error = datetime.now()
+ self._total_timer += timer
+ if not self._longest_timer or self._longest_timer < timer:
+ self._longest_timer = timer
+
+class Health(object):
+ """Health stats for multiple requests"""
+ def __init__(self):
+ """Health stats for application"""
+ self._all_stats = {}
+ self._lock = Lock()
+
+ def _add_or_get_stats(self, stats_name):
+ """add to or get from the ever growing dict of HealthStats"""
+ stats = None
+ with self._lock:
+ stats = self._all_stats.get(stats_name)
+ if not stats:
+ self._all_stats[stats_name] = stats = HealthStats(stats_name)
+ return stats
+
+ def success(self, stats_name, timer):
+ """records the successful execution on stats_name"""
+ stats = self._add_or_get_stats(stats_name)
+ stats.success(timer)
+
+ def error(self, stats_name, timer):
+ """records the error execution on stats_name"""
+ stats = self._add_or_get_stats(stats_name)
+ stats.error(timer)
+
+ def dump(self):
+ """returns dict of stats"""
+ with self._lock:
+ stats = dict((k, v.dump()) for (k, v) in self._all_stats.items())
+
+ return stats
diff --git a/oti/event-handler/otihandler/utils.py b/oti/event-handler/otihandler/utils.py
new file mode 100644
index 0000000..4f9dbda
--- /dev/null
+++ b/oti/event-handler/otihandler/utils.py
@@ -0,0 +1,83 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+import base64
+import collections
+import copy
+import os
+
+from Crypto import Random
+from Crypto.Cipher import PKCS1_v1_5
+from Crypto.Hash import SHA
+from Crypto.PublicKey import RSA
+
+
+def update_dict(d, u):
+ """Recursively updates dict
+
+ Update dict d with dict u
+ """
+ for k, v in u.items():
+ if isinstance(v, collections.Mapping):
+ r = update_dict(d.get(k, {}), v)
+ d[k] = r
+ else:
+ d[k] = u[k]
+ return d
+
+def replace_token(configure_content):
+ try:
+ with open("/opt/app/config-map/dcae-k8s-cluster-token",'r') as fh:
+ dcae_token = fh.readline().rstrip('\n')
+
+ new_config = copy.deepcopy(configure_content)
+
+ # override the default-user token
+ ix=0
+ for user in new_config['users'][:]:
+ if user['name'] == "default-user":
+ new_config['users'][ix] = {
+ "name": "default-user",
+ "user": {
+ "token": dcae_token
+ }
+ }
+ ix += 1
+
+ return new_config
+
+ except Exception as e:
+ return configure_content
+
+def decrypt(b64_ciphertext):
+ """returns decrypted b64_ciphertext that was encoded like this:
+
+ echo "cleartext" | openssl pkeyutl -encrypt -pubin -inkey rsa.pub | base64 --wrap=0
+
+ requires private key in environment variable EOMUSER_PRIVATE
+ """
+
+ if len(b64_ciphertext) <= 30: # For transition, assume short values are not encrypted
+ return b64_ciphertext
+
+ try:
+ ciphertext = base64.b64decode(b64_ciphertext)
+ key = RSA.importKey(os.getenv('EOMUSER_PRIVATE'))
+ cleartext = PKCS1_v1_5.new(key).decrypt(ciphertext, Random.new().read(15+SHA.digest_size))
+ except Exception as e:
+ return b64_ciphertext
+
+ return cleartext
diff --git a/oti/event-handler/otihandler/web_server.py b/oti/event-handler/otihandler/web_server.py
new file mode 100644
index 0000000..f3eb071
--- /dev/null
+++ b/oti/event-handler/otihandler/web_server.py
@@ -0,0 +1,603 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""web-service for oti_handler"""
+
+import json
+import logging
+import os
+import time
+from datetime import datetime
+
+import cherrypy
+
+from otihandler.cbs_rest import CBSRest
+from otihandler.config import Config
+from otihandler.dti_processor import DTIProcessor
+from otihandler.onap.audit import Audit
+
+
+class DTIWeb(object):
+ """run REST API of OTI Handler"""
+
+ logger = logging.getLogger("oti_handler.web_server")
+ HOST_INADDR_ANY = ".".join("0"*4)
+
+ @staticmethod
+ def run_forever(audit):
+ """run the web-server of OTI Handler forever"""
+
+ cherrypy.config.update({"server.socket_host": DTIWeb.HOST_INADDR_ANY,
+ "server.socket_port": Config.wservice_port})
+
+ protocol = "http"
+ tls_info = ""
+ if Config.tls_server_cert_file and Config.tls_private_key_file:
+ tm_cert = os.path.getmtime(Config.tls_server_cert_file)
+ tm_key = os.path.getmtime(Config.tls_private_key_file)
+ #cherrypy.server.ssl_module = 'builtin'
+ cherrypy.server.ssl_module = 'pyOpenSSL'
+ cherrypy.server.ssl_certificate = Config.tls_server_cert_file
+ cherrypy.server.ssl_private_key = Config.tls_private_key_file
+ if Config.tls_server_ca_chain_file:
+ cherrypy.server.ssl_certificate_chain = Config.tls_server_ca_chain_file
+ protocol = "https"
+ tls_info = "cert: {} {} {}".format(Config.tls_server_cert_file,
+ Config.tls_private_key_file,
+ Config.tls_server_ca_chain_file)
+
+ cherrypy.tree.mount(_DTIWeb(), '/')
+
+ DTIWeb.logger.info(
+ "%s with config: %s", audit.info("running oti_handler as {}://{}:{} {}".format(
+ protocol, cherrypy.server.socket_host, cherrypy.server.socket_port, tls_info)),
+ json.dumps(cherrypy.config))
+ cherrypy.engine.start()
+
+ # If HTTPS server certificate changes, exit to let kubernetes restart us
+ if Config.tls_server_cert_file and Config.tls_private_key_file:
+ while True:
+ time.sleep(600)
+ c_tm_cert = os.path.getmtime(Config.tls_server_cert_file)
+ c_tm_key = os.path.getmtime(Config.tls_private_key_file)
+ if c_tm_cert > tm_cert or c_tm_key > tm_key:
+ DTIWeb.logger.info("cert or key file updated")
+ cherrypy.engine.stop()
+ cherrypy.engine.exit()
+ break
+
+
+class _DTIWeb(object):
+ """REST API of DTI Handler"""
+
+ VALID_EVENT_TYPES = ['deploy', 'undeploy', 'add', 'delete', 'update', 'notify']
+
+ @staticmethod
+ def _get_request_info(request):
+ """Returns info about the http request."""
+
+ return "{} {}{}".format(request.method, request.script_name, request.path_info)
+
+
+ #----- Common endpoint methods
+
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def healthcheck(self):
+ """Returns healthcheck results."""
+
+ req_info = _DTIWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
+
+ DTIWeb.logger.info("%s", req_info)
+
+ result = Audit.health()
+
+ DTIWeb.logger.info("healthcheck %s: result=%s", req_info, json.dumps(result))
+
+ audit.audit_done(result=json.dumps(result))
+ return result
+
+ @cherrypy.expose
+ def shutdown(self):
+ """Shutdown the web server."""
+
+ req_info = _DTIWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
+
+ DTIWeb.logger.info("%s: --- stopping REST API of DTI Handler ---", req_info)
+
+ cherrypy.engine.exit()
+
+ health = json.dumps(Audit.health())
+ audit.info("oti_handler health: {}".format(health))
+ DTIWeb.logger.info("oti_handler health: %s", health)
+ DTIWeb.logger.info("%s: --------- the end -----------", req_info)
+ result = str(datetime.now())
+ audit.info_requested(result)
+ return "goodbye! shutdown requested {}".format(result)
+
+ # ----- DTI Handler mock endpoint methods
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ @cherrypy.tools.json_in()
+ def mockevents(self):
+
+ result = {"KubeNamespace":"com-my-dcae-test", "KubePod":"pod-0", "KubeServiceName":"pod-0.service.local", "KubeServicePort":"8880", "KubeClusterFqdn":"fqdn-1"}
+
+ return result
+
+ #----- DTI Handler endpoint methods
+
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ @cherrypy.tools.json_in()
+ def events(self, notify="y"):
+ """
+ Run dti reconfig script in service component instances configured to accept the DTI Event.
+
+ POST /events < <dcae_event>
+
+ POST /events?ndtify="n" < <dcae_event>
+
+ where <dcae_event> is the entire DTI Event passed as a JSON object and contains at least these top-level keys:
+ dcae_service_action : string
+ required, 'deploy' or 'undeploy'
+ dcae_target_name : string
+ required, VNF Instance ID
+ dcae_target_type : string
+ required, VNF Type of the VNF Instance
+ dcae_service_location : string
+ optional, CLLI location. Not provided or '' infers all locations.
+
+ Parameters
+ ----------
+ notify : string
+ optional, default "y", any of these will not notify components: [ "f", "false", "False", "FALSE", "n", "no" ]
+ When "n" will **not** notify components of this DTI Event update to Consul.
+
+ Returns
+ -------
+ dict
+ JSON object containing success or error of executing the dti reconfig script on
+ each component instance's docker container, keyed by service_component_name.
+
+ """
+
+ if cherrypy.request.method != "POST":
+ raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method))
+
+ dti_event = cherrypy.request.json or {}
+ str_dti_event = json.dumps(dti_event)
+
+ req_info = _DTIWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message="{}: {}".format(req_info, str_dti_event), \
+ headers=cherrypy.request.headers)
+ DTIWeb.logger.info("%s: dti_event=%s headers=%s", \
+ req_info, str_dti_event, json.dumps(cherrypy.request.headers))
+
+ dcae_service_action = dti_event.get('dcae_service_action')
+ if not dcae_service_action:
+ msg = 'dcae_service_action is missing'
+ DTIWeb.logger.error(msg)
+ raise cherrypy.HTTPError(400, msg)
+ elif dcae_service_action.lower() not in self.VALID_EVENT_TYPES:
+ msg = 'dcae_service_action is invalid'
+ DTIWeb.logger.error(msg)
+ raise cherrypy.HTTPError(400,msg)
+
+ dcae_target_name = dti_event.get('dcae_target_name')
+ if not dcae_target_name:
+ msg = 'dcae_target_name is missing'
+ DTIWeb.logger.error(msg)
+ raise cherrypy.HTTPError(400, msg)
+
+ dcae_target_type = dti_event.get('dcae_target_type', '')
+ if not dcae_target_type:
+ msg = 'dcae_target_type is missing'
+ DTIWeb.logger.error(msg)
+ raise cherrypy.HTTPError(400, msg)
+
+ send_notification = True
+ if (isinstance(notify, bool) and not notify) or \
+ (isinstance(notify, str) and notify.lower() in [ "f", "false", "n", "no" ]):
+ send_notification = False
+
+ prc = DTIProcessor(dti_event, send_notification=send_notification)
+ result = prc.get_result()
+
+ DTIWeb.logger.info("%s: dti_event=%s result=%s", \
+ req_info, str_dti_event, json.dumps(result))
+
+ success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
+ if not success:
+ cherrypy.response.status = http_status_code
+
+ return result
+
+ def get_docker_events(self, request, service, location):
+ """
+ common routine for dti_docker_events and oti_docker_events
+
+ :param request: HTTP GET request
+ :param service: HTTP request query parameter for service name
+ :param location: HTTP request query parameter for location CLLI
+ :return:
+ """
+
+ if request.method != "GET":
+ raise cherrypy.HTTPError(404, "unexpected method {}".format(request.method))
+
+ req_info = _DTIWeb._get_request_info(request)
+ audit = Audit(req_message=req_info, headers=request.headers)
+
+ return DTIProcessor.get_docker_raw_events(service, location)
+
+ def get_k8s_events(self, request, **params):
+ """
+ common routine for dti_k8s_events and oti_k8s_events
+
+ :param request: HTTP GET request
+ :param params: HTTP request query parameters
+ :return:
+ """
+ if request.method != "GET":
+ raise cherrypy.HTTPError(404, "unexpected method {}".format(request.method))
+
+ req_info = _DTIWeb._get_request_info(request)
+ audit = Audit(req_message=req_info, headers=request.headers)
+
+ pod = request.params['pod']
+ namespace = request.params['namespace']
+ cluster = request.params['cluster']
+
+ return DTIProcessor.get_k8_raw_events(pod, cluster, namespace)
+
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def oti_k8s_events(self, **params):
+ """
+ Retrieve raw JSON events from application events database
+
+ GET /oti_k8_events?pod=<sts-1>&namespace=<ns1>&cluster=<cluster1>
+
+ Parameters
+ ----------
+ pod ID : string
+ POD ID of the stateful set POD
+ namespace: string
+ kubernetes namespace
+ cluster: string
+ kubernetes cluster FQDN
+
+ Returns
+ -------
+ dict
+ JSON object containing the fully-bound configuration.
+
+ """
+
+ return self.get_k8s_events(cherrypy.request, params)
+
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def dti_k8s_events(self, **params):
+ """
+ Retrieve raw JSON events from application events database
+
+ GET /dti_k8_events?pod=<sts-1>&namespace=<ns1>&cluster=<cluster1>
+
+ Parameters
+ ----------
+ pod ID : string
+ POD ID of the stateful set POD
+ namespace: string
+ kubernetes namespace
+ cluster: string
+ kubernetes cluster FQDN
+
+ Returns
+ -------
+ dict
+ JSON object containing the fully-bound configuration.
+
+ """
+
+ return self.get_k8s_events(cherrypy.request, params)
+
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def oti_docker_events(self, service, location=None):
+ """
+ Retrieve raw JSON events from application events database related to docker deployments
+
+ GET /oti_docker_events?service=<svc>&location=<location>
+
+ Parameters
+ ----------
+ service : string
+ The service component name assigned by dockerplugin to the component
+ that is unique to the cloudify node instance and used in its Consul key(s).
+ location : string
+ optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location.
+ If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided,
+ otherwise results are not location filtered.
+
+ Returns
+ -------
+ dict
+ JSON object containing the fully-bound configuration.
+
+ """
+
+ return self.get_docker_events(cherrypy.request, service, location)
+
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def dti_docker_events(self, service, location=None):
+ """
+ Retrieve raw JSON events from application events database related to docker deployments
+
+ GET /dti_docker_events?service=<svc>&location=<location>
+
+ Parameters
+ ----------
+ service : string
+ The service component name assigned by dockerplugin to the component
+ that is unique to the cloudify node instance and used in its Consul key(s).
+ location : string
+ optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location.
+ If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided,
+ otherwise results are not location filtered.
+
+ Returns
+ -------
+ dict
+ JSON object containing the fully-bound configuration.
+
+ """
+
+ return self.get_docker_events(cherrypy.request, service, location)
+
+ #----- Config Binding Service (CBS) endpoint methods
+
+ @cherrypy.expose
+ @cherrypy.popargs('service_name')
+ @cherrypy.tools.json_out()
+ def service_component(self, service_name):
+ """
+ Retrieve fully-bound configuration for service_name from Consul KVs.
+
+ GET /service_component/<service_name>
+
+ Parameters
+ ----------
+ service_name : string
+ The service component name assigned by dockerplugin to the component
+ that is unique to the cloudify node instance and used in its Consul key(s).
+
+ Returns
+ -------
+ dict
+ JSON object containing the fully-bound configuration.
+
+ """
+
+ if cherrypy.request.method != "GET":
+ raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method))
+
+ req_info = _DTIWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
+ DTIWeb.logger.info("%s: service_name=%s headers=%s", \
+ req_info, service_name, json.dumps(cherrypy.request.headers))
+
+ try:
+ result = CBSRest.get_service_component(service_name)
+ except Exception as e:
+ result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)}
+ audit.set_http_status_code(404)
+
+ DTIWeb.logger.info("%s: service_name=%s result=%s", \
+ req_info, service_name, json.dumps(result))
+
+ success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
+ if not success:
+ cherrypy.response.status = http_status_code
+
+ return result
+
+ @cherrypy.expose
+ @cherrypy.popargs('service_name')
+ @cherrypy.tools.json_out()
+ def service_component_all(self, service_name, service_location=None, policy_ids="y"):
+ """
+ Retrieve all information for service_name (config, dti, dti_events, and policies) from Consul KVs.
+
+ GET /service_component_all/<service_name>
+
+ GET /service_component_all/<service_name>?service_location=<service_location>
+
+ GET /service_component_all/<service_name>?service_location=<service_location>;policy_ids=n
+
+ Parameters
+ ----------
+ service_name : string
+ The service component name assigned by dockerplugin to the component
+ that is unique to the cloudify node instance and used in its Consul key(s).
+ service_location : string
+ optional, allows multiple values separated by commas.
+ Filters DTI events with dcae_service_location in service_location.
+ policy_ids : string
+ optional, default "y", any of these will unset: [ "f", "false", "False", "FALSE", "n", "no" ]
+ When unset, formats policies items as a list (without policy_ids) rather than as an object indexed by policy_id.
+
+ Returns
+ -------
+ dict
+ JSON object containing all information for component service_name.
+ The top-level keys may include the following:
+ config : dict
+ The cloudify node's application_config property from when the start workflow was executed.
+ dti : dict
+ Keys are VNF Types that the component currently is assigned to monitor. Policy can change them.
+ dti_events : dict
+ The latest deploy DTI events, keyed by VNF Type and sub-keyed by VNF Instance ID.
+ policies : dict
+ event : dict
+ Contains information about when the policies folder was last written.
+ items : dict
+ Contains all policy bodies for the service_name component, keyed by policy_id.
+
+ """
+
+ if cherrypy.request.method != "GET":
+ raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method))
+
+ req_info = _DTIWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
+ DTIWeb.logger.info("%s: service_name=%s headers=%s", \
+ req_info, service_name, json.dumps(cherrypy.request.headers))
+
+ policies_as_list = False
+ if (isinstance(policy_ids, bool) and not policy_ids) or \
+ (isinstance(policy_ids, str) and policy_ids.lower() in [ "f", "false", "n", "no" ]):
+ policies_as_list = True
+ try:
+ result = CBSRest.get_service_component_all(service_name, service_location=service_location, policies_as_list=policies_as_list)
+ except Exception as e:
+ result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)}
+ audit.set_http_status_code(404)
+
+ DTIWeb.logger.info("%s: service_name=%s result=%s", \
+ req_info, service_name, json.dumps(result))
+
+ success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
+ if not success:
+ cherrypy.response.status = http_status_code
+
+ return result
+
+ @cherrypy.expose
+ @cherrypy.popargs('service_name')
+ @cherrypy.tools.json_out()
+ def dti(self, service_name=None, vnf_type=None, vnf_id=None, service_location=None):
+ """
+ Retrieve current (latest, not undeployed) DTI events from Consul KVs.
+
+ GET /dti/<service_name>
+
+ GET /dti/<service_name>?vnf_type=<vnf_type>;vnf_id=<vnf_id>;service_location=<service_location>
+
+ GET /dti
+
+ GET /dti?vnf_type=<vnf_type>;vnf_id=<vnf_id>;service_location=<service_location>
+
+ Parameters
+ ----------
+ service_name : string
+ optional. The service component name assigned by dockerplugin to the component
+ that is unique to the cloudify node instance and used in its Consul key(s).
+ vnf_type : string
+ optional, allows multiple values separated by commas. Gets DTI events for these vnf_type(s).
+ vnf_id : string
+ optional. Requires vnf_type also. Gets DTI event for this vnf_id.
+ service_location : string
+ optional, allows multiple values separated by commas.
+ Filters DTI events with dcae_service_location in service_location.
+
+ Returns
+ -------
+ dict
+ Dictionary of DTI event(s).
+ If one vnf_type and vnf_id are both specified, then object returned will be just the one DTI event.
+ If one vnf_type is specified but not vnf_id, then DTI events will be keyed by vnf_id.
+ Otherwise the DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
+
+ """
+
+ if cherrypy.request.method != "GET":
+ raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method))
+
+ req_info = _DTIWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
+ DTIWeb.logger.info("%s: service_name=%s headers=%s", \
+ req_info, service_name, json.dumps(cherrypy.request.headers))
+
+ try:
+ result = CBSRest.get_dti(service_name=service_name, vnf_type=vnf_type, vnf_id=vnf_id, service_location=service_location)
+ except Exception as e:
+ result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)}
+ audit.set_http_status_code(404)
+
+ DTIWeb.logger.info("%s: service_name=%s result=%s", \
+ req_info, service_name, json.dumps(result))
+
+ success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
+ if not success:
+ cherrypy.response.status = http_status_code
+
+ return result
+
+ @cherrypy.expose
+ @cherrypy.popargs('service_name')
+ @cherrypy.tools.json_out()
+ def policies(self, service_name, policy_id=None):
+ """
+ Retrieve policies for service_name from Consul KVs.
+
+ GET /policies/<service_name>
+
+ GET /policies/<service_name>?policy_id=<policy_id>
+
+ Parameters
+ ----------
+ service_name : string
+ The service component name assigned by dockerplugin to the component
+ that is unique to the cloudify node instance and used in its Consul key(s).
+ policy_id : string
+ optional. Limits returned policy to this policy_id.
+
+ Returns
+ -------
+ dict
+ JSON object containing policy bodies for the service_name component.
+ If policy_id is specified, then object returned will be just the one policy body.
+ If policy_id is not specified, then object will contain all policy bodies, keyed by policy_id.
+
+ """
+
+ if cherrypy.request.method != "GET":
+ raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method))
+
+ req_info = _DTIWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
+ DTIWeb.logger.info("%s: service_name=%s headers=%s", \
+ req_info, service_name, json.dumps(cherrypy.request.headers))
+
+ try:
+ result = CBSRest.get_policies(service_name, policy_id=policy_id)
+ except Exception as e:
+ result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)}
+ audit.set_http_status_code(404)
+
+ DTIWeb.logger.info("%s: service_name=%s result=%s", \
+ req_info, service_name, json.dumps(result))
+
+ success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
+ if not success:
+ cherrypy.response.status = http_status_code
+
+ return result