summaryrefslogtreecommitdiffstats
path: root/oti/event-handler/otihandler
diff options
context:
space:
mode:
authorvv770d <vv770d@att.com>2021-03-17 22:44:49 +0000
committervv770d <vv770d@att.com>2021-03-17 22:51:01 +0000
commit15dd0a3541db1e7ac9f38680d9dbe83cf3d303ae (patch)
treebf908cb1e47da9eb8c34913f00b2f773b3d9b39e /oti/event-handler/otihandler
parent18512176d62ae211d97952027ca6a6dc59b50992 (diff)
[DCAE] Remove OTI component
OTI has been deprecated Change-Id: Ic2051f9262744081880d8471c31a2491d2e4451c Signed-off-by: vv770d <vv770d@att.com> Issue-ID: DCAEGEN2-2676 Signed-off-by: vv770d <vv770d@att.com>
Diffstat (limited to 'oti/event-handler/otihandler')
-rw-r--r--oti/event-handler/otihandler/__init__.py15
-rw-r--r--oti/event-handler/otihandler/__main__.py74
-rw-r--r--oti/event-handler/otihandler/cbs_rest.py202
-rw-r--r--oti/event-handler/otihandler/cfy_client.py638
-rw-r--r--oti/event-handler/otihandler/config.py180
-rw-r--r--oti/event-handler/otihandler/consul_client.py617
-rw-r--r--oti/event-handler/otihandler/dbclient/__init__.py19
-rw-r--r--oti/event-handler/otihandler/dbclient/apis/__init__.py18
-rw-r--r--oti/event-handler/otihandler/dbclient/apis/db_access.py50
-rw-r--r--oti/event-handler/otihandler/dbclient/apis/event_db_access.py154
-rw-r--r--oti/event-handler/otihandler/dbclient/db_dao.py33
-rw-r--r--oti/event-handler/otihandler/dbclient/models/__init__.py19
-rw-r--r--oti/event-handler/otihandler/dbclient/models/event.py40
-rw-r--r--oti/event-handler/otihandler/dbclient/models/event_ack.py51
-rw-r--r--oti/event-handler/otihandler/docker_client.py175
-rw-r--r--oti/event-handler/otihandler/dti_processor.py815
-rw-r--r--oti/event-handler/otihandler/onap/CommonLogger.py958
-rw-r--r--oti/event-handler/otihandler/onap/__init__.py15
-rw-r--r--oti/event-handler/otihandler/onap/audit.py375
-rw-r--r--oti/event-handler/otihandler/onap/health.py102
-rw-r--r--oti/event-handler/otihandler/utils.py83
-rw-r--r--oti/event-handler/otihandler/web_server.py605
22 files changed, 0 insertions, 5238 deletions
diff --git a/oti/event-handler/otihandler/__init__.py b/oti/event-handler/otihandler/__init__.py
deleted file mode 100644
index 87cf002..0000000
--- a/oti/event-handler/otihandler/__init__.py
+++ /dev/null
@@ -1,15 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
diff --git a/oti/event-handler/otihandler/__main__.py b/oti/event-handler/otihandler/__main__.py
deleted file mode 100644
index 59a7087..0000000
--- a/oti/event-handler/otihandler/__main__.py
+++ /dev/null
@@ -1,74 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""run as server: python -m otihandler"""
-
-import logging
-import os
-import sys
-
-from otihandler.config import Config
-from otihandler.onap.audit import Audit
-from otihandler.web_server import DTIWeb
-from otihandler.dbclient import DaoBase
-
-
-class LogWriter(object):
- """redirect the standard out + err to the logger"""
-
- def __init__(self, logger_func):
- self.logger_func = logger_func
-
- def write(self, log_line):
- """actual writer to be used in place of stdout or stderr"""
-
- log_line = log_line.rstrip()
- if log_line:
- self.logger_func(log_line)
-
- def flush(self):
- """no real flushing of the buffer"""
-
- pass
-
-
-def run_event_handler():
- """main run function for event_handler"""
-
- Config.load_from_file()
- # Config.discover()
-
- logger = logging.getLogger("event_handler")
- sys.stdout = LogWriter(logger.info)
- sys.stderr = LogWriter(logger.error)
-
- logger.info("========== run_event_handler ==========")
- app_version = os.getenv("APP_VER")
- logger.info("app_version %s", app_version)
- Audit.init(Config.get_system_name(), app_version, Config.LOGGER_CONFIG_FILE_PATH)
-
- logger.info("starting event_handler with config:")
- logger.info(Audit.log_json_dumps(Config.config))
-
- audit = Audit(req_message="start event_handler")
-
- audit = Audit(req_message="DB init start")
- DaoBase.init_db(os.environ.get("DB_CONN_URL"))
-
- DTIWeb.run_forever(audit)
-
-if __name__ == "__main__":
- run_event_handler()
diff --git a/oti/event-handler/otihandler/cbs_rest.py b/oti/event-handler/otihandler/cbs_rest.py
deleted file mode 100644
index 2e3e7de..0000000
--- a/oti/event-handler/otihandler/cbs_rest.py
+++ /dev/null
@@ -1,202 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""REST for high-level information retrievals from Consul KVs"""
-
-import copy
-import logging
-
-from otihandler.consul_client import ConsulClient
-
-
-class CBSRest(object):
- _logger = logging.getLogger("oti_handler.cbs_rest")
-
- @staticmethod
- def get_value(key, default=None):
- """Wrap ConsulClient.get_value() to ignore exceptions."""
-
- data = default
- try:
- data = ConsulClient.get_value(key)
- except Exception as e:
- pass
-
- return data
-
- @staticmethod
- def get_kvs(key):
- """Wrap ConsulClient.get_kvs() to ignore exceptions."""
-
- data = {}
- try:
- data = ConsulClient.get_kvs(key, trim_prefix=True)
- except Exception as e:
- data = {}
-
- if not data:
- data = {}
- return data
-
- @staticmethod
- def get_service_component(service_name):
- """Get the fully-bound config for a service_name."""
-
- return ConsulClient.get_service_component(service_name)
-
- @staticmethod
- def get_service_component_all(service_name, service_location=None, policies_as_list=False):
- """Get all Consul objects for a service_name."""
-
- r_dict = ConsulClient.get_service_component_all(service_name, policies_as_list=policies_as_list)
- if r_dict and r_dict.get('oti'):
- r_dict['oti'] = CBSRest.get_oti(service_name, service_location=service_location)
- return r_dict
-
- @staticmethod
- def get_oti(service_name=None, vnf_type=None, vnf_id=None, service_location=None):
- """
- Get DTI events.
-
- Parameters
- ----------
- service_name : string
- optional. The service component name assigned by dockerplugin to the component that is unique to the cloudify node instance and used in its Consul key(s).
- vnf_type : string
- optional, allows multiple values separated by commas. Gets DTI events for these vnf_type(s).
- vnf_id : string
- optional. Requires vnf_type also. Gets DTI event for this vnf_id.
- service_location : string
- optional, allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location.
- If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided,
- otherwise results are not location filtered.
-
- Returns
- -------
- dict
- Dictionary of DTI event(s).
- If one vnf_type and vnf_id are both specified, then object returned will be just the one DTI event.
- If one vnf_type is specified but not vnf_id, then DTI events will be keyed by vnf_id.
- Otherwise the DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
-
- """
-
- lc_vnf_type = vnf_type
- if vnf_type:
- lc_vnf_type = vnf_type.lower()
-
- r_dict = {}
-
- want_locs = []
- if service_location:
- want_locs = service_location.split(',')
-
- give_types = []
- if service_name:
- if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node
- try:
- node_name = ConsulClient.lookup_service(service_name)[0].get("Node")
- if node_name:
- services = ConsulClient.lookup_node(node_name).get("Services")
- if services:
- for node_svc in list(services.keys()):
- if "-component-dockerhost-" in node_svc or "_component_kubernetes_master" in node_svc:
- want_locs = services[node_svc].get("Tags", [])
- break
- except:
- pass
- supported_types = ConsulClient.get_value(service_name + ":oti")
- if supported_types:
- supported_types = [type.lower() for type in list(supported_types.keys())]
- if supported_types:
- if lc_vnf_type: # If user specifies vnf_type(s), constrain to supported ones
- for type in lc_vnf_type.split(','):
- if type in supported_types:
- give_types.append(type)
- else:
- give_types = supported_types
- if not give_types or (len(give_types) == 1 and give_types[0] == ''):
- return r_dict
- elif lc_vnf_type:
- give_types = lc_vnf_type.split(',')
-
-
- # If they specified only one vnf_type ...
- if lc_vnf_type and ',' not in lc_vnf_type:
- type = give_types[0]
-
- # ... and vnf_id
- if vnf_id:
- # get just one vnf_id
- t_dict = CBSRest.get_value("oti_events/" + type + "/" + vnf_id, default=None)
- if t_dict:
- event_loc = t_dict.get('dcae_service_location')
- if not event_loc or not want_locs or event_loc in want_locs:
- r_dict = copy.deepcopy(t_dict)
-
- # ... and not vnf_id
- else:
- # get all DTI events of just one type, indexed by vnf_id
- t_dict = CBSRest.get_kvs("oti_events/" + type + "/")
- if t_dict:
- if not want_locs:
- r_dict = copy.deepcopy(t_dict)
- else:
- for id in t_dict:
- event_loc = t_dict[id].get('dcae_service_location')
- if not event_loc or event_loc in want_locs:
- r_dict[id] = copy.deepcopy(t_dict[id])
-
- # If they did not specify either service_name or vnf_type (the only way give_types=[])
- elif not give_types:
- # get all DTI events, indexed by vnf_type then vnf_id
- t_dict = CBSRest.get_kvs("oti_events/")
- if t_dict:
- for type in t_dict:
- for id in t_dict[type]:
- if not vnf_id or vnf_id == id:
- if want_locs:
- event_loc = t_dict[type][id].get('dcae_service_location')
- if not want_locs or not event_loc or event_loc in want_locs:
- if type not in r_dict:
- r_dict[type] = {}
- r_dict[type][id] = copy.deepcopy(t_dict[type][id])
-
- # If they speclfied multiple vnf_types
- else:
- # get all DTI events of give_types, indexed by vnf_type then vnf_id
- for type in give_types:
- t_dict = CBSRest.get_kvs("oti_events/" + type + "/")
- if t_dict:
- for id in t_dict:
- if not vnf_id or vnf_id == id:
- if want_locs:
- event_loc = t_dict[id].get('dcae_service_location')
- if not want_locs or not event_loc or event_loc in want_locs:
- if type not in r_dict:
- r_dict[type] = {}
- r_dict[type][id] = copy.deepcopy(t_dict[id])
-
- return r_dict
-
- @staticmethod
- def get_policies(service_name, policy_id=None):
- """Get one or all policies for a service_name."""
-
- if policy_id:
- return ConsulClient.get_value(service_name + ":policies/items/" + policy_id)
- else:
- return ConsulClient.get_kvs(service_name + ":policies/items/", trim_prefix=True)
diff --git a/oti/event-handler/otihandler/cfy_client.py b/oti/event-handler/otihandler/cfy_client.py
deleted file mode 100644
index 4e3de87..0000000
--- a/oti/event-handler/otihandler/cfy_client.py
+++ /dev/null
@@ -1,638 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""Our client interface to Cloudify"""
-import base64
-import copy
-import json
-import logging
-import os
-
-import requests
-
-from otihandler.consul_client import ConsulClient
-
-
-class CfyClientConsulError(RuntimeError):
- pass
-
-
-class CloudifyClient(object):
- """quick replacement for cloudify_rest_client -- this supports pagination and scans all DCAE tenants"""
-
- def __init__(self, **kwargs):
- self._protocol = kwargs.get('protocol', 'http')
- self._host = kwargs.get('host')
- self._port = kwargs.get('port')
- self._headers = kwargs.get('headers')
-
- self.node_instances = self
-
- def list(self, **kwargs):
- url_mask = "{}://{}:{}/api/v3.1/tenants".format(self._protocol, self._host, self._port)
- # s = Session()
- # req = Request('GET', url_mask, headers=self._headers)
- # prepped = req.prepare()
- # response = s.send(prepped,verify=False,timeout=30)
- response = requests.get(url_mask, headers=self._headers, timeout=30)
- obj = response.json()
- tenants = [x["name"] for x in obj["items"]]
- tenants_with_containers = [x for x in tenants if 'DCAE' in x]
-
- size = 1000
- url_mask = "{}://{}:{}/api/v3.1/node-instances?_size={}&_offset={}".format(
- self._protocol, self._host, self._port, size, "{}")
- if kwargs:
- for (key,val) in kwargs.items():
- if isinstance(val, str):
- url_mask = url_mask + '&{}={}'.format(key, val)
- elif isinstance(val, list):
- url_mask = url_mask + '&{}={}'.format(key, ','.join(val))
-
- for tenant in tenants_with_containers:
- self._headers_with_tenant = copy.deepcopy(self._headers)
- self._headers_with_tenant['Tenant'] = tenant
-
- offset = 0
- total = 1
- while offset < total:
- # s = Session()
- # req = Request('GET', url_mask.format(offset), headers=self._headers_with_tenant)
- # prepped = req.prepare()
- # response = s.send(prepped, verify=False, timeout=30)
- response = requests.get(url_mask.format(offset), headers=self._headers_with_tenant, timeout=30)
- response.raise_for_status()
- obj = response.json()
- offset = offset + len(obj["items"])
- total = obj["metadata"]["pagination"]["total"]
- for item in obj["items"]:
- yield NodeInstance(item)
-
- def update_node_instance(self, node_instance_id, body, **kwargs):
- headers = copy.deepcopy(self._headers_with_tenant)
- headers['Content-Type'] = "application/json"
- url_mask = "{}://{}:{}/api/v3.1/node-instances/{}".format(
- self._protocol, self._host, self._port, node_instance_id)
- response = requests.patch(url_mask, json=body, headers=headers, timeout=30)
- obj = response.json()
- return obj
-
-
-class NodeInstance(object):
- """quick replacement for cloudify_rest_client"""
-
- def __init__(self, instance):
- self.id = instance.get("id")
- self.deployment_id = instance.get("deployment_id")
- self.host_id = instance.get("host_id")
- self.runtime_properties = instance.get("runtime_properties")
- self.relationships = instance.get("relationships")
- self.state = instance.get("state")
- self.version = instance.get("version")
- self.node_id = instance.get("node_id")
- self.scaling_groups = instance.get("scaling_groups")
-
-
-class CfyClient(object):
- _logger = logging.getLogger("oti_handler.cfy_client")
- _client = None
-
-
- @staticmethod
- def __set_cloudify_manager_client():
- """Create connection to Cloudify_Manager."""
-
- if CfyClient._client:
- return
-
- host = None
- port = None
- obj = json.loads(os.environ.get("CLOUDIFY", "{}")).get("cloudify")
- source = "CLOUDIFY environment variable"
- if not obj:
- CM_KEY = 'cloudify_manager'
- source = "Consul key '{}'".format(CM_KEY)
-
- try:
- results = ConsulClient.lookup_service(CM_KEY)
- except Exception as e:
- msg = "Unexpected exception {}: {!s} from ConsulClient.lookup_service({})".format(type(e).__name__, e, CM_KEY)
- CfyClient._logger.error(msg)
- raise CfyClientConsulError(msg)
- result = results[0]
- host = result['ServiceAddress']
- port = result['ServicePort']
-
- try:
- obj = ConsulClient.get_value(CM_KEY)
- except Exception as e:
- msg = "Unexpected exception {}: {!s} from ConsulClient.get_value({})".format(type(e).__name__, e, CM_KEY)
- CfyClient._logger.error(msg)
- raise CfyClientConsulError(msg)
- if not obj:
- raise CfyClientConsulError("{} value is empty or invalid".format(source))
-
- obj = obj.get('cloudify')
-
- if not obj:
- raise CfyClientConsulError("{} value is missing 'cloudify' key or value".format(source))
-
- host = obj.get('address', host)
- if not host:
- raise CfyClientConsulError("{} value is missing 'cloudify.address'".format(source))
-
- port = obj.get('port', port)
- if not port:
- raise CfyClientConsulError("{} value is missing 'cloudify.port'".format(source))
-
- protocol = obj.get('protocol')
- if not protocol:
- raise CfyClientConsulError("{} value is missing 'cloudify.protocol'".format(source))
- username = obj.get('user')
- if not username:
- raise CfyClientConsulError("{} value is missing 'cloudify.user'".format(source))
- password = obj.get('password')
- if not password:
- raise CfyClientConsulError("{} value is missing 'cloudify.password'".format(source))
-
- b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), "utf-8")).decode("utf-8")
- headers = {'Authorization': 'Basic ' + b64_encoded_str.rstrip('\n')}
- #headers = {'Authorization': 'Basic ' + '{}:{}'.format(username, password).encode("base64").rstrip('\n')}
-
- CfyClient._client = CloudifyClient(host=host, port=port, protocol=protocol, headers=headers)
-
-
- @staticmethod
- def query_k8_components(in_cluster_fqdn):
- """
- Iterate components that belong to a cluster fqdn.
-
- Parameters
- ----------
- in_cluster_fqdn : string
- k8s cluster FQDN
-
- Returns
- -------
- A generator of tuples of component information
- [ (proxy_fqdn, namespace, scn, replicas, scn_port), ... ]
- """
-
- cnt_found = 0
- CfyClient.__set_cloudify_manager_client()
- for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
- rtp = node_instance.runtime_properties
- scn_port = None
- cluster_fqdn = None
- proxy_fqdn = None
- dti_info = rtp.get('dti_info')
- if dti_info:
- env_items = dti_info.get('env')
- for env in env_items:
- if env.get("name") == 'KUBE_CLUSTER_FQDN':
- cluster_fqdn = env.get("value")
- if env.get("name") == 'KUBE_PROXY_FQDN':
- proxy_fqdn = env.get("value")
- ports = dti_info.get('ports')
- if ports:
- scn_port = ports[0].split(':')[0]
- else:
- continue
-
- if in_cluster_fqdn != cluster_fqdn:
- continue
-
- controller_type = rtp.get('k8s_controller_type')
- if not controller_type:
- CfyClient._logger.debug("controller type is missing")
- continue
- elif controller_type != "statefulset":
- CfyClient._logger.debug("not a stateful set")
- continue
-
- container_id = rtp.get('k8s_deployment')
- if not container_id:
- CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(
- node_instance.deployment_id, node_instance.id))
- continue
-
- try:
- namespace = container_id.get('namespace')
- except:
- namespace = ''
- pass
-
- replicas = 1
- try:
- replicas = rtp.get('replicas')
- except:
- pass
-
- scn = rtp.get('service_component_name')
- if not scn:
- CfyClient._logger.debug(
- "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id,
- node_instance.id))
- continue
-
- cnt_found += 1
- yield (proxy_fqdn, namespace, scn, replicas, scn_port)
- continue
-
- msg = "Found {} components (collectors) for cluster={}" \
- .format(cnt_found, in_cluster_fqdn)
- CfyClient._logger.debug(msg)
-
-
- @staticmethod
- def iter_components(dcae_target_type, dcae_service_location='', component_type=''):
- """
- Iterate components that handle a given dcae_target_type.
-
- Parameters
- ----------
- dcae_target_type : string
- VNF Type
- dcae_service_location : string
- Location of the component (optional)
- component_type : string
- Type of the component (optional)
-
- Returns
- -------
- A generator of tuples of component information
- [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
- or
- [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ]
-
- """
-
- cnt_found = 0
-
- # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI)
- dockerhosts = []
- k8s_svcs_tagged_with_clli = []
- if dcae_service_location:
- try:
- dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location])
- except Exception as e:
- msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "-component-dockerhost-", [dcae_service_location])
- CfyClient._logger.error(msg)
- raise CfyClientConsulError(msg)
- try:
- k8s_svcs_tagged_with_clli = ConsulClient.search_services("_component_kubernetes_master", [dcae_service_location])
- except Exception as e:
- msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "_component_kubernetes_master", [dcae_service_location])
- CfyClient._logger.error(msg)
- raise CfyClientConsulError(msg)
-
- CfyClient.__set_cloudify_manager_client()
- for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
- rtp = node_instance.runtime_properties
-
- # Skip this node_instance if it is not a collector
- container_type = "docker"
- container_id = rtp.get('container_id')
- docker_host = ''
- svc_with_my_clli_tags = ''
- if not container_id:
- container_type = "k8s"
- container_id = rtp.get('k8s_deployment')
- if not container_id:
- CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id))
- continue
- docker_config = rtp.get('docker_config')
- if not docker_config:
- CfyClient._logger.debug("{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, node_instance.id))
- continue
- dti_reconfig_script = ""
- if container_type == "docker":
- dti_reconfig_script = rtp.get('dti_reconfig_script')
- if not dti_reconfig_script:
- CfyClient._logger.debug("{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, node_instance.id))
- continue
- elif container_type == "k8s":
- dti_reconfig_script = docker_config.get('reconfigs',{}).get('dti')
- if not dti_reconfig_script:
- CfyClient._logger.debug("{} {} runtime_properties docker_config has no reconfigs.dti".format(node_instance.deployment_id, node_instance.id))
- continue
-
- scn = rtp.get('service_component_name')
- scn_address = None
- scn_port = None
- if not scn:
- CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id))
- continue
- if container_type == "docker":
- docker_host = rtp.get('selected_container_destination')
- if not docker_host:
- CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id))
- continue
- elif container_type == "k8s":
- try:
- srvcCatalogItem = ConsulClient.lookup_service(scn)[0]
- scn_address = srvcCatalogItem.get("ServiceAddress")
- except:
- CfyClient._logger.debug(
- "{} {} runtime_properties has no consul svc catalog registry".format(node_instance.deployment_id,
- node_instance.id))
- continue
- svc_with_my_clli_tags = rtp.get('svc_with_my_clli_tags')
- # e.g., scn="s908d92e232ed43..."
- if not svc_with_my_clli_tags:
- # We should not incur this burden. k8splugin should store this into runtime properties.
- try:
- node_name = srvcCatalogItem.get("Node")
- if node_name:
- # e.g., node_name="zldcdyh1adce3kpma00"
- services = ConsulClient.lookup_node(node_name).get("Services")
- if services:
- for node_svc in list(services.keys()):
- if "_component_kubernetes_master" in node_svc:
- # e.g., node_svc="zldcdyh1adce3_kp_component_kubernetes_master"
- svc_with_my_clli_tags = node_svc
- break
- except:
- pass
- # ... cache results we find into runtime properties to avoid searching again
- if svc_with_my_clli_tags:
- CfyClient._logger.debug("{} {} storing runtime property svc_with_my_clli_tags={}".format(
- node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags))
- rtp['svc_with_my_clli_tags'] = svc_with_my_clli_tags
- body = {
- "runtime_properties": rtp,
- "state": node_instance.state,
- "version": 1 + int(node_instance.version)
- }
- try:
- CfyClient._client.update_node_instance(node_instance.id, body)
- except:
- pass
-
- if not svc_with_my_clli_tags:
- CfyClient._logger.debug("{} {} runtime_properties has no svc_with_my_clli_tags".format(node_instance.deployment_id, node_instance.id))
- continue
-
- # get the nodeport for statefulset sidecar service
- dti_info = rtp.get('dti_info')
- if dti_info:
- ports = dti_info.get('ports')
- if ports:
- scn_port = ports[0].split(':')[1]
- docker_host = rtp.get('configuration',{}).get('file_content')
- if not docker_host:
- CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id))
- continue
-
- # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG
- if dcae_service_location:
- if container_type == "docker" and docker_host not in dockerhosts:
- CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}"
- .format(node_instance.deployment_id, node_instance.id, docker_host, dcae_service_location))
- continue
- elif container_type == "k8s" and svc_with_my_clli_tags not in k8s_svcs_tagged_with_clli:
- CfyClient._logger.debug("{} {} svc_with_my_clli_tags {} is not TAGged with DTI Event dcae_service_location {}"
- .format(node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags, dcae_service_location))
- continue
-
- # If DTI Event specifies component_type, then collector's service_component_type must match
- if component_type:
- c_component_type = rtp.get('service_component_type')
- if component_type != c_component_type:
- CfyClient._logger.debug("{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id))
- continue
-
- # Check if the collector supports this VNF Type
- # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
- dti_key = scn + ':oti'
- try:
- obj = ConsulClient.get_value(dti_key)
- except Exception as e:
- CfyClient._logger.error(
- "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}"
- .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id)
- )
- continue
- if not obj:
- CfyClient._logger.debug("{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, dti_key))
- continue
- obj_types = set(k.lower() for k in obj)
- if dcae_target_type.lower() in obj_types:
- CfyClient._logger.debug("{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, dcae_target_type))
- cnt_found += 1
- yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, dti_reconfig_script, container_type, scn_address, scn_port )
- continue
- else:
- CfyClient._logger.debug("{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, dcae_target_type, dti_key))
- continue
-
- msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}"\
- .format(cnt_found, dcae_target_type, dcae_service_location, component_type)
- CfyClient._logger.debug(msg)
-
- @staticmethod
- def iter_components_for_docker(dcae_target_type, dcae_service_location='', component_type=''):
- """
- Iterate components that handle a given dcae_target_type to find the components of docker type
-
- Parameters
- ----------
- dcae_target_type : string
- VNF Type
- dcae_service_location : string
- Location of the component (optional)
- component_type : string
- Type of the component (optional)
-
- Returns
- -------
- A generator of tuples of component information
- [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
-
- """
-
- cnt_found = 0
- # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI)
- dockerhosts = []
-
- if dcae_service_location:
- try:
- dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location])
- except Exception as e:
- msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(
- type(e).__name__, e, "-component-dockerhost-", [dcae_service_location])
- CfyClient._logger.error(msg)
- raise CfyClientConsulError(msg)
-
- CfyClient.__set_cloudify_manager_client()
- for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
- rtp = node_instance.runtime_properties
-
- # Skip this node_instance if it is not a collector
- container_type = "docker"
- container_id = rtp.get('container_id')
- if not container_id:
- if not container_id:
- CfyClient._logger.debug("{} {} runtime_properties has no container_id".format(
- node_instance.deployment_id, node_instance.id))
- continue
- docker_config = rtp.get('docker_config')
- if not docker_config:
- CfyClient._logger.debug(
- "{} {} runtime_properties has no docker_config".format(node_instance.deployment_id,
- node_instance.id))
- continue
- dti_reconfig_script = ""
- dti_reconfig_script = rtp.get('dti_reconfig_script')
- if not dti_reconfig_script:
- CfyClient._logger.debug(
- "{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id,
- node_instance.id))
- continue
- scn = rtp.get('service_component_name')
- if not scn:
- CfyClient._logger.debug(
- "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id,
- node_instance.id))
- continue
- docker_host = rtp.get('selected_container_destination')
- if not docker_host:
- CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(
- node_instance.deployment_id, node_instance.id))
- continue
-
- # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG
- if dcae_service_location:
- if docker_host not in dockerhosts:
- CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}"
- .format(node_instance.deployment_id, node_instance.id, docker_host,
- dcae_service_location))
- continue
-
- # If DTI Event specifies component_type, then collector's service_component_type must match
- if component_type:
- c_component_type = rtp.get('service_component_type')
- if component_type != c_component_type:
- CfyClient._logger.debug(
- "{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id))
- continue
-
- # Check if the collector supports this VNF Type
- # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
- dti_key = scn + ':oti'
- try:
- obj = ConsulClient.get_value(dti_key)
- except Exception as e:
- CfyClient._logger.error(
- "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}"
- .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id)
- )
- continue
- if not obj:
- CfyClient._logger.debug(
- "{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id,
- dti_key))
- continue
- obj_types = set(k.lower() for k in obj)
- if dcae_target_type.lower() in obj_types:
- CfyClient._logger.debug(
- "{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id,
- dcae_target_type))
- cnt_found += 1
- yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id,
- node_instance.state, docker_host, dti_reconfig_script, container_type, '', '')
- continue
- else:
- CfyClient._logger.debug(
- "{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id,
- dcae_target_type, dti_key))
- continue
-
- msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}" \
- .format(cnt_found, dcae_target_type, dcae_service_location, component_type)
- CfyClient._logger.debug(msg)
-
-
- @staticmethod
- def iter_components_of_deployment(deployment_id, node_id=None, reconfig_type="app"):
- """
- Iterate components of a specific deployment_id.
-
- Parameters
- ----------
- deployment_id : string
- Cloudify deployment ID that created the component(s).
- node_id : string
- Cloudify node ID that created the component.
- reconfig_type : string
- "app"
-
- Returns
- -------
- A generator of tuples of component information
- [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
- or
- [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ]
-
- """
-
- cnt_found = 0
-
- CfyClient.__set_cloudify_manager_client()
- for node_instance in CfyClient._client.node_instances.list(
- deployment_id=deployment_id,
- _include=['id','node_id','deployment_id','state','runtime_properties']
- ):
- if node_id and node_instance.node_id != node_id:
- continue
-
- rtp = node_instance.runtime_properties
-
- # Skip this node_instance if it is not a collector
- container_type = "docker"
- container_id = rtp.get('container_id')
- if not container_id:
- container_type = "k8s"
- container_id = rtp.get('k8s_deployment')
- if not container_id:
- CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id))
- continue
- reconfig_script = rtp.get('docker_config',{}).get('reconfigs',{}).get(reconfig_type)
- if not reconfig_script:
- CfyClient._logger.debug("{} {} runtime_properties has no docker_config.reconfigs.{}".format(node_instance.deployment_id, node_instance.id, reconfig_type))
- continue
- scn = rtp.get('service_component_name')
- if not scn:
- CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id))
- continue
- if container_type == "docker":
- docker_host = rtp.get('selected_container_destination')
- if not docker_host:
- CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id))
- continue
- elif container_type == "k8s":
- docker_host = rtp.get('configuration',{}).get('file_content')
- if not docker_host:
- CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id))
- continue
-
- CfyClient._logger.debug("{} {} is a {}-reconfigurable component".format(node_instance.deployment_id, node_instance.id, reconfig_type))
- cnt_found += 1
- yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, reconfig_script, container_type)
- continue
-
- msg = "Found {} {}-reconfigurable components".format(cnt_found, reconfig_type)
- CfyClient._logger.debug(msg)
diff --git a/oti/event-handler/otihandler/config.py b/oti/event-handler/otihandler/config.py
deleted file mode 100644
index 5c87f43..0000000
--- a/oti/event-handler/otihandler/config.py
+++ /dev/null
@@ -1,180 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""read and use the config"""
-
-import copy
-import json
-import logging
-import logging.config
-import os
-
-from otihandler.consul_client import ConsulClient
-
-os.makedirs('logs', exist_ok=True)
-logging.basicConfig(
- filename='logs/oti_handler.log', \
- format='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \
- '%(threadName)s %(name)s.%(funcName)s: %(message)s', \
- datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG)
-
-class Config(object):
- """main config of the application"""
-
- CONFIG_FILE_PATH = "etc/config.json"
- LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config"
- SERVICE_NAME = "oti_handler"
-
- FIELD_SYSTEM = "system"
- FIELD_WSERVICE_PORT = "wservice_port"
- FIELD_TLS = "tls"
-
- _logger = logging.getLogger("oti_handler.config")
- config = None
-
- cloudify_proto = None
- cloudify_addr = None
- cloudify_port = None
- cloudify_user = None
- cloudify_pass = None
- cloudify = None
- consul_url = "http://consul:8500"
- tls_cacert_file = None
- tls_server_cert_file = None
- tls_private_key_file = None
- tls_server_ca_chain_file = None
- wservice_port = 9443
-
- @staticmethod
- def _get_tls_file_path(tls_config, cert_directory, tls_name):
- """calc file path and verify its existance"""
-
- file_name = tls_config.get(tls_name)
- if not file_name:
- return None
- tls_file_path = os.path.join(cert_directory, file_name)
- if not os.path.isfile(tls_file_path) or not os.access(tls_file_path, os.R_OK):
- Config._logger.error("invalid %s: %s", tls_name, tls_file_path)
- return None
- return tls_file_path
-
- @staticmethod
- def _set_tls_config(tls_config):
- """verify and set tls certs in config"""
-
- try:
- Config.tls_cacert_file = None
- Config.tls_server_cert_file = None
- Config.tls_private_key_file = None
- Config.tls_server_ca_chain_file = None
-
- if not (tls_config and isinstance(tls_config, dict)):
- Config._logger.info("no tls in config: %s", json.dumps(tls_config))
- return
-
- cert_directory = tls_config.get("cert_directory")
-
- if not (cert_directory and isinstance(cert_directory, str)):
- Config._logger.warning("unexpected tls.cert_directory: %r", cert_directory)
- return
-
- cert_directory = os.path.join(
- os.path.dirname(os.path.dirname(os.path.realpath(__file__))), str(cert_directory))
- if not (cert_directory and os.path.isdir(cert_directory)):
- Config._logger.warning("ignoring invalid cert_directory: %s", cert_directory)
- return
-
- Config.tls_cacert_file = Config._get_tls_file_path(tls_config, cert_directory, "cacert")
- Config.tls_server_cert_file = Config._get_tls_file_path(tls_config, cert_directory,
- "server_cert")
- Config.tls_private_key_file = Config._get_tls_file_path(tls_config, cert_directory,
- "private_key")
- Config.tls_server_ca_chain_file = Config._get_tls_file_path(tls_config, cert_directory,
- "server_ca_chain")
-
- finally:
- Config._logger.info("tls_cacert_file = %s", Config.tls_cacert_file)
- Config._logger.info("tls_server_cert_file = %s", Config.tls_server_cert_file)
- Config._logger.info("tls_private_key_file = %s", Config.tls_private_key_file)
- Config._logger.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file)
-
- @staticmethod
- def merge(new_config):
- """merge the new_config into current config - override the values"""
-
- if not new_config:
- return
-
- if not Config.config:
- Config.config = new_config
- return
-
- new_config = copy.deepcopy(new_config)
- Config.config.update(new_config)
-
- @staticmethod
- def get_system_name():
- """find the name of the oti_handler system
- to be used as the key in consul-kv for config of oti_handler
- """
-
- return (Config.config or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME)
-
- @staticmethod
- def discover():
- """bring and merge the config settings from Consul"""
-
- system_key = Config.get_system_name()
- new_config = ConsulClient.get_value(system_key)
-
- if not new_config or not isinstance(new_config, dict):
- Config._logger.warn("unexpected config from Consul: %s", new_config)
- return
-
- Config._logger.debug("loaded config from Consul(%s): %s",
- system_key, json.dumps(new_config))
- Config._logger.debug("config before merge from Consul: %s", json.dumps(Config.config))
- Config.merge(new_config.get(Config.SERVICE_NAME))
- Config._logger.debug("merged config from Consul: %s", json.dumps(Config.config))
-
- @staticmethod
- def load_from_file(file_path=None):
- """read and store the config from config file"""
-
- if not file_path:
- file_path = Config.CONFIG_FILE_PATH
-
- loaded_config = None
- if os.access(file_path, os.R_OK):
- with open(file_path, 'r') as config_json:
- loaded_config = json.load(config_json)
-
- if not loaded_config:
- Config._logger.info("config not loaded from file: %s", file_path)
- return
-
- Config._logger.info("config loaded from file: %s", file_path)
- logging_config = loaded_config.get("logging")
- if logging_config:
- logging.config.dictConfig(logging_config)
-
- Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port)
-
- local_config = loaded_config.get(Config.SERVICE_NAME, {})
- Config._set_tls_config(local_config.get(Config.FIELD_TLS))
-
- Config.merge(loaded_config.get(Config.SERVICE_NAME))
- return True
diff --git a/oti/event-handler/otihandler/consul_client.py b/oti/event-handler/otihandler/consul_client.py
deleted file mode 100644
index 1b25f3e..0000000
--- a/oti/event-handler/otihandler/consul_client.py
+++ /dev/null
@@ -1,617 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""client to talk to consul at consul port 8500"""
-
-import base64
-import copy
-import json
-import logging
-import os
-import re
-import socket
-
-import requests
-
-
-class ConsulClientError(RuntimeError):
- pass
-
-class ConsulClientConnectionError(RuntimeError):
- pass
-
-class ConsulClientServiceNotFoundError(RuntimeError):
- pass
-
-class ConsulClientNodeNotFoundError(RuntimeError):
- pass
-
-class ConsulClientKVEntryNotFoundError(RuntimeError):
- pass
-
-
-class ConsulClient(object):
- """talking to consul"""
-
- CONSUL_SERVICE_MASK = "{}/v1/catalog/service/{}"
- CONSUL_KV_MASK = "{}/v1/kv/{}"
- CONSUL_KVS_MASK = "{}/v1/kv/{}?recurse=true"
- CONSUL_TRANSACTION_URL = "{}/v1/txn"
- _logger = logging.getLogger("oti_handler.consul_client")
-
- MAX_OPS_PER_TXN = 64
- # MAX_VALUE_LEN = 512 * 1000
-
- OPERATION_SET = "set"
- OPERATION_DELETE = "delete"
- OPERATION_DELETE_FOLDER = "delete-tree"
-
-
- #----- Methods for Consul services
-
- @staticmethod
- def lookup_service(service_name):
- """find the service record in consul"""
-
- service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name)
-
- ConsulClient._logger.info("lookup_service(%s)", service_path)
-
- try:
- response = requests.get(service_path, timeout=30)
- response.raise_for_status()
- # except requests.exceptions.HTTPError as e:
- # except requests.exceptions.ConnectionError as e:
- # except requests.exceptions.Timeout as e:
- except requests.exceptions.RequestException as e:
- msg = "lookup_service({}) requests.get({}) threw exception {}: {!s}".format(
- service_name, service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- return_list = response.json()
- # except ValueError as e:
- except Exception as e:
- msg = "lookup_service({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
- service_name, service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientServiceNotFoundError(msg)
-
- if not return_list:
- msg = "lookup_service({}) got empty or no value from requests.get({})".format(
- service_name, service_path)
- ConsulClient._logger.error(msg)
- raise ConsulClientServiceNotFoundError(msg)
-
- return return_list
-
-
- @staticmethod
- def get_all_services():
- """List all services from consul"""
-
- service_path = "{}/v1/catalog/services".format(os.environ.get("CONSUL_URL").rstrip("/"))
-
- ConsulClient._logger.info("get_all_services(%s)", service_path)
-
- try:
- response = requests.get(service_path, timeout=30)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "get_all_services() requests.get({}) threw exception {}: {!s}".format(
- service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- return_dict = response.json()
- except Exception as e:
- msg = "get_all_services() parsing JSON from requests.get({}) threw exception {}: {!s}".format(
- service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientServiceNotFoundError(msg)
-
- if not return_dict:
- msg = "get_all_services() got empty or no value from requests.get({})".format(
- service_path)
- ConsulClient._logger.info(msg)
- # raise ConsulClientServiceNotFoundError(msg)
-
- return return_dict
-
-
- @staticmethod
- def _find_matching_services(services, name_search, tags):
- """Find matching services given search criteria"""
- sub_tags = tags[0][4:6]
- tags.append(sub_tags)
-
- def is_match(service):
- srv_name, srv_tags = service
- return name_search in srv_name and \
- any([tag in srv_tags for tag in tags])
-
- return [ srv[0] for srv in list(services.items()) if is_match(srv) ]
-
-
- @staticmethod
- def search_services(name_search, tags):
- """
- Search for services that match criteria
-
- Args:
- -----
- name_search: (string) Name to search for as a substring
- tags: (list) List of strings that are tags. A service must match **ANY OF** the
- tags in the list.
-
- Returns:
- --------
- List of names of services that matched
- """
-
- matches = []
-
- # srvs is dict where key is service name and value is list of tags
- srvs = ConsulClient.get_all_services()
-
- if srvs:
- matches = ConsulClient._find_matching_services(srvs, name_search, tags)
-
- return matches
-
-
- @staticmethod
- def get_service_fqdn_port(service_name, node_meta=False):
- """find the service record in consul"""
-
- service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name)
-
- ConsulClient._logger.info("get_service_fqdn_port(%s)", service_path)
-
- try:
- response = requests.get(service_path, timeout=30)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "get_service_fqdn_port({}) requests.get({}) threw exception {}: {!s}".format(
- service_name, service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- service = response.json()
- except Exception as e:
- msg = "get_service_fqdn_port({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
- service_name, service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientServiceNotFoundError(msg)
-
- if not service:
- msg = "get_service_fqdn_port({}) got empty or no value from requests.get({})".format(
- service_name, service_path)
- ConsulClient._logger.error(msg)
- raise ConsulClientServiceNotFoundError(msg)
-
- try:
- service = service[0] # arbitrarily choose the first one
- port = service["ServicePort"]
-
- # HTTPS certificate validation requires FQDN not IP address
- fqdn = ""
- if node_meta:
- meta = service.get("NodeMeta")
- if meta:
- fqdn = meta.get("fqdn")
- if not fqdn:
- fqdn = socket.getfqdn(str(service["ServiceAddress"]))
- except Exception as e:
- msg = "get_service_fqdn_port({}) parsing result from requests.get({}) threw exception {}: {!s}".format(
- service_name, service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientServiceNotFoundError(msg)
-
- return (fqdn, port)
-
-
- #----- Methods for Consul nodes
-
- @staticmethod
- def lookup_node(node_name):
- """find the node record in consul"""
-
- node_path = "{}/v1/catalog/node/{}".format(os.environ.get("CONSUL_URL").rstrip("/"), node_name)
-
- ConsulClient._logger.info("lookup_node(%s)", node_path)
-
- try:
- response = requests.get(node_path, timeout=30)
- response.raise_for_status()
- # except requests.exceptions.HTTPError as e:
- # except requests.exceptions.ConnectionError as e:
- # except requests.exceptions.Timeout as e:
- except requests.exceptions.RequestException as e:
- msg = "lookup_node({}) requests.get({}) threw exception {}: {!s}".format(
- node_name, node_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- return_dict = response.json()
- # except ValueError as e:
- except Exception as e:
- msg = "lookup_node({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
- node_name, node_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientNodeNotFoundError(msg)
-
- if not return_dict:
- msg = "lookup_node({}) got empty or no value from requests.get({})".format(
- node_name, node_path)
- ConsulClient._logger.error(msg)
- raise ConsulClientNodeNotFoundError(msg)
-
- return return_dict
-
-
- #----- Methods for Consul key-values
-
- @staticmethod
- def put_value(key, data, cas=None):
- """put the value for key into consul-kv"""
-
- # ConsulClient._logger.info("put_value(%s)", str(key))
-
- URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key)
- if cas is not None:
- URL = '{}?cas={}'.format(URL, cas)
-
- try:
- response = requests.put(URL, data=json.dumps(data), timeout=30)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "put_value({}) requests.put({}) threw exception {}: {!s}".format(
- key, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- updated = response.json()
- except Exception as e:
- msg = "put_value({}) parsing JSON from requests.put({}) threw exception {}: {!s}".format(
- key, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientKVEntryNotFoundError(msg)
-
- return updated
-
-
- @staticmethod
- def get_value(key, get_index=False):
- """get the value for key from consul-kv"""
-
- URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key)
-
- try:
- response = requests.get(URL, timeout=30)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "get_value({}) requests.get({}) threw exception {}: {!s}".format(
- key, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- data = response.json()
- except Exception as e:
- msg = "get_value({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
- key, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientKVEntryNotFoundError(msg)
-
- if not data:
- msg = "get_value({}) got empty or no value from requests.get({})".format(
- key, URL)
- ConsulClient._logger.error(msg)
- raise ConsulClientKVEntryNotFoundError(msg)
-
- try:
- value = base64.b64decode(data[0]["Value"]).decode("utf-8")
- value_dict = json.loads(value)
- except Exception as e:
- msg = "get_value({}) decoding value from requests.get({}) threw exception {}: {!s}".format(
- key, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientKVEntryNotFoundError(msg)
-
- ConsulClient._logger.info("consul-kv key=%s value(%s) data=%s",
- key, value, json.dumps(data))
-
- if get_index:
- return data[0]["ModifyIndex"], value_dict
-
- return value_dict
-
-
- @staticmethod
- def get_kvs(prefix, nest=True, trim_prefix=False):
- """get key-values for keys beginning with prefix from consul-kv"""
-
- URL = ConsulClient.CONSUL_KVS_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), prefix)
-
- try:
- response = requests.get(URL, timeout=30)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "get_kvs({}) requests.get({}) threw exception {}: {!s}".format(
- prefix, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- data = response.json()
- except Exception as e:
- msg = "get_kvs({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
- prefix, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientKVEntryNotFoundError(msg)
-
- if not data:
- msg = "get_kvs({}) got empty or no value from requests.get({})".format(
- prefix, URL)
- ConsulClient._logger.error(msg)
- raise ConsulClientKVEntryNotFoundError(msg)
-
- def put_level_value(level_keys, value, level_dict={}):
- if level_keys:
- key = level_keys.pop(0)
- level_dict[key] = put_level_value(level_keys, value, level_dict.get(key, {}))
- return level_dict
- else:
- return value
-
- rdict = {}
- for item in data:
- v = base64.b64decode(item["Value"]).decode("utf-8")
- try:
- value = json.loads(v)
- except Exception as e:
- value = v
- key = item['Key']
- if trim_prefix:
- key = key[len(prefix):]
- if nest:
- level_keys = key.split('/')
- rdict = put_level_value(level_keys, value, rdict)
- else:
- rdict[key] = value
-
- ConsulClient._logger.info("consul-kv prefix=%s value(%s) data=%s",
- prefix, json.dumps(rdict), json.dumps(data))
- return rdict
-
-
- @staticmethod
- def _gen_txn_operation(verb, key, value=None):
- """returns the properly formatted operation to be used inside transaction"""
-
- # key = urllib.quote(key) # can't use urllib.quote() because it kills ':' in the key
- if value:
- return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(bytes(value, "utf-8")).decode("utf-8")}}
- return {"KV": {"Verb": verb, "Key": key}}
-
-
- @staticmethod
- def _run_transaction(operation_name, txn):
- """run a single transaction of several operations at consul /txn"""
-
- if not txn:
- return
-
- txn_url = ConsulClient.CONSUL_TRANSACTION_URL.format(os.environ.get("CONSUL_URL").rstrip("/"))
- response = None
- try:
- response = requests.put(txn_url, json=txn, timeout=30)
- except requests.exceptions.RequestException as e:
- ConsulClient._logger.error("failed to {} at {}: exception {}: {!s} on txn={}"
- .format(operation_name, txn_url, type(e).__name__, e, json.dumps(txn)))
- return
-
- if response.status_code != requests.codes.ok:
- ConsulClient._logger.error("failed {} {}: {} text={} txn={} headers={}"
- .format(operation_name, txn_url, response.status_code,
- response.text, json.dumps(txn),
- json.dumps(dict(list(response.request.headers.items())))))
- return
-
- ConsulClient._logger.info("response for {} {}: {} text={} txn={} headers={}"
- .format(operation_name, txn_url, response.status_code,
- response.text, json.dumps(txn),
- json.dumps(dict(list(response.request.headers.items())))))
-
- return True
-
-
- @staticmethod
- def store_kvs(kvs):
- """put kvs into consul-kv"""
-
- if not kvs:
- ConsulClient._logger.warning("kvs not supplied to store_kvs()")
- return
-
- store_kvs = [
- ConsulClient._gen_txn_operation(ConsulClient.OPERATION_SET,
- key, json.dumps(value))
- for key, value in kvs.items()
- ]
- txn = []
- idx_step = ConsulClient.MAX_OPS_PER_TXN - len(txn)
- for idx in range(0, len(store_kvs), idx_step):
- txn += store_kvs[idx : idx + idx_step]
- if not ConsulClient._run_transaction("store_kvs", txn):
- return False
- txn = []
-
- return ConsulClient._run_transaction("store_kvs", txn)
-
-
- @staticmethod
- def delete_key(key):
- """delete key from consul-kv"""
-
- if not key:
- ConsulClient._logger.warning("key not supplied to delete_key()")
- return
-
- delete_key = [
- ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE, key)
- ]
- return ConsulClient._run_transaction("delete_key", delete_key)
-
-
- @staticmethod
- def delete_kvs(key):
- """delete key from consul-kv"""
-
- if not key:
- ConsulClient._logger.warning("key not supplied to delete_kvs()")
- return
-
- delete_kvs = [
- ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE_FOLDER, key)
- ]
- return ConsulClient._run_transaction("delete_kvs", delete_kvs)
-
-
- #----- Methods for Config Binding Service
-
- @staticmethod
- def get_service_component(scn):
- config = json.dumps(ConsulClient.get_value(scn))
-
- try:
- dmaap = ConsulClient.get_value(scn + ":dmaap")
- except Exception as e:
- dmaap = None
- if dmaap:
- for key in list(dmaap.keys()):
- config = re.sub('"<<' + key + '>>"', json.dumps(dmaap[key]), config)
-
- try:
- rel = ConsulClient.get_value(scn + ":rel")
- except Exception as e:
- rel = None
- if rel:
- for key in list(rel.keys()):
- config = re.sub('"{{' + key + '}}"', json.dumps(rel[key]), config)
-
- return json.loads(config)
-
-
- @staticmethod
- def get_service_component_all(scn, policies_as_list=True):
- t_scn = scn + ":"
- t_len = len(t_scn)
- a_dict = ConsulClient.get_kvs(scn)
- b_dict = {}
- for key in a_dict:
- b_key = None
- if key == scn:
- b_dict["config"] = ConsulClient.get_service_component(scn)
- elif key == scn + ":dmaap":
- continue
- elif key[0:t_len] == t_scn:
- b_key = key[t_len:]
- # policies_as_list = True formats policies items in a list like ONAP's CBS; False keeps policy_ids keys
- if policies_as_list and b_key == "policies": # convert items from KVs to a values list
- b_dict[b_key] = {}
- for sub_key in a_dict[key]:
- if sub_key == "items":
- b_dict[b_key][sub_key] = []
- d_dict = a_dict[key][sub_key]
- for item in sorted(d_dict.keys()): # old CBS sorted them so we emulate
- b_dict[b_key][sub_key].append(d_dict[item])
- else:
- b_dict[b_key][sub_key] = copy.deepcopy(a_dict[key][sub_key])
- else:
- b_dict[b_key] = copy.deepcopy(a_dict[key])
- return b_dict
-
-
- @staticmethod
- def add_vnf_id(scn, vnf_type, vnf_id, dti_dict):
- """
- Add VNF instance to Consul scn:oti key.
-
- Treat its value as a JSON string representing a dict.
- Extend the dict by adding a dti_dict for vnf_id under vnf_type.
- Turn the resulting extended dict into a JSON string.
- Store the string back into Consul under scn:oti key.
- Watch out for conflicting concurrent updates.
- """
-
- key = scn + ':oti'
- lc_vnf_type = vnf_type.lower()
- while True: # do until update succeeds
- (mod_index, v) = ConsulClient.get_value(key, get_index=True)
- lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case
- # but DCAE-C doesn't create such keys
-
- if lc_vnf_type not in lc_v:
- return # That VNF type is not supported by this component
- lc_v[lc_vnf_type][vnf_id] = dti_dict # add or replace the VNF instance
-
- updated = ConsulClient.put_value(key, lc_v, cas=mod_index)
- if updated:
- return lc_v
-
-
- @staticmethod
- def delete_vnf_id(scn, vnf_type, vnf_id):
- """
- Delete VNF instance from Consul scn:oti key.
-
- Treat its value as a JSON string representing a dict.
- Modify the dict by deleting the vnf_id key entry from under vnf_type.
- Turn the resulting extended dict into a JSON string.
- Store the string back into Consul under scn:oti key.
- Watch out for conflicting concurrent updates.
- """
-
- key = scn + ':oti'
- lc_vnf_type = vnf_type.lower()
- while True: # do until update succeeds
- (mod_index, v) = ConsulClient.get_value(key, get_index=True)
- lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case
- # but DCAE-C doesn't create such keys
-
- if lc_vnf_type not in lc_v:
- return # That VNF type is not supported by this component
- if vnf_id not in lc_v[lc_vnf_type]:
- return lc_v
- del lc_v[lc_vnf_type][vnf_id] # delete the VNF instance
-
- updated = ConsulClient.put_value(key, lc_v, cas=mod_index)
- if updated:
- return lc_v
-
-
-if __name__ == "__main__":
- value = None
-
- if value:
- print(json.dumps(value, sort_keys=True, indent=4, separators=(',', ': ')))
diff --git a/oti/event-handler/otihandler/dbclient/__init__.py b/oti/event-handler/otihandler/dbclient/__init__.py
deleted file mode 100644
index ee3ec3e..0000000
--- a/oti/event-handler/otihandler/dbclient/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-from .models import Event
-from .models import EventAck
-from .db_dao import DaoBase
diff --git a/oti/event-handler/otihandler/dbclient/apis/__init__.py b/oti/event-handler/otihandler/dbclient/apis/__init__.py
deleted file mode 100644
index 05e3800..0000000
--- a/oti/event-handler/otihandler/dbclient/apis/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-from .db_access import DbAccess
-from .event_db_access import EventDbAccess
diff --git a/oti/event-handler/otihandler/dbclient/apis/db_access.py b/oti/event-handler/otihandler/dbclient/apis/db_access.py
deleted file mode 100644
index f064b30..0000000
--- a/oti/event-handler/otihandler/dbclient/apis/db_access.py
+++ /dev/null
@@ -1,50 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""
-Base class for APIs to interact with application database using sqlAlchemy ORM lib and postgresSql driver
-"""
-
-from sqlalchemy.orm import sessionmaker
-from ..db_dao import DaoBase
-import psycopg2
-from psycopg2.extras import execute_values
-import os
-import logging
-
-
-class DbAccess(object):
- logger = logging.getLogger("dti_handler.DbAccess")
- engine = None
- session = None
-
- def __init__(self):
- self.engine = DaoBase.getDbEngine()
- # create a configured "Session" class
- Session = sessionmaker(bind=self.engine)
-
- # create a Session
- self.session = Session()
-
- def saveDomainObject(self, obj):
- self.session.add(obj)
- self.session.commit()
- self.session.close()
-
- def deleteDomainObject(self,obj):
- self.session.delete(obj)
- self.session.commit()
- self.session.close()
diff --git a/oti/event-handler/otihandler/dbclient/apis/event_db_access.py b/oti/event-handler/otihandler/dbclient/apis/event_db_access.py
deleted file mode 100644
index 898ee8e..0000000
--- a/oti/event-handler/otihandler/dbclient/apis/event_db_access.py
+++ /dev/null
@@ -1,154 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-""" DB APIs to interact with application database using sqlAlchemy ORM lib and postgresSql driver"""
-
-from sqlalchemy import and_
-from sqlalchemy.orm.exc import NoResultFound
-
-from .db_access import DbAccess
-from ..models import Event, EventAck
-
-
-class EventDbAccess(DbAccess):
-
- def __init__(self):
- DbAccess.__init__(self)
-
- def query_event_item(self, target_type, target_name):
- try:
- query = self.session.query(Event).filter(Event.target_type == target_type).\
- filter(Event.target_name == target_name)
- evt = query.one()
- except NoResultFound:
- return None
- else:
- return evt
-
- def query_event_data(self, target_type, target_name):
- try:
- query = self.session.query(Event).filter(Event.target_type == target_type).\
- filter(Event.target_name == target_name)
- evt = query.one()
- except NoResultFound:
- return []
- else:
- try:
- ack_result = self.session.query(EventAck).filter(EventAck.event == evt).all()
- except NoResultFound:
- return []
- else:
- return ack_result
-
- def query_event_data_k8s(self, target_type, target_name):
- try:
- query = self.session.query(Event).filter(Event.target_type == target_type).\
- filter(Event.target_name == target_name)
- evt = query.one()
- except NoResultFound:
- return []
- else:
- try:
- ack_result = self.session.query(EventAck).filter(EventAck.event == evt).\
- filter(EventAck.container_type != 'docker').all()
- except NoResultFound:
- return []
- else:
- return ack_result
-
- def query_event_info_docker(self, prim_evt, service_component, deployment_id, container_id):
- try:
- query = self.session.query(EventAck).filter(EventAck.event == prim_evt).filter(
- and_(EventAck.service_component == service_component,
- EventAck.deployment_id == deployment_id,
- EventAck.container_id == container_id,
- EventAck.container_type == 'docker'))
- evt = query.one()
- except NoResultFound as nrf:
- raise nrf
- else:
- return evt
-
- def update_event_item(self, dti_event, target_type, target_name):
- self.session.query(Event).filter(Event.target_type == target_type). \
- filter(Event.target_name == target_name).update({Event.event:dti_event})
- self.session.commit()
-
- def query_raw_k8_events(self, cluster, pod, namespace):
- """
- run an inner JOIN query to dtih_event and dtih_event_ack tables using supplied query predicates
-
- :param cluster:
- :param pod:
- :param namespace:
- :return:
- Set of event objects related to k8s pods
- """
- try:
- return self.session.query(Event).filter(Event.dtih_event_id.in_(
- self.session.query(EventAck.dtih_event_id).filter(and_(EventAck.k8s_cluster_fqdn == cluster,
- EventAck.k8s_pod_id == pod,
- EventAck.k8s_namespace == namespace)))).all()
- except NoResultFound:
- print("invalid query or no data")
- return ()
-
- def query_raw_docker_events(self, target_types, locations):
- """
- run a query to dtih_event table using supplied query predicates
-
- :param target_types: required
- :param locations: optional
- :return:
- set of event objects related to docker container
- """
- try:
- if not locations or (len(locations) == 1 and locations[0] == ''):
- return self.session.query(Event).filter(Event.target_type.in_(target_types)).all()
- else:
- return self.session.query(Event).filter(Event.target_type.in_(target_types)).filter(
- Event.location_clli.in_(locations)).all()
- except NoResultFound:
- print("invalid query or no data")
- return ()
-
- def query_pod_info2(self, cluster):
- try:
- return self.session.query(EventAck).filter(EventAck.k8s_cluster_fqdn == cluster).all()
- except NoResultFound:
- print("invalid query or no data")
- return ()
-
- def query_pod_info(self, cluster):
- try:
- return self.session.query(EventAck.k8s_pod_id, EventAck.k8s_namespace,
- EventAck.k8s_proxy_fqdn, EventAck.k8s_service_name,
- EventAck.k8s_service_port)\
- .filter(EventAck.k8s_cluster_fqdn == cluster) \
- .distinct().order_by(EventAck.k8s_cluster_fqdn).all()
- except NoResultFound:
- print("invalid query or no data")
- return ()
-
- def query_event_data_k8s_pod(self, prim_evt, scn):
- try:
- query = self.session.query(EventAck).filter(EventAck.event == prim_evt).filter(
- and_(EventAck.service_component == scn))
- event_info = query.one()
- except NoResultFound:
- return None
- else:
- return event_info
diff --git a/oti/event-handler/otihandler/dbclient/db_dao.py b/oti/event-handler/otihandler/dbclient/db_dao.py
deleted file mode 100644
index 78fa058..0000000
--- a/oti/event-handler/otihandler/dbclient/db_dao.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-""" SqlAlchemy ORM engine for postgresSql dti database """
-
-from sqlalchemy import create_engine
-
-class DaoBase:
- _engine = None
-
- @staticmethod
- def init_db(dbConStr):
- if DaoBase._engine:
- return
- DaoBase._engine = create_engine(dbConStr)
-
- @staticmethod
- def getDbEngine():
- return DaoBase._engine
-
diff --git a/oti/event-handler/otihandler/dbclient/models/__init__.py b/oti/event-handler/otihandler/dbclient/models/__init__.py
deleted file mode 100644
index bc802f5..0000000
--- a/oti/event-handler/otihandler/dbclient/models/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-
-from .event import Event
-from .event_ack import EventAck
diff --git a/oti/event-handler/otihandler/dbclient/models/event.py b/oti/event-handler/otihandler/dbclient/models/event.py
deleted file mode 100644
index 553bec2..0000000
--- a/oti/event-handler/otihandler/dbclient/models/event.py
+++ /dev/null
@@ -1,40 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-""" ORM - mapping class for dtih_event table """
-
-from sqlalchemy import Column, String, Integer, ForeignKey, func
-from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP
-from sqlalchemy.ext.declarative import declarative_base
-import datetime
-
-
-Base = declarative_base()
-
-class Event(Base):
- __tablename__ = 'dtih_event'
- __table_args__ = {'schema': 'dti'}
- dtih_event_id = Column(Integer, primary_key=True)
- event = Column(JSONB)
- create_ts = Column(TIMESTAMP(timezone=True), default=func.now())
- last_modified_ts = Column(TIMESTAMP(timezone=True), default=func.now())
- target_name = Column(String)
- target_type = Column(String)
- location_clli = Column(String)
- # def __repr__(self):
- # return "<Event(event_id='%s', target_type='%s', target_name='%s')" % (
- # self.event_id, self.target_type, self.target_name
- # )
diff --git a/oti/event-handler/otihandler/dbclient/models/event_ack.py b/oti/event-handler/otihandler/dbclient/models/event_ack.py
deleted file mode 100644
index 2b19316..0000000
--- a/oti/event-handler/otihandler/dbclient/models/event_ack.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-""" ORM - mapping class for dtih_event_ack table """
-
-import datetime
-from sqlalchemy import Column, String, Integer, ForeignKey, func
-from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP
-from sqlalchemy.orm import relationship
-from sqlalchemy.ext.declarative import declarative_base
-from ..models import Event
-
-Base = declarative_base()
-
-class EventAck(Base):
- __tablename__ = 'dtih_event_ack'
- __table_args__ = {'schema': 'dti'}
- dtih_event_ack_id = Column(Integer, primary_key=True)
- create_ts = Column(TIMESTAMP(timezone=True), default=func.now())
- last_modified_ts = Column(TIMESTAMP(timezone=True), default=func.now())
- action = Column(String)
- k8s_namespace = Column(String)
- k8s_service_name = Column(String)
- k8s_service_port = Column(String)
- k8s_cluster_fqdn = Column(String)
- k8s_proxy_fqdn = Column(String)
- k8s_pod_id = Column(String)
- service_component = Column(String)
- deployment_id = Column(String)
- container_type = Column(String)
- docker_host = Column(String)
- container_id = Column(String)
- reconfig_script = Column(String)
- dtih_event_id = Column(Integer, ForeignKey(Event.dtih_event_id))
- event = relationship(Event)
-
- def update_action(self, action):
- setattr(self, 'action', action)
diff --git a/oti/event-handler/otihandler/docker_client.py b/oti/event-handler/otihandler/docker_client.py
deleted file mode 100644
index 621a1ec..0000000
--- a/oti/event-handler/otihandler/docker_client.py
+++ /dev/null
@@ -1,175 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""client interface to docker"""
-
-import docker
-import json
-import logging
-import time
-
-from otihandler.config import Config
-from otihandler.consul_client import ConsulClient
-from otihandler.utils import decrypt
-
-
-# class DockerClientError(RuntimeError):
-# pass
-
-class DockerClientConnectionError(RuntimeError):
- pass
-
-
-class DockerClient(object):
- """
- All Docker logins are in Consul's key-value store under
- "docker_plugin/docker_logins" as a list of json objects where
- each object is a single login:
-
- [{ "username": "XXXX", "password": "yyyy",
- "registry": "hostname.domain:18443" }]
- """
-
- _logger = logging.getLogger("oti_handler.docker_client")
-
- def __init__(self, docker_host, reauth=False):
- """Create Docker client
-
- Args:
- -----
- reauth: (boolean) Forces reauthentication, e.g., Docker login
- """
-
- (fqdn, port) = ConsulClient.get_service_fqdn_port(docker_host, node_meta=True)
- base_url = "https://{}:{}".format(fqdn, port)
-
- try:
- tls_config = docker.tls.TLSConfig(
- client_cert=(
- Config.tls_server_ca_chain_file,
- Config.tls_private_key_file
- )
- )
- self._client = docker.APIClient(base_url=base_url, tls=tls_config, version='auto', timeout=60)
-
- for dcl in ConsulClient.get_value("docker_plugin/docker_logins"):
- dcl['password'] = decrypt(dcl['password'])
- dcl["reauth"] = reauth
- self._client.login(**dcl)
-
- # except requests.exceptions.RequestException as e:
- except Exception as e:
- msg = "DockerClient.__init__({}) attempt to {} with TLS got exception {}: {!s}".format(
- docker_host, base_url, type(e).__name__, e)
-
- # Then try connecting to dockerhost without TLS
- try:
- base_url = "tcp://{}:{}".format(fqdn, port)
- self._client = docker.APIClient(base_url=base_url, tls=False, version='auto', timeout=60)
-
- for dcl in ConsulClient.get_value("docker_plugin/docker_logins"):
- dcl['password'] = decrypt(dcl['password'])
- dcl["reauth"] = reauth
- self._client.login(**dcl)
-
- # except requests.exceptions.RequestException as e:
- except Exception as e:
- msg = "{}\nDockerClient.__init__({}) attempt to {} without TLS got exception {}: {!s}".format(
- msg, docker_host, base_url, type(e).__name__, e)
- DockerClient._logger.error(msg)
- raise DockerClientConnectionError(msg)
-
- @staticmethod
- def build_cmd(script_path, use_sh=True, msg_type="dti", **kwargs):
- """Build command to execute"""
-
- data = json.dumps(kwargs or {})
-
- if use_sh:
- return ['/bin/sh', script_path, msg_type, data]
- else:
- return [script_path, msg_type, data]
-
- def notify_for_reconfiguration(self, container_id, cmd):
- """Notify Docker container that reconfiguration occurred
-
- Notify the Docker container by doing Docker exec of passed-in command
-
- Args:
- -----
- container_id: (string)
- cmd: (list) of strings each entry being part of the command
- """
-
- for attempts_remaining in range(11,-1,-1):
- try:
- result = self._client.exec_create(container=container_id, cmd=cmd)
- except docker.errors.APIError as e:
- # e # 500 Server Error: Internal Server Error ("{"message":"Container 624108d1ab96f24b568662ca0e5ffc39b59c1c57431aec0bef231fb62b04e166 is not running"}")
- DockerClient._logger.debug("exec_create() returned APIError: {!s}".format(e))
-
- # e.message # 500 Server Error: Internal Server Error
- # DockerClient._logger.debug("e.message: {}".format(e.message))
- # e.response.status_code # 500
- # DockerClient._logger.debug("e.response.status_code: {}".format(e.response.status_code))
- # e.response.reason # Internal Server Error
- # DockerClient._logger.debug("e.response.reason: {}".format(e.response.reason))
- # e.explanation # {"message":"Container 624108d1ab96f24b568662ca0e5ffc39b59c1c57431aec0bef231fb62b04e166 is not running"}
- # DockerClient._logger.debug("e.explanation: {}".format(e.explanation))
-
- # docker container restarting can wait
- if e.explanation and 'is restarting' in e.explanation.lower():
- DockerClient._logger.debug("notification exec_create() experienced: {!s}".format(e))
- if attempts_remaining == 0:
- result = None
- break
- time.sleep(10)
- # elif e.explanation and 'no such container' in e.explanation.lower():
- # elif e.explanation and 'is not running' in e.explanation.lower():
- else:
- DockerClient._logger.warn("aborting notification exec_create() because exception {}: {!s}".format(type(e).__name__, e))
- return str(e) # don't raise or CM will retry usually forever
- # raise DockerClientError(e)
- except Exception as e:
- DockerClient._logger.warn("aborting notification exec_create() because exception {}: {!s}".format(
- type(e).__name__, e))
- return str(e) # don't raise or CM will retry usually forever
- # raise DockerClientError(e)
- else:
- break
- if not result:
- DockerClient._logger.warn("aborting notification exec_create() because docker exec failed")
- return "notification unsuccessful" # failed to get an exec_id, perhaps trying multiple times, so don't raise or CM will retry usually forever
- DockerClient._logger.debug("notification exec_create() succeeded")
-
- for attempts_remaining in range(11,-1,-1):
- try:
- result = self._client.exec_start(exec_id=result['Id'])
- except Exception as e:
- DockerClient._logger.debug("notification exec_start() got exception {}: {!s}".format(type(e).__name__, e))
- if attempts_remaining == 0:
- DockerClient._logger.warn("aborting notification exec_start() because exception {}: {!s}".format(type(e).__name__, e))
- return str(e) # don't raise or CM will retry usually forever
- # raise DockerClientError(e)
- time.sleep(10)
- else:
- break
- DockerClient._logger.debug("notification exec_start() succeeded")
-
- DockerClient._logger.info("Pass to docker exec {} {} {}".format(
- container_id, cmd, result))
-
- return result
diff --git a/oti/event-handler/otihandler/dti_processor.py b/oti/event-handler/otihandler/dti_processor.py
deleted file mode 100644
index 5802233..0000000
--- a/oti/event-handler/otihandler/dti_processor.py
+++ /dev/null
@@ -1,815 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""OTI Event processor for handling all the event types"""
-
-import copy
-import json
-import logging
-from multiprocessing.dummy import Pool as ThreadPool
-from threading import Lock
-
-import requests
-
-from otihandler import utils
-from otihandler.cfy_client import CfyClient
-from otihandler.consul_client import ConsulClient
-from otihandler.dbclient.apis import EventDbAccess
-from otihandler.dbclient.models import Event, EventAck
-from otihandler.docker_client import DockerClient
-
-notify_response_arr = []
-lock = Lock()
-K8S_CLUSTER_PROXY_NODE_PORT = '30132'
-
-
-# def notify_docker(args_tuple):
-# """
-# event notification executor inside a process pool to communicate with docker container
-# interacts with docker client library
-# """
-# (dti_event, db_access, ack_item) = args_tuple
-# try:
-# dcae_service_action = dti_event.get('dcae_service_action')
-# component_scn = ack_item.service_component
-# deployment_id = ack_item.deployment_id
-# container_id = ack_item.container_id
-# docker_host = ack_item.docker_host
-# reconfig_script = ack_item.reconfig_script
-# container_type = 'docker'
-# except Exception as e:
-# return (
-# "ERROR", "dti_processor.notify_docker() processing args got exception {}: {!s}".format(type(e).__name__, e))
-# what = ""
-# try:
-# what = "{} in {} container {} on {} that was deployed by {}".format(
-# reconfig_script, container_type, container_id, docker_host, deployment_id)
-# if dcae_service_action == 'add':
-# add_action = {"dcae_service_action": "deploy"}
-# dti_event.update(add_action)
-#
-# if dcae_service_action == 'delete':
-# add_action = {"dcae_service_action": "undeploy"}
-# dti_event.update(add_action)
-#
-# # dkr = DockerClient(docker_host, reauth=False)
-# result = ''
-# # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ])
-# if dti_event.get('dcae_service_action') == 'undeploy':
-# # delete from dti_event_ack table
-# try:
-# db_access.deleteDomainObject(ack_item)
-# except Exception as e:
-# msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
-# DTIProcessor.logger.warning(msg)
-# return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-# else:
-# return (component_scn, "ran {}, got: {!s}".format(what, result))
-#
-# except Exception as e:
-# return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-
-
-def notify_svc(args_tuple):
- """
- add/update/delete event handler
- event notification executor inside a process pool to communicate with docker container and k8s services
- interacts with docker client library
- interacts with k8s node port services using REST client
- """
- (orig_dti_event, db_access, curr_evt, res_tuple) = args_tuple
- dti_event = copy.deepcopy(orig_dti_event)
- try:
- dcae_service_action = dti_event.get('dcae_service_action').lower()
-
- component_scn = res_tuple[0]
- deployment_id = res_tuple[1]
- container_id = res_tuple[2]
- node_id = res_tuple[3]
- docker_host = res_tuple[6]
- reconfig_script = res_tuple[7]
- container_type = res_tuple[8]
- except Exception as e:
- return ("ERROR", "dti_processor.notify_svc() processing args got exception {}: {!s}".format(type(e).__name__, e))
-
- what = ""
- if container_type == "docker":
- # exec reconfigure.sh in docker container
- try:
- what = "{} in {} container {} on {} that was deployed by {} node {}".format(
- reconfig_script, container_type, container_id, docker_host, deployment_id, node_id)
- if dcae_service_action == 'add':
- add_action = {"dcae_service_action": "deploy"}
- dti_event.update(add_action)
-
- if dcae_service_action == 'delete':
- add_action = {"dcae_service_action": "undeploy"}
- dti_event.update(add_action)
-
- dkr = DockerClient(docker_host, reauth=False)
- result = ''
- if dti_event.get('dcae_service_action') == 'update':
- # undeploy + deploy
- DTIProcessor.logger.debug("update 1 - running undeploy {}".format(what))
- dti_event.update({"dcae_service_action": "undeploy"})
- result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
- DTIProcessor.logger.debug("update 2 - running deploy {}".format(what))
- dti_event.update({"dcae_service_action": "deploy"})
- result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
- try:
- upd_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
- container_id)
- upd_evt_ack.update_action('update')
- db_access.saveDomainObject(upd_evt_ack)
- except Exception as e:
- msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
- else:
- DTIProcessor.logger.debug("running {}".format(what))
- result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
- if dti_event.get('dcae_service_action') == 'deploy':
- # add into dti_event_ack table
- try:
- add_evt_ack = EventAck(service_component=component_scn, deployment_id=deployment_id,
- container_type='docker', docker_host=docker_host,
- container_id=container_id, reconfig_script=reconfig_script,
- event=curr_evt,
- action='add')
- db_access.saveDomainObject(add_evt_ack)
- except Exception as e:
- msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
- else:
- # remove from dtih_event_ack if present
- if curr_evt:
- try:
- del_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
- container_id)
- db_access.deleteDomainObject(del_evt_ack)
- except Exception as e:
- msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
- except Exception as e:
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-
- return (component_scn, "ran {}, got: {!s}".format(what, result))
- elif container_type == "k8s":
- DTIProcessor.logger.debug("dti_processor.notify_svc() handling k8s component")
- # if action is 'update', check if k8s pod info exists already for this event in app db
- if dcae_service_action == 'add':
- DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for add action")
- return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
- elif dcae_service_action == 'update':
- # handle update for pods being tracked and handle add for new pods
- k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn)
- if k8s_scn_result:
- # update
- DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for update action")
- return notify_k8s_pod((dti_event, db_access, k8s_scn_result))
- else:
- # add
- DTIProcessor.logger.debug("dti_processor.notify_svc(), convert update to add action in k8s ")
- add_action = {"dcae_service_action": "add"}
- dti_event.update(add_action)
- return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
-
-
-def notify_k8s(args_tuple):
- """
- add event handler
- event notification executor inside a process pool to communicate with k8s statefulset nodeport service
- uses REST API client to call k8s services
- """
- (dti_event, db_access, curr_evt, res_tuple) = args_tuple
- component_scn = res_tuple[0]
- deployment_id = res_tuple[1]
- node_id = res_tuple[3]
- container_type = res_tuple[8]
- service_address = res_tuple[9]
- service_port = res_tuple[10]
- what = "{} in {} deployment {} that was deployed by {} node {}".format(
- "add", container_type, "statefulset", deployment_id, node_id)
- # call scn node port service REST API
- svc_nodeport_url = "https://{}:{}".format(service_address, service_port)
- try:
- DTIProcessor.logger.debug("running {}".format(what))
- response = requests.put(svc_nodeport_url, json=dti_event, timeout=50)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "collector nodeport service({}) threw exception {}: {!s}".format(
- svc_nodeport_url, type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
- try:
- event_ack_info = response.json()
- except Exception as e:
- msg = "collector nodeport service({}) threw exception {}: {!s}".format(
- svc_nodeport_url, type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-
- if not event_ack_info:
- msg = "collector nodeport service returned bad data"
- DTIProcessor.logger.error(msg)
- return (component_scn, "collector nodeport service returned bad data")
-
- namespace = event_ack_info.get("KubeNamespace")
- svc_name = event_ack_info.get("KubeServiceName")
- svc_port = event_ack_info.get("KubeServicePort")
- proxy_fqdn = event_ack_info.get("KubeProxyFqdn")
- cluster_fqdn = event_ack_info.get("KubeClusterFqdn")
- pod_name = event_ack_info.get("KubePod")
- statefulset = pod_name[0:pod_name.rindex('-')]
-
- what = "{} in {} deployment {} in namespace {} that was deployed by {} node {}".format(
- "add", container_type, statefulset, namespace, deployment_id, node_id)
- try:
- add_evt_ack = EventAck(k8s_namespace=namespace, k8s_service_name=svc_name, deployment_id=deployment_id,
- k8s_service_port=svc_port, k8s_cluster_fqdn=cluster_fqdn, k8s_proxy_fqdn=proxy_fqdn,
- k8s_pod_id=pod_name, event=curr_evt, action='add', container_type='k8s',
- service_component=component_scn)
- db_access.saveDomainObject(add_evt_ack)
- return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info))
- except Exception as e:
- msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-
-
-def notify_pods(args_tuple):
- """
- notify event handler
- event notification executor inside a process pool to communicate with k8s DTIH proxy nodeport service
- uses REST API client to call k8s services
- """
- event_ack_info = ''
- (dti_event, res_tuple) = args_tuple
- try:
- cluster = res_tuple[0]
- port = K8S_CLUSTER_PROXY_NODE_PORT
- namespace = res_tuple[1]
- svc_name = res_tuple[2]
- svc_port = res_tuple[4]
- replicas = res_tuple[3]
-
- for replica in range(replicas):
- pod_id = "sts-{}-{}".format(svc_name, replica)
- item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(cluster, port, namespace,
- pod_id, svc_name,
- svc_port)
- what = "{} for pod id {} in cluster {} and namespace {}".format("notify", pod_id, cluster, namespace)
- try:
- DTIProcessor.logger.debug("running {}".format(what))
- response = requests.put(item_pod_url, json=dti_event, timeout=50)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
- item_pod_url, type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- with lock:
- notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
- else:
- try:
- event_ack_info = response.json()
- except Exception as e:
- msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
- item_pod_url, type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- with lock:
- notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
-
- if not event_ack_info:
- msg = "stateful set proxy service returned bad data"
- DTIProcessor.logger.error(msg)
- with lock:
- notify_response_arr.append ((pod_id, "no acknowledgement - running {}".format(what)))
-
- with lock:
- notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info)))
- except Exception as e:
- with lock:
- notify_response_arr.append (("ERROR", "dti_processor.notify_pods() processing args got exception {}: {!s}".format(type(e).__name__, e)))
-
-def notify_k8s_pod(args_tuple):
- """
- update event handler
- event notification executor inside a process pool to communicate with k8s DTIH proxy service
- uses REST API client to call k8s services
- """
- item_pod_url = ''
- component_scn = ''
- (dti_event, db_access, ack_item) = args_tuple
- # call ingress proxy to dispatch delete event
-
- action = dti_event.get('dcae_service_action')
- what = "{} to {} ID {} in namespace {} that was deployed in cluster {}".format(
- action, 'k8s pod', ack_item.k8s_pod_id, ack_item.k8s_namespace, ack_item.k8s_cluster_fqdn)
- try:
- DTIProcessor.logger.debug("running {}".format(what))
- item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(
- ack_item.k8s_proxy_fqdn, K8S_CLUSTER_PROXY_NODE_PORT, ack_item.k8s_namespace,
- ack_item.k8s_pod_id, ack_item.k8s_service_name, ack_item.k8s_service_port)
- component_scn = ack_item.service_component
- response = requests.put(item_pod_url, json=dti_event, timeout=50)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "exception occured, stateful set proxy service({}) threw {}: {!s}".format(
- item_pod_url, type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- return (component_scn, "ran {}, got: {!s}".format(what, msg))
- else:
- if action == 'delete':
- try:
- db_access.deleteDomainObject(ack_item)
- except Exception as e:
- msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
- else:
- try:
- ack_item.update_action('update')
- db_access.saveDomainObject(ack_item)
- except Exception as e:
- msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-
- return (component_scn, "ran {}, got: {!s}".format(what, response.json()))
-
-
-class DTIProcessor(object):
- """
- Main event processing class that encapsulates all the logic of this handler application!
- An instance of this class is created per incoming client request.
-
- Generates input data by querying platform services - cloudify, consul, postgresSql
-
- It creates a pool of worker processes using a multiprocessing Pool class instance.
- Tasks are offloaded to the worker processes that exist in the pool.
- The input data is distributed across processes of the Pool object to enable parallel execution of
- event notification function across multiple input values (data parallelism).
- """
-
- logger = logging.getLogger("oti_handler.dti_processor")
- K8S_CLUSTER_PROXY_NODE_PORT = '30132'
- db_access = None
- docker_pool = None
- k8s_pool = None
-
- def __init__(self, dti_event, send_notification=True):
- self._result = {}
- self.event = dti_event
- self.is_notify = send_notification
- self.action = dti_event.get('dcae_service_action').lower()
- self.target_name = dti_event.get('dcae_target_name')
- self.target_type = dti_event.get('dcae_target_type', '').lower()
- self.event_clli = dti_event.get('dcae_service_location')
- res_dict = None
- try:
- self.docker_pool = ThreadPool(8)
- self.k8s_pool = ThreadPool(8)
- except Exception as e:
- msg = "DTIProcessor.__init__() creating ThreadPool got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
- raise e
- else:
- self.db_access = EventDbAccess()
- self.prim_db_event = None
- try:
- res_dict = self.dispatcher()
- except:
- raise
- finally:
- try:
- self.docker_pool.close()
- self.k8s_pool.close()
- except Exception as e:
- msg = "DTIProcessor.__init__() running pool.close() got exception {}: {!s}".format(type(e).__name__,
- e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
- try:
- self.docker_pool.join()
- self.k8s_pool.join()
- except Exception as e:
- msg = "DTIProcessor.__init__() running pool.join() got exception {}: {!s}".format(type(e).__name__,
- e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
-
- # if not send_notification:
- # DTIProcessor._logger.debug("DTIProcessor.__init__() not notifying DCAE-Controller components")
- # return
-
- if res_dict:
- try:
- utils.update_dict(self._result, res_dict)
- except Exception as e:
- msg = "DTIProcessor.__init__() running utils.update_dict() got exception {}: {!s}".format(
- type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
-
- DTIProcessor.logger.debug("DTIProcessor.__init__() done notifying new DCAE-Controller components")
-
- def dispatcher(self):
- """ dispatch method to execute specific method based on event type """
-
- arg = str(self.action)
- method = getattr(self, arg, lambda: "Invalid action")
- return method()
-
- def undeploy(self):
- """
- delete event from consul KV store, this functionality will be retired as events are stored
- in postgresSql oti database
- """
- global key
- try:
- # update Consul KV store with DTI Event - storing them in a folder for all components
- key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
- result = ConsulClient.delete_key(key)
- except Exception as e:
- msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- self._result['WARNING'] = msg
- else:
- if not result:
- msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name)
- DTIProcessor.logger.warning(msg)
- self._result['WARNING'] = msg
-
- def deploy(self):
- """
- add event to consul KV store, this functionality will be retired as events are stored
- in postgresSql oti database
- """
- dep_key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
- try:
- # update Consul KV store with DTI Event - storing them in a folder for all components
- result = ConsulClient.store_kvs({dep_key: self.event})
- except Exception as e:
- msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
- DTIProcessor.logger.warning(msg)
- self._result['WARNING'] = msg
-
- def add(self):
- """
- process DTI event that contains a new VNF instance that has to be configured in the collector microservices
- """
- res_dict = None
- try:
- msg = "processing add event for {}/{}".format(self.target_type, self.target_name)
- DTIProcessor.logger.debug(msg)
- # insert add event into dtih_event table
- self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
- location_clli=self.event_clli)
- self.db_access.saveDomainObject(self.prim_db_event)
- except Exception as e:
- msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warning(msg)
- self._result['ERROR'] = msg
- raise Exception(msg)
- else:
- if self.is_notify:
- try:
- # force the action to add, to avoid bad things later
- add_action = {"dcae_service_action": "add"}
- self.event.update(add_action)
- # mock up data
- mock_tp11 = (
- "scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
- "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
- "dcae-d1.idns.cip.corp.com", "30996")
- mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
- "docker_node_instance_id1",
- "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
- "dcae-d1.idns.cip.corp.com", "30996")
- # tpl_arr = []
- # tpl_arr.append(mock_tp11)
- # tpl_arr.append(mock_tp12)
- # res_dict = dict(self.docker_pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
- res_dict = dict(self.docker_pool.map(notify_svc,
- ((self.event, self.db_access, self.prim_db_event, tp) for tp in
- CfyClient().iter_components(self.target_type,
- dcae_service_location=self.event_clli))
- ))
- except Exception as e:
- msg = "DTIProcessor.__init__() running pool.map() got exception {}: {!s}".format(type(e).__name__,
- e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
- return res_dict
-
- def add_replay(self):
- """
- convert an update event flow and replay as an add event type since the event acknowledgement is missing
- from application database
- """
- res_dict = None
- try:
- # force the action to add, to avoid bad things later
- add_action = {"dcae_service_action": "add"}
- self.event.update(add_action)
- # mock up data
- mock_tp11 = ("scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
- "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
- "dcae-d1.idns.cip.corp.com", "30996")
- mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
- "docker_node_instance_id1",
- "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
- "dcae-d1.idns.cip.corp.com", "30996")
- # tpl_arr = []
- # tpl_arr.append(mock_tp11)
- # tpl_arr.append(mock_tp12)
- # res_dict = dict(self.pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
- res_dict = dict(self.docker_pool.map(notify_svc,
- ((self.event, self.db_access, self.prim_db_event, tp) for tp in
- CfyClient().iter_components(self.target_type,
- dcae_service_location=self.event_clli))
- ))
- except Exception as e:
- msg = "DTIProcessor.add_replay() running pool.map() got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
- return res_dict
-
- def delete(self):
- """
- process DTI event that indicates a VNF instance has to be removed from the collector microservices
- """
- res_dict = {}
- res_dict_k8s = {}
- res_dict_docker = {}
- try:
- self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
- if self.is_notify:
- try:
- msg = "processing delete event for {}/{} to relate with any docker hosts".format(
- self.target_type, self.target_name)
- DTIProcessor.logger.warning(msg)
- res_dict_docker = dict(self.docker_pool.map(notify_svc,
- ((self.event, self.db_access, self.prim_db_event, tp)
- for tp
- in CfyClient().iter_components_for_docker(
- self.target_type,
- dcae_service_location=self.event_clli))
- ))
- except Exception as e:
- msg = "DTIProcessor.delete() running docker_pool.map() got exception {}: {!s}".format(type(e).__name__,
- e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
-
- try:
- msg = "processing delete event for {}/{} to relate with any k8s hosts".format(
- self.target_type, self.target_name)
- DTIProcessor.logger.warning(msg)
- if self.prim_db_event:
- result = self.db_access.query_event_data_k8s(self.target_type, self.target_name)
- res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, (
- ((self.event, self.db_access, ack_item) for ack_item in result))))
- except Exception as e:
- msg = "DTIProcessor.delete() running k8s_pool.map() got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
-
- try:
- if self.prim_db_event:
- self.db_access.deleteDomainObject(self.prim_db_event)
- except Exception as e:
- msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warning(msg)
- self._result['ERROR'] = msg
- except Exception as e:
- msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warning(msg)
- self._result['ERROR'] = msg
-
- if res_dict_k8s:
- utils.update_dict(res_dict, res_dict_k8s)
-
- if res_dict_docker:
- utils.update_dict(res_dict, res_dict_docker)
-
- return res_dict
-
- def update(self):
- """
- process DTI event that indicates VNF instance has to be updated in the collector microservices
- """
- res_dict = {}
- res_dict_k8s = {}
- res_dict_docker = {}
-
- if self.is_notify:
- try:
- self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
- if self.prim_db_event:
- self.db_access.update_event_item(self.event, self.target_type, self.target_name)
- result = self.db_access.query_event_data(self.target_type, self.target_name)
- if len(result) == 0:
- msg = "processing update event for {}/{}, but event distribution info is not found in database, " \
- "replaying this event to cluster if required". \
- format(self.target_type, self.target_name)
- DTIProcessor.logger.warning(msg)
- self._result['WARNING'] = msg
- res_dict = self.add_replay()
- else:
- msg = "DTIProcessor.update() handle update flow for {}/{}, for k8s rediscover scn list and" \
- "identify new vs update cases".format(self.target_type, self.target_name)
- DTIProcessor.logger.debug(msg)
- try:
- tpl_arr = CfyClient().iter_components(self.target_type,
- dcae_service_location=self.event_clli)
- res_dict_docker = dict(self.docker_pool.map(notify_svc,
- ((
- self.event, self.db_access,
- self.prim_db_event,
- tp)
- for tp in tpl_arr)))
- except Exception as e:
- msg = "DTIProcessor.update() running docker_pool.map() got exception {}: {!s}".format(
- type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
- else:
- # event is new for the handler
- msg = "processing update event for {}/{}, but current event info is not found in database, " \
- "executing add event".format(self.target_type, self.target_name)
- DTIProcessor.logger.warning(msg)
- self._result['WARNING'] = msg
- res_dict = self.add()
- except Exception as e:
- msg = "DTIProcessor.update() got exception {}: {!s}".format(type(e).__name__, e)
- DTIProcessor.logger.error(msg)
- self._result['ERROR'] = msg
-
- if res_dict_k8s:
- utils.update_dict(res_dict, res_dict_k8s)
-
- if res_dict_docker:
- utils.update_dict(res_dict, res_dict_docker)
-
- return res_dict
-
- def notify(self):
- """
- event handler to notify all the pods in the kubernetes cluster whose FQDN is present in the incoming event
- This notification is meant for the cluster failover.
- """
- res_dict = {}
- try:
- self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
- if self.prim_db_event:
- self.db_access.update_event_item(self.event, self.target_type, self.target_name)
- else:
- self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
- location_clli=self.event_clli)
- self.db_access.saveDomainObject(self.prim_db_event)
- except Exception as e:
- msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warning(msg)
- self._result['ERROR'] = msg
-
- try:
- self.k8s_pool.map(notify_pods, ((self.event, tp) for tp in
- CfyClient().query_k8_components(self.target_name)))
- for k, v in notify_response_arr:
- res_dict[k] = v
- except Exception as e:
- msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
- DTIProcessor.logger.warning(msg)
- self._result['WARNING'] = msg
-
- return res_dict
-
- def get_result(self):
- return self._result
-
- @classmethod
- def get_k8_raw_events(cls, pod, cluster, namespace):
- """
- Get DTI events for a k8 stateful set pod container
-
- :param pod: required
- k8s stateful set pod ID that was configured with a specific set of DTI Events
- :param cluster: required
- k8s cluster FQDN where the mS was deployed
- :param namespace: required
- k8s namespace where the stateful set was deployed in that namespace
- :return:
- Dictionary of DTI event(s).
- DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
- """
- db_access = EventDbAccess()
- results = db_access.query_raw_k8_events(cluster, pod, namespace)
-
- target_types = set([])
- outer_dict = {}
-
- for evnt_item in results:
- target_types.add(evnt_item.target_type)
-
- for targ_type in target_types:
- inner_name_evt_dict = {}
- for evnt in results:
- if targ_type == evnt.target_type:
- inner_name_evt_dict[evnt.target_name] = evnt.event
-
- outer_dict[targ_type] = inner_name_evt_dict
-
- return outer_dict
-
- @classmethod
- def get_docker_raw_events(cls, service_name, service_location):
- """
- Get DTI events for docker container.
-
- Parameters
- ----------
- service_name : string
- required. The service component name assigned by dockerplugin to the component that is unique to the
- cloudify node instance and used in its Consul key(s).
- service_location : string
- optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location
- in service_location.
- If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul
- TAGs if service_name is provided,
- otherwise results are not location filtered.
-
- Returns
- -------
- dict
- Dictionary of DTI event(s).
- DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
-
- """
-
- r_dict = {}
-
- want_locs = []
- if service_location:
- want_locs = service_location.split(',')
-
- give_types = []
- if service_name:
- if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node
- try:
- node_name = ConsulClient.lookup_service(service_name)[0].get("Node")
- if node_name:
- services = ConsulClient.lookup_node(node_name).get("Services")
- if services:
- for node_svc in list(services.keys()):
- if "-component-dockerhost-" in node_svc:
- want_locs = services[node_svc].get("Tags", [])
- break
- except:
- pass
-
- try:
- supported_types = ConsulClient.get_value(service_name + ":oti")
- except:
- return r_dict
- else:
- if supported_types:
- supported_types = [t_type.lower() for t_type in list(supported_types.keys())]
- give_types = supported_types
- if not give_types or (len(give_types) == 1 and give_types[0] == ''):
- return r_dict
-
- db_access = EventDbAccess()
- results = db_access.query_raw_docker_events(give_types, want_locs)
-
- target_types = set([])
- outer_dict = {}
-
- for evnt_item in results:
- target_types.add(evnt_item.target_type)
-
- for targ_type in target_types:
- inner_name_evt_dict = {}
- for evnt in results:
- if targ_type == evnt.target_type:
- inner_name_evt_dict[evnt.target_name] = evnt.event
-
- outer_dict[targ_type] = inner_name_evt_dict
-
- return outer_dict
diff --git a/oti/event-handler/otihandler/onap/CommonLogger.py b/oti/event-handler/otihandler/onap/CommonLogger.py
deleted file mode 100644
index 3b5b477..0000000
--- a/oti/event-handler/otihandler/onap/CommonLogger.py
+++ /dev/null
@@ -1,958 +0,0 @@
-#!/usr/bin/python
-# -*- indent-tabs-mode: nil -*- vi: set expandtab:
-
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-""" Common Logging library in Python.
-
-CommonLogger.py
-
-Original Written by: Terry Schmalzried
-Date written: October 1, 2015
-Last updated: December 1, 2016
-
-version 0.8
-"""
-
-import os
-import logging
-import logging.handlers
-import re
-import socket
-import sys
-import threading
-import time
-
-
-class CommonLogger:
- """ Common Logging object.
-
- Public methods:
- __init__
- setFields
- debug
- info
- warn
- error
- fatal
- """
-
- UnknownFile = -1
- ErrorFile = 0
- DebugFile = 1
- AuditFile = 2
- MetricsFile = 3
- DateFmt = '%Y-%m-%dT%H:%M:%S'
- verbose = False
-
- def __init__(self, configFile, logKey, **kwargs):
- """Construct a Common Logger for one Log File.
-
- Arguments:
- configFile -- configuration filename.
- logKey -- the keyword in configFile that identifies the log filename.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages,
- one of CommonLogger.ErrorFile, CommonLogger.DebugFile,
- CommonLogger.AuditFile and CommonLogger.MetricsFile, or
- one of the strings "error", "debug", "audit" or "metrics".
- May also be set in the config file using a field named
- <logKey>Style (where <logKey> is the value of the logKey
- parameter). The keyword value overrides the value in the
- config file.
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._monitorFlag = False
-
- # Get configuration parameters
- self._logKey = str(logKey)
- self._configFile = str(configFile)
- self._rotateMethod = 'time'
- self._timeRotateIntervalType = 'midnight'
- self._timeRotateInterval = 1
- self._sizeMaxBytes = 0
- self._sizeRotateMode = 'a'
- self._socketHost = None
- self._socketPort = 0
- self._typeLogger = 'filelogger'
- self._backupCount = 6
- self._logLevelThreshold = self._intLogLevel('')
- self._logFile = None
- self._begTime = None
- self._begMsec = 0
- self._fields = {}
- self._fields["style"] = CommonLogger.UnknownFile
- try:
- self._configFileModified = os.path.getmtime(self._configFile)
- for line in open(self._configFile):
- line = line.split('#',1)[0] # remove comments
- if '=' in line:
- key, value = [x.strip() for x in line.split('=',1)]
- if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none']:
- self._rotateMethod = value.lower()
- elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']:
- self._timeRotateIntervalType = value
- elif key == 'timeRotateInterval' and int( value ) > 0:
- self._timeRotateInterval = int( value )
- elif key == 'sizeMaxBytes' and int( value ) >= 0:
- self._sizeMaxBytes = int( value )
- elif key == 'sizeRotateMode' and value in ['a']:
- self._sizeRotateMode = value
- elif key == 'backupCount' and int( value ) >= 0:
- self._backupCount = int( value )
- elif key == self._logKey + 'SocketHost':
- self._socketHost = value
- elif key == self._logKey + 'SocketPort' and int( value ) == 0:
- self._socketPort = int( value )
- elif key == self._logKey + 'LogType' and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']:
- self._typeLogger = value.lower()
- elif key == self._logKey + 'LogLevel':
- self._logLevelThreshold = self._intLogLevel(value.upper())
- elif key == self._logKey + 'Style':
- self._fields["style"] = value
- elif key == self._logKey:
- self._logFile = value
- except Exception as x:
- print("exception reading '%s' configuration file: %s" %(self._configFile, str(x)))
- sys.exit(2)
- except:
- print("exception reading '%s' configuration file" %(self._configFile))
- sys.exit(2)
-
- if self._logFile is None:
- print('configuration file %s is missing definition %s for log file' %(self._configFile, self._logKey))
- sys.exit(2)
-
-
- # initialize default log fields
- # timestamp will automatically be generated
- for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \
- 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \
- 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \
- 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \
- 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \
- 'errorDescription' ]:
- if key in kwargs and kwargs[key] != None:
- self._fields[key] = kwargs[key]
-
- self._resetStyleField()
-
- # Set up logger
- self._logLock = threading.Lock()
- with self._logLock:
- self._logger = logging.getLogger(self._logKey)
- self._logger.propagate = False
- self._createLogger()
-
- self._defaultServerInfo()
-
- # spawn a thread to monitor configFile for logLevel and logFile changes
- self._monitorFlag = True
- self._monitorThread = threading.Thread(target=self._monitorConfigFile, args=())
- self._monitorThread.daemon = True
- self._monitorThread.start()
-
-
- def _createLogger(self):
- if self._typeLogger == 'filelogger':
- self._mkdir_p(self._logFile)
- if self._rotateMethod == 'time':
- self._logHandler = logging.handlers.TimedRotatingFileHandler(self._logFile, \
- when=self._timeRotateIntervalType, interval=self._timeRotateInterval, \
- backupCount=self._backupCount, encoding=None, delay=False, utc=True)
- elif self._rotateMethod == 'size':
- self._logHandler = logging.handlers.RotatingFileHandler(self._logFile, \
- mode=self._sizeRotateMode, maxBytes=self._sizeMaxBytes, \
- backupCount=self._backupCount, encoding=None, delay=False)
-
- else:
- self._logHandler = logging.handlers.WatchedFileHandler(self._logFile, \
- mode=self._sizeRotateMode, \
- encoding=None, delay=False)
- elif self._typeLogger == 'stderrlogger':
- self._logHandler = logging.handlers.StreamHandler(sys.stderr)
- elif self._typeLogger == 'stdoutlogger':
- self._logHandler = logging.handlers.StreamHandler(sys.stdout)
- elif self._typeLogger == 'socketlogger':
- self._logHandler = logging.handlers.SocketHandler(self._socketHost, self._socketPort)
- elif self._typeLogger == 'nulllogger':
- self._logHandler = logging.handlers.NullHandler()
-
- if self._fields["style"] == CommonLogger.AuditFile or self._fields["style"] == CommonLogger.MetricsFile:
- self._logFormatter = logging.Formatter(fmt='%(begtime)s,%(begmsecs)03d+00:00|%(endtime)s,%(endmsecs)03d+00:00|%(message)s', datefmt=CommonLogger.DateFmt)
- else:
- self._logFormatter = logging.Formatter(fmt='%(asctime)s,%(msecs)03d+00:00|%(message)s', datefmt='%Y-%m-%dT%H:%M:%S')
- self._logFormatter.converter = time.gmtime
- self._logHandler.setFormatter(self._logFormatter)
- self._logger.addHandler(self._logHandler)
-
- def _resetStyleField(self):
- styleFields = ["error", "debug", "audit", "metrics"]
- if self._fields['style'] in styleFields:
- self._fields['style'] = styleFields.index(self._fields['style'])
-
- def __del__(self):
- if not self._monitorFlag:
- return
-
- self._monitorFlag = False
-
- if self._monitorThread is not None and self._monitorThread.is_alive():
- self._monitorThread.join()
-
- self._monitorThread = None
-
-
- def _defaultServerInfo(self):
-
- # If not set or purposely set = None, then set default
- if self._fields.get('server') is None:
- try:
- self._fields['server'] = socket.getfqdn()
- except Exception as err:
- try:
- self._fields['server'] = socket.gethostname()
- except Exception as err:
- self._fields['server'] = ""
-
- # If not set or purposely set = None, then set default
- if self._fields.get('serverIPAddress') is None:
- try:
- self._fields['serverIPAddress'] = socket.gethostbyname(self._fields['server'])
- except Exception as err:
- self._fields['serverIPAddress'] = ""
-
-
- def _monitorConfigFile(self):
- while self._monitorFlag:
- try:
- fileTime = os.path.getmtime(self._configFile)
- if fileTime > self._configFileModified:
- self._configFileModified = fileTime
- ReopenLogFile = False
- logFile = self._logFile
- with open(self._configFile) as fp:
- for line in fp:
- line = line.split('#',1)[0] # remove comments
- if '=' in line:
- key, value = [x.strip() for x in line.split('=',1)]
- if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none'] and self._rotateMethod != value:
- self._rotateMethod = value.lower()
- ReopenLogFile = True
- elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']:
- self._timeRotateIntervalType = value
- ReopenLogFile = True
- elif key == 'timeRotateInterval' and int( value ) > 0:
- self._timeRotateInterval = int( value )
- ReopenLogFile = True
- elif key == 'sizeMaxBytes' and int( value ) >= 0:
- self._sizeMaxBytes = int( value )
- ReopenLogFile = True
- elif key == 'sizeRotateMode' and value in ['a']:
- self._sizeRotateMode = value
- ReopenLogFile = True
- elif key == 'backupCount' and int( value ) >= 0:
- self._backupCount = int( value )
- ReopenLogFile = True
- elif key == self._logKey + 'SocketHost' and self._socketHost != value:
- self._socketHost = value
- ReopenLogFile = True
- elif key == self._logKey + 'SocketPort' and self._socketPort > 0 and self._socketPort != int( value ):
- self._socketPort = int( value )
- ReopenLogFile = True
- elif key == self._logKey + 'LogLevel' and self._logLevelThreshold != self._intLogLevel( value.upper() ):
- self._logLevelThreshold = self._intLogLevel(value.upper())
- elif key == self._logKey + 'LogType' and self._typeLogger != value and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']:
- self._typeLogger = value.lower()
- ReopenLogFile = True
- elif key == self._logKey + 'Style':
- self._fields["style"] = value
- self._resetStyleField()
- elif key == self._logKey and self._logFile != value:
- logFile = value
- ReopenLogFile = True
- if ReopenLogFile:
- with self._logLock:
- self._logger.removeHandler(self._logHandler)
- self._logFile = logFile
- self._createLogger()
- except Exception as err:
- pass
-
- time.sleep(5)
-
-
- def setFields(self, **kwargs):
- """Set default values for log fields.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \
- 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \
- 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \
- 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \
- 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \
- 'errorDescription' ]:
- if key in kwargs:
- if kwargs[key] != None:
- self._fields[key] = kwargs[key]
- elif key in self._fields:
- del self._fields[key]
-
- self._defaultServerInfo()
-
-
- def debug(self, message, **kwargs):
- """Write a DEBUG level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('DEBUG', message, errorCategory = 'DEBUG', **kwargs)
-
- def info(self, message, **kwargs):
- """Write an INFO level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('INFO', message, errorCategory = 'INFO', **kwargs)
-
- def warn(self, message, **kwargs):
- """Write a WARN level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('WARN', message, errorCategory = 'WARN', **kwargs)
-
- def error(self, message, **kwargs):
- """Write an ERROR level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('ERROR', message, errorCategory = 'ERROR', **kwargs)
-
- def fatal(self, message, **kwargs):
- """Write a FATAL level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('FATAL', message, errorCategory = 'FATAL', **kwargs)
-
- def _log(self, logLevel, message, **kwargs):
- """Write a message to the log file.
-
- Arguments:
- logLevel -- value ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', ...) for the log record.
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- # timestamp will automatically be inserted
- style = int(self._getVal('style', '', **kwargs))
- requestID = self._getVal('requestID', '', **kwargs)
- serviceInstanceID = self._getVal('serviceInstanceID', '', **kwargs)
- threadID = self._getVal('threadID', threading.currentThread().getName(), **kwargs)
- serverName = self._getVal('serverName', '', **kwargs)
- serviceName = self._getVal('serviceName', '', **kwargs)
- instanceUUID = self._getVal('instanceUUID', '', **kwargs)
- upperLogLevel = self._noSep(logLevel.upper())
- severity = self._getVal('severity', '', **kwargs)
- serverIPAddress = self._getVal('serverIPAddress', '', **kwargs)
- server = self._getVal('server', '', **kwargs)
- IPAddress = self._getVal('IPAddress', '', **kwargs)
- className = self._getVal('className', '', **kwargs)
- timer = self._getVal('timer', '', **kwargs)
- partnerName = self._getVal('partnerName', '', **kwargs)
- targetEntity = self._getVal('targetEntity', '', **kwargs)
- targetServiceName = self._getVal('targetServiceName', '', **kwargs)
- statusCode = self._getVal('statusCode', '', **kwargs)
- responseCode = self._getVal('responseCode', '', **kwargs)
- responseDescription = self._noSep(self._getVal('responseDescription', '', **kwargs))
- processKey = self._getVal('processKey', '', **kwargs)
- targetVirtualEntity = self._getVal('targetVirtualEntity', '', **kwargs)
- customField1 = self._getVal('customField1', '', **kwargs)
- customField2 = self._getVal('customField2', '', **kwargs)
- customField3 = self._getVal('customField3', '', **kwargs)
- customField4 = self._getVal('customField4', '', **kwargs)
- errorCategory = self._getVal('errorCategory', '', **kwargs)
- errorCode = self._getVal('errorCode', '', **kwargs)
- errorDescription = self._noSep(self._getVal('errorDescription', '', **kwargs))
- nbegTime = self._getArg('begTime', {}, **kwargs)
-
- detailMessage = self._noSep(message)
- if bool(re.match(r" *$", detailMessage)):
- return # don't log empty messages
-
- useLevel = self._intLogLevel(upperLogLevel)
- if CommonLogger.verbose: print("logger STYLE=%s" % style)
- if useLevel < self._logLevelThreshold:
- if CommonLogger.verbose: print("skipping because of level")
- pass
- else:
- with self._logLock:
- if style == CommonLogger.ErrorFile:
- if CommonLogger.verbose: print("using CommonLogger.ErrorFile")
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- %(requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName,
- errorCategory, errorCode, errorDescription, detailMessage))
- elif style == CommonLogger.DebugFile:
- if CommonLogger.verbose: print("using CommonLogger.DebugFile")
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- %(requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel,
- severity, serverIPAddress, server, IPAddress, className, timer, detailMessage))
- elif style == CommonLogger.AuditFile:
- if CommonLogger.verbose: print("using CommonLogger.AuditFile")
- endAuditTime, endAuditMsec = self._getTime()
- if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime:
- d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
- elif self._begTime is not None:
- d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
- else:
- d = { 'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
- self._begTime = None
- unused = ""
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
- statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel,
- severity, serverIPAddress, timer, server, IPAddress, className, unused,
- processKey, customField1, customField2, customField3, customField4, detailMessage), extra=d)
- elif style == CommonLogger.MetricsFile:
- if CommonLogger.verbose: print("using CommonLogger.MetricsFile")
- endMetricsTime, endMetricsMsec = self._getTime()
- if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime:
- d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
- elif self._begTime is not None:
- d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
- else:
- d = { 'begtime': endMetricsTime, 'begmsecs': endMetricsMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
- self._begTime = None
- unused = ""
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
- targetEntity, targetServiceName, statusCode, responseCode, responseDescription,
- instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server, IPAddress,
- className, unused, processKey, targetVirtualEntity, customField1, customField2,
- customField3, customField4, detailMessage), extra=d)
- else:
- print("!!!!!!!!!!!!!!!! style not set: %s" % self._fields["style"])
-
- def _getTime(self):
- ct = time.time()
- lt = time.localtime(ct)
- return (time.strftime(CommonLogger.DateFmt, lt), (ct - int(ct)) * 1000)
-
- def setStartRecordEvent(self):
- """
- Set the start time to be saved for both audit and metrics records
- """
- self._begTime, self._begMsec = self._getTime()
-
- def getStartRecordEvent(self):
- """
- Retrieve the start time to be used for either audit and metrics records
- """
- begTime, begMsec = self._getTime()
- return {'begTime':begTime, 'begMsec':begMsec}
-
- def _getVal(self, key, default, **kwargs):
- val = self._fields.get(key)
- if key in kwargs: val = kwargs[key]
- if val is None: val = default
- return self._noSep(val)
-
- def _getArg(self, key, default, **kwargs):
- val = None
- if key in kwargs: val = kwargs[key]
- if val is None: val = default
- return val
-
- def _noSep(self, message):
- if message is None: return ''
- return re.sub(r'[\|\n]', ' ', str(message))
-
- def _intLogLevel(self, logLevel):
- if logLevel == 'FATAL': useLevel = 50
- elif logLevel == 'ERROR': useLevel = 40
- elif logLevel == 'WARN': useLevel = 30
- elif logLevel == 'INFO': useLevel = 20
- elif logLevel == 'DEBUG': useLevel = 10
- else: useLevel = 0
- return useLevel
-
- def _mkdir_p(self, filename):
- """Create missing directories from a full filename path like mkdir -p"""
-
- if filename is None:
- return
-
- folder=os.path.dirname(filename)
-
- if folder == "":
- return
-
- if not os.path.exists(folder):
- try:
- os.makedirs(folder)
- except OSError as err:
- print("error number %d creating %s directory to hold %s logfile: %s" %(err.errno, err.filename, filename, err.strerror))
- sys.exit(2)
- except Exception as err:
- print("error creating %s directory to hold %s logfile: %s" %(folder, filename, str(err)))
- sys.exit(2)
-
-if __name__ == "__main__": # pragma: no cover
-
- def __checkOneTime(line):
- format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[+]00:00[|]'
- m = re.match(format, line)
- if not m:
- print("ERROR: time string did not match proper time format, %s" %line)
- print("\t: format=%s" % format)
- return 1
- return 0
-
- def __checkTwoTimes(line, different):
- format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|][0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|]'
- m = re.match(format, line)
- if not m:
- print("ERROR: time strings did not match proper time format, %s" %line)
- print("\t: format=%s" % format)
- return 1
- second1 = int(m.group(1))
- msec1 = int(m.group(2))
- second2 = int(m.group(3))
- msec2 = int(m.group(4))
- if second1 > second2: second2 += 60
- t1 = second1 * 1000 + msec1
- t2 = second2 * 1000 + msec2
- diff = t2 - t1
- # print("t1=%d (%d,%d) t2=%d (%d,%d), diff = %d" % (t1, second1, msec1, t2, second2, msec2, diff))
- if different:
- if diff < 500:
- print("ERROR: times did not differ enough: %s" % line)
- return 1
- else:
- if diff > 10:
- print("ERROR: times were too far apart: %s" % line)
- return 1
- return 0
-
- def __checkBegTime(line):
- format = "begTime should be ([-0-9T:]+)"
- # print("checkBegTime(%s)" % line)
- strt = 'begTime should be '
- i = line.index(strt)
- rest = line[i+len(strt):].rstrip()
- if not line.startswith(rest + ","):
- print("ERROR: line %s should start with %s" % (line,rest))
- return 1
- return 0
-
- def __checkLog(logfile, numLines, numFields):
- lineCount = 0
- errorCount = 0
- with open(logfile, "r") as fp:
- for line in fp:
- # print("saw line %s" % line)
- lineCount += 1
- c = line.count('|')
- if c != numFields:
- print("ERROR: wrong number of fields. Expected %d, got %d: %s" % (numFields, c, line))
- errorCount += 1
- if re.search("should not appear", line):
- print("ERROR: a line appeared that should not have appeared, %s" % line)
- errorCount += 1
- elif re.search("single time", line):
- errorCount += __checkOneTime(line)
- elif re.search("time should be the same", line):
- errorCount += __checkTwoTimes(line, different=False)
- elif re.search("time should be ", line):
- errorCount += __checkTwoTimes(line, different=True)
- elif re.search("begTime should be ", line):
- errorCount += __checkBegTime(line)
- else:
- print("ERROR: an unknown message appeared, %s" % line)
- errorCount += 1
-
- if lineCount != numLines:
- print("ERROR: expected %d lines, but got %d lines" % (numLines, lineCount))
- errorCount += 1
- return errorCount
-
- import os, argparse
- parser = argparse.ArgumentParser(description="test the CommonLogger functions")
- parser.add_argument("-k", "--keeplogs", help="Keep the log files after finishing the tests", action="store_true")
- parser.add_argument("-v", "--verbose", help="Print debugging messages", action="store_true")
- args = parser.parse_args()
-
- spid = str(os.getpid())
- if args.keeplogs:
- spid = ""
- logcfg = "/tmp/cl.log" + spid + ".cfg"
- errorLog = "/tmp/cl.error" + spid + ".log"
- metricsLog = "/tmp/cl.metrics" + spid + ".log"
- auditLog = "/tmp/cl.audit" + spid + ".log"
- debugLog = "/tmp/cl.debug" + spid + ".log"
- if args.verbose: CommonLogger.verbose = True
-
- import atexit
- def cleanupTmps():
- for f in [ logcfg, errorLog, metricsLog, auditLog, debugLog ]:
- try:
- os.remove(f)
- except:
- pass
- if not args.keeplogs:
- atexit.register(cleanupTmps)
-
- with open(logcfg, "w") as o:
- o.write("error = " + errorLog + "\n" +
- "errorLogLevel = WARN\n" +
- "metrics = " + metricsLog + "\n" +
- "metricsLogLevel = INFO\n" +
- "audit = " + auditLog + "\n" +
- "auditLogLevel = INFO\n" +
- "debug = " + debugLog + "\n" +
- "debugLogLevel = DEBUG\n")
-
- import uuid
- instanceUUID = uuid.uuid1()
- serviceName = "testharness"
- errorLogger = CommonLogger(logcfg, "error", style=CommonLogger.ErrorFile, instanceUUID=instanceUUID, serviceName=serviceName)
- debugLogger = CommonLogger(logcfg, "debug", style=CommonLogger.DebugFile, instanceUUID=instanceUUID, serviceName=serviceName)
- auditLogger = CommonLogger(logcfg, "audit", style=CommonLogger.AuditFile, instanceUUID=instanceUUID, serviceName=serviceName)
- metricsLogger = CommonLogger(logcfg, "metrics", style=CommonLogger.MetricsFile, instanceUUID=instanceUUID, serviceName=serviceName)
-
- testsRun = 0
- errorCount = 0
- errorLogger.debug("error calling debug (should not appear)")
- errorLogger.info("error calling info (should not appear)")
- errorLogger.warn("error calling warn (single time)")
- errorLogger.error("error calling error (single time)")
- errorLogger.setStartRecordEvent()
- time.sleep(1)
- errorLogger.fatal("error calling fatal, after setStartRecordEvent and sleep (start should be ignored, single time)")
- testsRun += 6
- errorCount += __checkLog(errorLog, 3, 10)
-
- auditLogger.debug("audit calling debug (should not appear)")
- auditLogger.info("audit calling info (time should be the same)")
- auditLogger.warn("audit calling warn (time should be the same)")
- auditLogger.error("audit calling error (time should be the same)")
- bt = auditLogger.getStartRecordEvent()
- # print("bt=%s" % bt)
- time.sleep(1)
- auditLogger.setStartRecordEvent()
- time.sleep(1)
- auditLogger.fatal("audit calling fatal, after setStartRecordEvent and sleep, time should be different)")
- time.sleep(1)
- auditLogger.fatal("audit calling fatal, begTime should be %s" % bt['begTime'], begTime=bt)
- testsRun += 7
- errorCount += __checkLog(auditLog, 5, 25)
-
- debugLogger.debug("debug calling debug (single time)")
- debugLogger.info("debug calling info (single time)")
- debugLogger.warn("debug calling warn (single time)")
- debugLogger.setStartRecordEvent()
- time.sleep(1)
- debugLogger.error("debug calling error, after SetStartRecordEvent and sleep (start should be ignored, single time)")
- debugLogger.fatal("debug calling fatal (single time)")
- errorCount += __checkLog(debugLog, 5, 13)
- testsRun += 6
-
- metricsLogger.debug("metrics calling debug (should not appear)")
- metricsLogger.info("metrics calling info (time should be the same)")
- metricsLogger.warn("metrics calling warn (time should be the same)")
- bt = metricsLogger.getStartRecordEvent()
- time.sleep(1)
- metricsLogger.setStartRecordEvent()
- time.sleep(1)
- metricsLogger.error("metrics calling error, after SetStartRecordEvent and sleep, time should be different")
- metricsLogger.fatal("metrics calling fatal (time should be the same)")
- time.sleep(1)
- metricsLogger.fatal("metrics calling fatal, begTime should be %s" % bt['begTime'], begTime=bt)
- testsRun += 6
- errorCount += __checkLog(metricsLog, 5, 28)
-
- print("%d tests run, %d errors found" % (testsRun, errorCount))
diff --git a/oti/event-handler/otihandler/onap/__init__.py b/oti/event-handler/otihandler/onap/__init__.py
deleted file mode 100644
index 87cf002..0000000
--- a/oti/event-handler/otihandler/onap/__init__.py
+++ /dev/null
@@ -1,15 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
diff --git a/oti/event-handler/otihandler/onap/audit.py b/oti/event-handler/otihandler/onap/audit.py
deleted file mode 100644
index 8cd16cf..0000000
--- a/oti/event-handler/otihandler/onap/audit.py
+++ /dev/null
@@ -1,375 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""generic class to keep track of request handling
- from receiving it through reponse and log all the activities
-
- call Audit.init("component-name", "path/to/config_file") to init the loggers before any requests
-
- start each outside request with creation of the Audit object
- audit = Audit(request_id=None, headers=None, msg=None)
-"""
-
-import os
-import sys
-import json
-import uuid
-import time
-import copy
-from datetime import datetime
-from threading import Lock
-from enum import Enum
-
-from .CommonLogger import CommonLogger
-from .health import Health
-
-REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID"
-REQUEST_REMOTE_ADDR = "Remote-Addr"
-REQUEST_HOST = "Host"
-HOSTNAME = "HOSTNAME"
-
-AUDIT_REQUESTID = 'requestID'
-AUDIT_IPADDRESS = 'IPAddress'
-AUDIT_SERVER = 'server'
-AUDIT_TARGET_ENTITY = 'targetEntity'
-
-HEADER_CLIENTAUTH = "clientauth"
-HEADER_AUTHORIZATION = "authorization"
-
-class AuditHttpCode(Enum):
- """audit http codes"""
- HTTP_OK = 200
- PERMISSION_UNAUTHORIZED_ERROR = 401
- PERMISSION_FORBIDDEN_ERROR = 403
- RESPONSE_ERROR = 400
- DATA_NOT_FOUND_ERROR = 404
- SERVER_INTERNAL_ERROR = 500
- SERVICE_UNAVAILABLE_ERROR = 503
- DATA_ERROR = 1030
- SCHEMA_ERROR = 1040
-
-class AuditResponseCode(Enum):
- """audit response codes"""
- SUCCESS = 0
- PERMISSION_ERROR = 100
- AVAILABILITY_ERROR = 200
- DATA_ERROR = 300
- SCHEMA_ERROR = 400
- BUSINESS_PROCESS_ERROR = 500
- UNKNOWN_ERROR = 900
-
- @staticmethod
- def get_response_code(http_status_code):
- """calculates the response_code from max_http_status_code"""
- response_code = AuditResponseCode.UNKNOWN_ERROR
- if http_status_code <= AuditHttpCode.HTTP_OK.value:
- response_code = AuditResponseCode.SUCCESS
-
- elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value,
- AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]:
- response_code = AuditResponseCode.PERMISSION_ERROR
- elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value:
- response_code = AuditResponseCode.AVAILABILITY_ERROR
- elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value:
- response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR
- elif http_status_code in [AuditHttpCode.DATA_ERROR.value,
- AuditHttpCode.RESPONSE_ERROR.value,
- AuditHttpCode.DATA_NOT_FOUND_ERROR.value]:
- response_code = AuditResponseCode.DATA_ERROR
- elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value:
- response_code = AuditResponseCode.SCHEMA_ERROR
-
- return response_code
-
- @staticmethod
- def get_human_text(response_code):
- """convert enum name into human readable text"""
- if not response_code:
- return "unknown"
- return response_code.name.lower().replace("_", " ")
-
-class Audit(object):
- """put the audit object on stack per each initiating request in the system
-
- :request_id: is the X-ECOMP-RequestID for tracing
-
- :req_message: is the request message string for logging
-
- :aud_parent: is the parent request - used for sub-query metrics to other systems
-
- :kwargs: - put any request related params into kwargs
- """
- _service_name = ""
- _service_version = ""
- _service_instance_uuid = str(uuid.uuid4())
- _started = datetime.now()
- _logger_debug = None
- _logger_error = None
- _logger_metrics = None
- _logger_audit = None
- _health = Health()
- _py_ver = sys.version.replace("\n", "")
-
- @staticmethod
- def init(service_name, service_version, config_file_path):
- """init static invariants and loggers"""
- Audit._service_name = service_name
- Audit._service_version = service_version
- Audit._logger_debug = CommonLogger(config_file_path, "debug", \
- instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
- Audit._logger_error = CommonLogger(config_file_path, "error", \
- instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
- Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \
- instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
- Audit._logger_audit = CommonLogger(config_file_path, "audit", \
- instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
-
- @staticmethod
- def health():
- """returns json for health check"""
- now = datetime.now()
- return {
- "service_name" : Audit._service_name,
- "service_version" : Audit._service_version,
- "service_instance_UUID" : Audit._service_instance_uuid,
- "python" : Audit._py_ver,
- "started" : str(Audit._started),
- "now" : str(now),
- "uptime" : str(now - Audit._started),
- "stats" : Audit._health.dump(),
- "packages" : "N/A" # Audit._packages
- }
-
- def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs):
- """create audit object per each request in the system
-
- :request_id: is the X-ECOMP-RequestID for tracing
- :req_message: is the request message string for logging
- :aud_parent: is the parent Audit - used for sub-query metrics to other systems
- :kwargs: - put any request related params into kwargs
- """
- self.request_id = request_id
- self.req_message = req_message or ""
- self.aud_parent = aud_parent
- self.kwargs = kwargs or {}
-
- self.retry_get_config = False
- self.max_http_status_code = 0
- self._lock = Lock()
-
- if self.aud_parent:
- if not self.request_id:
- self.request_id = self.aud_parent.request_id
- if not self.req_message:
- self.req_message = self.aud_parent.req_message
- self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs)
- else:
- headers = self.kwargs.get("headers", {})
- if headers:
- if not self.request_id:
- self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID)
- if AUDIT_IPADDRESS not in self.kwargs:
- self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR)
- if AUDIT_SERVER not in self.kwargs:
- self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST)
-
- if AUDIT_SERVER not in self.kwargs:
- self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME)
-
- created_req = ""
- if not self.request_id:
- created_req = " with new"
- self.request_id = str(uuid.uuid4())
-
- self.kwargs[AUDIT_REQUESTID] = self.request_id
-
- self._started = time.time()
- self._start_event = Audit._logger_audit.getStartRecordEvent()
- self.metrics_start()
-
- if not self.aud_parent:
- self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\
- .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
-
- def merge_all_kwargs(self, **kwargs):
- """returns the merge of copy of self.kwargs with the param kwargs"""
- all_kwargs = self.kwargs.copy()
- if kwargs:
- all_kwargs.update(kwargs)
- return all_kwargs
-
- def set_http_status_code(self, http_status_code):
- """accumulate the highest(worst) http status code"""
- self._lock.acquire()
- if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
- self.max_http_status_code = max(http_status_code, self.max_http_status_code)
- self._lock.release()
-
- def get_max_http_status_code(self):
- """returns the highest(worst) http status code"""
- self._lock.acquire()
- max_http_status_code = self.max_http_status_code
- self._lock.release()
- return max_http_status_code
-
- @staticmethod
- def get_status_code(success):
- """COMPLETE versus ERROR"""
- if success:
- return 'COMPLETE'
- return 'ERROR'
-
- @staticmethod
- def hide_secrets(obj):
- """hides the known secret field values of the dictionary"""
- if not isinstance(obj, dict):
- return obj
-
- for key in obj:
- if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]:
- obj[key] = "*"
- elif isinstance(obj[key], dict):
- obj[key] = Audit.hide_secrets(obj[key])
-
- return obj
-
- @staticmethod
- def log_json_dumps(obj, **kwargs):
- """hide the known secret field values of the dictionary and return json.dumps"""
- if not isinstance(obj, dict):
- return json.dumps(obj, **kwargs)
-
- return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs)
-
- def is_serious_error(self, status_code):
- """returns whether the response_code is success and a human text for response code"""
- return AuditResponseCode.PERMISSION_ERROR.value \
- == AuditResponseCode.get_response_code(status_code).value \
- or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value
-
- def _get_response_status(self):
- """calculates the response status fields from max_http_status_code"""
- max_http_status_code = self.get_max_http_status_code()
- response_code = AuditResponseCode.get_response_code(max_http_status_code)
- success = (response_code.value == AuditResponseCode.SUCCESS.value)
- response_description = AuditResponseCode.get_human_text(response_code)
- return success, max_http_status_code, response_code, response_description
-
- def is_success(self):
- """returns whether the response_code is success and a human text for response code"""
- success, _, _, _ = self._get_response_status()
- return success
-
- def debug(self, log_line, **kwargs):
- """debug - the debug=lowest level of logging"""
- Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs))
-
- def info(self, log_line, **kwargs):
- """debug - the info level of logging"""
- Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs))
-
- def info_requested(self, result=None, **kwargs):
- """info "requested ..." - the info level of logging"""
- self.info("requested {0} {1}".format(self.req_message, result or ""), \
- **self.merge_all_kwargs(**kwargs))
-
- def warn(self, log_line, **kwargs):
- """debug+error - the warn level of logging"""
- all_kwargs = self.merge_all_kwargs(**kwargs)
- Audit._logger_debug.warn(log_line, **all_kwargs)
- Audit._logger_error.warn(log_line, **all_kwargs)
-
- def error(self, log_line, **kwargs):
- """debug+error - the error level of logging"""
- all_kwargs = self.merge_all_kwargs(**kwargs)
- Audit._logger_debug.error(log_line, **all_kwargs)
- Audit._logger_error.error(log_line, **all_kwargs)
-
- def fatal(self, log_line, **kwargs):
- """debug+error - the fatal level of logging"""
- all_kwargs = self.merge_all_kwargs(**kwargs)
- Audit._logger_debug.fatal(log_line, **all_kwargs)
- Audit._logger_error.fatal(log_line, **all_kwargs)
-
- @staticmethod
- def get_elapsed_time(started):
- """returns the elapsed time since started in milliseconds"""
- return int(round(1000 * (time.time() - started)))
-
- def metrics_start(self, log_line=None, **kwargs):
- """reset metrics timing"""
- self._metrics_started = time.time()
- self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent()
- if log_line:
- self.info(log_line, **self.merge_all_kwargs(**kwargs))
-
- def metrics(self, log_line, **kwargs):
- """debug+metrics - the metrics=sub-audit level of logging"""
- all_kwargs = self.merge_all_kwargs(**kwargs)
- success, max_http_status_code, response_code, response_description = \
- self._get_response_status()
- metrics_func = None
- timer = Audit.get_elapsed_time(self._metrics_started)
- if success:
- log_line = "done: {0}".format(log_line)
- self.info(log_line, **all_kwargs)
- metrics_func = Audit._logger_metrics.info
- Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer)
- else:
- log_line = "failed: {0}".format(log_line)
- self.error(log_line, errorCode=response_code.value, \
- errorDescription=response_description, **all_kwargs)
- metrics_func = Audit._logger_metrics.error
- Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer)
-
- metrics_func(log_line, begTime=self._metrics_start_event, timer=timer,
- statusCode=Audit.get_status_code(success), responseCode=response_code.value,
- responseDescription=response_description,
- **all_kwargs
- )
-
- self.metrics_start()
- return (success, max_http_status_code, response_description)
-
- def audit_done(self, result=None, **kwargs):
- """debug+audit - the audit=top level of logging"""
- all_kwargs = self.merge_all_kwargs(**kwargs)
- success, max_http_status_code, response_code, response_description = \
- self._get_response_status()
- log_line = "{0} {1}".format(self.req_message, result or "").strip()
- audit_func = None
- timer = Audit.get_elapsed_time(self._started)
- if success:
- log_line = "done: {0}".format(log_line)
- self.info(log_line, **all_kwargs)
- audit_func = Audit._logger_audit.info
- Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer)
- else:
- log_line = "failed: {0}".format(log_line)
- self.error(log_line, errorCode=response_code.value,
- errorDescription=response_description, **all_kwargs)
- audit_func = Audit._logger_audit.error
- Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer)
-
- audit_func(log_line, begTime=self._start_event, timer=timer,
- statusCode=Audit.get_status_code(success),
- responseCode=response_code.value,
- responseDescription=response_description,
- **all_kwargs
- )
-
- return (success, max_http_status_code, response_description)
- # this line added to test
diff --git a/oti/event-handler/otihandler/onap/health.py b/oti/event-handler/otihandler/onap/health.py
deleted file mode 100644
index 39edc0d..0000000
--- a/oti/event-handler/otihandler/onap/health.py
+++ /dev/null
@@ -1,102 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""generic class to keep track of app health"""
-
-import uuid
-from datetime import datetime
-from threading import Lock
-
-
-class HealthStats(object):
- """keep track of stats for calls"""
- def __init__(self, name):
- """keep track of stats for metrics calls"""
- self._name = name or "stats_" + str(uuid.uuid4())
- self._lock = Lock()
- self._call_count = 0
- self._error_count = 0
- self._longest_timer = 0
- self._total_timer = 0
- self._last_success = None
- self._last_error = None
-
- def dump(self):
- """returns dict of stats"""
- dump = None
- with self._lock:
- dump = {
- "call_count" : self._call_count,
- "error_count" : self._error_count,
- "last_success" : str(self._last_success),
- "last_error" : str(self._last_error),
- "longest_timer_millisecs" : self._longest_timer,
- "ave_timer_millisecs" : (float(self._total_timer)/self._call_count \
- if self._call_count else 0)
- }
- return dump
-
- def success(self, timer):
- """records the successful execution"""
- with self._lock:
- self._call_count += 1
- self._last_success = datetime.now()
- self._total_timer += timer
- if not self._longest_timer or self._longest_timer < timer:
- self._longest_timer = timer
-
- def error(self, timer):
- """records the errored execution"""
- with self._lock:
- self._call_count += 1
- self._error_count += 1
- self._last_error = datetime.now()
- self._total_timer += timer
- if not self._longest_timer or self._longest_timer < timer:
- self._longest_timer = timer
-
-class Health(object):
- """Health stats for multiple requests"""
- def __init__(self):
- """Health stats for application"""
- self._all_stats = {}
- self._lock = Lock()
-
- def _add_or_get_stats(self, stats_name):
- """add to or get from the ever growing dict of HealthStats"""
- stats = None
- with self._lock:
- stats = self._all_stats.get(stats_name)
- if not stats:
- self._all_stats[stats_name] = stats = HealthStats(stats_name)
- return stats
-
- def success(self, stats_name, timer):
- """records the successful execution on stats_name"""
- stats = self._add_or_get_stats(stats_name)
- stats.success(timer)
-
- def error(self, stats_name, timer):
- """records the error execution on stats_name"""
- stats = self._add_or_get_stats(stats_name)
- stats.error(timer)
-
- def dump(self):
- """returns dict of stats"""
- with self._lock:
- stats = dict((k, v.dump()) for (k, v) in self._all_stats.items())
-
- return stats
diff --git a/oti/event-handler/otihandler/utils.py b/oti/event-handler/otihandler/utils.py
deleted file mode 100644
index 4f9dbda..0000000
--- a/oti/event-handler/otihandler/utils.py
+++ /dev/null
@@ -1,83 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-import base64
-import collections
-import copy
-import os
-
-from Crypto import Random
-from Crypto.Cipher import PKCS1_v1_5
-from Crypto.Hash import SHA
-from Crypto.PublicKey import RSA
-
-
-def update_dict(d, u):
- """Recursively updates dict
-
- Update dict d with dict u
- """
- for k, v in u.items():
- if isinstance(v, collections.Mapping):
- r = update_dict(d.get(k, {}), v)
- d[k] = r
- else:
- d[k] = u[k]
- return d
-
-def replace_token(configure_content):
- try:
- with open("/opt/app/config-map/dcae-k8s-cluster-token",'r') as fh:
- dcae_token = fh.readline().rstrip('\n')
-
- new_config = copy.deepcopy(configure_content)
-
- # override the default-user token
- ix=0
- for user in new_config['users'][:]:
- if user['name'] == "default-user":
- new_config['users'][ix] = {
- "name": "default-user",
- "user": {
- "token": dcae_token
- }
- }
- ix += 1
-
- return new_config
-
- except Exception as e:
- return configure_content
-
-def decrypt(b64_ciphertext):
- """returns decrypted b64_ciphertext that was encoded like this:
-
- echo "cleartext" | openssl pkeyutl -encrypt -pubin -inkey rsa.pub | base64 --wrap=0
-
- requires private key in environment variable EOMUSER_PRIVATE
- """
-
- if len(b64_ciphertext) <= 30: # For transition, assume short values are not encrypted
- return b64_ciphertext
-
- try:
- ciphertext = base64.b64decode(b64_ciphertext)
- key = RSA.importKey(os.getenv('EOMUSER_PRIVATE'))
- cleartext = PKCS1_v1_5.new(key).decrypt(ciphertext, Random.new().read(15+SHA.digest_size))
- except Exception as e:
- return b64_ciphertext
-
- return cleartext
diff --git a/oti/event-handler/otihandler/web_server.py b/oti/event-handler/otihandler/web_server.py
deleted file mode 100644
index 45c407f..0000000
--- a/oti/event-handler/otihandler/web_server.py
+++ /dev/null
@@ -1,605 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""web-service for oti_handler"""
-
-import json
-import logging
-import os
-import time
-from datetime import datetime
-
-import cherrypy
-
-from otihandler.cbs_rest import CBSRest
-from otihandler.config import Config
-from otihandler.dti_processor import DTIProcessor
-from otihandler.onap.audit import Audit
-
-
-class DTIWeb(object):
- """run REST API of OTI Handler"""
-
- logger = logging.getLogger("oti_handler.web_server")
- HOST_INADDR_ANY = ".".join("0"*4)
-
- @staticmethod
- def run_forever(audit):
- """run the web-server of OTI Handler forever"""
-
- cherrypy.config.update({"server.socket_host": DTIWeb.HOST_INADDR_ANY,
- "server.socket_port": Config.wservice_port})
-
- protocol = "http"
- tls_info = ""
- if Config.tls_server_cert_file and Config.tls_private_key_file:
- tm_cert = os.path.getmtime(Config.tls_server_cert_file)
- tm_key = os.path.getmtime(Config.tls_private_key_file)
- #cherrypy.server.ssl_module = 'builtin'
- cherrypy.server.ssl_module = 'pyOpenSSL'
- cherrypy.server.ssl_certificate = Config.tls_server_cert_file
- cherrypy.server.ssl_private_key = Config.tls_private_key_file
- if Config.tls_server_ca_chain_file:
- cherrypy.server.ssl_certificate_chain = Config.tls_server_ca_chain_file
- protocol = "https"
- tls_info = "cert: {} {} {}".format(Config.tls_server_cert_file,
- Config.tls_private_key_file,
- Config.tls_server_ca_chain_file)
-
- cherrypy.tree.mount(_DTIWeb(), '/')
-
- DTIWeb.logger.info(
- "%s with config: %s", audit.info("running oti_handler as {}://{}:{} {}".format(
- protocol, cherrypy.server.socket_host, cherrypy.server.socket_port, tls_info)),
- json.dumps(cherrypy.config))
- cherrypy.engine.start()
-
- # If HTTPS server certificate changes, exit to let kubernetes restart us
- if Config.tls_server_cert_file and Config.tls_private_key_file:
- while True:
- time.sleep(600)
- c_tm_cert = os.path.getmtime(Config.tls_server_cert_file)
- c_tm_key = os.path.getmtime(Config.tls_private_key_file)
- if c_tm_cert > tm_cert or c_tm_key > tm_key:
- DTIWeb.logger.info("cert or key file updated")
- cherrypy.engine.stop()
- cherrypy.engine.exit()
- break
-
-
-class _DTIWeb(object):
- """REST API of DTI Handler"""
-
- VALID_EVENT_TYPES = ['deploy', 'undeploy', 'add', 'delete', 'update', 'notify']
-
- @staticmethod
- def _get_request_info(request):
- """Returns info about the http request."""
-
- return "{} {}{}".format(request.method, request.script_name, request.path_info)
-
-
- #----- Common endpoint methods
-
- @cherrypy.expose
- @cherrypy.tools.json_out()
- def healthcheck(self):
- """Returns healthcheck results."""
-
- req_info = _DTIWeb._get_request_info(cherrypy.request)
- audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
-
- DTIWeb.logger.info("%s", req_info)
-
- result = Audit.health()
-
- DTIWeb.logger.info("healthcheck %s: result=%s", req_info, json.dumps(result))
-
- audit.audit_done(result=json.dumps(result))
- return result
-
- @cherrypy.expose
- def shutdown(self):
- """Shutdown the web server."""
-
- req_info = _DTIWeb._get_request_info(cherrypy.request)
- audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
-
- DTIWeb.logger.info("%s: --- stopping REST API of DTI Handler ---", req_info)
-
- cherrypy.engine.exit()
-
- health = json.dumps(Audit.health())
- audit.info("oti_handler health: {}".format(health))
- DTIWeb.logger.info("oti_handler health: %s", health)
- DTIWeb.logger.info("%s: --------- the end -----------", req_info)
- result = str(datetime.now())
- audit.info_requested(result)
- return "goodbye! shutdown requested {}".format(result)
-
- # ----- DTI Handler mock endpoint methods
- @cherrypy.expose
- @cherrypy.tools.json_out()
- @cherrypy.tools.json_in()
- def mockevents(self):
-
- result = {"KubeNamespace":"com-my-dcae-test", "KubePod":"pod-0", "KubeServiceName":"pod-0.service.local", "KubeServicePort":"8880", "KubeClusterFqdn":"fqdn-1"}
-
- return result
-
- #----- DTI Handler endpoint methods
-
- @cherrypy.expose
- @cherrypy.tools.json_out()
- @cherrypy.tools.json_in()
- def events(self, notify="y"):
- """
- Run dti reconfig script in service component instances configured to accept the DTI Event.
-
- POST /events < <dcae_event>
-
- POST /events?ndtify="n" < <dcae_event>
-
- where <dcae_event> is the entire DTI Event passed as a JSON object and contains at least these top-level keys:
- dcae_service_action : string
- required, 'deploy' or 'undeploy'
- dcae_target_name : string
- required, VNF Instance ID
- dcae_target_type : string
- required, VNF Type of the VNF Instance
- dcae_service_location : string
- optional, CLLI location. Not provided or '' infers all locations.
-
- Parameters
- ----------
- notify : string
- optional, default "y", any of these will not notify components: [ "f", "false", "False", "FALSE", "n", "no" ]
- When "n" will **not** notify components of this DTI Event update to Consul.
-
- Returns
- -------
- dict
- JSON object containing success or error of executing the dti reconfig script on
- each component instance's docker container, keyed by service_component_name.
-
- """
-
- if cherrypy.request.method != "POST":
- raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method))
-
- msg = ""
-
- dti_event = cherrypy.request.json or {}
- str_dti_event = json.dumps(dti_event)
-
- req_info = _DTIWeb._get_request_info(cherrypy.request)
- audit = Audit(req_message="{}: {}".format(req_info, str_dti_event), \
- headers=cherrypy.request.headers)
- DTIWeb.logger.info("%s: dti_event=%s headers=%s", \
- req_info, str_dti_event, json.dumps(cherrypy.request.headers))
-
- dcae_service_action = dti_event.get('dcae_service_action')
- if not dcae_service_action:
- msg = 'dcae_service_action is missing'
- elif dcae_service_action.lower() not in self.VALID_EVENT_TYPES:
- msg = 'dcae_service_action is invalid'
-
- dcae_target_name = dti_event.get('dcae_target_name')
- if not msg and not dcae_target_name:
- msg = 'dcae_target_name is missing'
-
- dcae_target_type = dti_event.get('dcae_target_type', '')
- if not msg and not dcae_target_type:
- msg = 'dcae_target_type is missing'
-
- if msg:
- result = {"ERROR": msg}
-
- DTIWeb.logger.error("%s: dti_event=%s result=%s", \
- req_info, str_dti_event, json.dumps(result))
- else:
- send_notification = True
- if (isinstance(notify, bool) and not notify) or \
- (isinstance(notify, str) and notify.lower() in [ "f", "false", "n", "no" ]):
- send_notification = False
-
- prc = DTIProcessor(dti_event, send_notification=send_notification)
- result = prc.get_result()
-
- DTIWeb.logger.info("%s: dti_event=%s result=%s", \
- req_info, str_dti_event, json.dumps(result))
-
- success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
- if msg:
- cherrypy.response.status = "400 Bad Request"
- elif not success:
- cherrypy.response.status = http_status_code
-
- return result
-
- def get_docker_events(self, request, service, location):
- """
- common routine for dti_docker_events and oti_docker_events
-
- :param request: HTTP GET request
- :param service: HTTP request query parameter for service name
- :param location: HTTP request query parameter for location CLLI
- :return:
- """
-
- if request.method != "GET":
- raise cherrypy.HTTPError(404, "unexpected method {}".format(request.method))
-
- req_info = _DTIWeb._get_request_info(request)
- audit = Audit(req_message=req_info, headers=request.headers)
-
- return DTIProcessor.get_docker_raw_events(service, location)
-
- def get_k8s_events(self, request, **params):
- """
- common routine for dti_k8s_events and oti_k8s_events
-
- :param request: HTTP GET request
- :param params: HTTP request query parameters
- :return:
- """
- if request.method != "GET":
- raise cherrypy.HTTPError(404, "unexpected method {}".format(request.method))
-
- req_info = _DTIWeb._get_request_info(request)
- audit = Audit(req_message=req_info, headers=request.headers)
-
- pod = request.params['pod']
- namespace = request.params['namespace']
- cluster = request.params['cluster']
-
- return DTIProcessor.get_k8_raw_events(pod, cluster, namespace)
-
- @cherrypy.expose
- @cherrypy.tools.json_out()
- def oti_k8s_events(self, **params):
- """
- Retrieve raw JSON events from application events database
-
- GET /oti_k8_events?pod=<sts-1>&namespace=<ns1>&cluster=<cluster1>
-
- Parameters
- ----------
- pod ID : string
- POD ID of the stateful set POD
- namespace: string
- kubernetes namespace
- cluster: string
- kubernetes cluster FQDN
-
- Returns
- -------
- dict
- JSON object containing the fully-bound configuration.
-
- """
-
- return self.get_k8s_events(cherrypy.request, params)
-
- @cherrypy.expose
- @cherrypy.tools.json_out()
- def dti_k8s_events(self, **params):
- """
- Retrieve raw JSON events from application events database
-
- GET /dti_k8_events?pod=<sts-1>&namespace=<ns1>&cluster=<cluster1>
-
- Parameters
- ----------
- pod ID : string
- POD ID of the stateful set POD
- namespace: string
- kubernetes namespace
- cluster: string
- kubernetes cluster FQDN
-
- Returns
- -------
- dict
- JSON object containing the fully-bound configuration.
-
- """
-
- return self.get_k8s_events(cherrypy.request, params)
-
- @cherrypy.expose
- @cherrypy.tools.json_out()
- def oti_docker_events(self, service, location=None):
- """
- Retrieve raw JSON events from application events database related to docker deployments
-
- GET /oti_docker_events?service=<svc>&location=<location>
-
- Parameters
- ----------
- service : string
- The service component name assigned by dockerplugin to the component
- that is unique to the cloudify node instance and used in its Consul key(s).
- location : string
- optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location.
- If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided,
- otherwise results are not location filtered.
-
- Returns
- -------
- dict
- JSON object containing the fully-bound configuration.
-
- """
-
- return self.get_docker_events(cherrypy.request, service, location)
-
- @cherrypy.expose
- @cherrypy.tools.json_out()
- def dti_docker_events(self, service, location=None):
- """
- Retrieve raw JSON events from application events database related to docker deployments
-
- GET /dti_docker_events?service=<svc>&location=<location>
-
- Parameters
- ----------
- service : string
- The service component name assigned by dockerplugin to the component
- that is unique to the cloudify node instance and used in its Consul key(s).
- location : string
- optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location.
- If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided,
- otherwise results are not location filtered.
-
- Returns
- -------
- dict
- JSON object containing the fully-bound configuration.
-
- """
-
- return self.get_docker_events(cherrypy.request, service, location)
-
- #----- Config Binding Service (CBS) endpoint methods
-
- @cherrypy.expose
- @cherrypy.popargs('service_name')
- @cherrypy.tools.json_out()
- def service_component(self, service_name):
- """
- Retrieve fully-bound configuration for service_name from Consul KVs.
-
- GET /service_component/<service_name>
-
- Parameters
- ----------
- service_name : string
- The service component name assigned by dockerplugin to the component
- that is unique to the cloudify node instance and used in its Consul key(s).
-
- Returns
- -------
- dict
- JSON object containing the fully-bound configuration.
-
- """
-
- if cherrypy.request.method != "GET":
- raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method))
-
- req_info = _DTIWeb._get_request_info(cherrypy.request)
- audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
- DTIWeb.logger.info("%s: service_name=%s headers=%s", \
- req_info, service_name, json.dumps(cherrypy.request.headers))
-
- try:
- result = CBSRest.get_service_component(service_name)
- except Exception as e:
- result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)}
- audit.set_http_status_code(404)
-
- DTIWeb.logger.info("%s: service_name=%s result=%s", \
- req_info, service_name, json.dumps(result))
-
- success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
- if not success:
- cherrypy.response.status = http_status_code
-
- return result
-
- @cherrypy.expose
- @cherrypy.popargs('service_name')
- @cherrypy.tools.json_out()
- def service_component_all(self, service_name, service_location=None, policy_ids="y"):
- """
- Retrieve all information for service_name (config, dti, dti_events, and policies) from Consul KVs.
-
- GET /service_component_all/<service_name>
-
- GET /service_component_all/<service_name>?service_location=<service_location>
-
- GET /service_component_all/<service_name>?service_location=<service_location>;policy_ids=n
-
- Parameters
- ----------
- service_name : string
- The service component name assigned by dockerplugin to the component
- that is unique to the cloudify node instance and used in its Consul key(s).
- service_location : string
- optional, allows multiple values separated by commas.
- Filters DTI events with dcae_service_location in service_location.
- policy_ids : string
- optional, default "y", any of these will unset: [ "f", "false", "False", "FALSE", "n", "no" ]
- When unset, formats policies items as a list (without policy_ids) rather than as an object indexed by policy_id.
-
- Returns
- -------
- dict
- JSON object containing all information for component service_name.
- The top-level keys may include the following:
- config : dict
- The cloudify node's application_config property from when the start workflow was executed.
- dti : dict
- Keys are VNF Types that the component currently is assigned to monitor. Policy can change them.
- dti_events : dict
- The latest deploy DTI events, keyed by VNF Type and sub-keyed by VNF Instance ID.
- policies : dict
- event : dict
- Contains information about when the policies folder was last written.
- items : dict
- Contains all policy bodies for the service_name component, keyed by policy_id.
-
- """
-
- if cherrypy.request.method != "GET":
- raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method))
-
- req_info = _DTIWeb._get_request_info(cherrypy.request)
- audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
- DTIWeb.logger.info("%s: service_name=%s headers=%s", \
- req_info, service_name, json.dumps(cherrypy.request.headers))
-
- policies_as_list = False
- if (isinstance(policy_ids, bool) and not policy_ids) or \
- (isinstance(policy_ids, str) and policy_ids.lower() in [ "f", "false", "n", "no" ]):
- policies_as_list = True
- try:
- result = CBSRest.get_service_component_all(service_name, service_location=service_location, policies_as_list=policies_as_list)
- except Exception as e:
- result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)}
- audit.set_http_status_code(404)
-
- DTIWeb.logger.info("%s: service_name=%s result=%s", \
- req_info, service_name, json.dumps(result))
-
- success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
- if not success:
- cherrypy.response.status = http_status_code
-
- return result
-
- @cherrypy.expose
- @cherrypy.popargs('service_name')
- @cherrypy.tools.json_out()
- def dti(self, service_name=None, vnf_type=None, vnf_id=None, service_location=None):
- """
- Retrieve current (latest, not undeployed) DTI events from Consul KVs.
-
- GET /dti/<service_name>
-
- GET /dti/<service_name>?vnf_type=<vnf_type>;vnf_id=<vnf_id>;service_location=<service_location>
-
- GET /dti
-
- GET /dti?vnf_type=<vnf_type>;vnf_id=<vnf_id>;service_location=<service_location>
-
- Parameters
- ----------
- service_name : string
- optional. The service component name assigned by dockerplugin to the component
- that is unique to the cloudify node instance and used in its Consul key(s).
- vnf_type : string
- optional, allows multiple values separated by commas. Gets DTI events for these vnf_type(s).
- vnf_id : string
- optional. Requires vnf_type also. Gets DTI event for this vnf_id.
- service_location : string
- optional, allows multiple values separated by commas.
- Filters DTI events with dcae_service_location in service_location.
-
- Returns
- -------
- dict
- Dictionary of DTI event(s).
- If one vnf_type and vnf_id are both specified, then object returned will be just the one DTI event.
- If one vnf_type is specified but not vnf_id, then DTI events will be keyed by vnf_id.
- Otherwise the DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
-
- """
-
- if cherrypy.request.method != "GET":
- raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method))
-
- req_info = _DTIWeb._get_request_info(cherrypy.request)
- audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
- DTIWeb.logger.info("%s: service_name=%s headers=%s", \
- req_info, service_name, json.dumps(cherrypy.request.headers))
-
- try:
- result = CBSRest.get_oti(service_name=service_name, vnf_type=vnf_type, vnf_id=vnf_id, service_location=service_location)
- except Exception as e:
- result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)}
- audit.set_http_status_code(404)
-
- DTIWeb.logger.info("%s: service_name=%s result=%s", \
- req_info, service_name, json.dumps(result))
-
- success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
- if not success:
- cherrypy.response.status = http_status_code
-
- return result
-
- @cherrypy.expose
- @cherrypy.popargs('service_name')
- @cherrypy.tools.json_out()
- def policies(self, service_name, policy_id=None):
- """
- Retrieve policies for service_name from Consul KVs.
-
- GET /policies/<service_name>
-
- GET /policies/<service_name>?policy_id=<policy_id>
-
- Parameters
- ----------
- service_name : string
- The service component name assigned by dockerplugin to the component
- that is unique to the cloudify node instance and used in its Consul key(s).
- policy_id : string
- optional. Limits returned policy to this policy_id.
-
- Returns
- -------
- dict
- JSON object containing policy bodies for the service_name component.
- If policy_id is specified, then object returned will be just the one policy body.
- If policy_id is not specified, then object will contain all policy bodies, keyed by policy_id.
-
- """
-
- if cherrypy.request.method != "GET":
- raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method))
-
- req_info = _DTIWeb._get_request_info(cherrypy.request)
- audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
- DTIWeb.logger.info("%s: service_name=%s headers=%s", \
- req_info, service_name, json.dumps(cherrypy.request.headers))
-
- try:
- result = CBSRest.get_policies(service_name, policy_id=policy_id)
- except Exception as e:
- result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)}
- audit.set_http_status_code(404)
-
- DTIWeb.logger.info("%s: service_name=%s result=%s", \
- req_info, service_name, json.dumps(result))
-
- success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
- if not success:
- cherrypy.response.status = http_status_code
-
- return result