aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2019-01-31 16:07:48 -0500
committerAlex Shatov <alexs@att.com>2019-01-31 16:07:48 -0500
commitebc1a062328e53e97e4d24ed111534cfc567a809 (patch)
treeb0721077df349f2cee5d1a7426f4de0acc1855cb
parenta39f4e82cef0414f510cf20e25864ac04cc8f055 (diff)
4.6.0 policy-handler - active-passive
DCAEGEN2-931: - exposed POST /reconfigure endpoint on the web-server that initiates the reconfigure process right away DCAEGEN2-932: - mode_of_operation: active or passive = active is as before this change = in passive mode the policy-handler * closes the web-socket to PDP * skips the periodic catch_ups * still periodically checks for reconfigure * still allows usig the web-server to retrieve policies from PDP - default is active - when mode_of_operation changes from passive to active, the policy-handler invokes the catch_up right away - config-kv contains the optional override field mode_of_operation = changing the mode_of_operation in config-kv and invoking POST /reconfigure will bring the new value and change the mode of operation of the policy-handler if no service_activator section is provided in consul-kv record - if config-kv contains the service_activator section, = the policy-handler registers with service_activator - untested = and receives the mode_of_operation - untested = service_activator can POST-notify the policy-handler to initiate the /reconfigure - reduced the default web-socket ping interval from 180 to 30 seconds because PDP changed its default timeout on the web-socket from 400 seconds to 50 seconds Change-Id: If7dd21c008d9906aca97939be65dfa9c2f007535 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-931 Issue-ID: DCAEGEN2-932
-rw-r--r--LICENSE.txt2
-rw-r--r--policyhandler/__main__.py7
-rw-r--r--policyhandler/config.py4
-rw-r--r--policyhandler/onap/audit.py13
-rw-r--r--policyhandler/policy_consts.py3
-rw-r--r--policyhandler/policy_receiver.py102
-rw-r--r--policyhandler/policy_updater.py62
-rw-r--r--policyhandler/service_activator.py217
-rw-r--r--policyhandler/web_server.py21
-rw-r--r--pom.xml4
-rw-r--r--setup.py4
-rw-r--r--tests/mock_settings.py5
-rw-r--r--version.properties2
13 files changed, 383 insertions, 63 deletions
diff --git a/LICENSE.txt b/LICENSE.txt
index 14cb17c..c142ce1 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -1,7 +1,7 @@
/*
* ============LICENSE_START==========================================
* ===================================================================
-* Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+* Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
* ===================================================================
*
* Unless otherwise specified, all software contained herein is licensed
diff --git a/policyhandler/__main__.py b/policyhandler/__main__.py
index 63dc5da..798a2e1 100644
--- a/policyhandler/__main__.py
+++ b/policyhandler/__main__.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@ from policyhandler import LogWriter
from policyhandler.config import Config
from policyhandler.onap.audit import Audit
from policyhandler.policy_receiver import PolicyReceiver
+from policyhandler.service_activator import ServiceActivator
from policyhandler.web_server import PolicyWeb
@@ -47,7 +48,9 @@ def run_policy_handler():
audit = Audit(req_message="start policy handler")
Config.discover(audit)
- logger.info("starting policy_handler with config: %s", Config.discovered_config)
+ ServiceActivator.determine_mode_of_operation(audit)
+ logger.info(audit.info(
+ "starting policy_handler with config: {}".format(Config.discovered_config)))
PolicyReceiver.run(audit)
PolicyWeb.run_forever(audit)
diff --git a/policyhandler/config.py b/policyhandler/config.py
index 5184f7f..e6e74cc 100644
--- a/policyhandler/config.py
+++ b/policyhandler/config.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -152,6 +152,8 @@ class Config(object):
CONSUL_TIMEOUT_IN_SECS = "consul_timeout_in_secs"
WS_PING_INTERVAL_IN_SECS = "ws_ping_interval_in_secs"
DEFAULT_TIMEOUT_IN_SECS = 60
+ SERVICE_ACTIVATOR = "service_activator"
+ MODE_OF_OPERATION = "mode_of_operation"
system_name = SERVICE_NAME_POLICY_HANDLER
wservice_port = 25577
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py
index d63d0b2..db4498a 100644
--- a/policyhandler/onap/audit.py
+++ b/policyhandler/onap/audit.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -185,12 +185,15 @@ class _Audit(object):
@staticmethod
- def register_item_health(health_name, health_getter):
+ def register_item_health(health_name, health_getter=None):
"""
register the health-checker for the additional item
by its health_name and the function health_getter that returns its health status as json
"""
- _Audit._health_checkers[health_name] = health_getter
+ if health_getter:
+ _Audit._health_checkers[health_name] = health_getter
+ elif health_name in _Audit._health_checkers:
+ del _Audit._health_checkers[health_name]
def health(self, full=False):
"""returns json for health check"""
@@ -401,8 +404,8 @@ class Audit(_Audit):
self._started = time.time()
self._start_event = Audit._logger_audit.getStartRecordEvent()
- self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\
- .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
+ self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"
+ .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
def audit_done(self, result=None, **kwargs):
diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py
index cde4551..8f3ec76 100644
--- a/policyhandler/policy_consts.py
+++ b/policyhandler/policy_consts.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@ POLICY_CONFIG = 'config'
CATCH_UP = "catch_up"
AUTO_CATCH_UP = "auto catch_up"
+AUTO_RECONFIGURE = "auto reconfigure"
LATEST_POLICIES = "latest_policies"
REMOVED_POLICIES = "removed_policies"
ERRORED_POLICIES = "errored_policies"
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index 249c1f7..7cf1869 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@ from .onap.audit import Audit
from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
from .policy_updater import PolicyUpdater
from .policy_utils import Utils
+from .service_activator import ServiceActivator
LOADED_POLICIES = 'loadedPolicies'
REMOVED_POLICIES = 'removedPolicies'
@@ -58,12 +59,14 @@ class _PolicyReceiver(Thread):
WS_MESSAGE_COUNT = "message_count"
WS_MESSAGE_TIMESTAMP = "message_timestamp"
WS_STATUS = "status"
- WS_PING_INTERVAL_DEFAULT = 180
+ WS_PING_INTERVAL_DEFAULT = 30
+ WEB_SOCKET_HEALTH = "web_socket_health"
- def __init__(self, audit):
+ def __init__(self, audit, policy_updater):
"""web-socket inside the thread to receive policy notifications from PolicyEngine"""
Thread.__init__(self, name="policy_receiver", daemon=True)
+ self._policy_updater = policy_updater
self._lock = Lock()
self._keep_running = True
self._settings = Settings(Config.FIELD_POLICY_ENGINE)
@@ -83,12 +86,10 @@ class _PolicyReceiver(Thread):
_PolicyReceiver.WS_STATUS: "created"
}
- Audit.register_item_health("web_socket_health", self._get_health)
- self._reconfigure(audit)
+ Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH, self._get_health)
+ self.reconfigure(audit)
- self._policy_updater = PolicyUpdater(self._reconfigure)
-
- def _reconfigure(self, audit):
+ def reconfigure(self, audit):
"""configure and reconfigure the web-socket"""
with self._lock:
_PolicyReceiver._logger.info(audit.info("web_socket_health {}".format(
@@ -150,7 +151,6 @@ class _PolicyReceiver(Thread):
def run(self):
"""listen on web-socket and pass the policy notifications to policy-updater"""
- self._policy_updater.start()
_PolicyReceiver._logger.info("starting policy_receiver...")
websocket.enableTrace(True)
restarting = False
@@ -178,8 +178,9 @@ class _PolicyReceiver(Thread):
ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
_PolicyReceiver._logger.info(
- "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)",
- web_socket_url, json.dumps(sslopt), tls_wss_ca_mode)
+ "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)"
+ " ws_ping_interval_in_secs(%s)",
+ web_socket_url, json.dumps(sslopt), tls_wss_ca_mode, ws_ping_interval_in_secs)
self._web_socket = websocket.WebSocketApp(
web_socket_url,
@@ -194,6 +195,7 @@ class _PolicyReceiver(Thread):
self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs)
restarting = True
+ Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH)
_PolicyReceiver._logger.info("exit policy-receiver")
def _get_keep_running(self):
@@ -207,7 +209,7 @@ class _PolicyReceiver(Thread):
with self._lock:
if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected:
self._web_socket.close()
- _PolicyReceiver._logger.info("Stopped receiving notifications from PDP")
+ _PolicyReceiver._logger.info("stopped receiving notifications from PDP")
def _on_pdp_message(self, *args):
"""received the notification from PDP"""
@@ -270,7 +272,7 @@ class _PolicyReceiver(Thread):
self._web_socket_health[_PolicyReceiver.WS_STATUS] = "closed"
self._web_socket_health[_PolicyReceiver.WS_CLOSE_COUNT] += 1
_PolicyReceiver._logger.info(
- "lost connection(%s, %s) to PDP - restarting... web_socket_health %s",
+ "lost connection(%s, %s) to PDP web_socket_health %s",
code, reason, json.dumps(self._get_health(), sort_keys=True))
def _on_ws_open(self):
@@ -291,6 +293,7 @@ class _PolicyReceiver(Thread):
def _get_health(self):
"""returns the healthcheck of the web-socket as json"""
web_socket_health = copy.deepcopy(self._web_socket_health)
+ web_socket_health[Config.WS_PING_INTERVAL_IN_SECS] = self._ws_ping_interval_in_secs
started = web_socket_health.get(_PolicyReceiver.WS_STARTED)
if started:
web_socket_health[_PolicyReceiver.WS_STARTED] = str(started)
@@ -300,7 +303,7 @@ class _PolicyReceiver(Thread):
def shutdown(self, audit):
"""Shutdown the policy-receiver"""
- _PolicyReceiver._logger.info("shutdown policy-receiver")
+ _PolicyReceiver._logger.info(audit.info("shutdown policy-receiver"))
with self._lock:
self._keep_running = False
@@ -309,39 +312,74 @@ class _PolicyReceiver(Thread):
if self.is_alive():
self.join()
- self._policy_updater.shutdown(audit)
-
- def catch_up(self, audit):
- """need to bring the latest policies to DCAE-Controller"""
- self._policy_updater.catch_up(audit)
-
- def is_running(self):
- """check whether the policy-receiver and policy-updater are running"""
- return self.is_alive() and self._policy_updater.is_alive()
class PolicyReceiver(object):
- """policy-receiver - static singleton wrapper"""
+ """
+ policy-receiver - static singleton wrapper around two threads
+ policy_updater - master thread for all scheduled actions
+ policy_receiver - listens to policy-engine through web-socket
+ """
+ _policy_updater = None
_policy_receiver = None
@staticmethod
def is_running():
"""check whether the policy-receiver runs"""
- return PolicyReceiver._policy_receiver and PolicyReceiver._policy_receiver.is_running()
+ return (PolicyReceiver._policy_receiver
+ and PolicyReceiver._policy_receiver.is_alive()
+ and PolicyReceiver._policy_updater
+ and PolicyReceiver._policy_updater.is_alive())
+
+ @staticmethod
+ def _close_receiver(audit):
+ """stop the notification-handler"""
+ if PolicyReceiver._policy_receiver:
+ policy_receiver = PolicyReceiver._policy_receiver
+ PolicyReceiver._policy_receiver = None
+ policy_receiver.shutdown(audit)
@staticmethod
def shutdown(audit):
- """Shutdown the notification-handler"""
- PolicyReceiver._policy_receiver.shutdown(audit)
+ """shutdown the notification-handler and policy-updater"""
+ PolicyReceiver._close_receiver(audit)
+ PolicyReceiver._policy_updater.shutdown(audit)
@staticmethod
def catch_up(audit):
- """bring the latest policies from policy-engine"""
- PolicyReceiver._policy_receiver.catch_up(audit)
+ """request to bring the latest policies to DCAE"""
+ PolicyReceiver._policy_updater.catch_up(audit)
+
+ @staticmethod
+ def reconfigure(audit):
+ """request to reconfigure the updated config for policy-handler"""
+ PolicyReceiver._policy_updater.reconfigure(audit)
+
+ @staticmethod
+ def _on_reconfigure(audit):
+ """act on reconfiguration event"""
+ active = ServiceActivator.is_active_mode_of_operation(audit)
+
+ if not PolicyReceiver._policy_receiver:
+ if active:
+ PolicyReceiver._policy_receiver = _PolicyReceiver(audit,
+ PolicyReceiver._policy_updater)
+ PolicyReceiver._policy_receiver.start()
+ return
+
+ if not active:
+ PolicyReceiver._close_receiver(audit)
+ return
+
+ PolicyReceiver._policy_receiver.reconfigure(audit)
+
@staticmethod
def run(audit):
- """Using policy-engine client to talk to policy engine"""
- PolicyReceiver._policy_receiver = _PolicyReceiver(audit)
- PolicyReceiver._policy_receiver.start()
+ """run policy_updater and policy_receiver"""
+ PolicyReceiver._policy_updater = PolicyUpdater(PolicyReceiver._on_reconfigure)
+
+ PolicyReceiver._on_reconfigure(audit)
+
+ PolicyReceiver._policy_updater.start()
PolicyReceiver.catch_up(audit)
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index fb6c8b6..af1ea4b 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -25,11 +25,13 @@ from threading import Event, Lock, Thread
from .config import Config, Settings
from .deploy_handler import DeployHandler, PolicyUpdateMessage
from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
-from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID,
- POLICY_NAME, POLICY_NAMES, POLICY_VERSION)
+from .policy_consts import (AUTO_CATCH_UP, AUTO_RECONFIGURE, CATCH_UP,
+ POLICY_BODY, POLICY_ID, POLICY_NAME, POLICY_NAMES,
+ POLICY_VERSION)
from .policy_matcher import PolicyMatcher
from .policy_rest import PolicyRest
from .policy_utils import PolicyUtils
+from .service_activator import ServiceActivator
from .step_timer import StepTimer
@@ -172,11 +174,11 @@ class PolicyUpdater(Thread):
)
self._run.set()
- def _reconfigure(self):
+ def reconfigure(self, audit=None):
"""job to check for and bring in the updated config for policy-handler"""
with self._lock:
if not self._aud_reconfigure:
- self._aud_reconfigure = Audit(req_message=Config.RECONFIGURE)
+ self._aud_reconfigure = audit or Audit(req_message=AUTO_RECONFIGURE)
PolicyUpdater._logger.info(
"%s request_id %s",
self._aud_reconfigure.req_message, self._aud_reconfigure.request_id
@@ -251,7 +253,7 @@ class PolicyUpdater(Thread):
self._reconfigure_timer = StepTimer(
"reconfigure_timer",
self._reconfigure_interval,
- PolicyUpdater._reconfigure,
+ PolicyUpdater.reconfigure,
PolicyUpdater._logger,
self
)
@@ -293,30 +295,52 @@ class PolicyUpdater(Thread):
log_line = "{}({})".format(aud_reconfigure.req_message, aud_reconfigure.request_id)
reconfigure_result = ""
try:
+ need_to_catch_up = False
PolicyUpdater._logger.info(log_line)
+
+ active_prev = ServiceActivator.is_active_mode_of_operation(aud_reconfigure)
Config.discover(aud_reconfigure)
+
if not Config.discovered_config.is_changed():
+ active = ServiceActivator.determine_mode_of_operation(aud_reconfigure)
reconfigure_result = " -- config not changed"
else:
- reconfigure_result = " -- config changed for:"
+ changed_configs = []
+
+ if ServiceActivator.reconfigure(aud_reconfigure):
+ changed_configs.append(Config.SERVICE_ACTIVATOR)
+ active = ServiceActivator.determine_mode_of_operation(aud_reconfigure)
+
if self._set_timer_intervals():
- reconfigure_result += " timer_intervals"
+ changed_configs.append("timer_intervals")
if PolicyRest.reconfigure():
- reconfigure_result += " " + Config.FIELD_POLICY_ENGINE
+ need_to_catch_up = True
+ changed_configs.append(Config.FIELD_POLICY_ENGINE)
if DeployHandler.reconfigure(aud_reconfigure):
- reconfigure_result += " " + Config.DEPLOY_HANDLER
+ need_to_catch_up = True
+ changed_configs.append(Config.DEPLOY_HANDLER)
if self._reconfigure_receiver(aud_reconfigure):
- reconfigure_result += " web-socket"
+ need_to_catch_up = True
+ changed_configs.append("web-socket")
+
+ reconfigure_result = " -- config changed on {} changes: {}".format(
+ json.dumps(changed_configs), Config.discovered_config)
- reconfigure_result += " -- change: {}".format(Config.discovered_config)
+ need_to_catch_up = need_to_catch_up or (active and not active_prev)
+ if need_to_catch_up:
+ reconfigure_result += " -- going to catch_up"
Config.discovered_config.commit_change()
aud_reconfigure.audit_done(result=reconfigure_result)
PolicyUpdater._logger.info(log_line + reconfigure_result)
+ if need_to_catch_up:
+ self._pause_catch_up_timer()
+ self.catch_up()
+
except Exception as ex:
error_msg = "crash {} {}{}: {}: {}".format(
"_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex))
@@ -344,6 +368,20 @@ class PolicyUpdater(Thread):
if not aud_catch_up:
return False
+ if not ServiceActivator.is_active_mode_of_operation(aud_catch_up):
+ catch_up_result = "passive -- skip catch_up {0} request_id {1}".format(
+ aud_catch_up.req_message, aud_catch_up.request_id
+ )
+ self._pause_catch_up_timer()
+ aud_catch_up.audit_done(result=catch_up_result)
+ PolicyUpdater._logger.info(catch_up_result)
+ self._run_catch_up_timer()
+
+ PolicyUpdater._logger.info("policy_handler health: %s",
+ json.dumps(aud_catch_up.health(full=True)))
+ PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info()))
+ return False
+
log_line = "catch_up {0} request_id {1}".format(
aud_catch_up.req_message, aud_catch_up.request_id
)
diff --git a/policyhandler/service_activator.py b/policyhandler/service_activator.py
new file mode 100644
index 0000000..d51d11c
--- /dev/null
+++ b/policyhandler/service_activator.py
@@ -0,0 +1,217 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""
+ ask service_activator for the mode_of_operation
+ that is whether the current site/cluster is active versus passive
+
+ active is the default and expects the polisy-handler
+ to receive the push notifications from policy-engine
+ as well as to periodically run the catch_up process
+
+ passive expects the polisy-handler
+ to stop listening for the policy-updates from the policy-engine
+ and to stop doing the periodic catch_up
+"""
+
+import json
+import logging
+from copy import deepcopy
+from urllib.parse import urljoin
+
+import requests
+
+from .config import Config, Settings
+from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics
+from .policy_consts import TARGET_ENTITY
+
+
+class ServiceActivator(object):
+ """calling the service_activator web api to determine the mode_of_operation"""
+ _logger = logging.getLogger("policy_handler.service_activator")
+ DEFAULT_TARGET_ENTITY = "service_activator"
+ DEFAULT_TIMEOUT_IN_SECS = 10
+ MODE_OF_OPERATION_ACTIVE = "active"
+
+ _lazy_inited = False
+ _settings = Settings(Config.MODE_OF_OPERATION, Config.SERVICE_ACTIVATOR)
+
+ _mode_of_operation = None
+ _url = None
+ _url_register = None
+ _post_register = {}
+ _target_entity = None
+ _custom_kwargs = {}
+ _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS
+
+
+ @staticmethod
+ def _init(audit):
+ """
+ initialize service-activator client config based on discovered config:
+
+ "mode_of_operation" : "active",
+ "service_activator" : {
+ "target_entity" : "service_activator",
+ "url" : "https://service-activator-service:123",
+ "path_register" : "/register",
+ "tls_ca_mode" : "cert_directory",
+ "timeout_in_secs": 20,
+ "post_register" : {
+ "component_name" : "policy_handler",
+ "reconfigure_path" : "/reconfigure",
+ "http_protocol" : "http"
+ }
+ }
+ """
+ ServiceActivator._custom_kwargs = {}
+ ServiceActivator._url = ServiceActivator._url_register = ""
+
+ try:
+ _, ServiceActivator._mode_of_operation = ServiceActivator._settings.get_by_key(
+ Config.MODE_OF_OPERATION, ServiceActivator._mode_of_operation)
+
+ _, config_sa = ServiceActivator._settings.get_by_key(Config.SERVICE_ACTIVATOR)
+ if config_sa and isinstance(config_sa, dict):
+ ServiceActivator._target_entity = config_sa.get(
+ TARGET_ENTITY, ServiceActivator.DEFAULT_TARGET_ENTITY)
+ ServiceActivator._url = config_sa.get("url", "")
+ if ServiceActivator._url:
+ ServiceActivator._url_register = urljoin(ServiceActivator._url,
+ config_sa.get("path_register", ""))
+ ServiceActivator._post_register = deepcopy(config_sa.get("post_register", {}))
+ tls_ca_mode = config_sa.get(Config.TLS_CA_MODE)
+ ServiceActivator._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
+
+ ServiceActivator._logger.info(audit.info(
+ "dns based routing to %s: url(%s) tls_ca_mode(%s) custom_kwargs(%s)",
+ ServiceActivator._target_entity, ServiceActivator._url_register,
+ tls_ca_mode, json.dumps(ServiceActivator._custom_kwargs)))
+
+ ServiceActivator._timeout_in_secs = config_sa.get(Config.TIMEOUT_IN_SECS)
+ if not ServiceActivator._timeout_in_secs or ServiceActivator._timeout_in_secs < 1:
+ ServiceActivator._timeout_in_secs = ServiceActivator.DEFAULT_TIMEOUT_IN_SECS
+
+ ServiceActivator._settings.commit_change()
+ except Exception:
+ pass
+ ServiceActivator._lazy_inited = True
+
+ @staticmethod
+ def reconfigure(audit):
+ """reconfigure"""
+ ServiceActivator._settings.set_config(Config.discovered_config)
+ if not ServiceActivator._settings.is_changed():
+ ServiceActivator._settings.commit_change()
+ return False
+
+ ServiceActivator._lazy_inited = False
+ ServiceActivator._init(audit)
+ return True
+
+ @staticmethod
+ def _lazy_init(audit):
+ """set config"""
+ if ServiceActivator._lazy_inited:
+ return
+
+ ServiceActivator._settings.set_config(Config.discovered_config)
+ ServiceActivator._init(audit)
+
+ @staticmethod
+ def is_active_mode_of_operation(audit):
+ """
+ mode_of_operation - whether the service is
+ active == True or passive == False
+ based on the current value of the mode_of_operation
+ """
+ active = (ServiceActivator._mode_of_operation is None
+ or ServiceActivator._mode_of_operation
+ == ServiceActivator.MODE_OF_OPERATION_ACTIVE)
+ ServiceActivator._logger.info(audit.info(
+ "mode_of_operation = {} active = {}".format(
+ ServiceActivator._mode_of_operation, active)))
+ return active
+
+ @staticmethod
+ def determine_mode_of_operation(audit):
+ """retrieves the mode_of_operation from service_activator"""
+ try:
+ ServiceActivator._lazy_init(audit)
+
+ target_entity = ServiceActivator._target_entity
+
+ if not ServiceActivator._url:
+ ServiceActivator._logger.info(audit.info(
+ "no url found for {}".format(target_entity)))
+ return ServiceActivator.is_active_mode_of_operation(audit)
+
+ url = ServiceActivator._url_register
+ json_body = deepcopy(ServiceActivator._post_register)
+ timeout_in_secs = ServiceActivator._timeout_in_secs
+ custom_kwargs = deepcopy(ServiceActivator._custom_kwargs)
+
+ metrics = Metrics(aud_parent=audit,
+ targetEntity="{} determine_mode_of_operation".format(target_entity),
+ targetServiceName=url)
+ headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
+
+ log_action = "post to {} at {}".format(target_entity, url)
+ log_data = "headers={}, json_body={}, timeout_in_secs={}, custom_kwargs({})".format(
+ json.dumps(headers), json.dumps(json_body), timeout_in_secs,
+ json.dumps(custom_kwargs))
+ log_line = log_action + " " + log_data
+
+ ServiceActivator._logger.info(log_line)
+ metrics.metrics_start(log_line)
+
+ res = None
+ try:
+ res = requests.post(url, json=json_body, headers=headers, timeout=timeout_in_secs,
+ **custom_kwargs)
+ except Exception as ex:
+ error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ if isinstance(ex, requests.exceptions.RequestException)
+ else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ error_msg = "failed to {} {}: {} {}".format(
+ log_action, type(ex).__name__, str(ex), log_data)
+ ServiceActivator._logger.exception(error_msg)
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
+ return ServiceActivator.is_active_mode_of_operation(audit)
+
+ metrics.set_http_status_code(res.status_code)
+ audit.set_http_status_code(res.status_code)
+
+ log_line = "response {} from {}: text={} {}".format(
+ res.status_code, log_action, res.text, log_data)
+ metrics.metrics(log_line)
+
+ if res.status_code != requests.codes.ok:
+ ServiceActivator._logger.error(log_line)
+ return ServiceActivator.is_active_mode_of_operation(audit)
+
+ result = res.json() or {}
+
+ ServiceActivator._mode_of_operation = (result.get(Config.MODE_OF_OPERATION)
+ or ServiceActivator._mode_of_operation)
+ return ServiceActivator.is_active_mode_of_operation(audit)
+
+ except Exception as ex:
+ return ServiceActivator.is_active_mode_of_operation(audit)
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
index 73e7fbc..dc76353 100644
--- a/policyhandler/web_server.py
+++ b/policyhandler/web_server.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -208,7 +208,24 @@ class _PolicyWeb(object):
PolicyWeb.logger.info("%s", req_info)
PolicyReceiver.catch_up(audit)
- res = {"catch-up requested": started}
+ res = {"catch-up requested": started, "request_id": audit.request_id}
+ PolicyWeb.logger.info("requested %s: %s", req_info, json.dumps(res))
+ audit.info_requested(started)
+ return res
+
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def reconfigure(self):
+ """schedule reconfigure"""
+ started = str(datetime.utcnow())
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(job_name="reconfigure", req_message=req_info,
+ headers=cherrypy.request.headers)
+
+ PolicyWeb.logger.info("%s", req_info)
+ PolicyReceiver.reconfigure(audit)
+
+ res = {"reconfigure requested": started, "request_id": audit.request_id}
PolicyWeb.logger.info("requested %s: %s", req_info, json.dumps(res))
audit.info_requested(started)
return res
diff --git a/pom.xml b/pom.xml
index 97fc4e4..5b7f7a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<!--
================================================================================
-Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -30,7 +30,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property.
<groupId>org.onap.dcaegen2.platform</groupId>
<artifactId>policy-handler</artifactId>
<name>dcaegen2-platform-policy-handler</name>
- <version>4.5.0-SNAPSHOT</version>
+ <version>4.6.0-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
diff --git a/setup.py b/setup.py
index ff1aa4e..3807bc6 100644
--- a/setup.py
+++ b/setup.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -23,7 +23,7 @@ from setuptools import setup
setup(
name='policyhandler',
description='DCAE-Controller policy-handler to communicate with policy-engine',
- version="4.5.0",
+ version="4.6.0",
author='Alex Shatov',
packages=['policyhandler'],
zip_safe=False,
diff --git a/tests/mock_settings.py b/tests/mock_settings.py
index 80a003e..8dec8e5 100644
--- a/tests/mock_settings.py
+++ b/tests/mock_settings.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -22,13 +22,13 @@ import json
import logging
import sys
import uuid
-from datetime import datetime
from functools import wraps
from policyhandler import LogWriter
from policyhandler.config import Config
from policyhandler.discovery import DiscoveryClient
from policyhandler.onap.audit import Audit
+from policyhandler.service_activator import ServiceActivator
def _fix_discover_config(func):
@@ -95,6 +95,7 @@ class Settings(object):
audit = Audit(req_message="rediscover_config")
Config.discover(audit)
+ ServiceActivator.determine_mode_of_operation(audit)
Settings.logger.info("testing policy_handler with config: %s", Config.discovered_config)
diff --git a/version.properties b/version.properties
index aae9468..314217b 100644
--- a/version.properties
+++ b/version.properties
@@ -1,5 +1,5 @@
major=4
-minor=5
+minor=6
patch=0
base_version=${major}.${minor}.${patch}
release_version=${base_version}