diff options
author | Kotagiri, Ramprasad (rp5662) <rp5662@att.com> | 2019-12-19 17:41:16 -0500 |
---|---|---|
committer | Kotagiri, Ramprasad (rp5662) <rp5662@att.com> | 2020-01-21 16:50:17 -0500 |
commit | 158b75abd6095a3155f5057832ec868bc99ced36 (patch) | |
tree | d374ba4adcfa6db9a036cb2bf018fe529c215eee /oti/event-handler | |
parent | 77900bb3097491cd9fca964c111ea70724e53989 (diff) |
Add OTI event-handler project
OTI event handler application in DCAEGEN2 platform
Change-Id: Ie64f26f851e2045f00043f90279d503c2dc62948
Issue-ID: DCAEGEN2-1910
Signed-off-by: Kotagiri, Ramprasad (rp5662) <rp5662@att.com>
Diffstat (limited to 'oti/event-handler')
34 files changed, 6529 insertions, 0 deletions
diff --git a/oti/event-handler/Dockerfile b/oti/event-handler/Dockerfile new file mode 100644 index 0000000..30134c8 --- /dev/null +++ b/oti/event-handler/Dockerfile @@ -0,0 +1,57 @@ +# ================================================================================ +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +FROM python:3.6 + +ENV INSROOT /opt/app +ENV APPUSER oti_handler +ENV HOME ${INSROOT}/${APPUSER} + +RUN useradd -d ${HOME} ${APPUSER} + +WORKDIR ${HOME} + +EXPOSE 9443 + +COPY ./*.py ./ +COPY ./*.in ./ +COPY ./*.txt ./ +COPY ./*.sh ./ +COPY ./otihandler/ ./otihandler/ +COPY ./etc/ ./etc/ +COPY create_schema.sql /tmp/create_schema.sql + +RUN mkdir -p ${HOME}/logs \ + && chown -R ${APPUSER}:${APPUSER} ${HOME} \ + && chmod 500 ${HOME}/etc \ + && chmod 500 ${HOME}/*.sh \ + && sleep 5 \ + && pip install -r requirements.txt + +RUN apt-get update \ + && apt-get -y --allow-unauthenticated install vim-tiny \ + && apt-get -y --allow-unauthenticated install dos2unix \ + && apt-get -y --allow-unauthenticated install postgresql \ + && dos2unix /tmp/create_schema.sql \ + && dos2unix run.sh \ + && chmod 777 /tmp/create_schema.sql + +USER ${APPUSER} + +RUN ls -lanR ${HOME} + +ENTRYPOINT ["./run.sh"] diff --git a/oti/event-handler/MANIFEST.in b/oti/event-handler/MANIFEST.in new file mode 100644 index 0000000..f9bd145 --- /dev/null +++ b/oti/event-handler/MANIFEST.in @@ -0,0 +1 @@ +include requirements.txt diff --git a/oti/event-handler/OTI-API.md b/oti/event-handler/OTI-API.md new file mode 100644 index 0000000..e5f7cae --- /dev/null +++ b/oti/event-handler/OTI-API.md @@ -0,0 +1,850 @@ +# ONAP Topology Interface (OTI) Handler API +*Version 1.0.0* + +--- +<a name="toc"></a> +## Contents + +* Overview +* Managing OTI Handler + - [GET /healthcheck](#healthcheck) + - [GET /shutdown](#shutdown) +* Triggering notifications to microservices + - [POST /events](#events-post) +* Queries for information from Consul KVs + - [GET /policies](#policies) + - [GET /service_component](#service_component) + - [GET /service_component_all](#service_component_all) +* Queries for OTI events from OTI database + - [GET /oti_docker_events](#oti_docker_events) + - [GET /oti_k8s_events](#oti_k8s_events) + - [GET /dti_docker_events](#dti_docker_events) + - [GET /dti_k8s_events](#dti_k8s_events) +--- +<a name="overview"></a> +## Overview +This document describes OTI Handler's HTTPS API for: +1. accepting events from OTI to reconfigure microservices. +1. sending reconfig notifications to microservices. +1. retrieving service component information from Consul. + +--- +<a name="manage"></a> +## Managing OTI Handler + +--- +<a name="healthcheck">healthcheck</a> +### GET /healthcheck + +#### Description +Get health status of OTI Handler service + +#### Parameters +None + +#### Responses +|HTTP Code|Description|Schema| +|---|---|---| +|**200**|Success|[HealthCheckResponse](#healthcheckresponse)| + +<a name="healthcheckresponse"></a> +##### HealthCheckResponse +OTI Handler returns a JSON object containing current health check information. + +Example JSON response, formatted: +``` +{ + "now": "2019-10-19 18:30:34.987514", + "packages": "N/A", + "python": "3.6.0 (default, Oct 10 2019, 02:49:49) [GCC 4.9.2]", + "service_instance_UUID": "0cf593cd-83d4-4cdc-b8bb-a33f8edc28f4", + "service_name": "oti_handler", + "service_version": "3.6.0", + "started": "2019-10-18 18:50:10.209063", + "stats": { + "oti_handler": { + "ave_timer_millisecs": 207.0, + "call_count": 5, + "error_count": 0, + "last_error": "None", + "last_success": "2019-10-18 19:25:36.604613", + "longest_timer_millisecs": 348 + } + }, + "uptime": "23:40:24.778451" +} +``` + +--- +<a name="shutdown"></a> +### GET /shutdown + +#### Description +Shutdown and restart OTI Handler + +#### Parameters +None + +#### Responses +|HTTP Code|Description|Schema| +|---|---|---| +|**200**|Success|string| + +OTI Handler returns a string acknowledging the shutdown request. + +Example response: +``` +goodbye! shutdown requested 2019-10-19 18:34:22.642045 +``` + + +--- +<a name="notifications"></a> +## Triggering notifications to microservices +--- +<a name="events-post"></a> +### POST /events?**notify**=n + +#### Description +(For OTI to) Submit a OTI Event to OTI Handler + +**/events** is +for OTI to signal add, update, delete of a VNF instance or to signal an entire site activation/deactivation. +OTI POSTs an Event when the VNF in question needs monitoring by DCAE monitoring services. +The OTI Event includes information that identifies the type of VNF, the specific VNF instance, +the location where the VNF is running, the type of operation, and +additional information that may be useful for setting up the DCAE monitoring services. + +#### Parameters +|Type|Name|Description|Schema|Default| +|---|---|---|---|---| +|Body|**dcae_event** <br>*required*|JSON event from OTI that must contain<br>at least the few fields identified below.|[DCAEEvent](#dcaeevent)|| +|query component|**notify** <br>*optional*|When set to "n", oti handler will **not** notify components of this OTI Event<br> and only persist the event into OTI postgreSQL database.|string|y| + +<a name="dcaeevent"></a> +##### DCAEEvent +OTI Handler uses the following fields of the OTI Event JSON object to identify which monitoring services to reconfigure: + +|Name|Description|Schema| +|---|---|---| +|**dcae_service_action** <br>*required*|Indicates the action to be taken by a DCAE service [deploy, modify or remove a VNF instance monitoring].<br>Valid values are: "add", "update", "delete", "notify". <br>Docker hosted microservices will continue to be signaled with "deploy" or "undeploy action".<br> A "notify" action for kubernetes hosted collector services can signal all the services to be activated or deactivated.|string| +|**dcae_target_name** <br>*required*|The name of the unique VNF Instance monitored by DCAE services.|string| +|**dcae_target_type** <br>*required*|The VNF Type of the VNF Instance monitored by a DCAE service.|string| +|**dcae_service_location** <br>*optional*|CLLI location. Not provided or "" infers all locations.|string| + +Any additional fields of the OTI Event are ignored by OTI Handler and are passed over to the collector services. +The entire OTI Event object is saved in OTI handler application database, for Config Binding Service to provide to the monitoring services. + +#### Responses +|HTTP Code|Description|Schema| +|---|---|---| +|**200**|Accepted. Any errors will be noted in the JSON object response.|[DCAEEventResponse](#dcaeeventresponse)| + +<a name="dcaeeventresponse"></a> +##### DCAEEventResponse +OTI Handler returns a JSON object containing results of running each affected service component's dti reconfig script in case of docker containers and the HTTP response acknowledgement from kubernetes pods. +If the object is empty or only contains an ERROR, then no collector component's reconfig script was run. + +|Name|Description|Schema| +|---|---|---| +|**\<service_component_name\>** <br>*optional*,<br>*can be multiple*|Identification of a component's docker container<br>and results of running its reconfig script.<br>One for each service notified about this event.|string| +|**ERROR** <br>*optional*|An error not specific to any one **\<service_component_name\>**.|string| + +Examples of JSON responses, formatted: +``` +{ + "0b231f372c6f42a89817a097549f4af5_green-eggs-and-ham": "ran /opt/app/reconfigure.py in container 9a9045657e097a4e41b077d10a0c8b2e860a9993e90e1c2a6997b03c2287d86f on zldcmtn23adcc1-platform-dockerhost-1 that was deployed by policy-test-terry node green_eggs_and_ham_docker_container, got: reconfigured\n", + "add6bcffdf16488cb961ac88605243a6_green-eggs-and-ham": "ran /opt/app/reconfigure.py in container dab026db7c33081f89b0de54a5a8ed1eed4bcf6bac783a1f657c3018a24f522e on zldcmtn23adcc1-platform-dockerhost-1 that was deployed by app_reconfigurable_tes node green_eggs_and_ham_docker_container, got: rpc error: code = 13 desc = invalid header field value \"oci runtime error: exec failed: container_linux.go:247: starting container process caused \\\"exec: \\\\\\\"/opt/app/reconfigure.py\\\\\\\": stat /opt/app/reconfigure.py: no such file or directory\\\"\\n\"\r\n", + "DTIHandler": { + "deploymentIds": [ + "43b77ab2-7341-4929-ba27-ea91d00b253c" + ], + "requestId": "ab88d651-fa83-4342-9579-d383c1f29373" + } +} + +{ + "zldcdyh1bdce1d13-vcc-clicollector-cli-p1-v12": "ran add in k8s deployment sts-zldcdyh1bdce1d13-vcc-clicollector-cli-p1-v12 in namespace com-my-dcae-collgrp1-dev that was deployed by dcae_vcc-clicollector-cli +-p1-k8_2002_vp663p_1120v7 node vcc-clicollector-cli-p1_vcc-clicollector-cli-p1, got: {u'KubeServicePort': u'9999', u'KubeNamespace': u'com-my-dcae-collgrp1-dev', u'KubeServiceName': u'zldcdyh1bdce1d13-vcc-clicoll +ector-cli-p1-v12', u'KubeClusterFqdn': u'32.68.210.134', u'KubeProxyFqdn': u'dcae-kcdthp-site1-edge-d13.test.idns.cip.my.com', u'KubePod': u'sts-zldcdyh1bdce1d13-vcc-clicollector-cli-p1-v12-1'}" +} + +{ + "zldcdyh1bdce1d13-ovl-mib": "ran UPDATE to k8s pod ID sts-zldcdyh1bdce1d13-ovl-mib-0 in namespace com-my-dcae-poller-dev that was deployed in cluster 32.68.210.134, got: {u'KubeServicePort': u'9999', u'KubeNa +mespace': u'com-my-dcae-poller-dev', u'KubeServiceName': u'zldcdyh1bdce1d13-ovl-mib', u'KubeClusterFqdn': u'32.68.210.134', u'KubeProxyFqdn': u'dcae-kcdthp-site1-dyh1b-edge-d13.test.idns.cip.my.com', u'KubePod' +: u'sts-zldcdyh1bdce1d13-ovl-mib-0'}" +} + +``` + + +--- +<a name="info-queries"></a> +## Queries for information from Consul KVs + +--- +<a name="dti"></a> +### GET /dti/**\<service_name\>**?**vnf_type**=\<vnf_type\>;**vnf_id**=\<vnf_id\>;**service_location**=\<service_location\> + +#### Description +Retrieve current (latest, not undeployed) OTI Events + +#### Parameters +|Type|Name|Description|Schema|Default| +|---|---|---|---|---| +|path segment|**\<service_name\>** <br>*optional*|The service component name assigned by dockerplugin to the component<br>that is unique to the cloudify node instance and used in its Consul key(s).|string|| +|query component|**vnf_type** <br>*optional*|Allows multiple values separated by commas. <br>Gets OTI events for these **\<vnf_type\>(s)**.|string|| +|query component|**vnf_id** <br>*optional*|Requires **vnf_type** also. Gets OTI event for this **\<vnf_id\>**.|string|| +|query component|**service_location** <br>*optional*|Allows multiple values separated by commas.<br>Filters OTI events with dcae_service_location in **\<service_location\>**.<br>Overrides locations defined in Consul for the **\<service_name\>**.|string|locations from Consul| + +#### Responses +|HTTP Code|Description|Schema| +|---|---|---| +|**200**|Success|[DTIResponse](#dtiresponse)| + +<a name="dtiresponse"></a> +##### OTIResponse +OTI Handler returns a JSON object of OTI event(s). +- If one **vnf_type** and **vnf_id** are both specified, then object returned will be just the one OTI event. +- If one **vnf_type** is specified but not **vnf_id**, then OTI events will be keyed by **\<vnf_id\>**. +- Otherwise the OTI events will be keyed by **\<vnf_type\>**, sub-keyed by **\<vnf_id\>**. + +Example JSON response, formatted: +``` +{ + "anot-her": { + "another01ems003": { + "aai_additional_info": {}, + "dcae_generic-vnf_model-version-id": "1", + "dcae_service-instance_model-version-id": "1", + "dcae_service_action": "deploy", + "dcae_service_location": "LSLEILAA", + "dcae_snmp_community_string": "my_first_community", + "dcae_snmp_version": "2c", + "dcae_target_collection": "true", + "dcae_target_collection_ip": "107.239.85.3", + "dcae_target_in-maint": "false", + "dcae_target_is-closed-loop-disabled": "false", + "dcae_target_name": "another01ems003", + "dcae_target_prov-status": "PROV", + "dcae_target_type": "anot-her", + "event": {} + }, + "another01ems044": { + "aai_additional_info": {}, + "dcae_generic-vnf_model-version-id": "1", + "dcae_service-instance_model-version-id": "1", + "dcae_service_action": "deploy", + "dcae_service_location": "LSLEILAA", + "dcae_snmp_community_string": "my_first_community", + "dcae_snmp_version": "2c", + "dcae_target_collection": "true", + "dcae_target_collection_ip": "107.239.85.44", + "dcae_target_in-maint": "false", + "dcae_target_is-closed-loop-disabled": "false", + "dcae_target_name": "another01ems044", + "dcae_target_prov-status": "PROV", + "dcae_target_type": "anot-her", + "event": {} + } + } +} +``` + +--- +<a name="policies"></a> +### GET /policies/**\<service_name\>**?**policy_id**=\<policy_id\> + +#### Description +Retrieve policy or policies for a service component instance + +#### Parameters +|Type|Name|Description|Schema|Default| +|---|---|---|---|---| +|path segment|**\<service_name\>** <br>*required*|The service component name assigned by dockerplugin to the component<br>that is unique to the cloudify node instance and used in its Consul key(s).|string|| +|query component|**policy_id** <br>*optional*|Returns only the policy for this one **\<policy_id\>**.|string|| + +#### Responses +|HTTP Code|Description|Schema| +|---|---|---| +|**200**|Success|[PoliciesResponse](#policiesresponse)| + +<a name="policiesresponse"></a> +##### PoliciesResponse +OTI Handler returns a JSON object containing policy bodies for the **\<service_name\>** component. +- If **policy_id** is specified, then object returned will be just the one policy body. +- If **policy_id** is not specified, then object will contain all policy bodies for **\<service_name\>**, keyed by **\<policy_id\>**. + +Example JSON response, formatted: +``` +{ + "DCAE_FTL3B.Config_Green_Collectors": { + "config": { + "conflicting_key": "green_collectors_wins", + "package_type": "plastic", + "polling_frequency": "30m", + "power_source": "lemmings" + }, + "matchingConditions": { + "ConfigName": "Green_Collectors", + "testName": "dcae", + "ONAPName": "dcae" + }, + "policyConfigMessage": "Config Retrieved! ", + "policyConfigStatus": "CONFIG_RETRIEVED", + "policyName": "DCAE_FTL3B.Config_Green_Collectors.1.xml", + "policyVersion": "1", + "property": null, + "responseAttributes": {}, + "type": "JSON" + }, + "DCAE_FTL3B.Config_Green_Eggs_and_Ham_specific": { + "config": { + "bacon": "soft, not crispy", + "bread": "pumpernickel", + "conflicting_key": "green_eggs_and_ham_are_better", + "dcae_target_type": [ + "pnga-xxx", + "pcrf-oam", + "vhss-ems", + "anot-her", + "new-type" + ], + "egg_color": "green", + "preparation": "scrambled" + }, + "matchingConditions": { + "ConfigName": "Green_Eggs_and_Ham_specific", + "testName": "dcae", + "ONAPName": "dcae" + }, + "policyConfigMessage": "Config Retrieved! ", + "policyConfigStatus": "CONFIG_RETRIEVED", + "policyName": "DCAE_FTL3B.Config_Green_Eggs_and_Ham_specific.5.xml", + "policyVersion": "5", + "property": null, + "responseAttributes": {}, + "type": "JSON" + }, + "DCAE_FTL3B.Config_In_Service": { + "config": { + "conflicting_key": "in_service_trumps!", + "in_service": true + }, + "matchingConditions": { + "ConfigName": "In_Service", + "testName": "dcae", + "ONAPName": "dcae" + }, + "policyConfigMessage": "Config Retrieved! ", + "policyConfigStatus": "CONFIG_RETRIEVED", + "policyName": "DCAE_FTL3B.Config_In_Service.1.xml", + "policyVersion": "1", + "property": null, + "responseAttributes": {}, + "type": "JSON" + } +} +``` + +--- +<a name="service_component"></a> +### GET /service_component/**\<service_name\>** + +#### Description +Retrieve fully-bound configuration for a service component instance. + +*Note: Response is the same as what Config Binding Service returns.* + +#### Parameters +|Type|Name|Description|Schema|Default| +|---|---|---|---|---| +|path segment|**\<service_name\>** <br>*required*|The service component name assigned by dockerplugin to the component<br>that is unique to the cloudify node instance and used in its Consul key(s).|string|| + +#### Responses +|HTTP Code|Description|Schema| +|---|---|---| +|**200**|Success|[ServictestonentResponse](#servictestonentresponse)| + +<a name="servictestonentresponse"></a> +##### ServictestonentResponse +OTI Handler returns a JSON object containing the install-time value of the service component node's +application_config property from the Cloudify deployment, +with any templating resolved from the current values of Consul dmaap and rel keys. + +Example JSON response, formatted: +``` +{ + "dcae_target_type": [ + "pnga-xxx" + ] +} +``` + +--- +<a name="oti_k8s_events"></a> +### GET /oti_k8s_events?pod=\<pod name\>&cluster=\<k8s cluster\>&namespace=\<k8s namespace\> + +#### Description +Retrieve oti events list associated with a specific kubernetes pod. + +*Note:Config Binding Service calls this API to fetch pod specific data* +1. *OTI events are queried from application database (not from consul)* +1. *events are associated with a specific k8s pod and the k8s location CLLI* + +#### Parameters +|Type|Name|Description|Schema|Default| +|---|---|---|---|---| +|query component|**pod** <br>*required*|pod ID of the kubernetes StatefulSet for this collector service|String|| +|query component|**cluster** <br>*required*|cluster FQDN of the kubernetes StatefulSet for this collector service|String|| +|query component|**namespace** <br>*required*|namespace of the kubernetes StatefulSet for this collector service|String|| + +#### Responses +|HTTP Code|Description|Schema| +|---|---|---| +|**200**|Success|[OtiEventsResponse](#otieventsresponse)| + +<a name="otieventsresponse"></a> +##### OtiEventsResponse +OTI Handler Service returns a JSON object with VNF Types as first-level keys, and<br>VNF Instance IDs as second-level keys with value of its latest OTI Event. + + +Example JSON response + +``` +{ + "ctsf": { + "dpa2actsf12345": { + "dcae_service_location": "LSLEILAA", + "dcae_service_type": "vUSP - vCTS", + "dcae_target_type": "ctsf", + "dcae_service_action": "add", + "dcae_target_name": "dpa2actsf12345", + "dcae_target_is-closed-loop-disabled": "false", + "dcae_target_collection_ip": "32.67.11.99", + "dcae_target_collection": "true", + "dcae_target_prov-status": "PROV", + "dcae_snmp_version": "", + "dcae_target_service-description": "VIRTUAL USP", + "dcae_snmp_community_string": "", + "dcae_target_cloud-region-id": "dpa2a", + "dcae_target_cloud-region-version": "aic3.0", + "event": {} + }, + "dpa2actsf4421": { + "dcae_service_location": "LSLEILAA", + "dcae_service_type": "vUSP - vCTS", + "dcae_target_type": "ctsf", + "dcae_service_action": "add", + "dcae_target_name": "dpa2actsf4421", + "dcae_target_is-closed-loop-disabled": "false", + "dcae_target_collection_ip": "32.67.11.99", + "dcae_target_collection": "true", + "dcae_target_prov-status": "PROV", + "dcae_snmp_version": "", + "dcae_target_service-description": "VIRTUAL USP", + "nodeType": "ctsf", + "description": "CTS metrics", + "nodeSubtype": "", + "serviceType": "VIRTUAL USP", + "priority": 1, + "subType": "camel", + "vnfType": "ctsf", + "taskId": "PVUVUALUCTSCTS1080", + "collectionType": "FOI", + "protocol": "sftp", + "collectionInterval": "300" + } + } +} +``` + +--- +<a name="oti_docker_events"></a> +### GET /oti_docker_events?service=\<service name\>&location=\<location CLLI\> + +#### Description +Retrieve oti events list associated with a docker hosted service or k8s service. + +*Note:Config Binding Service calls this API to fetch OTI events associated with a docker container* +1. *OTI events are queried from application database (not from consul)* +1. *events are associated with a specific docker container or all k8s pods related to the service,<br> further filtered by the input location CLLI* + +#### Parameters +|Type|Name|Description|Schema|Default| +|---|---|---|---|---| +|query component|**service** <br>*required*|service compnent name|String|| +|query component|**location** <br>*optional*|location CLLI associated with the docker host or k8s cluster|String|| + +#### Responses +|HTTP Code|Description|Schema| +|---|---|---| +|**200**|Success|[OtiEventsResponse](#otieventsresponse)| + +<a name="otieventsresponse"></a> +##### OtiEventsResponse +OTI Handler Service returns a JSON object with VNF Types as first-level keys, and<br>VNF Instance IDs as second-level keys with value of its latest OTI Event. + + +Example JSON response + +``` +{ + "ctsf": { + "dpa2actsf12345": { + "dcae_service_location": "LSLEILAA", + "dcae_service_type": "vUSP - vCTS", + "dcae_target_type": "ctsf", + "dcae_service_action": "add", + "dcae_target_name": "dpa2actsf12345", + "dcae_target_is-closed-loop-disabled": "false", + "dcae_target_collection_ip": "32.67.11.99", + "dcae_target_collection": "true", + "dcae_target_prov-status": "PROV", + "dcae_snmp_version": "", + "dcae_target_service-description": "VIRTUAL USP", + "dcae_snmp_community_string": "", + "dcae_target_cloud-region-id": "dpa2a", + "dcae_target_cloud-region-version": "aic3.0", + "event": {} + }, + "dpa2actsf4421": { + "dcae_service_location": "LSLEILAA", + "dcae_service_type": "vUSP - vCTS", + "dcae_target_type": "ctsf", + "dcae_service_action": "add", + "dcae_target_name": "dpa2actsf4421", + "dcae_target_is-closed-loop-disabled": "false", + "dcae_target_collection_ip": "32.67.11.99", + "dcae_target_collection": "true", + "dcae_target_prov-status": "PROV", + "dcae_snmp_version": "", + "dcae_target_service-description": "VIRTUAL USP", + "nodeType": "ctsf", + "description": "CTS metrics", + "nodeSubtype": "", + "serviceType": "VIRTUAL USP", + "priority": 1, + "subType": "camel", + "vnfType": "ctsf", + "taskId": "PVUVUALUCTSCTS1080", + "collectionType": "FOI", + "protocol": "sftp", + "collectionInterval": "300" + } + } +} +``` + +--- +<a name="dti_k8_events"></a> +### GET /dti_k8_events?pod=\<pod name\>&cluster=\<k8s cluster\>&namespace=\<k8s namespace\> + +#### Description +Retrieve dti events list associated with a specific kubernetes pod. + +*Note:Config Binding Service calls this API to fetch pod specific data* +1. *OTI events are queried from application database (not from consul)* +1. *events are associated with a specific k8s pod and the k8s location CLLI* + +#### Parameters +|Type|Name|Description|Schema|Default| +|---|---|---|---|---| +|query component|**pod** <br>*required*|pod ID of the kubernetes StatefulSet for this collector service|String|| +|query component|**cluster** <br>*required*|cluster FQDN of the kubernetes StatefulSet for this collector service|String|| +|query component|**namespace** <br>*required*|namespace of the kubernetes StatefulSet for this collector service|String|| + +#### Responses +|HTTP Code|Description|Schema| +|---|---|---| +|**200**|Success|[DtiEventsResponse](#dtieventsresponse)| + +<a name="dtieventsresponse"></a> +##### DtiEventsResponse +OTI Handler Service returns a JSON object with VNF Types as first-level keys, and<br>VNF Instance IDs as second-level keys with value of its latest OTI Event. + + +Example JSON response + +``` +{ + "ctsf": { + "dpa2actsf12345": { + "dcae_service_location": "LSLEILAA", + "dcae_service_type": "vUSP - vCTS", + "dcae_target_type": "ctsf", + "dcae_service_action": "add", + "dcae_target_name": "dpa2actsf12345", + "dcae_target_is-closed-loop-disabled": "false", + "dcae_target_collection_ip": "32.67.11.99", + "dcae_target_collection": "true", + "dcae_target_prov-status": "PROV", + "dcae_snmp_version": "", + "dcae_target_service-description": "VIRTUAL USP", + "dcae_snmp_community_string": "", + "dcae_target_cloud-region-id": "dpa2a", + "dcae_target_cloud-region-version": "aic3.0", + "event": {} + }, + "dpa2actsf4421": { + "dcae_service_location": "LSLEILAA", + "dcae_service_type": "vUSP - vCTS", + "dcae_target_type": "ctsf", + "dcae_service_action": "add", + "dcae_target_name": "dpa2actsf4421", + "dcae_target_is-closed-loop-disabled": "false", + "dcae_target_collection_ip": "32.67.11.99", + "dcae_target_collection": "true", + "dcae_target_prov-status": "PROV", + "dcae_snmp_version": "", + "dcae_target_service-description": "VIRTUAL USP", + "nodeType": "ctsf", + "description": "CTS metrics", + "nodeSubtype": "", + "serviceType": "VIRTUAL USP", + "priority": 1, + "subType": "camel", + "vnfType": "ctsf", + "taskId": "PVUVUALUCTSCTS1080", + "collectionType": "FOI", + "protocol": "sftp", + "collectionInterval": "300" + } + } +} +``` + +--- +<a name="dti_docker_events"></a> +### GET /dti_docker_events?service=\<service name\>&location=\<location CLLI\> + +#### Description +Retrieve dti events list associated with a docker hosted service or k8s service. + +*Note:Config Binding Service calls this API to fetch OTI events associated with a docker container* +1. *OTI events are queried from application database (not from consul)* +1. *events are associated with a specific docker container or all k8s pods related to the service,<br> further filtered by the input location CLLI* + +#### Parameters +|Type|Name|Description|Schema|Default| +|---|---|---|---|---| +|query component|**service** <br>*required*|service compnent name|String|| +|query component|**location** <br>*optional*|location CLLI associated with the docker host or k8s cluster|String|| + +#### Responses +|HTTP Code|Description|Schema| +|---|---|---| +|**200**|Success|[DtiEventsResponse](#dtieventsresponse)| + +<a name="dtieventsresponse"></a> +##### DtiEventsResponse +OTI Handler Service returns a JSON object with VNF Types as first-level keys, and<br>VNF Instance IDs as second-level keys with value of its latest OTI Event. + + +Example JSON response + +``` +{ + "ctsf": { + "dpa2actsf12345": { + "dcae_service_location": "LSLEILAA", + "dcae_service_type": "vUSP - vCTS", + "dcae_target_type": "ctsf", + "dcae_service_action": "add", + "dcae_target_name": "dpa2actsf12345", + "dcae_target_is-closed-loop-disabled": "false", + "dcae_target_collection_ip": "32.67.11.99", + "dcae_target_collection": "true", + "dcae_target_prov-status": "PROV", + "dcae_snmp_version": "", + "dcae_target_service-description": "VIRTUAL USP", + "dcae_snmp_community_string": "", + "dcae_target_cloud-region-id": "dpa2a", + "dcae_target_cloud-region-version": "aic3.0", + "event": {} + }, + "dpa2actsf4421": { + "dcae_service_location": "LSLEILAA", + "dcae_service_type": "vUSP - vCTS", + "dcae_target_type": "ctsf", + "dcae_service_action": "add", + "dcae_target_name": "dpa2actsf4421", + "dcae_target_is-closed-loop-disabled": "false", + "dcae_target_collection_ip": "32.67.11.99", + "dcae_target_collection": "true", + "dcae_target_prov-status": "PROV", + "dcae_snmp_version": "", + "dcae_target_service-description": "VIRTUAL USP", + "nodeType": "ctsf", + "description": "CTS metrics", + "nodeSubtype": "", + "serviceType": "VIRTUAL USP", + "priority": 1, + "subType": "camel", + "vnfType": "ctsf", + "taskId": "PVUVUALUCTSCTS1080", + "collectionType": "FOI", + "protocol": "sftp", + "collectionInterval": "300" + } + } +} +``` + +<a name="service_component_all"></a> +### GET /service_component_all/**\<service_name\>**?**service_location**=\<service_location\>;**policy_ids**=n + +#### Description +Retrieve all available information for a service component instance (config, dti, and policies). + +*Note: Response differs from what prior Config Binding Service returned in that:* +1. *OTI events come from history (e.g., before a collector service component instance was deployed and are not lost if redeployed).* +1. *Can specify locations for OTI events to retrieve (e.g., for filtering, or for alternate sites).* +1. *Policies items is an object indexed by policy_id rather than a list (unless you specify **policy_ids**=n).* + +#### Parameters +|Type|Name|Description|Schema|Default| +|---|---|---|---|---| +|path segment|**\<service_name\>** <br>*required*|The service component name assigned by dockerplugin or k8splugin to the component<br>that is unique to the cloudify node instance and used in its Consul key(s).|string|| +|query component|**service_location** <br>*optional*|Allows multiple values separated by commas.<br>Filters OTI events with dcae_service_location in **\<service_location\>**.<br>Overrides locations defined in Consul for the **\<service_name\>**.|string|locations from Consul| +|query component|**policy_ids** <br>*optional*|When "n", formats policies items as a list without policy_ids rather than as an object indexed by policy_id.|string|y| + +#### Responses +|HTTP Code|Description|Schema| +|---|---|---| +|**200**|Success|[ServictestonentAllResponse](#servictestonentallresponse)| + +<a name="servictestonentallresponse"></a> +##### ServictestonentAllResponse +OTI Handler returns a JSON object containing all information for the component from Consul: + +|Name|Description|Schema| +|---|---|---| +|**config** <br>*required*|The install-time value of the service component node's application_config property<br>from the Cloudify deployment, with any templating resolved<br>from the current values of Consul dmaap and rel keys.|object| +|**dti** <br>*optional*|A JSON object with VNF Types as first-level keys, and<br>VNF Instance IDs as second-level keys with value of its latest OTI Event.|object| +|**policies** <br>*optional*|A JSON object with "event" and "items" first-level keys, and<br>policy_ids as second-level keys under "items"<br>(or if **policy_ids**=n then just a list without policy_ids)<br>with value of the complete policy body from Policy Manager.|object| + +Example JSON response, formatted: +``` +{ + "config": { + "dcae_target_type": [ + "pnga-xxx" + ] + }, + "dti": { + "anot-her": { + "another01ems003": { + "aai_additional_info": {}, + "dcae_generic-vnf_model-version-id": "1", + "dcae_service-instance_model-version-id": "1", + "dcae_service_action": "deploy", + "dcae_service_location": "LSLEILAA", + "dcae_snmp_community_string": "my_first_community", + "dcae_snmp_version": "2c", + "dcae_target_collection": "true", + "dcae_target_collection_ip": "107.239.85.3", + "dcae_target_in-maint": "false", + "dcae_target_is-closed-loop-disabled": "false", + "dcae_target_name": "another01ems003", + "dcae_target_prov-status": "PROV", + "dcae_target_type": "anot-her", + "event": {} + }, + "another01ems044": { + "aai_additional_info": {}, + "dcae_generic-vnf_model-version-id": "1", + "dcae_service-instance_model-version-id": "1", + "dcae_service_action": "deploy", + "dcae_service_location": "LSLEILAA", + "dcae_snmp_community_string": "my_first_community", + "dcae_snmp_version": "2c", + "dcae_target_collection": "true", + "dcae_target_collection_ip": "107.239.85.44", + "dcae_target_in-maint": "false", + "dcae_target_is-closed-loop-disabled": "false", + "dcae_target_name": "another01ems044", + "dcae_target_prov-status": "PROV", + "dcae_target_type": "anot-her", + "event": {} + } + } + }, + "policies": { + "event": { + "action": "updated", + "policies_count": 3, + "timestamp": "2018-07-16T15:11:44.845Z", + "update_id": "e6102aab-3079-435a-ae0d-0397a2cb3c4d" + }, + "items": { + "DCAE_FTL3B.Config_Green_Collectors": { + "config": { + "conflicting_key": "green_collectors_wins", + "package_type": "plastic", + "polling_frequency": "30m", + "power_source": "lemmings" + }, + "matchingConditions": { + "ConfigName": "Green_Collectors", + "testName": "dcae", + "ONAPName": "dcae" + }, + "policyConfigMessage": "Config Retrieved! ", + "policyConfigStatus": "CONFIG_RETRIEVED", + "policyName": "DCAE_FTL3B.Config_Green_Collectors.1.xml", + "policyVersion": "1", + "property": null, + "responseAttributes": {}, + "type": "JSON" + }, + "DCAE_FTL3B.Config_Green_Eggs_and_Ham_specific": { + "config": { + "bacon": "soft, not crispy", + "bread": "pumpernickel", + "conflicting_key": "green_eggs_and_ham_are_better", + "dcae_target_type": [ + "pnga-xxx", + "pcrf-oam", + "vhss-ems", + "anot-her", + "new-type" + ], + "egg_color": "green", + "preparation": "scrambled" + }, + "matchingConditions": { + "ConfigName": "Green_Eggs_and_Ham_specific", + "testName": "dcae", + "ONAPName": "dcae" + }, + "policyConfigMessage": "Config Retrieved! ", + "policyConfigStatus": "CONFIG_RETRIEVED", + "policyName": "DCAE_FTL3B.Config_Green_Eggs_and_Ham_specific.5.xml", + "policyVersion": "5", + "property": null, + "responseAttributes": {}, + "type": "JSON" + }, + "DCAE_FTL3B.Config_In_Service": { + "config": { + "conflicting_key": "in_service_trumps!", + "in_service": true + }, + "matchingConditions": { + "ConfigName": "In_Service", + "testName": "dcae", + "ONAPName": "dcae" + }, + "policyConfigMessage": "Config Retrieved! ", + "policyConfigStatus": "CONFIG_RETRIEVED", + "policyName": "DCAE_FTL3B.Config_In_Service.1.xml", + "policyVersion": "1", + "property": null, + "responseAttributes": {}, + "type": "JSON" + } + } + } +} +``` diff --git a/oti/event-handler/README.md b/oti/event-handler/README.md new file mode 100644 index 0000000..0465e1b --- /dev/null +++ b/oti/event-handler/README.md @@ -0,0 +1,45 @@ +### ONAP Topology Interface (OTI) Handler + +ONAP Topology Interface (OTI) Handler is a python 3.6 web application designed to run as a microservice in the DCAE kubernetes platform. +This application serves as an event routing service for DCAE OTI events. It is packaged in a docker image and run as a docker container in the Kubernetes platform cluster. It’s main business logic is to store incoming events in its database, determine the destination(s) for each event and then perform event routing to the target service (poller/collector). +Information present in the event [VNF type, location CLLI of a VNF instance] determines which services will be notified with this event. Event routing logic uses information available in the platform services +– +cloudify (service orchestrator), consul (service discovery). +A postgreSQL database will be used by the application for storage and retrieval of current OTI events. + +Installation pre-requisites: + +1. OTI postgreSQL database +1. Cloudify service +1. Consul service + +External interfaces/services for application processing: + +1. Cloudify service +1. Consul service + +OTI handler provides a REST API that is provided by an external node port service with a nodeport ID: 30134 (or as specified in the blueprint). External clients can access OTI handler service with an idns name and kubernetes nodeport, + +e.g. https://{OTI Handler IDNS}:{node port}/ + +OTI Handler: +1. receives OTI Events from the ONAP Topology Interface (OTI) and uses them to +assign or unassign VNF instances to DCAE monitoring services (data collector component instances). +1. accepts requests to run a reconfig script in a Cloudify deployment's service component instances. +1. sends event notification for service component instances that will be deployed in docker containers as a Kubernetes StatefulSet +1. stores the received events and the active event distribution data in an application database +1. using historical OTI event records from consul KV store or application database, answers queries for current (latest, not undeployed) OTI Events +1. answers queries for policy or policies of a service_component, indexed by policy_id. +1. answers queries for service_component (fully-bound config) -- +the same results as available from Config Binding Service (CBS). +1. answers queries for service_component_all with historical OTI events and policies indexed by policy_id +(not available from the prior Config Binding Service (CBS)). + +OTI Handler's HTTPS interface is documented [here](./OTI-API.md). + +Post-installation verification: + +Application health check - +From any external client or a web browser, access the oti handler health check URL: https://OTI Handler IDNS:nodeport/healthcheck + +If the application responds properly to a health check query, it is ready to process requests from clients. diff --git a/oti/event-handler/create_schema.sql b/oti/event-handler/create_schema.sql new file mode 100644 index 0000000..17baac5 --- /dev/null +++ b/oti/event-handler/create_schema.sql @@ -0,0 +1,87 @@ +-- ================================================================================ +-- Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +-- ================================================================================ +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- ============LICENSE_END========================================================= + +CREATE SCHEMA IF NOT EXISTS dti AUTHORIZATION dti_admin; + +CREATE SEQUENCE IF NOT EXISTS dti.dtih_event_event_id_seq; + +CREATE TABLE IF NOT EXISTS dti.dtih_event +( + dtih_event_id integer NOT NULL DEFAULT nextval('dti.dtih_event_event_id_seq'::regclass), + create_ts timestamp with time zone DEFAULT now(), + event jsonb, + target_name character varying(80), + target_type character varying(50), + last_modified_ts timestamp with time zone, + location_clli character varying(50), + CONSTRAINT dtih_event_pkey PRIMARY KEY (dtih_event_id) +); + +CREATE UNIQUE INDEX IF NOT EXISTS "dtih_event_UK" + ON dti.dtih_event USING btree + (target_name, target_type) + TABLESPACE pg_default; + +CREATE SEQUENCE IF NOT EXISTS dti.dtih_event_ack_event_ack_id_seq; + +CREATE TABLE IF NOT EXISTS dti.dtih_event_ack +( + dtih_event_ack_id integer NOT NULL DEFAULT nextval('dti.dtih_event_ack_event_ack_id_seq'::regclass), + last_modified_ts timestamp with time zone DEFAULT now(), + k8s_cluster_fqdn character varying(80), + k8s_proxy_fqdn character varying(80), + k8s_pod_id character varying(80), + dtih_event_id integer, + k8s_namespace character varying(100), + k8s_service_name character varying(120), + k8s_service_port character varying(6), + create_ts timestamp with time zone, + action character varying(10), + service_component character varying(120), + deployment_id character varying(120), + container_type character varying(10), + docker_host character varying(120), + container_id character varying(120), + reconfig_script character varying(100), + CONSTRAINT dtih_event_ack_pkey PRIMARY KEY (dtih_event_ack_id), + CONSTRAINT event_ack_fk FOREIGN KEY (dtih_event_id) + REFERENCES dti.dtih_event (dtih_event_id) MATCH SIMPLE + ON UPDATE NO ACTION + ON DELETE RESTRICT +); + +CREATE INDEX IF NOT EXISTS fki_dtih_event_ack_fk + ON dti.dtih_event_ack USING btree + (dtih_event_id) + TABLESPACE pg_default; + +CREATE OR REPLACE FUNCTION dti.trigger_set_timestamp() +RETURNS TRIGGER AS $$ +BEGIN + NEW.last_modified_ts = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER set_timestamp +BEFORE UPDATE ON dti.dtih_event_ack +FOR EACH ROW +EXECUTE PROCEDURE dti.trigger_set_timestamp(); + +CREATE TRIGGER set_timestamp_evt +BEFORE UPDATE ON dti.dtih_event +FOR EACH ROW +EXECUTE PROCEDURE dti.trigger_set_timestamp(); diff --git a/oti/event-handler/etc/common_logger.config b/oti/event-handler/etc/common_logger.config new file mode 100644 index 0000000..65ea5ad --- /dev/null +++ b/oti/event-handler/etc/common_logger.config @@ -0,0 +1,57 @@ +# ================================================================================ +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + + +# You may change this file while your program is running and CommonLogger will automatically reconfigure accordingly. +# Changing these parameters may leave old log files lying around. + + +#--- Parameters that apply to all logs +# +# rotateMethod: time, size, stdout, stderr, none +#... Note: the following two parameters apply only when rotateMethod=time +# timeRotateIntervalType: S, M, H, D, W0 - W6, or midnight (seconds, minutes, hours, days, weekday (0=Monday), or midnight UTC) +# timeRotateInterval: >= 1 (1 means every timeRotateIntervalType, 2 every other, 3 every third, etc.) +#... Note: the following parameter applies only when rotateMethod=size +# sizeMaxBytes: >= 0 (0 means no limit, else maximum filesize in Bytes) +# backupCount: >= 0 (Number of rotated backup files to retain. If rotateMethod=time, 0 retains *all* backups. If rotateMethod=size, 0 retains *no* backups.) +# +rotateMethod = size +timeRotateIntervalType = midnight +timeRotateInterval = 1 +sizeMaxBytes = 10000000 +backupCount = 4 + + +#--- Parameters that define log filenames and their initial LogLevel threshold +#... Note: CommonLogger will exit if your process does not have permission to write to the file. +# + +error = logs/error.log +errorLogLevel = WARN +errorStyle = error + +metrics = logs/metrics.log +metricsLogLevel = INFO +metricsStyle = metrics + +audit = logs/audit.log +auditLogLevel = INFO +auditStyle = audit + +debug = logs/debug.log +debugLogLevel = DEBUG +debugStyle = debug diff --git a/oti/event-handler/etc/config.json b/oti/event-handler/etc/config.json new file mode 100644 index 0000000..dad2431 --- /dev/null +++ b/oti/event-handler/etc/config.json @@ -0,0 +1,44 @@ +{ + "version" : "1.0.0", + "wservice_port" : 9443, + "oti_handler" : { + "system" : "oti_handler", + "tls" : { + "cert_directory" : "/opt/app/oti_handler/certificate", + "cacert" : "/opt/app/oti_handler/certificate/cacert.pem", + "private_key" : "tls.key", + "server_cert" : "tls.crt", + "server_ca_chain" : "tls.chain.crt" + } + }, + "logging" : { + "version": 1, + "disable_existing_loggers": false, + "formatters": { + "local": { + "format": "%(asctime)s.%(msecs)03d %(levelname)+8s %(threadName)s %(name)s.%(funcName)s: %(message)s", + "datefmt": "%Y%m%d_%H%M%S" + } + }, + "handlers": { + "file": { + "class": "logging.handlers.RotatingFileHandler", + "formatter": "local", + "filename" : "logs/oti_handler.log", + "level": "DEBUG", + "maxBytes": 200000000, + "backupCount": 5, + "delay": true + } + }, + "loggers": { + "dti_handler" : { + "handlers": ["file"], + "propagate":false + } + }, + "root": { + "handlers": ["file"] + } + } +} diff --git a/oti/event-handler/otihandler/__init__.py b/oti/event-handler/otihandler/__init__.py new file mode 100644 index 0000000..87cf002 --- /dev/null +++ b/oti/event-handler/otihandler/__init__.py @@ -0,0 +1,15 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= diff --git a/oti/event-handler/otihandler/__main__.py b/oti/event-handler/otihandler/__main__.py new file mode 100644 index 0000000..59a7087 --- /dev/null +++ b/oti/event-handler/otihandler/__main__.py @@ -0,0 +1,74 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""run as server: python -m otihandler""" + +import logging +import os +import sys + +from otihandler.config import Config +from otihandler.onap.audit import Audit +from otihandler.web_server import DTIWeb +from otihandler.dbclient import DaoBase + + +class LogWriter(object): + """redirect the standard out + err to the logger""" + + def __init__(self, logger_func): + self.logger_func = logger_func + + def write(self, log_line): + """actual writer to be used in place of stdout or stderr""" + + log_line = log_line.rstrip() + if log_line: + self.logger_func(log_line) + + def flush(self): + """no real flushing of the buffer""" + + pass + + +def run_event_handler(): + """main run function for event_handler""" + + Config.load_from_file() + # Config.discover() + + logger = logging.getLogger("event_handler") + sys.stdout = LogWriter(logger.info) + sys.stderr = LogWriter(logger.error) + + logger.info("========== run_event_handler ==========") + app_version = os.getenv("APP_VER") + logger.info("app_version %s", app_version) + Audit.init(Config.get_system_name(), app_version, Config.LOGGER_CONFIG_FILE_PATH) + + logger.info("starting event_handler with config:") + logger.info(Audit.log_json_dumps(Config.config)) + + audit = Audit(req_message="start event_handler") + + audit = Audit(req_message="DB init start") + DaoBase.init_db(os.environ.get("DB_CONN_URL")) + + DTIWeb.run_forever(audit) + +if __name__ == "__main__": + run_event_handler() diff --git a/oti/event-handler/otihandler/cbs_rest.py b/oti/event-handler/otihandler/cbs_rest.py new file mode 100644 index 0000000..2e3e7de --- /dev/null +++ b/oti/event-handler/otihandler/cbs_rest.py @@ -0,0 +1,202 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""REST for high-level information retrievals from Consul KVs""" + +import copy +import logging + +from otihandler.consul_client import ConsulClient + + +class CBSRest(object): + _logger = logging.getLogger("oti_handler.cbs_rest") + + @staticmethod + def get_value(key, default=None): + """Wrap ConsulClient.get_value() to ignore exceptions.""" + + data = default + try: + data = ConsulClient.get_value(key) + except Exception as e: + pass + + return data + + @staticmethod + def get_kvs(key): + """Wrap ConsulClient.get_kvs() to ignore exceptions.""" + + data = {} + try: + data = ConsulClient.get_kvs(key, trim_prefix=True) + except Exception as e: + data = {} + + if not data: + data = {} + return data + + @staticmethod + def get_service_component(service_name): + """Get the fully-bound config for a service_name.""" + + return ConsulClient.get_service_component(service_name) + + @staticmethod + def get_service_component_all(service_name, service_location=None, policies_as_list=False): + """Get all Consul objects for a service_name.""" + + r_dict = ConsulClient.get_service_component_all(service_name, policies_as_list=policies_as_list) + if r_dict and r_dict.get('oti'): + r_dict['oti'] = CBSRest.get_oti(service_name, service_location=service_location) + return r_dict + + @staticmethod + def get_oti(service_name=None, vnf_type=None, vnf_id=None, service_location=None): + """ + Get DTI events. + + Parameters + ---------- + service_name : string + optional. The service component name assigned by dockerplugin to the component that is unique to the cloudify node instance and used in its Consul key(s). + vnf_type : string + optional, allows multiple values separated by commas. Gets DTI events for these vnf_type(s). + vnf_id : string + optional. Requires vnf_type also. Gets DTI event for this vnf_id. + service_location : string + optional, allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location. + If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided, + otherwise results are not location filtered. + + Returns + ------- + dict + Dictionary of DTI event(s). + If one vnf_type and vnf_id are both specified, then object returned will be just the one DTI event. + If one vnf_type is specified but not vnf_id, then DTI events will be keyed by vnf_id. + Otherwise the DTI events will be keyed by vnf_type, sub-keyed by vnf_id. + + """ + + lc_vnf_type = vnf_type + if vnf_type: + lc_vnf_type = vnf_type.lower() + + r_dict = {} + + want_locs = [] + if service_location: + want_locs = service_location.split(',') + + give_types = [] + if service_name: + if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node + try: + node_name = ConsulClient.lookup_service(service_name)[0].get("Node") + if node_name: + services = ConsulClient.lookup_node(node_name).get("Services") + if services: + for node_svc in list(services.keys()): + if "-component-dockerhost-" in node_svc or "_component_kubernetes_master" in node_svc: + want_locs = services[node_svc].get("Tags", []) + break + except: + pass + supported_types = ConsulClient.get_value(service_name + ":oti") + if supported_types: + supported_types = [type.lower() for type in list(supported_types.keys())] + if supported_types: + if lc_vnf_type: # If user specifies vnf_type(s), constrain to supported ones + for type in lc_vnf_type.split(','): + if type in supported_types: + give_types.append(type) + else: + give_types = supported_types + if not give_types or (len(give_types) == 1 and give_types[0] == ''): + return r_dict + elif lc_vnf_type: + give_types = lc_vnf_type.split(',') + + + # If they specified only one vnf_type ... + if lc_vnf_type and ',' not in lc_vnf_type: + type = give_types[0] + + # ... and vnf_id + if vnf_id: + # get just one vnf_id + t_dict = CBSRest.get_value("oti_events/" + type + "/" + vnf_id, default=None) + if t_dict: + event_loc = t_dict.get('dcae_service_location') + if not event_loc or not want_locs or event_loc in want_locs: + r_dict = copy.deepcopy(t_dict) + + # ... and not vnf_id + else: + # get all DTI events of just one type, indexed by vnf_id + t_dict = CBSRest.get_kvs("oti_events/" + type + "/") + if t_dict: + if not want_locs: + r_dict = copy.deepcopy(t_dict) + else: + for id in t_dict: + event_loc = t_dict[id].get('dcae_service_location') + if not event_loc or event_loc in want_locs: + r_dict[id] = copy.deepcopy(t_dict[id]) + + # If they did not specify either service_name or vnf_type (the only way give_types=[]) + elif not give_types: + # get all DTI events, indexed by vnf_type then vnf_id + t_dict = CBSRest.get_kvs("oti_events/") + if t_dict: + for type in t_dict: + for id in t_dict[type]: + if not vnf_id or vnf_id == id: + if want_locs: + event_loc = t_dict[type][id].get('dcae_service_location') + if not want_locs or not event_loc or event_loc in want_locs: + if type not in r_dict: + r_dict[type] = {} + r_dict[type][id] = copy.deepcopy(t_dict[type][id]) + + # If they speclfied multiple vnf_types + else: + # get all DTI events of give_types, indexed by vnf_type then vnf_id + for type in give_types: + t_dict = CBSRest.get_kvs("oti_events/" + type + "/") + if t_dict: + for id in t_dict: + if not vnf_id or vnf_id == id: + if want_locs: + event_loc = t_dict[id].get('dcae_service_location') + if not want_locs or not event_loc or event_loc in want_locs: + if type not in r_dict: + r_dict[type] = {} + r_dict[type][id] = copy.deepcopy(t_dict[id]) + + return r_dict + + @staticmethod + def get_policies(service_name, policy_id=None): + """Get one or all policies for a service_name.""" + + if policy_id: + return ConsulClient.get_value(service_name + ":policies/items/" + policy_id) + else: + return ConsulClient.get_kvs(service_name + ":policies/items/", trim_prefix=True) diff --git a/oti/event-handler/otihandler/cfy_client.py b/oti/event-handler/otihandler/cfy_client.py new file mode 100644 index 0000000..c823340 --- /dev/null +++ b/oti/event-handler/otihandler/cfy_client.py @@ -0,0 +1,638 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""Our client interface to Cloudify""" +import base64 +import copy +import json +import logging +import os + +import requests + +from otihandler.consul_client import ConsulClient + + +class CfyClientConsulError(RuntimeError): + pass + + +class CloudifyClient(object): + """quick replacement for cloudify_rest_client -- this supports pagination and scans all DCAE tenants""" + + def __init__(self, **kwargs): + self._protocol = kwargs.get('protocol', 'http') + self._host = kwargs.get('host') + self._port = kwargs.get('port') + self._headers = kwargs.get('headers') + + self.node_instances = self + + def list(self, **kwargs): + url_mask = "{}://{}:{}/api/v3.1/tenants".format(self._protocol, self._host, self._port) + # s = Session() + # req = Request('GET', url_mask, headers=self._headers) + # prepped = req.prepare() + # response = s.send(prepped,verify=False,timeout=30) + response = requests.get(url_mask, headers=self._headers, timeout=30) + obj = response.json() + tenants = [x["name"] for x in obj["items"]] + tenants_with_containers = [x for x in tenants if 'DCAE' in x] + + size = 1000 + url_mask = "{}://{}:{}/api/v3.1/node-instances?_size={}&_offset={}".format( + self._protocol, self._host, self._port, size, "{}") + if kwargs: + for (key,val) in kwargs.items(): + if isinstance(val, str): + url_mask = url_mask + '&{}={}'.format(key, val) + elif isinstance(val, list): + url_mask = url_mask + '&{}={}'.format(key, ','.join(val)) + + for tenant in tenants_with_containers: + self._headers_with_tenant = copy.deepcopy(self._headers) + self._headers_with_tenant['Tenant'] = tenant + + offset = 0 + total = 1 + while offset < total: + # s = Session() + # req = Request('GET', url_mask.format(offset), headers=self._headers_with_tenant) + # prepped = req.prepare() + # response = s.send(prepped, verify=False, timeout=30) + response = requests.get(url_mask.format(offset), headers=self._headers_with_tenant, timeout=30) + response.raise_for_status() + obj = response.json() + offset = offset + len(obj["items"]) + total = obj["metadata"]["pagination"]["total"] + for item in obj["items"]: + yield NodeInstance(item) + + def update_node_instance(self, node_instance_id, body, **kwargs): + headers = copy.deepcopy(self._headers_with_tenant) + headers['Content-Type'] = "application/json" + url_mask = "{}://{}:{}/api/v3.1/node-instances/{}".format( + self._protocol, self._host, self._port, node_instance_id) + response = requests.patch(url_mask, json=body, headers=headers, timeout=30) + obj = response.json() + return obj + + +class NodeInstance(object): + """quick replacement for cloudify_rest_client""" + + def __init__(self, instance): + self.id = instance.get("id") + self.deployment_id = instance.get("deployment_id") + self.host_id = instance.get("host_id") + self.runtime_properties = instance.get("runtime_properties") + self.relationships = instance.get("relationships") + self.state = instance.get("state") + self.version = instance.get("version") + self.node_id = instance.get("node_id") + self.scaling_groups = instance.get("scaling_groups") + + +class CfyClient(object): + _logger = logging.getLogger("oti_handler.cfy_client") + _client = None + + + @staticmethod + def __set_cloudify_manager_client(): + """Create connection to Cloudify_Manager.""" + + if CfyClient._client: + return + + host = None + port = None + obj = json.loads(os.environ.get("CLOUDIFY", "{}")).get("cloudify") + source = "CLOUDIFY environment variable" + if not obj: + CM_KEY = 'cloudify_manager' + source = "Consul key '{}'".format(CM_KEY) + + try: + results = ConsulClient.lookup_service(CM_KEY) + except Exception as e: + msg = "Unexpected exception {}: {!s} from ConsulClient.lookup_service({})".format(type(e).__name__, e, CM_KEY) + CfyClient._logger.error(msg) + raise CfyClientConsulError(msg) + result = results[0] + host = result['ServiceAddress'] + port = result['ServicePort'] + + try: + obj = ConsulClient.get_value(CM_KEY) + except Exception as e: + msg = "Unexpected exception {}: {!s} from ConsulClient.get_value({})".format(type(e).__name__, e, CM_KEY) + CfyClient._logger.error(msg) + raise CfyClientConsulError(msg) + if not obj: + raise CfyClientConsulError("{} value is empty or invalid".format(source)) + + obj = obj.get('cloudify') + + if not obj: + raise CfyClientConsulError("{} value is missing 'cloudify' key or value".format(source)) + + host = obj.get('address', host) + if not host: + raise CfyClientConsulError("{} value is missing 'cloudify.address'".format(source)) + + port = obj.get('port', port) + if not port: + raise CfyClientConsulError("{} value is missing 'cloudify.port'".format(source)) + + protocol = obj.get('protocol') + if not protocol: + raise CfyClientConsulError("{} value is missing 'cloudify.protocol'".format(source)) + username = obj.get('user') + if not username: + raise CfyClientConsulError("{} value is missing 'cloudify.user'".format(source)) + password = obj.get('password') + if not password: + raise CfyClientConsulError("{} value is missing 'cloudify.password'".format(source)) + + b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), 'utf-8')).decode("utf-8") + headers = {'Authorization': 'Basic ' + b64_encoded_str.rstrip('\n')} + #headers = {'Authorization': 'Basic ' + '{}:{}'.format(username, password).encode("base64").rstrip('\n')} + + CfyClient._client = CloudifyClient(host=host, port=port, protocol=protocol, headers=headers) + + + @staticmethod + def query_k8_components(in_cluster_fqdn): + """ + Iterate components that belong to a cluster fqdn. + + Parameters + ---------- + in_cluster_fqdn : string + k8s cluster FQDN + + Returns + ------- + A generator of tuples of component information + [ (proxy_fqdn, namespace, scn, replicas, scn_port), ... ] + """ + + cnt_found = 0 + CfyClient.__set_cloudify_manager_client() + for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"): + rtp = node_instance.runtime_properties + scn_port = None + cluster_fqdn = None + proxy_fqdn = None + dti_info = rtp.get('dti_info') + if dti_info: + env_items = dti_info.get('env') + for env in env_items: + if env.get("name") == 'KUBE_CLUSTER_FQDN': + cluster_fqdn = env.get("value") + if env.get("name") == 'KUBE_PROXY_FQDN': + proxy_fqdn = env.get("value") + ports = dti_info.get('ports') + if ports: + scn_port = ports[0].split(':')[0] + else: + continue + + if in_cluster_fqdn != cluster_fqdn: + continue + + controller_type = rtp.get('k8s_controller_type') + if not controller_type: + CfyClient._logger.debug("controller type is missing") + continue + elif controller_type != "statefulset": + CfyClient._logger.debug("not a stateful set") + continue + + container_id = rtp.get('k8s_deployment') + if not container_id: + CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format( + node_instance.deployment_id, node_instance.id)) + continue + + try: + namespace = container_id.get('namespace') + except: + namespace = '' + pass + + replicas = 1 + try: + replicas = rtp.get('replicas') + except: + pass + + scn = rtp.get('service_component_name') + if not scn: + CfyClient._logger.debug( + "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, + node_instance.id)) + continue + + cnt_found += 1 + yield (proxy_fqdn, namespace, scn, replicas, scn_port) + continue + + msg = "Found {} components (collectors) for cluster={}" \ + .format(cnt_found, in_cluster_fqdn) + CfyClient._logger.debug(msg) + + + @staticmethod + def iter_components(dcae_target_type, dcae_service_location='', component_type=''): + """ + Iterate components that handle a given dcae_target_type. + + Parameters + ---------- + dcae_target_type : string + VNF Type + dcae_service_location : string + Location of the component (optional) + component_type : string + Type of the component (optional) + + Returns + ------- + A generator of tuples of component information + [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ] + or + [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ] + + """ + + cnt_found = 0 + + # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI) + dockerhosts = [] + k8s_svcs_tagged_with_clli = [] + if dcae_service_location: + try: + dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location]) + except Exception as e: + msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "-component-dockerhost-", [dcae_service_location]) + CfyClient._logger.error(msg) + raise CfyClientConsulError(msg) + try: + k8s_svcs_tagged_with_clli = ConsulClient.search_services("_component_kubernetes_master", [dcae_service_location]) + except Exception as e: + msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "_component_kubernetes_master", [dcae_service_location]) + CfyClient._logger.error(msg) + raise CfyClientConsulError(msg) + + CfyClient.__set_cloudify_manager_client() + for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"): + rtp = node_instance.runtime_properties + + # Skip this node_instance if it is not a collector + container_type = "docker" + container_id = rtp.get('container_id') + docker_host = '' + svc_with_my_clli_tags = '' + if not container_id: + container_type = "k8s" + container_id = rtp.get('k8s_deployment') + if not container_id: + CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id)) + continue + docker_config = rtp.get('docker_config') + if not docker_config: + CfyClient._logger.debug("{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, node_instance.id)) + continue + dti_reconfig_script = "" + if container_type == "docker": + dti_reconfig_script = rtp.get('dti_reconfig_script') + if not dti_reconfig_script: + CfyClient._logger.debug("{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, node_instance.id)) + continue + elif container_type == "k8s": + dti_reconfig_script = docker_config.get('reconfigs',{}).get('dti') + if not dti_reconfig_script: + CfyClient._logger.debug("{} {} runtime_properties docker_config has no reconfigs.dti".format(node_instance.deployment_id, node_instance.id)) + continue + + scn = rtp.get('service_component_name') + scn_address = None + scn_port = None + if not scn: + CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id)) + continue + if container_type == "docker": + docker_host = rtp.get('selected_container_destination') + if not docker_host: + CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id)) + continue + elif container_type == "k8s": + try: + srvcCatalogItem = ConsulClient.lookup_service(scn)[0] + scn_address = srvcCatalogItem.get("ServiceAddress") + except: + CfyClient._logger.debug( + "{} {} runtime_properties has no consul svc catalog registry".format(node_instance.deployment_id, + node_instance.id)) + continue + svc_with_my_clli_tags = rtp.get('svc_with_my_clli_tags') + # e.g., scn="s908d92e232ed43..." + if not svc_with_my_clli_tags: + # We should not incur this burden. k8splugin should store this into runtime properties. + try: + node_name = srvcCatalogItem.get("Node") + if node_name: + # e.g., node_name="zldcdyh1adce3kpma00" + services = ConsulClient.lookup_node(node_name).get("Services") + if services: + for node_svc in list(services.keys()): + if "_component_kubernetes_master" in node_svc: + # e.g., node_svc="zldcdyh1adce3_kp_component_kubernetes_master" + svc_with_my_clli_tags = node_svc + break + except: + pass + # ... cache results we find into runtime properties to avoid searching again + if svc_with_my_clli_tags: + CfyClient._logger.debug("{} {} storing runtime property svc_with_my_clli_tags={}".format( + node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags)) + rtp['svc_with_my_clli_tags'] = svc_with_my_clli_tags + body = { + "runtime_properties": rtp, + "state": node_instance.state, + "version": 1 + int(node_instance.version) + } + try: + CfyClient._client.update_node_instance(node_instance.id, body) + except: + pass + + if not svc_with_my_clli_tags: + CfyClient._logger.debug("{} {} runtime_properties has no svc_with_my_clli_tags".format(node_instance.deployment_id, node_instance.id)) + continue + + # get the nodeport for statefulset sidecar service + dti_info = rtp.get('dti_info') + if dti_info: + ports = dti_info.get('ports') + if ports: + scn_port = ports[0].split(':')[1] + docker_host = rtp.get('configuration',{}).get('file_content') + if not docker_host: + CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id)) + continue + + # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG + if dcae_service_location: + if container_type == "docker" and docker_host not in dockerhosts: + CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}" + .format(node_instance.deployment_id, node_instance.id, docker_host, dcae_service_location)) + continue + elif container_type == "k8s" and svc_with_my_clli_tags not in k8s_svcs_tagged_with_clli: + CfyClient._logger.debug("{} {} svc_with_my_clli_tags {} is not TAGged with DTI Event dcae_service_location {}" + .format(node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags, dcae_service_location)) + continue + + # If DTI Event specifies component_type, then collector's service_component_type must match + if component_type: + c_component_type = rtp.get('service_component_type') + if component_type != c_component_type: + CfyClient._logger.debug("{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id)) + continue + + # Check if the collector supports this VNF Type + # scn:dti Consul key is authoritative for vnfTypes that a collector supports (not docker_config) + dti_key = scn + ':dti' + try: + obj = ConsulClient.get_value(dti_key) + except Exception as e: + CfyClient._logger.error( + "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}" + .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id) + ) + continue + if not obj: + CfyClient._logger.debug("{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, dti_key)) + continue + obj_types = set(k.lower() for k in obj) + if dcae_target_type.lower() in obj_types: + CfyClient._logger.debug("{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, dcae_target_type)) + cnt_found += 1 + yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, dti_reconfig_script, container_type, scn_address, scn_port ) + continue + else: + CfyClient._logger.debug("{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, dcae_target_type, dti_key)) + continue + + msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}"\ + .format(cnt_found, dcae_target_type, dcae_service_location, component_type) + CfyClient._logger.debug(msg) + + @staticmethod + def iter_components_for_docker(dcae_target_type, dcae_service_location='', component_type=''): + """ + Iterate components that handle a given dcae_target_type to find the components of docker type + + Parameters + ---------- + dcae_target_type : string + VNF Type + dcae_service_location : string + Location of the component (optional) + component_type : string + Type of the component (optional) + + Returns + ------- + A generator of tuples of component information + [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ] + + """ + + cnt_found = 0 + # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI) + dockerhosts = [] + + if dcae_service_location: + try: + dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location]) + except Exception as e: + msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format( + type(e).__name__, e, "-component-dockerhost-", [dcae_service_location]) + CfyClient._logger.error(msg) + raise CfyClientConsulError(msg) + + CfyClient.__set_cloudify_manager_client() + for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"): + rtp = node_instance.runtime_properties + + # Skip this node_instance if it is not a collector + container_type = "docker" + container_id = rtp.get('container_id') + if not container_id: + if not container_id: + CfyClient._logger.debug("{} {} runtime_properties has no container_id".format( + node_instance.deployment_id, node_instance.id)) + continue + docker_config = rtp.get('docker_config') + if not docker_config: + CfyClient._logger.debug( + "{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, + node_instance.id)) + continue + dti_reconfig_script = "" + dti_reconfig_script = rtp.get('dti_reconfig_script') + if not dti_reconfig_script: + CfyClient._logger.debug( + "{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, + node_instance.id)) + continue + scn = rtp.get('service_component_name') + if not scn: + CfyClient._logger.debug( + "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, + node_instance.id)) + continue + docker_host = rtp.get('selected_container_destination') + if not docker_host: + CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format( + node_instance.deployment_id, node_instance.id)) + continue + + # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG + if dcae_service_location: + if docker_host not in dockerhosts: + CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}" + .format(node_instance.deployment_id, node_instance.id, docker_host, + dcae_service_location)) + continue + + # If DTI Event specifies component_type, then collector's service_component_type must match + if component_type: + c_component_type = rtp.get('service_component_type') + if component_type != c_component_type: + CfyClient._logger.debug( + "{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id)) + continue + + # Check if the collector supports this VNF Type + # scn:dti Consul key is authoritative for vnfTypes that a collector supports (not docker_config) + dti_key = scn + ':dti' + try: + obj = ConsulClient.get_value(dti_key) + except Exception as e: + CfyClient._logger.error( + "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}" + .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id) + ) + continue + if not obj: + CfyClient._logger.debug( + "{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, + dti_key)) + continue + obj_types = set(k.lower() for k in obj) + if dcae_target_type.lower() in obj_types: + CfyClient._logger.debug( + "{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, + dcae_target_type)) + cnt_found += 1 + yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, + node_instance.state, docker_host, dti_reconfig_script, container_type, '', '') + continue + else: + CfyClient._logger.debug( + "{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, + dcae_target_type, dti_key)) + continue + + msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}" \ + .format(cnt_found, dcae_target_type, dcae_service_location, component_type) + CfyClient._logger.debug(msg) + + + @staticmethod + def iter_components_of_deployment(deployment_id, node_id=None, reconfig_type="app"): + """ + Iterate components of a specific deployment_id. + + Parameters + ---------- + deployment_id : string + Cloudify deployment ID that created the component(s). + node_id : string + Cloudify node ID that created the component. + reconfig_type : string + "app" + + Returns + ------- + A generator of tuples of component information + [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ] + or + [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ] + + """ + + cnt_found = 0 + + CfyClient.__set_cloudify_manager_client() + for node_instance in CfyClient._client.node_instances.list( + deployment_id=deployment_id, + _include=['id','node_id','deployment_id','state','runtime_properties'] + ): + if node_id and node_instance.node_id != node_id: + continue + + rtp = node_instance.runtime_properties + + # Skip this node_instance if it is not a collector + container_type = "docker" + container_id = rtp.get('container_id') + if not container_id: + container_type = "k8s" + container_id = rtp.get('k8s_deployment') + if not container_id: + CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id)) + continue + reconfig_script = rtp.get('docker_config',{}).get('reconfigs',{}).get(reconfig_type) + if not reconfig_script: + CfyClient._logger.debug("{} {} runtime_properties has no docker_config.reconfigs.{}".format(node_instance.deployment_id, node_instance.id, reconfig_type)) + continue + scn = rtp.get('service_component_name') + if not scn: + CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id)) + continue + if container_type == "docker": + docker_host = rtp.get('selected_container_destination') + if not docker_host: + CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id)) + continue + elif container_type == "k8s": + docker_host = rtp.get('configuration',{}).get('file_content') + if not docker_host: + CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id)) + continue + + CfyClient._logger.debug("{} {} is a {}-reconfigurable component".format(node_instance.deployment_id, node_instance.id, reconfig_type)) + cnt_found += 1 + yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, reconfig_script, container_type) + continue + + msg = "Found {} {}-reconfigurable components".format(cnt_found, reconfig_type) + CfyClient._logger.debug(msg) diff --git a/oti/event-handler/otihandler/config.py b/oti/event-handler/otihandler/config.py new file mode 100644 index 0000000..d5149cc --- /dev/null +++ b/oti/event-handler/otihandler/config.py @@ -0,0 +1,179 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""read and use the config""" + +import copy +import json +import logging +import logging.config +import os + +from otihandler.consul_client import ConsulClient + +logging.basicConfig( + filename='logs/oti_handler.log', \ + format='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \ + '%(threadName)s %(name)s.%(funcName)s: %(message)s', \ + datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG) + +class Config(object): + """main config of the application""" + + CONFIG_FILE_PATH = "etc/config.json" + LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config" + SERVICE_NAME = "oti_handler" + + FIELD_SYSTEM = "system" + FIELD_WSERVICE_PORT = "wservice_port" + FIELD_TLS = "tls" + + _logger = logging.getLogger("oti_handler.config") + config = None + + cloudify_proto = None + cloudify_addr = None + cloudify_port = None + cloudify_user = None + cloudify_pass = None + cloudify = None + consul_url = "http://consul:8500" + tls_cacert_file = None + tls_server_cert_file = None + tls_private_key_file = None + tls_server_ca_chain_file = None + wservice_port = 9443 + + @staticmethod + def _get_tls_file_path(tls_config, cert_directory, tls_name): + """calc file path and verify its existance""" + + file_name = tls_config.get(tls_name) + if not file_name: + return None + tls_file_path = os.path.join(cert_directory, file_name) + if not os.path.isfile(tls_file_path) or not os.access(tls_file_path, os.R_OK): + Config._logger.error("invalid %s: %s", tls_name, tls_file_path) + return None + return tls_file_path + + @staticmethod + def _set_tls_config(tls_config): + """verify and set tls certs in config""" + + try: + Config.tls_cacert_file = None + Config.tls_server_cert_file = None + Config.tls_private_key_file = None + Config.tls_server_ca_chain_file = None + + if not (tls_config and isinstance(tls_config, dict)): + Config._logger.info("no tls in config: %s", json.dumps(tls_config)) + return + + cert_directory = tls_config.get("cert_directory") + + if not (cert_directory and isinstance(cert_directory, str)): + Config._logger.warning("unexpected tls.cert_directory: %r", cert_directory) + return + + cert_directory = os.path.join( + os.path.dirname(os.path.dirname(os.path.realpath(__file__))), str(cert_directory)) + if not (cert_directory and os.path.isdir(cert_directory)): + Config._logger.warning("ignoring invalid cert_directory: %s", cert_directory) + return + + Config.tls_cacert_file = Config._get_tls_file_path(tls_config, cert_directory, "cacert") + Config.tls_server_cert_file = Config._get_tls_file_path(tls_config, cert_directory, + "server_cert") + Config.tls_private_key_file = Config._get_tls_file_path(tls_config, cert_directory, + "private_key") + Config.tls_server_ca_chain_file = Config._get_tls_file_path(tls_config, cert_directory, + "server_ca_chain") + + finally: + Config._logger.info("tls_cacert_file = %s", Config.tls_cacert_file) + Config._logger.info("tls_server_cert_file = %s", Config.tls_server_cert_file) + Config._logger.info("tls_private_key_file = %s", Config.tls_private_key_file) + Config._logger.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file) + + @staticmethod + def merge(new_config): + """merge the new_config into current config - override the values""" + + if not new_config: + return + + if not Config.config: + Config.config = new_config + return + + new_config = copy.deepcopy(new_config) + Config.config.update(new_config) + + @staticmethod + def get_system_name(): + """find the name of the oti_handler system + to be used as the key in consul-kv for config of oti_handler + """ + + return (Config.config or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME) + + @staticmethod + def discover(): + """bring and merge the config settings from Consul""" + + system_key = Config.get_system_name() + new_config = ConsulClient.get_value(system_key) + + if not new_config or not isinstance(new_config, dict): + Config._logger.warn("unexpected config from Consul: %s", new_config) + return + + Config._logger.debug("loaded config from Consul(%s): %s", + system_key, json.dumps(new_config)) + Config._logger.debug("config before merge from Consul: %s", json.dumps(Config.config)) + Config.merge(new_config.get(Config.SERVICE_NAME)) + Config._logger.debug("merged config from Consul: %s", json.dumps(Config.config)) + + @staticmethod + def load_from_file(file_path=None): + """read and store the config from config file""" + + if not file_path: + file_path = Config.CONFIG_FILE_PATH + + loaded_config = None + if os.access(file_path, os.R_OK): + with open(file_path, 'r') as config_json: + loaded_config = json.load(config_json) + + if not loaded_config: + Config._logger.info("config not loaded from file: %s", file_path) + return + + Config._logger.info("config loaded from file: %s", file_path) + logging_config = loaded_config.get("logging") + if logging_config: + logging.config.dictConfig(logging_config) + + Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port) + + local_config = loaded_config.get(Config.SERVICE_NAME, {}) + Config._set_tls_config(local_config.get(Config.FIELD_TLS)) + + Config.merge(loaded_config.get(Config.SERVICE_NAME)) + return True diff --git a/oti/event-handler/otihandler/consul_client.py b/oti/event-handler/otihandler/consul_client.py new file mode 100644 index 0000000..d26d3a1 --- /dev/null +++ b/oti/event-handler/otihandler/consul_client.py @@ -0,0 +1,617 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""client to talk to consul at consul port 8500""" + +import base64 +import copy +import json +import logging +import os +import re +import socket + +import requests + + +class ConsulClientError(RuntimeError): + pass + +class ConsulClientConnectionError(RuntimeError): + pass + +class ConsulClientServiceNotFoundError(RuntimeError): + pass + +class ConsulClientNodeNotFoundError(RuntimeError): + pass + +class ConsulClientKVEntryNotFoundError(RuntimeError): + pass + + +class ConsulClient(object): + """talking to consul""" + + CONSUL_SERVICE_MASK = "{}/v1/catalog/service/{}" + CONSUL_KV_MASK = "{}/v1/kv/{}" + CONSUL_KVS_MASK = "{}/v1/kv/{}?recurse=true" + CONSUL_TRANSACTION_URL = "{}/v1/txn" + _logger = logging.getLogger("oti_handler.consul_client") + + MAX_OPS_PER_TXN = 64 + # MAX_VALUE_LEN = 512 * 1000 + + OPERATION_SET = "set" + OPERATION_DELETE = "delete" + OPERATION_DELETE_FOLDER = "delete-tree" + + + #----- Methods for Consul services + + @staticmethod + def lookup_service(service_name): + """find the service record in consul""" + + service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name) + + ConsulClient._logger.info("lookup_service(%s)", service_path) + + try: + response = requests.get(service_path, timeout=30) + response.raise_for_status() + # except requests.exceptions.HTTPError as e: + # except requests.exceptions.ConnectionError as e: + # except requests.exceptions.Timeout as e: + except requests.exceptions.RequestException as e: + msg = "lookup_service({}) requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + return_list = response.json() + # except ValueError as e: + except Exception as e: + msg = "lookup_service({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + if not return_list: + msg = "lookup_service({}) got empty or no value from requests.get({})".format( + service_name, service_path) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + return return_list + + + @staticmethod + def get_all_services(): + """List all services from consul""" + + service_path = "{}/v1/catalog/services".format(os.environ.get("CONSUL_URL").rstrip("/")) + + ConsulClient._logger.info("get_all_services(%s)", service_path) + + try: + response = requests.get(service_path, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "get_all_services() requests.get({}) threw exception {}: {!s}".format( + service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + return_dict = response.json() + except Exception as e: + msg = "get_all_services() parsing JSON from requests.get({}) threw exception {}: {!s}".format( + service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + if not return_dict: + msg = "get_all_services() got empty or no value from requests.get({})".format( + service_path) + ConsulClient._logger.info(msg) + # raise ConsulClientServiceNotFoundError(msg) + + return return_dict + + + @staticmethod + def _find_matching_services(services, name_search, tags): + """Find matching services given search criteria""" + sub_tags = tags[0][4:6] + tags.append(sub_tags) + + def is_match(service): + srv_name, srv_tags = service + return name_search in srv_name and \ + any([tag in srv_tags for tag in tags]) + + return [ srv[0] for srv in list(services.items()) if is_match(srv) ] + + + @staticmethod + def search_services(name_search, tags): + """ + Search for services that match criteria + + Args: + ----- + name_search: (string) Name to search for as a substring + tags: (list) List of strings that are tags. A service must match **ANY OF** the + tags in the list. + + Returns: + -------- + List of names of services that matched + """ + + matches = [] + + # srvs is dict where key is service name and value is list of tags + srvs = ConsulClient.get_all_services() + + if srvs: + matches = ConsulClient._find_matching_services(srvs, name_search, tags) + + return matches + + + @staticmethod + def get_service_fqdn_port(service_name, node_meta=False): + """find the service record in consul""" + + service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name) + + ConsulClient._logger.info("get_service_fqdn_port(%s)", service_path) + + try: + response = requests.get(service_path, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "get_service_fqdn_port({}) requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + service = response.json() + except Exception as e: + msg = "get_service_fqdn_port({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + if not service: + msg = "get_service_fqdn_port({}) got empty or no value from requests.get({})".format( + service_name, service_path) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + try: + service = service[0] # arbitrarily choose the first one + port = service["ServicePort"] + + # HTTPS certificate validation requires FQDN not IP address + fqdn = "" + if node_meta: + meta = service.get("NodeMeta") + if meta: + fqdn = meta.get("fqdn") + if not fqdn: + fqdn = socket.getfqdn(str(service["ServiceAddress"])) + except Exception as e: + msg = "get_service_fqdn_port({}) parsing result from requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + return (fqdn, port) + + + #----- Methods for Consul nodes + + @staticmethod + def lookup_node(node_name): + """find the node record in consul""" + + node_path = "{}/v1/catalog/node/{}".format(os.environ.get("CONSUL_URL").rstrip("/"), node_name) + + ConsulClient._logger.info("lookup_node(%s)", node_path) + + try: + response = requests.get(node_path, timeout=30) + response.raise_for_status() + # except requests.exceptions.HTTPError as e: + # except requests.exceptions.ConnectionError as e: + # except requests.exceptions.Timeout as e: + except requests.exceptions.RequestException as e: + msg = "lookup_node({}) requests.get({}) threw exception {}: {!s}".format( + node_name, node_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + return_dict = response.json() + # except ValueError as e: + except Exception as e: + msg = "lookup_node({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + node_name, node_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientNodeNotFoundError(msg) + + if not return_dict: + msg = "lookup_node({}) got empty or no value from requests.get({})".format( + node_name, node_path) + ConsulClient._logger.error(msg) + raise ConsulClientNodeNotFoundError(msg) + + return return_dict + + + #----- Methods for Consul key-values + + @staticmethod + def put_value(key, data, cas=None): + """put the value for key into consul-kv""" + + # ConsulClient._logger.info("put_value(%s)", str(key)) + + URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key) + if cas is not None: + URL = '{}?cas={}'.format(URL, cas) + + try: + response = requests.put(URL, data=json.dumps(data), timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "put_value({}) requests.put({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + updated = response.json() + except Exception as e: + msg = "put_value({}) parsing JSON from requests.put({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + return updated + + + @staticmethod + def get_value(key, get_index=False): + """get the value for key from consul-kv""" + + URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key) + + try: + response = requests.get(URL, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "get_value({}) requests.get({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + data = response.json() + except Exception as e: + msg = "get_value({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + if not data: + msg = "get_value({}) got empty or no value from requests.get({})".format( + key, URL) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + try: + value = base64.b64decode(data[0]["Value"]).decode("utf-8") + value_dict = json.loads(value) + except Exception as e: + msg = "get_value({}) decoding value from requests.get({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + ConsulClient._logger.info("consul-kv key=%s value(%s) data=%s", + key, value, json.dumps(data)) + + if get_index: + return data[0]["ModifyIndex"], value_dict + + return value_dict + + + @staticmethod + def get_kvs(prefix, nest=True, trim_prefix=False): + """get key-values for keys beginning with prefix from consul-kv""" + + URL = ConsulClient.CONSUL_KVS_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), prefix) + + try: + response = requests.get(URL, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "get_kvs({}) requests.get({}) threw exception {}: {!s}".format( + prefix, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + data = response.json() + except Exception as e: + msg = "get_kvs({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + prefix, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + if not data: + msg = "get_kvs({}) got empty or no value from requests.get({})".format( + prefix, URL) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + def put_level_value(level_keys, value, level_dict={}): + if level_keys: + key = level_keys.pop(0) + level_dict[key] = put_level_value(level_keys, value, level_dict.get(key, {})) + return level_dict + else: + return value + + rdict = {} + for item in data: + v = base64.b64decode(item["Value"]).decode("utf-8") + try: + value = json.loads(v) + except Exception as e: + value = v + key = item['Key'] + if trim_prefix: + key = key[len(prefix):] + if nest: + level_keys = key.split('/') + rdict = put_level_value(level_keys, value, rdict) + else: + rdict[key] = value + + ConsulClient._logger.info("consul-kv prefix=%s value(%s) data=%s", + prefix, json.dumps(rdict), json.dumps(data)) + return rdict + + + @staticmethod + def _gen_txn_operation(verb, key, value=None): + """returns the properly formatted operation to be used inside transaction""" + + # key = urllib.quote(key) # can't use urllib.quote() because it kills ':' in the key + if value: + return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(value)}} + return {"KV": {"Verb": verb, "Key": key}} + + + @staticmethod + def _run_transaction(operation_name, txn): + """run a single transaction of several operations at consul /txn""" + + if not txn: + return + + txn_url = ConsulClient.CONSUL_TRANSACTION_URL.format(os.environ.get("CONSUL_URL").rstrip("/")) + response = None + try: + response = requests.put(txn_url, json=txn, timeout=30) + except requests.exceptions.RequestException as e: + ConsulClient._logger.error("failed to {} at {}: exception {}: {!s} on txn={}" + .format(operation_name, txn_url, type(e).__name__, e, json.dumps(txn))) + return + + if response.status_code != requests.codes.ok: + ConsulClient._logger.error("failed {} {}: {} text={} txn={} headers={}" + .format(operation_name, txn_url, response.status_code, + response.text, json.dumps(txn), + json.dumps(dict(list(response.request.headers.items()))))) + return + + ConsulClient._logger.info("response for {} {}: {} text={} txn={} headers={}" + .format(operation_name, txn_url, response.status_code, + response.text, json.dumps(txn), + json.dumps(dict(list(response.request.headers.items()))))) + + return True + + + @staticmethod + def store_kvs(kvs): + """put kvs into consul-kv""" + + if not kvs: + ConsulClient._logger.warn("kvs not supplied to store_kvs()") + return + + store_kvs = [ + ConsulClient._gen_txn_operation(ConsulClient.OPERATION_SET, + key, json.dumps(value)) + for key, value in kvs.items() + ] + txn = [] + idx_step = ConsulClient.MAX_OPS_PER_TXN - len(txn) + for idx in range(0, len(store_kvs), idx_step): + txn += store_kvs[idx : idx + idx_step] + if not ConsulClient._run_transaction("store_kvs", txn): + return False + txn = [] + + return ConsulClient._run_transaction("store_kvs", txn) + + + @staticmethod + def delete_key(key): + """delete key from consul-kv""" + + if not key: + ConsulClient._logger.warn("key not supplied to delete_key()") + return + + delete_key = [ + ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE, key) + ] + return ConsulClient._run_transaction("delete_key", delete_key) + + + @staticmethod + def delete_kvs(key): + """delete key from consul-kv""" + + if not key: + ConsulClient._logger.warn("key not supplied to delete_kvs()") + return + + delete_kvs = [ + ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE_FOLDER, key) + ] + return ConsulClient._run_transaction("delete_kvs", delete_kvs) + + + #----- Methods for Config Binding Service + + @staticmethod + def get_service_component(scn): + config = json.dumps(ConsulClient.get_value(scn)) + + try: + dmaap = ConsulClient.get_value(scn + ":dmaap") + except Exception as e: + dmaap = None + if dmaap: + for key in list(dmaap.keys()): + config = re.sub('"<<' + key + '>>"', json.dumps(dmaap[key]), config) + + try: + rel = ConsulClient.get_value(scn + ":rel") + except Exception as e: + rel = None + if rel: + for key in list(rel.keys()): + config = re.sub('"{{' + key + '}}"', json.dumps(rel[key]), config) + + return json.loads(config) + + + @staticmethod + def get_service_component_all(scn, policies_as_list=True): + t_scn = scn + ":" + t_len = len(t_scn) + a_dict = ConsulClient.get_kvs(scn) + b_dict = {} + for key in a_dict: + b_key = None + if key == scn: + b_dict["config"] = ConsulClient.get_service_component(scn) + elif key == scn + ":dmaap": + continue + elif key[0:t_len] == t_scn: + b_key = key[t_len:] + # policies_as_list = True formats policies items in a list like ONAP's CBS; False keeps policy_ids keys + if policies_as_list and b_key == "policies": # convert items from KVs to a values list + b_dict[b_key] = {} + for sub_key in a_dict[key]: + if sub_key == "items": + b_dict[b_key][sub_key] = [] + d_dict = a_dict[key][sub_key] + for item in sorted(d_dict.keys()): # old CBS sorted them so we emulate + b_dict[b_key][sub_key].append(d_dict[item]) + else: + b_dict[b_key][sub_key] = copy.deepcopy(a_dict[key][sub_key]) + else: + b_dict[b_key] = copy.deepcopy(a_dict[key]) + return b_dict + + + @staticmethod + def add_vnf_id(scn, vnf_type, vnf_id, dti_dict): + """ + Add VNF instance to Consul scn:dti key. + + Treat its value as a JSON string representing a dict. + Extend the dict by adding a dti_dict for vnf_id under vnf_type. + Turn the resulting extended dict into a JSON string. + Store the string back into Consul under scn:dti key. + Watch out for conflicting concurrent updates. + """ + + key = scn + ':dti' + lc_vnf_type = vnf_type.lower() + while True: # do until update succeeds + (mod_index, v) = ConsulClient.get_value(key, get_index=True) + lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case + # but DCAE-C doesn't create such keys + + if lc_vnf_type not in lc_v: + return # That VNF type is not supported by this component + lc_v[lc_vnf_type][vnf_id] = dti_dict # add or replace the VNF instance + + updated = ConsulClient.put_value(key, lc_v, cas=mod_index) + if updated: + return lc_v + + + @staticmethod + def delete_vnf_id(scn, vnf_type, vnf_id): + """ + Delete VNF instance from Consul scn:dti key. + + Treat its value as a JSON string representing a dict. + Modify the dict by deleting the vnf_id key entry from under vnf_type. + Turn the resulting extended dict into a JSON string. + Store the string back into Consul under scn:dti key. + Watch out for conflicting concurrent updates. + """ + + key = scn + ':dti' + lc_vnf_type = vnf_type.lower() + while True: # do until update succeeds + (mod_index, v) = ConsulClient.get_value(key, get_index=True) + lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case + # but DCAE-C doesn't create such keys + + if lc_vnf_type not in lc_v: + return # That VNF type is not supported by this component + if vnf_id not in lc_v[lc_vnf_type]: + return lc_v + del lc_v[lc_vnf_type][vnf_id] # delete the VNF instance + + updated = ConsulClient.put_value(key, lc_v, cas=mod_index) + if updated: + return lc_v + + +if __name__ == "__main__": + value = None + + if value: + print(json.dumps(value, sort_keys=True, indent=4, separators=(',', ': '))) diff --git a/oti/event-handler/otihandler/dbclient/__init__.py b/oti/event-handler/otihandler/dbclient/__init__.py new file mode 100644 index 0000000..ee3ec3e --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/__init__.py @@ -0,0 +1,19 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +from .models import Event +from .models import EventAck +from .db_dao import DaoBase diff --git a/oti/event-handler/otihandler/dbclient/apis/__init__.py b/oti/event-handler/otihandler/dbclient/apis/__init__.py new file mode 100644 index 0000000..05e3800 --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/apis/__init__.py @@ -0,0 +1,18 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +from .db_access import DbAccess +from .event_db_access import EventDbAccess diff --git a/oti/event-handler/otihandler/dbclient/apis/db_access.py b/oti/event-handler/otihandler/dbclient/apis/db_access.py new file mode 100644 index 0000000..f064b30 --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/apis/db_access.py @@ -0,0 +1,50 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +""" +Base class for APIs to interact with application database using sqlAlchemy ORM lib and postgresSql driver +""" + +from sqlalchemy.orm import sessionmaker +from ..db_dao import DaoBase +import psycopg2 +from psycopg2.extras import execute_values +import os +import logging + + +class DbAccess(object): + logger = logging.getLogger("dti_handler.DbAccess") + engine = None + session = None + + def __init__(self): + self.engine = DaoBase.getDbEngine() + # create a configured "Session" class + Session = sessionmaker(bind=self.engine) + + # create a Session + self.session = Session() + + def saveDomainObject(self, obj): + self.session.add(obj) + self.session.commit() + self.session.close() + + def deleteDomainObject(self,obj): + self.session.delete(obj) + self.session.commit() + self.session.close() diff --git a/oti/event-handler/otihandler/dbclient/apis/event_db_access.py b/oti/event-handler/otihandler/dbclient/apis/event_db_access.py new file mode 100644 index 0000000..898ee8e --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/apis/event_db_access.py @@ -0,0 +1,154 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +""" DB APIs to interact with application database using sqlAlchemy ORM lib and postgresSql driver""" + +from sqlalchemy import and_ +from sqlalchemy.orm.exc import NoResultFound + +from .db_access import DbAccess +from ..models import Event, EventAck + + +class EventDbAccess(DbAccess): + + def __init__(self): + DbAccess.__init__(self) + + def query_event_item(self, target_type, target_name): + try: + query = self.session.query(Event).filter(Event.target_type == target_type).\ + filter(Event.target_name == target_name) + evt = query.one() + except NoResultFound: + return None + else: + return evt + + def query_event_data(self, target_type, target_name): + try: + query = self.session.query(Event).filter(Event.target_type == target_type).\ + filter(Event.target_name == target_name) + evt = query.one() + except NoResultFound: + return [] + else: + try: + ack_result = self.session.query(EventAck).filter(EventAck.event == evt).all() + except NoResultFound: + return [] + else: + return ack_result + + def query_event_data_k8s(self, target_type, target_name): + try: + query = self.session.query(Event).filter(Event.target_type == target_type).\ + filter(Event.target_name == target_name) + evt = query.one() + except NoResultFound: + return [] + else: + try: + ack_result = self.session.query(EventAck).filter(EventAck.event == evt).\ + filter(EventAck.container_type != 'docker').all() + except NoResultFound: + return [] + else: + return ack_result + + def query_event_info_docker(self, prim_evt, service_component, deployment_id, container_id): + try: + query = self.session.query(EventAck).filter(EventAck.event == prim_evt).filter( + and_(EventAck.service_component == service_component, + EventAck.deployment_id == deployment_id, + EventAck.container_id == container_id, + EventAck.container_type == 'docker')) + evt = query.one() + except NoResultFound as nrf: + raise nrf + else: + return evt + + def update_event_item(self, dti_event, target_type, target_name): + self.session.query(Event).filter(Event.target_type == target_type). \ + filter(Event.target_name == target_name).update({Event.event:dti_event}) + self.session.commit() + + def query_raw_k8_events(self, cluster, pod, namespace): + """ + run an inner JOIN query to dtih_event and dtih_event_ack tables using supplied query predicates + + :param cluster: + :param pod: + :param namespace: + :return: + Set of event objects related to k8s pods + """ + try: + return self.session.query(Event).filter(Event.dtih_event_id.in_( + self.session.query(EventAck.dtih_event_id).filter(and_(EventAck.k8s_cluster_fqdn == cluster, + EventAck.k8s_pod_id == pod, + EventAck.k8s_namespace == namespace)))).all() + except NoResultFound: + print("invalid query or no data") + return () + + def query_raw_docker_events(self, target_types, locations): + """ + run a query to dtih_event table using supplied query predicates + + :param target_types: required + :param locations: optional + :return: + set of event objects related to docker container + """ + try: + if not locations or (len(locations) == 1 and locations[0] == ''): + return self.session.query(Event).filter(Event.target_type.in_(target_types)).all() + else: + return self.session.query(Event).filter(Event.target_type.in_(target_types)).filter( + Event.location_clli.in_(locations)).all() + except NoResultFound: + print("invalid query or no data") + return () + + def query_pod_info2(self, cluster): + try: + return self.session.query(EventAck).filter(EventAck.k8s_cluster_fqdn == cluster).all() + except NoResultFound: + print("invalid query or no data") + return () + + def query_pod_info(self, cluster): + try: + return self.session.query(EventAck.k8s_pod_id, EventAck.k8s_namespace, + EventAck.k8s_proxy_fqdn, EventAck.k8s_service_name, + EventAck.k8s_service_port)\ + .filter(EventAck.k8s_cluster_fqdn == cluster) \ + .distinct().order_by(EventAck.k8s_cluster_fqdn).all() + except NoResultFound: + print("invalid query or no data") + return () + + def query_event_data_k8s_pod(self, prim_evt, scn): + try: + query = self.session.query(EventAck).filter(EventAck.event == prim_evt).filter( + and_(EventAck.service_component == scn)) + event_info = query.one() + except NoResultFound: + return None + else: + return event_info diff --git a/oti/event-handler/otihandler/dbclient/db_dao.py b/oti/event-handler/otihandler/dbclient/db_dao.py new file mode 100644 index 0000000..78fa058 --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/db_dao.py @@ -0,0 +1,33 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +""" SqlAlchemy ORM engine for postgresSql dti database """ + +from sqlalchemy import create_engine + +class DaoBase: + _engine = None + + @staticmethod + def init_db(dbConStr): + if DaoBase._engine: + return + DaoBase._engine = create_engine(dbConStr) + + @staticmethod + def getDbEngine(): + return DaoBase._engine + diff --git a/oti/event-handler/otihandler/dbclient/models/__init__.py b/oti/event-handler/otihandler/dbclient/models/__init__.py new file mode 100644 index 0000000..bc802f5 --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/models/__init__.py @@ -0,0 +1,19 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + + +from .event import Event +from .event_ack import EventAck diff --git a/oti/event-handler/otihandler/dbclient/models/event.py b/oti/event-handler/otihandler/dbclient/models/event.py new file mode 100644 index 0000000..553bec2 --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/models/event.py @@ -0,0 +1,40 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +""" ORM - mapping class for dtih_event table """ + +from sqlalchemy import Column, String, Integer, ForeignKey, func +from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP +from sqlalchemy.ext.declarative import declarative_base +import datetime + + +Base = declarative_base() + +class Event(Base): + __tablename__ = 'dtih_event' + __table_args__ = {'schema': 'dti'} + dtih_event_id = Column(Integer, primary_key=True) + event = Column(JSONB) + create_ts = Column(TIMESTAMP(timezone=True), default=func.now()) + last_modified_ts = Column(TIMESTAMP(timezone=True), default=func.now()) + target_name = Column(String) + target_type = Column(String) + location_clli = Column(String) + # def __repr__(self): + # return "<Event(event_id='%s', target_type='%s', target_name='%s')" % ( + # self.event_id, self.target_type, self.target_name + # ) diff --git a/oti/event-handler/otihandler/dbclient/models/event_ack.py b/oti/event-handler/otihandler/dbclient/models/event_ack.py new file mode 100644 index 0000000..2b19316 --- /dev/null +++ b/oti/event-handler/otihandler/dbclient/models/event_ack.py @@ -0,0 +1,51 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +""" ORM - mapping class for dtih_event_ack table """ + +import datetime +from sqlalchemy import Column, String, Integer, ForeignKey, func +from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP +from sqlalchemy.orm import relationship +from sqlalchemy.ext.declarative import declarative_base +from ..models import Event + +Base = declarative_base() + +class EventAck(Base): + __tablename__ = 'dtih_event_ack' + __table_args__ = {'schema': 'dti'} + dtih_event_ack_id = Column(Integer, primary_key=True) + create_ts = Column(TIMESTAMP(timezone=True), default=func.now()) + last_modified_ts = Column(TIMESTAMP(timezone=True), default=func.now()) + action = Column(String) + k8s_namespace = Column(String) + k8s_service_name = Column(String) + k8s_service_port = Column(String) + k8s_cluster_fqdn = Column(String) + k8s_proxy_fqdn = Column(String) + k8s_pod_id = Column(String) + service_component = Column(String) + deployment_id = Column(String) + container_type = Column(String) + docker_host = Column(String) + container_id = Column(String) + reconfig_script = Column(String) + dtih_event_id = Column(Integer, ForeignKey(Event.dtih_event_id)) + event = relationship(Event) + + def update_action(self, action): + setattr(self, 'action', action) diff --git a/oti/event-handler/otihandler/docker_client.py b/oti/event-handler/otihandler/docker_client.py new file mode 100644 index 0000000..621a1ec --- /dev/null +++ b/oti/event-handler/otihandler/docker_client.py @@ -0,0 +1,175 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""client interface to docker""" + +import docker +import json +import logging +import time + +from otihandler.config import Config +from otihandler.consul_client import ConsulClient +from otihandler.utils import decrypt + + +# class DockerClientError(RuntimeError): +# pass + +class DockerClientConnectionError(RuntimeError): + pass + + +class DockerClient(object): + """ + All Docker logins are in Consul's key-value store under + "docker_plugin/docker_logins" as a list of json objects where + each object is a single login: + + [{ "username": "XXXX", "password": "yyyy", + "registry": "hostname.domain:18443" }] + """ + + _logger = logging.getLogger("oti_handler.docker_client") + + def __init__(self, docker_host, reauth=False): + """Create Docker client + + Args: + ----- + reauth: (boolean) Forces reauthentication, e.g., Docker login + """ + + (fqdn, port) = ConsulClient.get_service_fqdn_port(docker_host, node_meta=True) + base_url = "https://{}:{}".format(fqdn, port) + + try: + tls_config = docker.tls.TLSConfig( + client_cert=( + Config.tls_server_ca_chain_file, + Config.tls_private_key_file + ) + ) + self._client = docker.APIClient(base_url=base_url, tls=tls_config, version='auto', timeout=60) + + for dcl in ConsulClient.get_value("docker_plugin/docker_logins"): + dcl['password'] = decrypt(dcl['password']) + dcl["reauth"] = reauth + self._client.login(**dcl) + + # except requests.exceptions.RequestException as e: + except Exception as e: + msg = "DockerClient.__init__({}) attempt to {} with TLS got exception {}: {!s}".format( + docker_host, base_url, type(e).__name__, e) + + # Then try connecting to dockerhost without TLS + try: + base_url = "tcp://{}:{}".format(fqdn, port) + self._client = docker.APIClient(base_url=base_url, tls=False, version='auto', timeout=60) + + for dcl in ConsulClient.get_value("docker_plugin/docker_logins"): + dcl['password'] = decrypt(dcl['password']) + dcl["reauth"] = reauth + self._client.login(**dcl) + + # except requests.exceptions.RequestException as e: + except Exception as e: + msg = "{}\nDockerClient.__init__({}) attempt to {} without TLS got exception {}: {!s}".format( + msg, docker_host, base_url, type(e).__name__, e) + DockerClient._logger.error(msg) + raise DockerClientConnectionError(msg) + + @staticmethod + def build_cmd(script_path, use_sh=True, msg_type="dti", **kwargs): + """Build command to execute""" + + data = json.dumps(kwargs or {}) + + if use_sh: + return ['/bin/sh', script_path, msg_type, data] + else: + return [script_path, msg_type, data] + + def notify_for_reconfiguration(self, container_id, cmd): + """Notify Docker container that reconfiguration occurred + + Notify the Docker container by doing Docker exec of passed-in command + + Args: + ----- + container_id: (string) + cmd: (list) of strings each entry being part of the command + """ + + for attempts_remaining in range(11,-1,-1): + try: + result = self._client.exec_create(container=container_id, cmd=cmd) + except docker.errors.APIError as e: + # e # 500 Server Error: Internal Server Error ("{"message":"Container 624108d1ab96f24b568662ca0e5ffc39b59c1c57431aec0bef231fb62b04e166 is not running"}") + DockerClient._logger.debug("exec_create() returned APIError: {!s}".format(e)) + + # e.message # 500 Server Error: Internal Server Error + # DockerClient._logger.debug("e.message: {}".format(e.message)) + # e.response.status_code # 500 + # DockerClient._logger.debug("e.response.status_code: {}".format(e.response.status_code)) + # e.response.reason # Internal Server Error + # DockerClient._logger.debug("e.response.reason: {}".format(e.response.reason)) + # e.explanation # {"message":"Container 624108d1ab96f24b568662ca0e5ffc39b59c1c57431aec0bef231fb62b04e166 is not running"} + # DockerClient._logger.debug("e.explanation: {}".format(e.explanation)) + + # docker container restarting can wait + if e.explanation and 'is restarting' in e.explanation.lower(): + DockerClient._logger.debug("notification exec_create() experienced: {!s}".format(e)) + if attempts_remaining == 0: + result = None + break + time.sleep(10) + # elif e.explanation and 'no such container' in e.explanation.lower(): + # elif e.explanation and 'is not running' in e.explanation.lower(): + else: + DockerClient._logger.warn("aborting notification exec_create() because exception {}: {!s}".format(type(e).__name__, e)) + return str(e) # don't raise or CM will retry usually forever + # raise DockerClientError(e) + except Exception as e: + DockerClient._logger.warn("aborting notification exec_create() because exception {}: {!s}".format( + type(e).__name__, e)) + return str(e) # don't raise or CM will retry usually forever + # raise DockerClientError(e) + else: + break + if not result: + DockerClient._logger.warn("aborting notification exec_create() because docker exec failed") + return "notification unsuccessful" # failed to get an exec_id, perhaps trying multiple times, so don't raise or CM will retry usually forever + DockerClient._logger.debug("notification exec_create() succeeded") + + for attempts_remaining in range(11,-1,-1): + try: + result = self._client.exec_start(exec_id=result['Id']) + except Exception as e: + DockerClient._logger.debug("notification exec_start() got exception {}: {!s}".format(type(e).__name__, e)) + if attempts_remaining == 0: + DockerClient._logger.warn("aborting notification exec_start() because exception {}: {!s}".format(type(e).__name__, e)) + return str(e) # don't raise or CM will retry usually forever + # raise DockerClientError(e) + time.sleep(10) + else: + break + DockerClient._logger.debug("notification exec_start() succeeded") + + DockerClient._logger.info("Pass to docker exec {} {} {}".format( + container_id, cmd, result)) + + return result diff --git a/oti/event-handler/otihandler/dti_processor.py b/oti/event-handler/otihandler/dti_processor.py new file mode 100644 index 0000000..970e020 --- /dev/null +++ b/oti/event-handler/otihandler/dti_processor.py @@ -0,0 +1,815 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""OTI Event processor for handling all the event types""" + +import copy +import json +import logging +from multiprocessing.dummy import Pool as ThreadPool +from threading import Lock + +import requests + +from otihandler import utils +from otihandler.cfy_client import CfyClient +from otihandler.consul_client import ConsulClient +from otihandler.dbclient.apis import EventDbAccess +from otihandler.dbclient.models import Event, EventAck +from otihandler.docker_client import DockerClient + +notify_response_arr = [] +lock = Lock() +K8S_CLUSTER_PROXY_NODE_PORT = '30132' + + +def notify_docker(args_tuple): + """ + event notification executor inside a process pool to communicate with docker container + interacts with docker client library + """ + (dti_event, db_access, ack_item) = args_tuple + try: + dcae_service_action = dti_event.get('dcae_service_action') + component_scn = ack_item.service_component + deployment_id = ack_item.deployment_id + container_id = ack_item.container_id + docker_host = ack_item.docker_host + reconfig_script = ack_item.reconfig_script + container_type = 'docker' + except Exception as e: + return ( + "ERROR", "dti_processor.notify_docker processing args got exception {}: {!s}".format(type(e).__name__, e)) + what = "" + try: + what = "{} in {} container {} on {} that was deployed by {}".format( + reconfig_script, container_type, container_id, docker_host, deployment_id) + if dcae_service_action == 'add': + add_action = {"dcae_service_action": "deploy"} + dti_event.update(add_action) + + if dcae_service_action == 'delete': + add_action = {"dcae_service_action": "undeploy"} + dti_event.update(add_action) + + # dkr = DockerClient(docker_host, reauth=False) + result = '' + # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ]) + if dti_event.get('dcae_service_action') == 'undeploy': + # delete from dti_event_ack table + try: + db_access.deleteDomainObject(ack_item) + except Exception as e: + msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + else: + return (component_scn, "ran {}, got: {!s}".format(what, result)) + + except Exception as e: + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + + +def notify_svc(args_tuple): + """ + add/update/delete event handler + event notification executor inside a process pool to communicate with docker container and k8s services + interacts with docker client library + interacts with k8s node port services using REST client + """ + (orig_dti_event, db_access, curr_evt, res_tuple) = args_tuple + dti_event = copy.deepcopy(orig_dti_event) + try: + dcae_service_action = dti_event.get('dcae_service_action').lower() + + component_scn = res_tuple[0] + deployment_id = res_tuple[1] + container_id = res_tuple[2] + node_id = res_tuple[3] + docker_host = res_tuple[6] + reconfig_script = res_tuple[7] + container_type = res_tuple[8] + except Exception as e: + return ("ERROR", "oti_processor.notify processing args got exception {}: {!s}".format(type(e).__name__, e)) + + what = "" + if container_type == "docker": + # exec reconfigure.sh in docker container + try: + what = "{} in {} container {} on {} that was deployed by {} node {}".format( + reconfig_script, container_type, container_id, docker_host, deployment_id, node_id) + if dcae_service_action == 'add': + add_action = {"dcae_service_action": "deploy"} + dti_event.update(add_action) + + if dcae_service_action == 'delete': + add_action = {"dcae_service_action": "undeploy"} + dti_event.update(add_action) + + dkr = DockerClient(docker_host, reauth=False) + result = '' + if dti_event.get('dcae_service_action') == 'update': + # undeploy + deploy + DTIProcessor.logger.debug("update 1 - running undeploy {}".format(what)) + dti_event.update({"dcae_service_action": "undeploy"}) + result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)]) + DTIProcessor.logger.debug("update 2 - running deploy {}".format(what)) + dti_event.update({"dcae_service_action": "deploy"}) + result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)]) + try: + upd_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id, + container_id) + upd_evt_ack.update_action('update') + db_access.saveDomainObject(upd_evt_ack) + except Exception as e: + msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + else: + DTIProcessor.logger.debug("running {}".format(what)) + result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)]) + if dti_event.get('dcae_service_action') == 'deploy': + # add into dti_event_ack table + try: + add_evt_ack = EventAck(service_component=component_scn, deployment_id=deployment_id, + container_type='docker', docker_host=docker_host, + container_id=container_id, reconfig_script=reconfig_script, + event=curr_evt, + action='add') + db_access.saveDomainObject(add_evt_ack) + except Exception as e: + msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + else: + # remove from dtih_event_ack if present + if curr_evt is not None: + try: + del_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id, + container_id) + db_access.deleteDomainObject(del_evt_ack) + except Exception as e: + msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + except Exception as e: + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + + return (component_scn, "ran {}, got: {!s}".format(what, result)) + elif container_type == "k8s": + DTIProcessor.logger.debug("DTIProcessor.notify_svc() handling k8s component") + # if action is 'update', check if k8s pod info exists already for this event in app db + if dcae_service_action == 'add': + DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for add action") + return notify_k8s((dti_event, db_access, curr_evt, res_tuple)) + elif dcae_service_action == 'update': + # handle update for pods being tracked and handle add for new pods + k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn) + if k8s_scn_result is not None: + # update + DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for update action") + return notify_k8s_pod((dti_event, db_access, k8s_scn_result)) + else: + # add + DTIProcessor.logger.debug("DTIProcessor.notify_svc(), convert update to add action in k8s ") + add_action = {"dcae_service_action": "add"} + dti_event.update(add_action) + return notify_k8s((dti_event, db_access, curr_evt, res_tuple)) + + +def notify_k8s(args_tuple): + """ + add event handler + event notification executor inside a process pool to communicate with k8s statefulset nodeport service + uses REST API client to call k8s services + """ + (dti_event, db_access, curr_evt, res_tuple) = args_tuple + component_scn = res_tuple[0] + deployment_id = res_tuple[1] + node_id = res_tuple[3] + container_type = res_tuple[8] + service_address = res_tuple[9] + service_port = res_tuple[10] + what = "{} in {} deployment {} that was deployed by {} node {}".format( + "add", container_type, "statefulset", deployment_id, node_id) + # call scn node port service REST API + svc_nodeport_url = "https://{}:{}".format(service_address, service_port) + try: + DTIProcessor.logger.debug("running {}".format(what)) + response = requests.put(svc_nodeport_url, json=dti_event, timeout=50) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "collector nodeport service({}) threw exception {}: {!s}".format( + svc_nodeport_url, type(e).__name__, e) + DTIProcessor.logger.error(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + try: + event_ack_info = response.json() + except Exception as e: + msg = "collector nodeport service({}) threw exception {}: {!s}".format( + svc_nodeport_url, type(e).__name__, e) + DTIProcessor.logger.error(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + + if not event_ack_info: + msg = "collector nodeport service returned bad data" + DTIProcessor.logger.error(msg) + return (component_scn, "collector nodeport service returned bad data") + + namespace = event_ack_info.get("KubeNamespace") + svc_name = event_ack_info.get("KubeServiceName") + svc_port = event_ack_info.get("KubeServicePort") + proxy_fqdn = event_ack_info.get("KubeProxyFqdn") + cluster_fqdn = event_ack_info.get("KubeClusterFqdn") + pod_name = event_ack_info.get("KubePod") + statefulset = pod_name[0:pod_name.rindex('-')] + + what = "{} in {} deployment {} in namespace {} that was deployed by {} node {}".format( + "add", container_type, statefulset, namespace, deployment_id, node_id) + try: + add_evt_ack = EventAck(k8s_namespace=namespace, k8s_service_name=svc_name, deployment_id=deployment_id, + k8s_service_port=svc_port, k8s_cluster_fqdn=cluster_fqdn, k8s_proxy_fqdn=proxy_fqdn, + k8s_pod_id=pod_name, event=curr_evt, action='add', container_type='k8s', + service_component=component_scn) + db_access.saveDomainObject(add_evt_ack) + return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info)) + except Exception as e: + msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + + +def notify_pods(args_tuple): + """ + notify event handler + event notification executor inside a process pool to communicate with k8s DTIH proxy nodeport service + uses REST API client to call k8s services + """ + event_ack_info = '' + (dti_event, res_tuple) = args_tuple + try: + cluster = res_tuple[0] + port = K8S_CLUSTER_PROXY_NODE_PORT + namespace = res_tuple[1] + svc_name = res_tuple[2] + svc_port = res_tuple[4] + replicas = res_tuple[3] + + for replica in range(replicas): + pod_id = "sts-{}-{}".format(svc_name, replica) + item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(cluster, port, namespace, + pod_id, svc_name, + svc_port) + what = "{} for pod id {} in cluster {} and namespace {}".format("notify", pod_id, cluster, namespace) + try: + DTIProcessor.logger.debug("running {}".format(what)) + response = requests.put(item_pod_url, json=dti_event, timeout=50) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "stateful set proxy service({}) threw exception {}: {!s}".format( + item_pod_url, type(e).__name__, e) + DTIProcessor.logger.error(msg) + with lock: + notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what))) + else: + try: + event_ack_info = response.json() + except Exception as e: + msg = "stateful set proxy service({}) threw exception {}: {!s}".format( + item_pod_url, type(e).__name__, e) + DTIProcessor.logger.error(msg) + with lock: + notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what))) + + if not event_ack_info: + msg = "stateful set proxy service returned bad data" + DTIProcessor.logger.error(msg) + with lock: + notify_response_arr.append ((pod_id, "no acknowledgement - running {}".format(what))) + + with lock: + notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info))) + except Exception as e: + with lock: + notify_response_arr.append (("ERROR", "dti_processor.notify() processing args got exception {}: {!s}".format(type(e).__name__, e))) + +def notify_k8s_pod(args_tuple): + """ + update event handler + event notification executor inside a process pool to communicate with k8s DTIH proxy service + uses REST API client to call k8s services + """ + item_pod_url = '' + component_scn = '' + (dti_event, db_access, ack_item) = args_tuple + # call ingress proxy to dispatch delete event + + action = dti_event.get('dcae_service_action') + what = "{} to {} ID {} in namespace {} that was deployed in cluster {}".format( + action, 'k8s pod', ack_item.k8s_pod_id, ack_item.k8s_namespace, ack_item.k8s_cluster_fqdn) + try: + DTIProcessor.logger.debug("running {}".format(what)) + item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format( + ack_item.k8s_proxy_fqdn, K8S_CLUSTER_PROXY_NODE_PORT, ack_item.k8s_namespace, + ack_item.k8s_pod_id, ack_item.k8s_service_name, ack_item.k8s_service_port) + component_scn = ack_item.service_component + response = requests.put(item_pod_url, json=dti_event, timeout=50) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "exception occured, stateful set proxy service({}) threw {}: {!s}".format( + item_pod_url, type(e).__name__, e) + DTIProcessor.logger.error(msg) + return (component_scn, "ran {}, got: {!s}".format(what, msg)) + else: + if action == 'delete': + try: + db_access.deleteDomainObject(ack_item) + except Exception as e: + msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + else: + try: + ack_item.update_action('update') + db_access.saveDomainObject(ack_item) + except Exception as e: + msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.warn(msg) + return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what)) + + return (component_scn, "ran {}, got: {!s}".format(what, response.json())) + + +class DTIProcessor(object): + """ + Main event processing class that encapsulates all the logic of this handler application! + An instance of this class is created per incoming client request. + + Generates input data by querying platform services - cloudify, consul, postgresSql + + It creates a pool of worker processes using a multiprocessing Pool class instance. + Tasks are offloaded to the worker processes that exist in the pool. + The input data is distributed across processes of the Pool object to enable parallel execution of + event notification function across multiple input values (data parallelism). + """ + + logger = logging.getLogger("oti_handler.dti_processor") + K8S_CLUSTER_PROXY_NODE_PORT = '30132' + db_access = None + docker_pool = None + k8s_pool = None + + def __init__(self, dti_event, send_notification=True): + self._result = {} + self.event = dti_event + self.is_notify = send_notification + self.action = dti_event.get('dcae_service_action').lower() + self.target_name = dti_event.get('dcae_target_name') + self.target_type = dti_event.get('dcae_target_type', '').lower() + self.event_clli = dti_event.get('dcae_service_location') + res_dict = None + try: + self.docker_pool = ThreadPool(8) + self.k8s_pool = ThreadPool(8) + except Exception as e: + msg = "DTIProcessor.__init__() creating ThreadPool got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + raise e + else: + self.db_access = EventDbAccess() + self.prim_db_event = None + try: + res_dict = self.dispatcher() + except: + raise + finally: + try: + self.docker_pool.close() + self.k8s_pool.close() + except Exception as e: + msg = "DTIProcessor.__init__() running pool.close() got exception {}: {!s}".format(type(e).__name__, + e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + try: + self.docker_pool.join() + self.k8s_pool.join() + except Exception as e: + msg = "DTIProcessor.__init__() running pool.join() got exception {}: {!s}".format(type(e).__name__, + e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + + # if not send_notification: + # DTIProcessor._logger.debug("DTIProcessor.__init__() not notifying DCAE-Controller components") + # return + + if res_dict: + try: + utils.update_dict(self._result, res_dict) + except Exception as e: + msg = "DTIProcessor.__init__() running utils.update_dict() got exception {}: {!s}".format( + type(e).__name__, e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + + DTIProcessor.logger.debug("DTIProcessor.__init__() done notifying new DCAE-Controller components") + + def dispatcher(self): + """ dispatch method to execute specific method based on event type """ + + arg = str(self.action) + method = getattr(self, arg, lambda: "Invalid action") + return method() + + def undeploy(self): + """ + delete event from consul KV store, this functionality will be retired as events are stored + in postgresSql oti database + """ + global key + try: + # update Consul KV store with DTI Event - storing them in a folder for all components + key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name) + result = ConsulClient.delete_key(key) + except Exception as e: + msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e) + DTIProcessor.logger.warn(msg) + self._result['WARNING'] = msg + else: + if not result: + msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name) + DTIProcessor.logger.warn(msg) + self._result['WARNING'] = msg + + def deploy(self): + """ + add event to consul KV store, this functionality will be retired as events are stored + in postgresSql oti database + """ + dep_key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name) + try: + # update Consul KV store with DTI Event - storing them in a folder for all components + result = ConsulClient.store_kvs({dep_key: self.event}) + except Exception as e: + msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e) + DTIProcessor.logger.warn(msg) + self._result['WARNING'] = msg + + def add(self): + """ + process DTI event that contains a new VNF instance that has to be configured in the collector microservices + """ + res_dict = None + try: + msg = "processing add event for {}/{}".format(self.target_type, self.target_name) + DTIProcessor.logger.debug(msg) + # insert add event into dtih_event table + self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type, + location_clli=self.event_clli) + self.db_access.saveDomainObject(self.prim_db_event) + except Exception as e: + msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args) + DTIProcessor.logger.warn(msg) + self._result['ERROR'] = msg + raise Exception(msg) + else: + if self.is_notify: + try: + # force the action to add, to avoid bad things later + add_action = {"dcae_service_action": "add"} + self.event.update(add_action) + # mock up data + mock_tp11 = ( + "scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1", + "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s", + "dcae-d1.idns.cip.corp.com", "30996") + mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1", + "docker_node_instance_id1", + "node_instance_state", "docker_host", "dti_reconfig_script", "docker", + "dcae-d1.idns.cip.corp.com", "30996") + # tpl_arr = [] + # tpl_arr.append(mock_tp11) + # tpl_arr.append(mock_tp12) + # res_dict = dict(self.docker_pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr)))) + res_dict = dict(self.docker_pool.map(notify_svc, + ((self.event, self.db_access, self.prim_db_event, tp) for tp in + CfyClient().iter_components(self.target_type, + dcae_service_location=self.event_clli)) + )) + except Exception as e: + msg = "DTIProcessor.__init__() running pool.map() got exception {}: {!s}".format(type(e).__name__, + e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + return res_dict + + def add_replay(self): + """ + convert an update event flow and replay as an add event type since the event acknowledgement is missing + from application database + """ + res_dict = None + try: + # force the action to add, to avoid bad things later + add_action = {"dcae_service_action": "add"} + self.event.update(add_action) + # mock up data + mock_tp11 = ("scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1", + "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s", + "dcae-d1.idns.cip.corp.com", "30996") + mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1", + "docker_node_instance_id1", + "node_instance_state", "docker_host", "dti_reconfig_script", "docker", + "dcae-d1.idns.cip.corp.com", "30996") + # tpl_arr = [] + # tpl_arr.append(mock_tp11) + # tpl_arr.append(mock_tp12) + # res_dict = dict(self.pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr)))) + res_dict = dict(self.docker_pool.map(notify_svc, + ((self.event, self.db_access, self.prim_db_event, tp) for tp in + CfyClient().iter_components(self.target_type, + dcae_service_location=self.event_clli)) + )) + except Exception as e: + msg = "DTIProcessor._add() running pool.map() got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + return res_dict + + def delete(self): + """ + process DTI event that indicates a VNF instance has to be removed from the collector microservices + """ + res_dict = {} + res_dict_k8s = {} + res_dict_docker = {} + try: + self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name) + if self.is_notify: + try: + msg = "processing delete event for {}/{} to relate with any docker hosts".format( + self.target_type, self.target_name) + DTIProcessor.logger.warn(msg) + res_dict_docker = dict(self.docker_pool.map(notify_svc, + ((self.event, self.db_access, self.prim_db_event, tp) + for tp + in CfyClient().iter_components_for_docker( + self.target_type, + dcae_service_location=self.event_clli)) + )) + except Exception as e: + msg = "DTIProcessor.delete() running docker_pool.map() got exception {}: {!s}".format(type(e).__name__, + e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + + try: + msg = "processing delete event for {}/{} to relate with any k8s hosts".format( + self.target_type, self.target_name) + DTIProcessor.logger.warn(msg) + if self.prim_db_event is not None: + result = self.db_access.query_event_data_k8s(self.target_type, self.target_name) + res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, ( + ((self.event, self.db_access, ack_item) for ack_item in result)))) + except Exception as e: + msg = "DTIProcessor.delete() running k8s_pool.map() got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + + try: + if self.prim_db_event is not None: + self.db_access.deleteDomainObject(self.prim_db_event) + except Exception as e: + msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args) + DTIProcessor.logger.warn(msg) + self._result['ERROR'] = msg + except Exception as e: + msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args) + DTIProcessor.logger.warn(msg) + self._result['ERROR'] = msg + + if res_dict_k8s is not None: + utils.update_dict(res_dict, res_dict_k8s) + + if res_dict_docker is not None: + utils.update_dict(res_dict, res_dict_docker) + + return res_dict + + def update(self): + """ + process DTI event that indicates VNF instance has to be updated in the collector microservices + """ + res_dict = {} + res_dict_k8s = {} + res_dict_docker = {} + + if self.is_notify: + try: + self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name) + if self.prim_db_event is not None: + self.db_access.update_event_item(self.event, self.target_type, self.target_name) + result = self.db_access.query_event_data(self.target_type, self.target_name) + if len(result) == 0: + msg = "processing update event for {}/{}, but event distribution info is not found in database, " \ + "replaying this event to cluster if required". \ + format(self.target_type, self.target_name) + DTIProcessor.logger.warn(msg) + self._result['WARNING'] = msg + res_dict = self.add_replay() + else: + msg = "DTIProcessor.update() handle update flow for {}/{}, for k8s rediscover scn list and" \ + "identify new vs update cases".format(self.target_type, self.target_name) + DTIProcessor.logger.debug(msg) + try: + tpl_arr = CfyClient().iter_components(self.target_type, + dcae_service_location=self.event_clli) + res_dict_docker = dict(self.docker_pool.map(notify_svc, + (( + self.event, self.db_access, + self.prim_db_event, + tp) + for tp in tpl_arr))) + except Exception as e: + msg = "DTIProcessor.update() running docker_pool.map() got exception {}: {!s}".format( + type(e).__name__, e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + else: + # event is new for the handler + msg = "processing update event for {}/{}, but current event info is not found in database, " \ + "executing add event".format(self.target_type, self.target_name) + DTIProcessor.logger.warn(msg) + self._result['WARNING'] = msg + res_dict = self.add() + except Exception as e: + msg = "DTIProcessor.update() got exception {}: {!s}".format(type(e).__name__, e) + DTIProcessor.logger.error(msg) + self._result['ERROR'] = msg + + if res_dict_k8s is not None: + utils.update_dict(res_dict, res_dict_k8s) + + if res_dict_docker is not None: + utils.update_dict(res_dict, res_dict_docker) + + return res_dict + + def notify(self): + """ + event handler to notify all the pods in the kubernetes cluster whose FQDN is present in the incoming event + This notification is meant for the cluster failover. + """ + res_dict = {} + try: + self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name) + if self.prim_db_event is not None: + self.db_access.update_event_item(self.event, self.target_type, self.target_name) + else: + self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type, + location_clli=self.event_clli) + self.db_access.saveDomainObject(self.prim_db_event) + except Exception as e: + msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args) + DTIProcessor.logger.warn(msg) + self._result['ERROR'] = msg + + try: + self.k8s_pool.map(notify_pods, ((self.event, tp) for tp in + CfyClient().query_k8_components(self.target_name))) + for k, v in notify_response_arr: + res_dict[k] = v + except Exception as e: + msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args) + DTIProcessor.logger.warn(msg) + self._result['WARNING'] = msg + + return res_dict + + def get_result(self): + return self._result + + @classmethod + def get_k8_raw_events(cls, pod, cluster, namespace): + """ + Get DTI events for a k8 stateful set pod container + + :param pod: required + k8s stateful set pod ID that was configured with a specific set of DTI Events + :param cluster: required + k8s cluster FQDN where the mS was deployed + :param namespace: required + k8s namespace where the stateful set was deployed in that namespace + :return: + Dictionary of DTI event(s). + DTI events will be keyed by vnf_type, sub-keyed by vnf_id. + """ + db_access = EventDbAccess() + results = db_access.query_raw_k8_events(cluster, pod, namespace) + + target_types = set([]) + outer_dict = {} + + for evnt_item in results: + target_types.add(evnt_item.target_type) + + for targ_type in target_types: + inner_name_evt_dict = {} + for evnt in results: + if targ_type == evnt.target_type: + inner_name_evt_dict[evnt.target_name] = evnt.event + + outer_dict[targ_type] = inner_name_evt_dict + + return outer_dict + + @classmethod + def get_docker_raw_events(cls, service_name, service_location): + """ + Get DTI events for docker container. + + Parameters + ---------- + service_name : string + required. The service component name assigned by dockerplugin to the component that is unique to the + cloudify node instance and used in its Consul key(s). + service_location : string + optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location + in service_location. + If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul + TAGs if service_name is provided, + otherwise results are not location filtered. + + Returns + ------- + dict + Dictionary of DTI event(s). + DTI events will be keyed by vnf_type, sub-keyed by vnf_id. + + """ + + r_dict = {} + + want_locs = [] + if service_location: + want_locs = service_location.split(',') + + give_types = [] + if service_name: + if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node + try: + node_name = ConsulClient.lookup_service(service_name)[0].get("Node") + if node_name: + services = ConsulClient.lookup_node(node_name).get("Services") + if services: + for node_svc in list(services.keys()): + if "-component-dockerhost-" in node_svc: + want_locs = services[node_svc].get("Tags", []) + break + except: + pass + + try: + supported_types = ConsulClient.get_value(service_name + ":dti") + except: + return r_dict + else: + if supported_types: + supported_types = [t_type.lower() for t_type in list(supported_types.keys())] + give_types = supported_types + if not give_types or (len(give_types) == 1 and give_types[0] == ''): + return r_dict + + db_access = EventDbAccess() + results = db_access.query_raw_docker_events(give_types, want_locs) + + target_types = set([]) + outer_dict = {} + + for evnt_item in results: + target_types.add(evnt_item.target_type) + + for targ_type in target_types: + inner_name_evt_dict = {} + for evnt in results: + if targ_type == evnt.target_type: + inner_name_evt_dict[evnt.target_name] = evnt.event + + outer_dict[targ_type] = inner_name_evt_dict + + return outer_dict diff --git a/oti/event-handler/otihandler/onap/CommonLogger.py b/oti/event-handler/otihandler/onap/CommonLogger.py new file mode 100644 index 0000000..644534d --- /dev/null +++ b/oti/event-handler/otihandler/onap/CommonLogger.py @@ -0,0 +1,958 @@ +#!/usr/bin/python +# -*- indent-tabs-mode: nil -*- vi: set expandtab: + +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +""" Common Logging library in Python. + +CommonLogger.py + +Original Written by: Terry Schmalzried +Date written: October 1, 2015 +Last updated: December 1, 2016 + +version 0.8 +""" + +import os +import logging +import logging.handlers +import re +import socket +import sys +import threading +import time + + +class CommonLogger: + """ Common Logging object. + + Public methods: + __init__ + setFields + debug + info + warn + error + fatal + """ + + UnknownFile = -1 + ErrorFile = 0 + DebugFile = 1 + AuditFile = 2 + MetricsFile = 3 + DateFmt = '%Y-%m-%dT%H:%M:%S' + verbose = False + + def __init__(self, configFile, logKey, **kwargs): + """Construct a Common Logger for one Log File. + + Arguments: + configFile -- configuration filename. + logKey -- the keyword in configFile that identifies the log filename. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages, + one of CommonLogger.ErrorFile, CommonLogger.DebugFile, + CommonLogger.AuditFile and CommonLogger.MetricsFile, or + one of the strings "error", "debug", "audit" or "metrics". + May also be set in the config file using a field named + <logKey>Style (where <logKey> is the value of the logKey + parameter). The keyword value overrides the value in the + config file. + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._monitorFlag = False + + # Get configuration parameters + self._logKey = str(logKey) + self._configFile = str(configFile) + self._rotateMethod = 'time' + self._timeRotateIntervalType = 'midnight' + self._timeRotateInterval = 1 + self._sizeMaxBytes = 0 + self._sizeRotateMode = 'a' + self._socketHost = None + self._socketPort = 0 + self._typeLogger = 'filelogger' + self._backupCount = 6 + self._logLevelThreshold = self._intLogLevel('') + self._logFile = None + self._begTime = None + self._begMsec = 0 + self._fields = {} + self._fields["style"] = CommonLogger.UnknownFile + try: + self._configFileModified = os.path.getmtime(self._configFile) + for line in open(self._configFile): + line = line.split('#',1)[0] # remove comments + if '=' in line: + key, value = [x.strip() for x in line.split('=',1)] + if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none']: + self._rotateMethod = value.lower() + elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']: + self._timeRotateIntervalType = value + elif key == 'timeRotateInterval' and int( value ) > 0: + self._timeRotateInterval = int( value ) + elif key == 'sizeMaxBytes' and int( value ) >= 0: + self._sizeMaxBytes = int( value ) + elif key == 'sizeRotateMode' and value in ['a']: + self._sizeRotateMode = value + elif key == 'backupCount' and int( value ) >= 0: + self._backupCount = int( value ) + elif key == self._logKey + 'SocketHost': + self._socketHost = value + elif key == self._logKey + 'SocketPort' and int( value ) == 0: + self._socketPort = int( value ) + elif key == self._logKey + 'LogType' and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']: + self._typeLogger = value.lower() + elif key == self._logKey + 'LogLevel': + self._logLevelThreshold = self._intLogLevel(value.upper()) + elif key == self._logKey + 'Style': + self._fields["style"] = value + elif key == self._logKey: + self._logFile = value + except Exception as x: + print("exception reading '%s' configuration file: %s" %(self._configFile, str(x))) + sys.exit(2) + except: + print("exception reading '%s' configuration file" %(self._configFile)) + sys.exit(2) + + if self._logFile is None: + print('configuration file %s is missing definition %s for log file' %(self._configFile, self._logKey)) + sys.exit(2) + + + # initialize default log fields + # timestamp will automatically be generated + for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \ + 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \ + 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \ + 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \ + 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \ + 'errorDescription' ]: + if key in kwargs and kwargs[key] != None: + self._fields[key] = kwargs[key] + + self._resetStyleField() + + # Set up logger + self._logLock = threading.Lock() + with self._logLock: + self._logger = logging.getLogger(self._logKey) + self._logger.propagate = False + self._createLogger() + + self._defaultServerInfo() + + # spawn a thread to monitor configFile for logLevel and logFile changes + self._monitorFlag = True + self._monitorThread = threading.Thread(target=self._monitorConfigFile, args=()) + self._monitorThread.daemon = True + self._monitorThread.start() + + + def _createLogger(self): + if self._typeLogger == 'filelogger': + self._mkdir_p(self._logFile) + if self._rotateMethod == 'time': + self._logHandler = logging.handlers.TimedRotatingFileHandler(self._logFile, \ + when=self._timeRotateIntervalType, interval=self._timeRotateInterval, \ + backupCount=self._backupCount, encoding=None, delay=False, utc=True) + elif self._rotateMethod == 'size': + self._logHandler = logging.handlers.RotatingFileHandler(self._logFile, \ + mode=self._sizeRotateMode, maxBytes=self._sizeMaxBytes, \ + backupCount=self._backupCount, encoding=None, delay=False) + + else: + self._logHandler = logging.handlers.WatchedFileHandler(self._logFile, \ + mode=self._sizeRotateMode, \ + encoding=None, delay=False) + elif self._typeLogger == 'stderrlogger': + self._logHandler = logging.handlers.StreamHandler(sys.stderr) + elif self._typeLogger == 'stdoutlogger': + self._logHandler = logging.handlers.StreamHandler(sys.stdout) + elif self._typeLogger == 'socketlogger': + self._logHandler = logging.handlers.SocketHandler(self._socketHost, self._socketPort) + elif self._typeLogger == 'nulllogger': + self._logHandler = logging.handlers.NullHandler() + + if self._fields["style"] == CommonLogger.AuditFile or self._fields["style"] == CommonLogger.MetricsFile: + self._logFormatter = logging.Formatter(fmt='%(begtime)s,%(begmsecs)03d+00:00|%(endtime)s,%(endmsecs)03d+00:00|%(message)s', datefmt=CommonLogger.DateFmt) + else: + self._logFormatter = logging.Formatter(fmt='%(asctime)s,%(msecs)03d+00:00|%(message)s', datefmt='%Y-%m-%dT%H:%M:%S') + self._logFormatter.converter = time.gmtime + self._logHandler.setFormatter(self._logFormatter) + self._logger.addHandler(self._logHandler) + + def _resetStyleField(self): + styleFields = ["error", "debug", "audit", "metrics"] + if self._fields['style'] in styleFields: + self._fields['style'] = styleFields.index(self._fields['style']) + + def __del__(self): + if not self._monitorFlag: + return + + self._monitorFlag = False + + if self._monitorThread is not None and self._monitorThread.is_alive(): + self._monitorThread.join() + + self._monitorThread = None + + + def _defaultServerInfo(self): + + # If not set or purposely set = None, then set default + if self._fields.get('server') is None: + try: + self._fields['server'] = socket.getfqdn() + except Exception as err: + try: + self._fields['server'] = socket.gethostname() + except Exception as err: + self._fields['server'] = "" + + # If not set or purposely set = None, then set default + if self._fields.get('serverIPAddress') is None: + try: + self._fields['serverIPAddress'] = socket.gethostbyname(self._fields['server']) + except Exception as err: + self._fields['serverIPAddress'] = "" + + + def _monitorConfigFile(self): + while self._monitorFlag: + try: + fileTime = os.path.getmtime(self._configFile) + if fileTime > self._configFileModified: + self._configFileModified = fileTime + ReopenLogFile = False + logFile = self._logFile + with open(self._configFile) as fp: + for line in fp: + line = line.split('#',1)[0] # remove comments + if '=' in line: + key, value = [x.strip() for x in line.split('=',1)] + if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none'] and self._rotateMethod != value: + self._rotateMethod = value.lower() + ReopenLogFile = True + elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']: + self._timeRotateIntervalType = value + ReopenLogFile = True + elif key == 'timeRotateInterval' and int( value ) > 0: + self._timeRotateInterval = int( value ) + ReopenLogFile = True + elif key == 'sizeMaxBytes' and int( value ) >= 0: + self._sizeMaxBytes = int( value ) + ReopenLogFile = True + elif key == 'sizeRotateMode' and value in ['a']: + self._sizeRotateMode = value + ReopenLogFile = True + elif key == 'backupCount' and int( value ) >= 0: + self._backupCount = int( value ) + ReopenLogFile = True + elif key == self._logKey + 'SocketHost' and self._socketHost != value: + self._socketHost = value + ReopenLogFile = True + elif key == self._logKey + 'SocketPort' and self._socketPort > 0 and self._socketPort != int( value ): + self._socketPort = int( value ) + ReopenLogFile = True + elif key == self._logKey + 'LogLevel' and self._logLevelThreshold != self._intLogLevel( value.upper() ): + self._logLevelThreshold = self._intLogLevel(value.upper()) + elif key == self._logKey + 'LogType' and self._typeLogger != value and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']: + self._typeLogger = value.lower() + ReopenLogFile = True + elif key == self._logKey + 'Style': + self._fields["style"] = value + self._resetStyleField() + elif key == self._logKey and self._logFile != value: + logFile = value + ReopenLogFile = True + if ReopenLogFile: + with self._logLock: + self._logger.removeHandler(self._logHandler) + self._logFile = logFile + self._createLogger() + except Exception as err: + pass + + time.sleep(5) + + + def setFields(self, **kwargs): + """Set default values for log fields. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \ + 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \ + 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \ + 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \ + 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \ + 'errorDescription' ]: + if key in kwargs: + if kwargs[key] != None: + self._fields[key] = kwargs[key] + elif key in self._fields: + del self._fields[key] + + self._defaultServerInfo() + + + def debug(self, message, **kwargs): + """Write a DEBUG level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('DEBUG', message, errorCategory = 'DEBUG', **kwargs) + + def info(self, message, **kwargs): + """Write an INFO level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('INFO', message, errorCategory = 'INFO', **kwargs) + + def warn(self, message, **kwargs): + """Write a WARN level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('WARN', message, errorCategory = 'WARN', **kwargs) + + def error(self, message, **kwargs): + """Write an ERROR level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('ERROR', message, errorCategory = 'ERROR', **kwargs) + + def fatal(self, message, **kwargs): + """Write a FATAL level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('FATAL', message, errorCategory = 'FATAL', **kwargs) + + def _log(self, logLevel, message, **kwargs): + """Write a message to the log file. + + Arguments: + logLevel -- value ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', ...) for the log record. + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + # timestamp will automatically be inserted + style = int(self._getVal('style', '', **kwargs)) + requestID = self._getVal('requestID', '', **kwargs) + serviceInstanceID = self._getVal('serviceInstanceID', '', **kwargs) + threadID = self._getVal('threadID', threading.currentThread().getName(), **kwargs) + serverName = self._getVal('serverName', '', **kwargs) + serviceName = self._getVal('serviceName', '', **kwargs) + instanceUUID = self._getVal('instanceUUID', '', **kwargs) + upperLogLevel = self._noSep(logLevel.upper()) + severity = self._getVal('severity', '', **kwargs) + serverIPAddress = self._getVal('serverIPAddress', '', **kwargs) + server = self._getVal('server', '', **kwargs) + IPAddress = self._getVal('IPAddress', '', **kwargs) + className = self._getVal('className', '', **kwargs) + timer = self._getVal('timer', '', **kwargs) + partnerName = self._getVal('partnerName', '', **kwargs) + targetEntity = self._getVal('targetEntity', '', **kwargs) + targetServiceName = self._getVal('targetServiceName', '', **kwargs) + statusCode = self._getVal('statusCode', '', **kwargs) + responseCode = self._getVal('responseCode', '', **kwargs) + responseDescription = self._noSep(self._getVal('responseDescription', '', **kwargs)) + processKey = self._getVal('processKey', '', **kwargs) + targetVirtualEntity = self._getVal('targetVirtualEntity', '', **kwargs) + customField1 = self._getVal('customField1', '', **kwargs) + customField2 = self._getVal('customField2', '', **kwargs) + customField3 = self._getVal('customField3', '', **kwargs) + customField4 = self._getVal('customField4', '', **kwargs) + errorCategory = self._getVal('errorCategory', '', **kwargs) + errorCode = self._getVal('errorCode', '', **kwargs) + errorDescription = self._noSep(self._getVal('errorDescription', '', **kwargs)) + nbegTime = self._getArg('begTime', {}, **kwargs) + + detailMessage = self._noSep(message) + if bool(re.match(r" *$", detailMessage)): + return # don't log empty messages + + useLevel = self._intLogLevel(upperLogLevel) + if CommonLogger.verbose: print("logger STYLE=%s" % style) + if useLevel < self._logLevelThreshold: + if CommonLogger.verbose: print("skipping because of level") + pass + else: + with self._logLock: + if style == CommonLogger.ErrorFile: + if CommonLogger.verbose: print("using CommonLogger.ErrorFile") + self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ + %(requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName, + errorCategory, errorCode, errorDescription, detailMessage)) + elif style == CommonLogger.DebugFile: + if CommonLogger.verbose: print("using CommonLogger.DebugFile") + self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ + %(requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel, + severity, serverIPAddress, server, IPAddress, className, timer, detailMessage)) + elif style == CommonLogger.AuditFile: + if CommonLogger.verbose: print("using CommonLogger.AuditFile") + endAuditTime, endAuditMsec = self._getTime() + if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime: + d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endAuditTime, 'endmsecs': endAuditMsec } + elif self._begTime is not None: + d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec } + else: + d = { 'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec } + self._begTime = None + unused = "" + self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ + %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName, + statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel, + severity, serverIPAddress, timer, server, IPAddress, className, unused, + processKey, customField1, customField2, customField3, customField4, detailMessage), extra=d) + elif style == CommonLogger.MetricsFile: + if CommonLogger.verbose: print("using CommonLogger.MetricsFile") + endMetricsTime, endMetricsMsec = self._getTime() + if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime: + d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec } + elif self._begTime is not None: + d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec } + else: + d = { 'begtime': endMetricsTime, 'begmsecs': endMetricsMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec } + self._begTime = None + unused = "" + self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ + %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName, + targetEntity, targetServiceName, statusCode, responseCode, responseDescription, + instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server, IPAddress, + className, unused, processKey, targetVirtualEntity, customField1, customField2, + customField3, customField4, detailMessage), extra=d) + else: + print("!!!!!!!!!!!!!!!! style not set: %s" % self._fields["style"]) + + def _getTime(self): + ct = time.time() + lt = time.localtime(ct) + return (time.strftime(CommonLogger.DateFmt, lt), (ct - int(ct)) * 1000) + + def setStartRecordEvent(self): + """ + Set the start time to be saved for both audit and metrics records + """ + self._begTime, self._begMsec = self._getTime() + + def getStartRecordEvent(self): + """ + Retrieve the start time to be used for either audit and metrics records + """ + begTime, begMsec = self._getTime() + return {'begTime':begTime, 'begMsec':begMsec} + + def _getVal(self, key, default, **kwargs): + val = self._fields.get(key) + if key in kwargs: val = kwargs[key] + if val is None: val = default + return self._noSep(val) + + def _getArg(self, key, default, **kwargs): + val = None + if key in kwargs: val = kwargs[key] + if val is None: val = default + return val + + def _noSep(self, message): + if message is None: return '' + return re.sub(r'[\|\n]', ' ', str(message)) + + def _intLogLevel(self, logLevel): + if logLevel == 'FATAL': useLevel = 50 + elif logLevel == 'ERROR': useLevel = 40 + elif logLevel == 'WARN': useLevel = 30 + elif logLevel == 'INFO': useLevel = 20 + elif logLevel == 'DEBUG': useLevel = 10 + else: useLevel = 0 + return useLevel + + def _mkdir_p(self, filename): + """Create missing directories from a full filename path like mkdir -p""" + + if filename is None: + return + + folder=os.path.dirname(filename) + + if folder == "": + return + + if not os.path.exists(folder): + try: + os.makedirs(folder) + except OSError as err: + print("error number %d creating %s directory to hold %s logfile: %s" %(err.errno, err.filename, filename, err.strerror)) + sys.exit(2) + except Exception as err: + print("error creating %s directory to hold %s logfile: %s" %(folder, filename, str(err))) + sys.exit(2) + +if __name__ == "__main__": + + def __checkOneTime(line): + format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[+]00:00[|]' + m = re.match(format, line) + if not m: + print("ERROR: time string did not match proper time format, %s" %line) + print("\t: format=%s" % format) + return 1 + return 0 + + def __checkTwoTimes(line, different): + format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|][0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|]' + m = re.match(format, line) + if not m: + print("ERROR: time strings did not match proper time format, %s" %line) + print("\t: format=%s" % format) + return 1 + second1 = int(m.group(1)) + msec1 = int(m.group(2)) + second2 = int(m.group(3)) + msec2 = int(m.group(4)) + if second1 > second2: second2 += 60 + t1 = second1 * 1000 + msec1 + t2 = second2 * 1000 + msec2 + diff = t2 - t1 + # print("t1=%d (%d,%d) t2=%d (%d,%d), diff = %d" % (t1, second1, msec1, t2, second2, msec2, diff)) + if different: + if diff < 500: + print("ERROR: times did not differ enough: %s" % line) + return 1 + else: + if diff > 10: + print("ERROR: times were too far apart: %s" % line) + return 1 + return 0 + + def __checkBegTime(line): + format = "begTime should be ([-0-9T:]+)" + # print("checkBegTime(%s)" % line) + strt = 'begTime should be ' + i = line.index(strt) + rest = line[i+len(strt):].rstrip() + if not line.startswith(rest + ","): + print("ERROR: line %s should start with %s" % (line,rest)) + return 1 + return 0 + + def __checkLog(logfile, numLines, numFields): + lineCount = 0 + errorCount = 0 + with open(logfile, "r") as fp: + for line in fp: + # print("saw line %s" % line) + lineCount += 1 + c = line.count('|') + if c != numFields: + print("ERROR: wrong number of fields. Expected %d, got %d: %s" % (numFields, c, line)) + errorCount += 1 + if re.search("should not appear", line): + print("ERROR: a line appeared that should not have appeared, %s" % line) + errorCount += 1 + elif re.search("single time", line): + errorCount += __checkOneTime(line) + elif re.search("time should be the same", line): + errorCount += __checkTwoTimes(line, different=False) + elif re.search("time should be ", line): + errorCount += __checkTwoTimes(line, different=True) + elif re.search("begTime should be ", line): + errorCount += __checkBegTime(line) + else: + print("ERROR: an unknown message appeared, %s" % line) + errorCount += 1 + + if lineCount != numLines: + print("ERROR: expected %d lines, but got %d lines" % (numLines, lineCount)) + errorCount += 1 + return errorCount + + import os, argparse + parser = argparse.ArgumentParser(description="test the CommonLogger functions") + parser.add_argument("-k", "--keeplogs", help="Keep the log files after finishing the tests", action="store_true") + parser.add_argument("-v", "--verbose", help="Print debugging messages", action="store_true") + args = parser.parse_args() + + spid = str(os.getpid()) + if args.keeplogs: + spid = "" + logcfg = "/tmp/cl.log" + spid + ".cfg" + errorLog = "/tmp/cl.error" + spid + ".log" + metricsLog = "/tmp/cl.metrics" + spid + ".log" + auditLog = "/tmp/cl.audit" + spid + ".log" + debugLog = "/tmp/cl.debug" + spid + ".log" + if args.verbose: CommonLogger.verbose = True + + import atexit + def cleanupTmps(): + for f in [ logcfg, errorLog, metricsLog, auditLog, debugLog ]: + try: + os.remove(f) + except: + pass + if not args.keeplogs: + atexit.register(cleanupTmps) + + with open(logcfg, "w") as o: + o.write("error = " + errorLog + "\n" + + "errorLogLevel = WARN\n" + + "metrics = " + metricsLog + "\n" + + "metricsLogLevel = INFO\n" + + "audit = " + auditLog + "\n" + + "auditLogLevel = INFO\n" + + "debug = " + debugLog + "\n" + + "debugLogLevel = DEBUG\n") + + import uuid + instanceUUID = uuid.uuid1() + serviceName = "testharness" + errorLogger = CommonLogger(logcfg, "error", style=CommonLogger.ErrorFile, instanceUUID=instanceUUID, serviceName=serviceName) + debugLogger = CommonLogger(logcfg, "debug", style=CommonLogger.DebugFile, instanceUUID=instanceUUID, serviceName=serviceName) + auditLogger = CommonLogger(logcfg, "audit", style=CommonLogger.AuditFile, instanceUUID=instanceUUID, serviceName=serviceName) + metricsLogger = CommonLogger(logcfg, "metrics", style=CommonLogger.MetricsFile, instanceUUID=instanceUUID, serviceName=serviceName) + + testsRun = 0 + errorCount = 0 + errorLogger.debug("error calling debug (should not appear)") + errorLogger.info("error calling info (should not appear)") + errorLogger.warn("error calling warn (single time)") + errorLogger.error("error calling error (single time)") + errorLogger.setStartRecordEvent() + time.sleep(1) + errorLogger.fatal("error calling fatal, after setStartRecordEvent and sleep (start should be ignored, single time)") + testsRun += 6 + errorCount += __checkLog(errorLog, 3, 10) + + auditLogger.debug("audit calling debug (should not appear)") + auditLogger.info("audit calling info (time should be the same)") + auditLogger.warn("audit calling warn (time should be the same)") + auditLogger.error("audit calling error (time should be the same)") + bt = auditLogger.getStartRecordEvent() + # print("bt=%s" % bt) + time.sleep(1) + auditLogger.setStartRecordEvent() + time.sleep(1) + auditLogger.fatal("audit calling fatal, after setStartRecordEvent and sleep, time should be different)") + time.sleep(1) + auditLogger.fatal("audit calling fatal, begTime should be %s" % bt['begTime'], begTime=bt) + testsRun += 7 + errorCount += __checkLog(auditLog, 5, 25) + + debugLogger.debug("debug calling debug (single time)") + debugLogger.info("debug calling info (single time)") + debugLogger.warn("debug calling warn (single time)") + debugLogger.setStartRecordEvent() + time.sleep(1) + debugLogger.error("debug calling error, after SetStartRecordEvent and sleep (start should be ignored, single time)") + debugLogger.fatal("debug calling fatal (single time)") + errorCount += __checkLog(debugLog, 5, 13) + testsRun += 6 + + metricsLogger.debug("metrics calling debug (should not appear)") + metricsLogger.info("metrics calling info (time should be the same)") + metricsLogger.warn("metrics calling warn (time should be the same)") + bt = metricsLogger.getStartRecordEvent() + time.sleep(1) + metricsLogger.setStartRecordEvent() + time.sleep(1) + metricsLogger.error("metrics calling error, after SetStartRecordEvent and sleep, time should be different") + metricsLogger.fatal("metrics calling fatal (time should be the same)") + time.sleep(1) + metricsLogger.fatal("metrics calling fatal, begTime should be %s" % bt['begTime'], begTime=bt) + testsRun += 6 + errorCount += __checkLog(metricsLog, 5, 28) + + print("%d tests run, %d errors found" % (testsRun, errorCount)) diff --git a/oti/event-handler/otihandler/onap/__init__.py b/oti/event-handler/otihandler/onap/__init__.py new file mode 100644 index 0000000..87cf002 --- /dev/null +++ b/oti/event-handler/otihandler/onap/__init__.py @@ -0,0 +1,15 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= diff --git a/oti/event-handler/otihandler/onap/audit.py b/oti/event-handler/otihandler/onap/audit.py new file mode 100644 index 0000000..8cd16cf --- /dev/null +++ b/oti/event-handler/otihandler/onap/audit.py @@ -0,0 +1,375 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""generic class to keep track of request handling + from receiving it through reponse and log all the activities + + call Audit.init("component-name", "path/to/config_file") to init the loggers before any requests + + start each outside request with creation of the Audit object + audit = Audit(request_id=None, headers=None, msg=None) +""" + +import os +import sys +import json +import uuid +import time +import copy +from datetime import datetime +from threading import Lock +from enum import Enum + +from .CommonLogger import CommonLogger +from .health import Health + +REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID" +REQUEST_REMOTE_ADDR = "Remote-Addr" +REQUEST_HOST = "Host" +HOSTNAME = "HOSTNAME" + +AUDIT_REQUESTID = 'requestID' +AUDIT_IPADDRESS = 'IPAddress' +AUDIT_SERVER = 'server' +AUDIT_TARGET_ENTITY = 'targetEntity' + +HEADER_CLIENTAUTH = "clientauth" +HEADER_AUTHORIZATION = "authorization" + +class AuditHttpCode(Enum): + """audit http codes""" + HTTP_OK = 200 + PERMISSION_UNAUTHORIZED_ERROR = 401 + PERMISSION_FORBIDDEN_ERROR = 403 + RESPONSE_ERROR = 400 + DATA_NOT_FOUND_ERROR = 404 + SERVER_INTERNAL_ERROR = 500 + SERVICE_UNAVAILABLE_ERROR = 503 + DATA_ERROR = 1030 + SCHEMA_ERROR = 1040 + +class AuditResponseCode(Enum): + """audit response codes""" + SUCCESS = 0 + PERMISSION_ERROR = 100 + AVAILABILITY_ERROR = 200 + DATA_ERROR = 300 + SCHEMA_ERROR = 400 + BUSINESS_PROCESS_ERROR = 500 + UNKNOWN_ERROR = 900 + + @staticmethod + def get_response_code(http_status_code): + """calculates the response_code from max_http_status_code""" + response_code = AuditResponseCode.UNKNOWN_ERROR + if http_status_code <= AuditHttpCode.HTTP_OK.value: + response_code = AuditResponseCode.SUCCESS + + elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, + AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]: + response_code = AuditResponseCode.PERMISSION_ERROR + elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value: + response_code = AuditResponseCode.AVAILABILITY_ERROR + elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: + response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR + elif http_status_code in [AuditHttpCode.DATA_ERROR.value, + AuditHttpCode.RESPONSE_ERROR.value, + AuditHttpCode.DATA_NOT_FOUND_ERROR.value]: + response_code = AuditResponseCode.DATA_ERROR + elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value: + response_code = AuditResponseCode.SCHEMA_ERROR + + return response_code + + @staticmethod + def get_human_text(response_code): + """convert enum name into human readable text""" + if not response_code: + return "unknown" + return response_code.name.lower().replace("_", " ") + +class Audit(object): + """put the audit object on stack per each initiating request in the system + + :request_id: is the X-ECOMP-RequestID for tracing + + :req_message: is the request message string for logging + + :aud_parent: is the parent request - used for sub-query metrics to other systems + + :kwargs: - put any request related params into kwargs + """ + _service_name = "" + _service_version = "" + _service_instance_uuid = str(uuid.uuid4()) + _started = datetime.now() + _logger_debug = None + _logger_error = None + _logger_metrics = None + _logger_audit = None + _health = Health() + _py_ver = sys.version.replace("\n", "") + + @staticmethod + def init(service_name, service_version, config_file_path): + """init static invariants and loggers""" + Audit._service_name = service_name + Audit._service_version = service_version + Audit._logger_debug = CommonLogger(config_file_path, "debug", \ + instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) + Audit._logger_error = CommonLogger(config_file_path, "error", \ + instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) + Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ + instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) + Audit._logger_audit = CommonLogger(config_file_path, "audit", \ + instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) + + @staticmethod + def health(): + """returns json for health check""" + now = datetime.now() + return { + "service_name" : Audit._service_name, + "service_version" : Audit._service_version, + "service_instance_UUID" : Audit._service_instance_uuid, + "python" : Audit._py_ver, + "started" : str(Audit._started), + "now" : str(now), + "uptime" : str(now - Audit._started), + "stats" : Audit._health.dump(), + "packages" : "N/A" # Audit._packages + } + + def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs): + """create audit object per each request in the system + + :request_id: is the X-ECOMP-RequestID for tracing + :req_message: is the request message string for logging + :aud_parent: is the parent Audit - used for sub-query metrics to other systems + :kwargs: - put any request related params into kwargs + """ + self.request_id = request_id + self.req_message = req_message or "" + self.aud_parent = aud_parent + self.kwargs = kwargs or {} + + self.retry_get_config = False + self.max_http_status_code = 0 + self._lock = Lock() + + if self.aud_parent: + if not self.request_id: + self.request_id = self.aud_parent.request_id + if not self.req_message: + self.req_message = self.aud_parent.req_message + self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs) + else: + headers = self.kwargs.get("headers", {}) + if headers: + if not self.request_id: + self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID) + if AUDIT_IPADDRESS not in self.kwargs: + self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR) + if AUDIT_SERVER not in self.kwargs: + self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST) + + if AUDIT_SERVER not in self.kwargs: + self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME) + + created_req = "" + if not self.request_id: + created_req = " with new" + self.request_id = str(uuid.uuid4()) + + self.kwargs[AUDIT_REQUESTID] = self.request_id + + self._started = time.time() + self._start_event = Audit._logger_audit.getStartRecordEvent() + self.metrics_start() + + if not self.aud_parent: + self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\ + .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) + + def merge_all_kwargs(self, **kwargs): + """returns the merge of copy of self.kwargs with the param kwargs""" + all_kwargs = self.kwargs.copy() + if kwargs: + all_kwargs.update(kwargs) + return all_kwargs + + def set_http_status_code(self, http_status_code): + """accumulate the highest(worst) http status code""" + self._lock.acquire() + if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: + self.max_http_status_code = max(http_status_code, self.max_http_status_code) + self._lock.release() + + def get_max_http_status_code(self): + """returns the highest(worst) http status code""" + self._lock.acquire() + max_http_status_code = self.max_http_status_code + self._lock.release() + return max_http_status_code + + @staticmethod + def get_status_code(success): + """COMPLETE versus ERROR""" + if success: + return 'COMPLETE' + return 'ERROR' + + @staticmethod + def hide_secrets(obj): + """hides the known secret field values of the dictionary""" + if not isinstance(obj, dict): + return obj + + for key in obj: + if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]: + obj[key] = "*" + elif isinstance(obj[key], dict): + obj[key] = Audit.hide_secrets(obj[key]) + + return obj + + @staticmethod + def log_json_dumps(obj, **kwargs): + """hide the known secret field values of the dictionary and return json.dumps""" + if not isinstance(obj, dict): + return json.dumps(obj, **kwargs) + + return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) + + def is_serious_error(self, status_code): + """returns whether the response_code is success and a human text for response code""" + return AuditResponseCode.PERMISSION_ERROR.value \ + == AuditResponseCode.get_response_code(status_code).value \ + or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value + + def _get_response_status(self): + """calculates the response status fields from max_http_status_code""" + max_http_status_code = self.get_max_http_status_code() + response_code = AuditResponseCode.get_response_code(max_http_status_code) + success = (response_code.value == AuditResponseCode.SUCCESS.value) + response_description = AuditResponseCode.get_human_text(response_code) + return success, max_http_status_code, response_code, response_description + + def is_success(self): + """returns whether the response_code is success and a human text for response code""" + success, _, _, _ = self._get_response_status() + return success + + def debug(self, log_line, **kwargs): + """debug - the debug=lowest level of logging""" + Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs)) + + def info(self, log_line, **kwargs): + """debug - the info level of logging""" + Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs)) + + def info_requested(self, result=None, **kwargs): + """info "requested ..." - the info level of logging""" + self.info("requested {0} {1}".format(self.req_message, result or ""), \ + **self.merge_all_kwargs(**kwargs)) + + def warn(self, log_line, **kwargs): + """debug+error - the warn level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + Audit._logger_debug.warn(log_line, **all_kwargs) + Audit._logger_error.warn(log_line, **all_kwargs) + + def error(self, log_line, **kwargs): + """debug+error - the error level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + Audit._logger_debug.error(log_line, **all_kwargs) + Audit._logger_error.error(log_line, **all_kwargs) + + def fatal(self, log_line, **kwargs): + """debug+error - the fatal level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + Audit._logger_debug.fatal(log_line, **all_kwargs) + Audit._logger_error.fatal(log_line, **all_kwargs) + + @staticmethod + def get_elapsed_time(started): + """returns the elapsed time since started in milliseconds""" + return int(round(1000 * (time.time() - started))) + + def metrics_start(self, log_line=None, **kwargs): + """reset metrics timing""" + self._metrics_started = time.time() + self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent() + if log_line: + self.info(log_line, **self.merge_all_kwargs(**kwargs)) + + def metrics(self, log_line, **kwargs): + """debug+metrics - the metrics=sub-audit level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + success, max_http_status_code, response_code, response_description = \ + self._get_response_status() + metrics_func = None + timer = Audit.get_elapsed_time(self._metrics_started) + if success: + log_line = "done: {0}".format(log_line) + self.info(log_line, **all_kwargs) + metrics_func = Audit._logger_metrics.info + Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) + else: + log_line = "failed: {0}".format(log_line) + self.error(log_line, errorCode=response_code.value, \ + errorDescription=response_description, **all_kwargs) + metrics_func = Audit._logger_metrics.error + Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) + + metrics_func(log_line, begTime=self._metrics_start_event, timer=timer, + statusCode=Audit.get_status_code(success), responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs + ) + + self.metrics_start() + return (success, max_http_status_code, response_description) + + def audit_done(self, result=None, **kwargs): + """debug+audit - the audit=top level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + success, max_http_status_code, response_code, response_description = \ + self._get_response_status() + log_line = "{0} {1}".format(self.req_message, result or "").strip() + audit_func = None + timer = Audit.get_elapsed_time(self._started) + if success: + log_line = "done: {0}".format(log_line) + self.info(log_line, **all_kwargs) + audit_func = Audit._logger_audit.info + Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) + else: + log_line = "failed: {0}".format(log_line) + self.error(log_line, errorCode=response_code.value, + errorDescription=response_description, **all_kwargs) + audit_func = Audit._logger_audit.error + Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) + + audit_func(log_line, begTime=self._start_event, timer=timer, + statusCode=Audit.get_status_code(success), + responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs + ) + + return (success, max_http_status_code, response_description) + # this line added to test diff --git a/oti/event-handler/otihandler/onap/health.py b/oti/event-handler/otihandler/onap/health.py new file mode 100644 index 0000000..39edc0d --- /dev/null +++ b/oti/event-handler/otihandler/onap/health.py @@ -0,0 +1,102 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""generic class to keep track of app health""" + +import uuid +from datetime import datetime +from threading import Lock + + +class HealthStats(object): + """keep track of stats for calls""" + def __init__(self, name): + """keep track of stats for metrics calls""" + self._name = name or "stats_" + str(uuid.uuid4()) + self._lock = Lock() + self._call_count = 0 + self._error_count = 0 + self._longest_timer = 0 + self._total_timer = 0 + self._last_success = None + self._last_error = None + + def dump(self): + """returns dict of stats""" + dump = None + with self._lock: + dump = { + "call_count" : self._call_count, + "error_count" : self._error_count, + "last_success" : str(self._last_success), + "last_error" : str(self._last_error), + "longest_timer_millisecs" : self._longest_timer, + "ave_timer_millisecs" : (float(self._total_timer)/self._call_count \ + if self._call_count else 0) + } + return dump + + def success(self, timer): + """records the successful execution""" + with self._lock: + self._call_count += 1 + self._last_success = datetime.now() + self._total_timer += timer + if not self._longest_timer or self._longest_timer < timer: + self._longest_timer = timer + + def error(self, timer): + """records the errored execution""" + with self._lock: + self._call_count += 1 + self._error_count += 1 + self._last_error = datetime.now() + self._total_timer += timer + if not self._longest_timer or self._longest_timer < timer: + self._longest_timer = timer + +class Health(object): + """Health stats for multiple requests""" + def __init__(self): + """Health stats for application""" + self._all_stats = {} + self._lock = Lock() + + def _add_or_get_stats(self, stats_name): + """add to or get from the ever growing dict of HealthStats""" + stats = None + with self._lock: + stats = self._all_stats.get(stats_name) + if not stats: + self._all_stats[stats_name] = stats = HealthStats(stats_name) + return stats + + def success(self, stats_name, timer): + """records the successful execution on stats_name""" + stats = self._add_or_get_stats(stats_name) + stats.success(timer) + + def error(self, stats_name, timer): + """records the error execution on stats_name""" + stats = self._add_or_get_stats(stats_name) + stats.error(timer) + + def dump(self): + """returns dict of stats""" + with self._lock: + stats = dict((k, v.dump()) for (k, v) in self._all_stats.items()) + + return stats diff --git a/oti/event-handler/otihandler/utils.py b/oti/event-handler/otihandler/utils.py new file mode 100644 index 0000000..4f9dbda --- /dev/null +++ b/oti/event-handler/otihandler/utils.py @@ -0,0 +1,83 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +import base64 +import collections +import copy +import os + +from Crypto import Random +from Crypto.Cipher import PKCS1_v1_5 +from Crypto.Hash import SHA +from Crypto.PublicKey import RSA + + +def update_dict(d, u): + """Recursively updates dict + + Update dict d with dict u + """ + for k, v in u.items(): + if isinstance(v, collections.Mapping): + r = update_dict(d.get(k, {}), v) + d[k] = r + else: + d[k] = u[k] + return d + +def replace_token(configure_content): + try: + with open("/opt/app/config-map/dcae-k8s-cluster-token",'r') as fh: + dcae_token = fh.readline().rstrip('\n') + + new_config = copy.deepcopy(configure_content) + + # override the default-user token + ix=0 + for user in new_config['users'][:]: + if user['name'] == "default-user": + new_config['users'][ix] = { + "name": "default-user", + "user": { + "token": dcae_token + } + } + ix += 1 + + return new_config + + except Exception as e: + return configure_content + +def decrypt(b64_ciphertext): + """returns decrypted b64_ciphertext that was encoded like this: + + echo "cleartext" | openssl pkeyutl -encrypt -pubin -inkey rsa.pub | base64 --wrap=0 + + requires private key in environment variable EOMUSER_PRIVATE + """ + + if len(b64_ciphertext) <= 30: # For transition, assume short values are not encrypted + return b64_ciphertext + + try: + ciphertext = base64.b64decode(b64_ciphertext) + key = RSA.importKey(os.getenv('EOMUSER_PRIVATE')) + cleartext = PKCS1_v1_5.new(key).decrypt(ciphertext, Random.new().read(15+SHA.digest_size)) + except Exception as e: + return b64_ciphertext + + return cleartext diff --git a/oti/event-handler/otihandler/web_server.py b/oti/event-handler/otihandler/web_server.py new file mode 100644 index 0000000..f3eb071 --- /dev/null +++ b/oti/event-handler/otihandler/web_server.py @@ -0,0 +1,603 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""web-service for oti_handler""" + +import json +import logging +import os +import time +from datetime import datetime + +import cherrypy + +from otihandler.cbs_rest import CBSRest +from otihandler.config import Config +from otihandler.dti_processor import DTIProcessor +from otihandler.onap.audit import Audit + + +class DTIWeb(object): + """run REST API of OTI Handler""" + + logger = logging.getLogger("oti_handler.web_server") + HOST_INADDR_ANY = ".".join("0"*4) + + @staticmethod + def run_forever(audit): + """run the web-server of OTI Handler forever""" + + cherrypy.config.update({"server.socket_host": DTIWeb.HOST_INADDR_ANY, + "server.socket_port": Config.wservice_port}) + + protocol = "http" + tls_info = "" + if Config.tls_server_cert_file and Config.tls_private_key_file: + tm_cert = os.path.getmtime(Config.tls_server_cert_file) + tm_key = os.path.getmtime(Config.tls_private_key_file) + #cherrypy.server.ssl_module = 'builtin' + cherrypy.server.ssl_module = 'pyOpenSSL' + cherrypy.server.ssl_certificate = Config.tls_server_cert_file + cherrypy.server.ssl_private_key = Config.tls_private_key_file + if Config.tls_server_ca_chain_file: + cherrypy.server.ssl_certificate_chain = Config.tls_server_ca_chain_file + protocol = "https" + tls_info = "cert: {} {} {}".format(Config.tls_server_cert_file, + Config.tls_private_key_file, + Config.tls_server_ca_chain_file) + + cherrypy.tree.mount(_DTIWeb(), '/') + + DTIWeb.logger.info( + "%s with config: %s", audit.info("running oti_handler as {}://{}:{} {}".format( + protocol, cherrypy.server.socket_host, cherrypy.server.socket_port, tls_info)), + json.dumps(cherrypy.config)) + cherrypy.engine.start() + + # If HTTPS server certificate changes, exit to let kubernetes restart us + if Config.tls_server_cert_file and Config.tls_private_key_file: + while True: + time.sleep(600) + c_tm_cert = os.path.getmtime(Config.tls_server_cert_file) + c_tm_key = os.path.getmtime(Config.tls_private_key_file) + if c_tm_cert > tm_cert or c_tm_key > tm_key: + DTIWeb.logger.info("cert or key file updated") + cherrypy.engine.stop() + cherrypy.engine.exit() + break + + +class _DTIWeb(object): + """REST API of DTI Handler""" + + VALID_EVENT_TYPES = ['deploy', 'undeploy', 'add', 'delete', 'update', 'notify'] + + @staticmethod + def _get_request_info(request): + """Returns info about the http request.""" + + return "{} {}{}".format(request.method, request.script_name, request.path_info) + + + #----- Common endpoint methods + + @cherrypy.expose + @cherrypy.tools.json_out() + def healthcheck(self): + """Returns healthcheck results.""" + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + + DTIWeb.logger.info("%s", req_info) + + result = Audit.health() + + DTIWeb.logger.info("healthcheck %s: result=%s", req_info, json.dumps(result)) + + audit.audit_done(result=json.dumps(result)) + return result + + @cherrypy.expose + def shutdown(self): + """Shutdown the web server.""" + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + + DTIWeb.logger.info("%s: --- stopping REST API of DTI Handler ---", req_info) + + cherrypy.engine.exit() + + health = json.dumps(Audit.health()) + audit.info("oti_handler health: {}".format(health)) + DTIWeb.logger.info("oti_handler health: %s", health) + DTIWeb.logger.info("%s: --------- the end -----------", req_info) + result = str(datetime.now()) + audit.info_requested(result) + return "goodbye! shutdown requested {}".format(result) + + # ----- DTI Handler mock endpoint methods + @cherrypy.expose + @cherrypy.tools.json_out() + @cherrypy.tools.json_in() + def mockevents(self): + + result = {"KubeNamespace":"com-my-dcae-test", "KubePod":"pod-0", "KubeServiceName":"pod-0.service.local", "KubeServicePort":"8880", "KubeClusterFqdn":"fqdn-1"} + + return result + + #----- DTI Handler endpoint methods + + @cherrypy.expose + @cherrypy.tools.json_out() + @cherrypy.tools.json_in() + def events(self, notify="y"): + """ + Run dti reconfig script in service component instances configured to accept the DTI Event. + + POST /events < <dcae_event> + + POST /events?ndtify="n" < <dcae_event> + + where <dcae_event> is the entire DTI Event passed as a JSON object and contains at least these top-level keys: + dcae_service_action : string + required, 'deploy' or 'undeploy' + dcae_target_name : string + required, VNF Instance ID + dcae_target_type : string + required, VNF Type of the VNF Instance + dcae_service_location : string + optional, CLLI location. Not provided or '' infers all locations. + + Parameters + ---------- + notify : string + optional, default "y", any of these will not notify components: [ "f", "false", "False", "FALSE", "n", "no" ] + When "n" will **not** notify components of this DTI Event update to Consul. + + Returns + ------- + dict + JSON object containing success or error of executing the dti reconfig script on + each component instance's docker container, keyed by service_component_name. + + """ + + if cherrypy.request.method != "POST": + raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) + + dti_event = cherrypy.request.json or {} + str_dti_event = json.dumps(dti_event) + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message="{}: {}".format(req_info, str_dti_event), \ + headers=cherrypy.request.headers) + DTIWeb.logger.info("%s: dti_event=%s headers=%s", \ + req_info, str_dti_event, json.dumps(cherrypy.request.headers)) + + dcae_service_action = dti_event.get('dcae_service_action') + if not dcae_service_action: + msg = 'dcae_service_action is missing' + DTIWeb.logger.error(msg) + raise cherrypy.HTTPError(400, msg) + elif dcae_service_action.lower() not in self.VALID_EVENT_TYPES: + msg = 'dcae_service_action is invalid' + DTIWeb.logger.error(msg) + raise cherrypy.HTTPError(400,msg) + + dcae_target_name = dti_event.get('dcae_target_name') + if not dcae_target_name: + msg = 'dcae_target_name is missing' + DTIWeb.logger.error(msg) + raise cherrypy.HTTPError(400, msg) + + dcae_target_type = dti_event.get('dcae_target_type', '') + if not dcae_target_type: + msg = 'dcae_target_type is missing' + DTIWeb.logger.error(msg) + raise cherrypy.HTTPError(400, msg) + + send_notification = True + if (isinstance(notify, bool) and not notify) or \ + (isinstance(notify, str) and notify.lower() in [ "f", "false", "n", "no" ]): + send_notification = False + + prc = DTIProcessor(dti_event, send_notification=send_notification) + result = prc.get_result() + + DTIWeb.logger.info("%s: dti_event=%s result=%s", \ + req_info, str_dti_event, json.dumps(result)) + + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) + if not success: + cherrypy.response.status = http_status_code + + return result + + def get_docker_events(self, request, service, location): + """ + common routine for dti_docker_events and oti_docker_events + + :param request: HTTP GET request + :param service: HTTP request query parameter for service name + :param location: HTTP request query parameter for location CLLI + :return: + """ + + if request.method != "GET": + raise cherrypy.HTTPError(404, "unexpected method {}".format(request.method)) + + req_info = _DTIWeb._get_request_info(request) + audit = Audit(req_message=req_info, headers=request.headers) + + return DTIProcessor.get_docker_raw_events(service, location) + + def get_k8s_events(self, request, **params): + """ + common routine for dti_k8s_events and oti_k8s_events + + :param request: HTTP GET request + :param params: HTTP request query parameters + :return: + """ + if request.method != "GET": + raise cherrypy.HTTPError(404, "unexpected method {}".format(request.method)) + + req_info = _DTIWeb._get_request_info(request) + audit = Audit(req_message=req_info, headers=request.headers) + + pod = request.params['pod'] + namespace = request.params['namespace'] + cluster = request.params['cluster'] + + return DTIProcessor.get_k8_raw_events(pod, cluster, namespace) + + @cherrypy.expose + @cherrypy.tools.json_out() + def oti_k8s_events(self, **params): + """ + Retrieve raw JSON events from application events database + + GET /oti_k8_events?pod=<sts-1>&namespace=<ns1>&cluster=<cluster1> + + Parameters + ---------- + pod ID : string + POD ID of the stateful set POD + namespace: string + kubernetes namespace + cluster: string + kubernetes cluster FQDN + + Returns + ------- + dict + JSON object containing the fully-bound configuration. + + """ + + return self.get_k8s_events(cherrypy.request, params) + + @cherrypy.expose + @cherrypy.tools.json_out() + def dti_k8s_events(self, **params): + """ + Retrieve raw JSON events from application events database + + GET /dti_k8_events?pod=<sts-1>&namespace=<ns1>&cluster=<cluster1> + + Parameters + ---------- + pod ID : string + POD ID of the stateful set POD + namespace: string + kubernetes namespace + cluster: string + kubernetes cluster FQDN + + Returns + ------- + dict + JSON object containing the fully-bound configuration. + + """ + + return self.get_k8s_events(cherrypy.request, params) + + @cherrypy.expose + @cherrypy.tools.json_out() + def oti_docker_events(self, service, location=None): + """ + Retrieve raw JSON events from application events database related to docker deployments + + GET /oti_docker_events?service=<svc>&location=<location> + + Parameters + ---------- + service : string + The service component name assigned by dockerplugin to the component + that is unique to the cloudify node instance and used in its Consul key(s). + location : string + optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location. + If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided, + otherwise results are not location filtered. + + Returns + ------- + dict + JSON object containing the fully-bound configuration. + + """ + + return self.get_docker_events(cherrypy.request, service, location) + + @cherrypy.expose + @cherrypy.tools.json_out() + def dti_docker_events(self, service, location=None): + """ + Retrieve raw JSON events from application events database related to docker deployments + + GET /dti_docker_events?service=<svc>&location=<location> + + Parameters + ---------- + service : string + The service component name assigned by dockerplugin to the component + that is unique to the cloudify node instance and used in its Consul key(s). + location : string + optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location in service_location. + If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul TAGs if service_name is provided, + otherwise results are not location filtered. + + Returns + ------- + dict + JSON object containing the fully-bound configuration. + + """ + + return self.get_docker_events(cherrypy.request, service, location) + + #----- Config Binding Service (CBS) endpoint methods + + @cherrypy.expose + @cherrypy.popargs('service_name') + @cherrypy.tools.json_out() + def service_component(self, service_name): + """ + Retrieve fully-bound configuration for service_name from Consul KVs. + + GET /service_component/<service_name> + + Parameters + ---------- + service_name : string + The service component name assigned by dockerplugin to the component + that is unique to the cloudify node instance and used in its Consul key(s). + + Returns + ------- + dict + JSON object containing the fully-bound configuration. + + """ + + if cherrypy.request.method != "GET": + raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + DTIWeb.logger.info("%s: service_name=%s headers=%s", \ + req_info, service_name, json.dumps(cherrypy.request.headers)) + + try: + result = CBSRest.get_service_component(service_name) + except Exception as e: + result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} + audit.set_http_status_code(404) + + DTIWeb.logger.info("%s: service_name=%s result=%s", \ + req_info, service_name, json.dumps(result)) + + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) + if not success: + cherrypy.response.status = http_status_code + + return result + + @cherrypy.expose + @cherrypy.popargs('service_name') + @cherrypy.tools.json_out() + def service_component_all(self, service_name, service_location=None, policy_ids="y"): + """ + Retrieve all information for service_name (config, dti, dti_events, and policies) from Consul KVs. + + GET /service_component_all/<service_name> + + GET /service_component_all/<service_name>?service_location=<service_location> + + GET /service_component_all/<service_name>?service_location=<service_location>;policy_ids=n + + Parameters + ---------- + service_name : string + The service component name assigned by dockerplugin to the component + that is unique to the cloudify node instance and used in its Consul key(s). + service_location : string + optional, allows multiple values separated by commas. + Filters DTI events with dcae_service_location in service_location. + policy_ids : string + optional, default "y", any of these will unset: [ "f", "false", "False", "FALSE", "n", "no" ] + When unset, formats policies items as a list (without policy_ids) rather than as an object indexed by policy_id. + + Returns + ------- + dict + JSON object containing all information for component service_name. + The top-level keys may include the following: + config : dict + The cloudify node's application_config property from when the start workflow was executed. + dti : dict + Keys are VNF Types that the component currently is assigned to monitor. Policy can change them. + dti_events : dict + The latest deploy DTI events, keyed by VNF Type and sub-keyed by VNF Instance ID. + policies : dict + event : dict + Contains information about when the policies folder was last written. + items : dict + Contains all policy bodies for the service_name component, keyed by policy_id. + + """ + + if cherrypy.request.method != "GET": + raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + DTIWeb.logger.info("%s: service_name=%s headers=%s", \ + req_info, service_name, json.dumps(cherrypy.request.headers)) + + policies_as_list = False + if (isinstance(policy_ids, bool) and not policy_ids) or \ + (isinstance(policy_ids, str) and policy_ids.lower() in [ "f", "false", "n", "no" ]): + policies_as_list = True + try: + result = CBSRest.get_service_component_all(service_name, service_location=service_location, policies_as_list=policies_as_list) + except Exception as e: + result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} + audit.set_http_status_code(404) + + DTIWeb.logger.info("%s: service_name=%s result=%s", \ + req_info, service_name, json.dumps(result)) + + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) + if not success: + cherrypy.response.status = http_status_code + + return result + + @cherrypy.expose + @cherrypy.popargs('service_name') + @cherrypy.tools.json_out() + def dti(self, service_name=None, vnf_type=None, vnf_id=None, service_location=None): + """ + Retrieve current (latest, not undeployed) DTI events from Consul KVs. + + GET /dti/<service_name> + + GET /dti/<service_name>?vnf_type=<vnf_type>;vnf_id=<vnf_id>;service_location=<service_location> + + GET /dti + + GET /dti?vnf_type=<vnf_type>;vnf_id=<vnf_id>;service_location=<service_location> + + Parameters + ---------- + service_name : string + optional. The service component name assigned by dockerplugin to the component + that is unique to the cloudify node instance and used in its Consul key(s). + vnf_type : string + optional, allows multiple values separated by commas. Gets DTI events for these vnf_type(s). + vnf_id : string + optional. Requires vnf_type also. Gets DTI event for this vnf_id. + service_location : string + optional, allows multiple values separated by commas. + Filters DTI events with dcae_service_location in service_location. + + Returns + ------- + dict + Dictionary of DTI event(s). + If one vnf_type and vnf_id are both specified, then object returned will be just the one DTI event. + If one vnf_type is specified but not vnf_id, then DTI events will be keyed by vnf_id. + Otherwise the DTI events will be keyed by vnf_type, sub-keyed by vnf_id. + + """ + + if cherrypy.request.method != "GET": + raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + DTIWeb.logger.info("%s: service_name=%s headers=%s", \ + req_info, service_name, json.dumps(cherrypy.request.headers)) + + try: + result = CBSRest.get_dti(service_name=service_name, vnf_type=vnf_type, vnf_id=vnf_id, service_location=service_location) + except Exception as e: + result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} + audit.set_http_status_code(404) + + DTIWeb.logger.info("%s: service_name=%s result=%s", \ + req_info, service_name, json.dumps(result)) + + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) + if not success: + cherrypy.response.status = http_status_code + + return result + + @cherrypy.expose + @cherrypy.popargs('service_name') + @cherrypy.tools.json_out() + def policies(self, service_name, policy_id=None): + """ + Retrieve policies for service_name from Consul KVs. + + GET /policies/<service_name> + + GET /policies/<service_name>?policy_id=<policy_id> + + Parameters + ---------- + service_name : string + The service component name assigned by dockerplugin to the component + that is unique to the cloudify node instance and used in its Consul key(s). + policy_id : string + optional. Limits returned policy to this policy_id. + + Returns + ------- + dict + JSON object containing policy bodies for the service_name component. + If policy_id is specified, then object returned will be just the one policy body. + If policy_id is not specified, then object will contain all policy bodies, keyed by policy_id. + + """ + + if cherrypy.request.method != "GET": + raise cherrypy.HTTPError(404, "unexpected method {}".format(cherrypy.request.method)) + + req_info = _DTIWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + DTIWeb.logger.info("%s: service_name=%s headers=%s", \ + req_info, service_name, json.dumps(cherrypy.request.headers)) + + try: + result = CBSRest.get_policies(service_name, policy_id=policy_id) + except Exception as e: + result = {"ERROR": "exception {}: {!s}".format(type(e).__name__, e)} + audit.set_http_status_code(404) + + DTIWeb.logger.info("%s: service_name=%s result=%s", \ + req_info, service_name, json.dumps(result)) + + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) + if not success: + cherrypy.response.status = http_status_code + + return result diff --git a/oti/event-handler/pom.xml b/oti/event-handler/pom.xml new file mode 100644 index 0000000..962d742 --- /dev/null +++ b/oti/event-handler/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +============LICENSE_START======================================================= +org.onap.dcae +================================================================================ +Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. +================================================================================ +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END========================================================= +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- This file is used only for Sonar scans --> + <modelVersion>4.0.0</modelVersion> + <groupId>org.onap.dcaegen2.platform.oti</groupId> + <artifactId>dcaegen2-platform-oti-event-handler</artifactId> + <version>1.0.0</version> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <sonar.sources>.</sonar.sources> + <sonar.junit.reportsPath>xunit-results.xml</sonar.junit.reportsPath> + <sonar.python.coverage.reportPath>coverage.xml</sonar.python.coverage.reportPath> + <sonar.language>py</sonar.language> + <sonar.pluginname>python</sonar.pluginname> + <sonar.inclusions>**/*.py</sonar.inclusions> + <sonar.exclusions>**/tests/**,**/setup.py</sonar.exclusions> + </properties> +</project> diff --git a/oti/event-handler/requirements.txt b/oti/event-handler/requirements.txt new file mode 100644 index 0000000..e46ed5e --- /dev/null +++ b/oti/event-handler/requirements.txt @@ -0,0 +1,10 @@ +CherryPy>=18.0.0 +docker==4.1.0 +enum34>=1.1.6 +kubernetes==4.0.0 +requests>=2.18.4,<3.0.0 +SQLAlchemy==1.3.6 +psycopg2-binary==2.8.4 +pyOpenSSL==19.1.0 +pycrypto==2.6.1 +uuid==1.30 diff --git a/oti/event-handler/run.sh b/oti/event-handler/run.sh new file mode 100644 index 0000000..e5a340a --- /dev/null +++ b/oti/event-handler/run.sh @@ -0,0 +1,56 @@ +#!/bin/bash +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +mkdir -p logs +LOG_FILE=logs/oti_handler.log +exec &>> >(tee -a ${LOG_FILE}) +echo "---------------------------------------------" +STARTED=$(date +%Y-%m-%d_%T.%N) +echo "${STARTED}: running ${BASH_SOURCE[0]}" +export APP_VER=$(python setup.py --version) +echo "APP_VER=${APP_VER}" +echo "HOSTNAME=${HOSTNAME}" +echo "CONSUL_URL=${CONSUL_URL}" +(pwd; uname -a; id; echo "ls -lanR:"; ls -lanR; echo "/etc/hosts:"; cat /etc/hosts; openssl version -a) +echo "---------------------------------------------" + +export REQUESTS_CA_BUNDLE="/etc/ssl/certs/ca-certificates.crt" + +# create the database tables +export PGPASSWORD=$postgres_password +psql -h $postgres_ip -U $postgres_user $postgres_db_name -f /tmp/create_schema.sql + +python -m otihandler 2>&1 & +PID=$! + +function finish { + echo "killing oti_handler ${PID}" $(date +%Y_%m%d-%H:%M:%S.%N) + kill -9 ${PID} + echo "killed oti_handler ${PID}" $(date +%Y_%m%d-%H:%M:%S.%N) +} +trap finish SIGHUP SIGINT SIGTERM + +echo "running oti_handler as" ${PID} "log" ${LOG_FILE} +#(free -h; df -h; ps afxvw; ss -aepi) + +wait ${PID} +exec &>> >(tee -a ${LOG_FILE}) +echo "---------------------------------------------" +rm ${LOG_FILE}.2[0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]_[0-9][0-9][0-9][0-9][0-9][0-9] +mv ${LOG_FILE} ${LOG_FILE}.$(date +%Y-%m-%d_%H%M%S) diff --git a/oti/event-handler/setup.py b/oti/event-handler/setup.py new file mode 100644 index 0000000..b2dd1a2 --- /dev/null +++ b/oti/event-handler/setup.py @@ -0,0 +1,46 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""DCAE-Controller dti_handler""" + +from setuptools import setup + +setup( + name='otihandler', + description='DCAE-Controller OTI Handler', + version="1.0.0", + author=[''], + packages=['otihandler'], + zip_safe=False, + install_requires=[ + "CherryPy>=18.0.0", + "docker==4.1.0", + "enum34>=1.1.6", + "kubernetes==4.0.0", + "requests>=2.18.4,<3.0.0", + "SQLAlchemy==1.3.6", + "psycopg2-binary==2.8.4", + "pyOpenSSL==19.1.0", + "pycrypto==2.6.1", + "uuid==1.30" + ], + keywords='oti dcae controller', + classifiers=[ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'Programming Language :: Python :: 3.6' + ] +) diff --git a/oti/event-handler/tox.ini b/oti/event-handler/tox.ini new file mode 100644 index 0000000..2532eb9 --- /dev/null +++ b/oti/event-handler/tox.ini @@ -0,0 +1,4 @@ +[tox] +envlist = py36 + +[testenv] |