diff options
author | Alex Shatov <alexs@att.com> | 2020-02-27 12:45:54 -0500 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2020-02-27 12:45:54 -0500 |
commit | 78ff88f9b3a3d32f941b3b9fedc2abfbaba291cb (patch) | |
tree | 5670dddc0e0cd9f793d419420b61ad0559639497 /policyhandler/pdp_api/dmaap_mr.py | |
parent | 715fc8a36ac1809cd3e36cbb6cfb7107ebb038ea (diff) |
5.1.0 policy-handler - policy-updates from new PDP5.1.0
DCAEGEN2-1851:
- policy-handler now supports the policy-update notification
from the new policy-engine thru DMaaP MR
= no policy-filters - only policy-id values
- see README for discoverable config settings of dmaap_mr client
= DMaaP MR client has the same flexibility as policy_engine
= set the query.timeout to high value like 15000 (default)
- requests to DMaaP MR go through a single blocking connection
- first catch-up only after draining the policy-updates from DMaaP MR
on the first loop
- safe parsing of messages from DMaaP MR
- policy-engine changed the data type for policy-version field
from int to string that is expected to have the semver value
- related change to deployment-handler (DCAEGEN2-2085) has to be
deployed to handle the non-numeric policyVersion
- on new PDP API: http /policy_latest and policy-updates
return the new data from the new PDP API with the following fields
added/renamed by the policy-handler to keep other policy related parts
intact in R4-R6 (see pdp_api/policy_utils.py)
* policyName = policy_id + "." + policyVersion.replace(".","-")
+ ".xml"
* policyVersion = str(metadata["policy-version"])
* "config" - is the renamed "properties" from the new PDP API response
- enabled the /catch_up and the periodic auto-catch-up for the new PDP
API
- enabled GET /policies_latest - returns the latest policies for the
deployed components
- POST /policies_latest - still disabled since no support for the
policy-filters is provided for the new PDP API
- fixed hiding the Authorization value on comparing the configs
- logging of secrets is now sha256 to see whether they changed
- added X-ONAP-RequestID to headers the same way as X-ECOMP-RequestID
- on policy-update process the removal first, then addition
- changed the pool_connections=1 (number of pools) on PDP and DH sides
== only a single destination is expected for each
- log the exception as fatal into error.log
- other minor fixes and refactoring
- unit-test coverage 74%
- integration testing is requested
DCAEGEN2-1976:
- policy-handler is enhanced to get user/password from env vars
for PDP and DMaaP MR clients and overwriting the Authorization field
in https headers received from the discoverable config
= to override the Authorization value on policy_engine,
set the environment vars $PDP_USER and $PDP_PWD in policy-handler
container
= to override the Authorization value on dmaap_mr,
if using https and user-password authentication,
set the environment vars $DMAAP_MR_USER and $DMAAP_MR_PWD in
policy-handler container
Change-Id: Iad8eab9e20e615a0e0d2822f4735dc64c50aa55c
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-1851
Issue-ID: DCAEGEN2-1976
Diffstat (limited to 'policyhandler/pdp_api/dmaap_mr.py')
-rw-r--r-- | policyhandler/pdp_api/dmaap_mr.py | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/policyhandler/pdp_api/dmaap_mr.py b/policyhandler/pdp_api/dmaap_mr.py new file mode 100644 index 0000000..2d4d468 --- /dev/null +++ b/policyhandler/pdp_api/dmaap_mr.py @@ -0,0 +1,202 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +"""policy-client communicates with policy-engine thru REST API""" + +import copy +import json +from threading import Lock + +import requests + +from ..config import Config, Settings +from ..onap.audit import AuditHttpCode, AuditResponseCode, Metrics +from ..utils import Utils + +_LOGGER = Utils.get_logger(__file__) + +class DmaapMr(object): + """using the http API to policy-engine""" + _lazy_inited = False + DEFAULT_TIMEOUT_IN_SECS = 60 + + _lock = Lock() + _settings = Settings(Config.DMAAP_MR) + + _requests_session = None + _long_polling = False + _target_entity = None + _url = None + _query = {} + _headers = None + _custom_kwargs = {} + _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS + + @staticmethod + def _init(audit): + """init static config""" + DmaapMr._custom_kwargs = {} + tls_ca_mode = None + + if not DmaapMr._requests_session: + DmaapMr._requests_session = requests.Session() + DmaapMr._requests_session.mount( + 'https://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=1, + pool_block=True)) + DmaapMr._requests_session.mount( + 'http://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=1, + pool_block=True)) + + + _, config = DmaapMr._settings.get_by_key(Config.DMAAP_MR) + if config: + DmaapMr._url = config.get("url") + DmaapMr._headers = config.get("headers", {}) + DmaapMr._query = copy.deepcopy(config.get("query", {})) + if DmaapMr._query.get(Config.QUERY_TIMEOUT, 0) < 1000: + DmaapMr._query[Config.QUERY_TIMEOUT] = 15000 + + DmaapMr._target_entity = config.get("target_entity", Config.DMAAP_MR) + + tls_ca_mode = config.get(Config.TLS_CA_MODE) + DmaapMr._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) + DmaapMr._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS) + if not DmaapMr._timeout_in_secs or DmaapMr._timeout_in_secs < 1: + DmaapMr._timeout_in_secs = DmaapMr.DEFAULT_TIMEOUT_IN_SECS + + _LOGGER.info( + audit.info(("config DMaaP MR({}) url({}) query({}) headers({}) " + "tls_ca_mode({}) custom_kwargs({}) timeout_in_secs({}): {}").format( + DmaapMr._target_entity, DmaapMr._url, + Metrics.json_dumps(DmaapMr._query), + Metrics.json_dumps(DmaapMr._headers), tls_ca_mode, + json.dumps(DmaapMr._custom_kwargs), DmaapMr._timeout_in_secs, + DmaapMr._settings))) + + DmaapMr._settings.commit_change() + DmaapMr._lazy_inited = True + + @staticmethod + def reconfigure(audit): + """reconfigure""" + with DmaapMr._lock: + DmaapMr._settings.set_config(Config.discovered_config) + if not DmaapMr._settings.is_changed(): + DmaapMr._settings.commit_change() + return False + + DmaapMr._lazy_inited = False + DmaapMr._long_polling = False + DmaapMr._init(audit) + return True + + @staticmethod + def _lazy_init(audit): + """init static config""" + if DmaapMr._lazy_inited: + return + + with DmaapMr._lock: + if DmaapMr._lazy_inited: + return + + DmaapMr._settings.set_config(Config.discovered_config) + DmaapMr._long_polling = False + DmaapMr._init(audit) + + @staticmethod + def get_policy_updates(audit): + """ + get from DMaaP MR - returns json list of stringified messages + + example [ + "{\"deployed-policies\":[ + {\"policy-type\":\"onap.policies.monitoring.cdap.tca.hi.lo.app\", + \"policy-type-version\":\"1.0.0\", + \"policy-id\":\"onap.scaleout.tca\", + \"policy-version\":\"2.2.2\", + \"success-count\":3, + \"failure-count\":0 + }], + \"undeployed-policies\":[ + {\"policy-type\":\"onap.policies.monitoring.cdap.tca.hi.lo.app\", + \"policy-type-version\":\"1.0.0\", + \"policy-id\":\"onap.scaleout.tca\", + \"policy-version\":\"1.0.0\", + \"success-count\":3, + \"failure-count\":0 + }]}" + ] + """ + DmaapMr._lazy_init(audit) + + if not DmaapMr._url: + _LOGGER.error( + audit.error("no url for DMaaP MR", error_code=AuditResponseCode.AVAILABILITY_ERROR)) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None + + with DmaapMr._lock: + target_entity = DmaapMr._target_entity + url = DmaapMr._url + params = copy.deepcopy(DmaapMr._query) if DmaapMr._long_polling else None + headers = copy.deepcopy(DmaapMr._headers) + timeout_in_secs = DmaapMr._timeout_in_secs + custom_kwargs = copy.deepcopy(DmaapMr._custom_kwargs) + DmaapMr._long_polling = True + + metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url) + + headers = metrics.put_request_id_into_headers(headers) + + log_line = ( + "get from {} at {} with params={}, headers={}, custom_kwargs({}) timeout_in_secs({})" + .format(target_entity, url, json.dumps(params), Metrics.json_dumps(headers), + json.dumps(custom_kwargs), timeout_in_secs)) + + _LOGGER.info(metrics.metrics_start(log_line)) + + res = None + try: + res = DmaapMr._requests_session.get(url, params=params, headers=headers, + timeout=timeout_in_secs, **custom_kwargs) + + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line)) + + _LOGGER.exception(metrics.fatal(error_msg)) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return None + + log_line = "response {} from {}: text={} headers={}".format( + res.status_code, log_line, res.text, Metrics.json_dumps(dict(res.headers.items()))) + _LOGGER.info(log_line) + + metrics.set_http_status_code(res.status_code) + audit.set_http_status_code(res.status_code) + metrics.metrics(log_line) + + policy_updates = None + if res.status_code == requests.codes.ok: + policy_updates = res.json() + + return policy_updates |