From 535e0dc1340ce57c0bfeed8d1ce530111cf41063 Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Thu, 22 Mar 2018 15:52:14 -0400 Subject: 2.4.1 better step-timer + audit + unit-tests - improved step-timer due to unit tests -- fixed events -- better logging - audit - collect list of package thru subprocess pip freeze - unit tests coverage 76% Change-Id: Ib1cb5f687612ecf18aa7414b1ff7dbf5774345b4 Signed-off-by: Alex Shatov Issue-ID: DCAEGEN2-389 --- policyhandler/onap/audit.py | 20 ++-- policyhandler/policy_updater.py | 5 +- policyhandler/step_timer.py | 105 ++++++++++++++++++-- pom.xml | 2 +- setup.py | 2 +- tests/test_policyhandler.py | 117 +++++++++++++++++------ tests/test_step_timer.py | 206 ++++++++++++++++++++++++++++++++++++++++ version.properties | 2 +- 8 files changed, 410 insertions(+), 49 deletions(-) create mode 100644 tests/test_step_timer.py diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index 109e3ff..c615d63 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -28,6 +28,7 @@ import copy import json import os +import subprocess import sys import time import uuid @@ -116,7 +117,7 @@ class Audit(object): """ _service_name = "" _service_version = "" - _service_instance_UUID = str(uuid.uuid4()) + _service_instance_uuid = str(uuid.uuid4()) _started = datetime.now() _logger_debug = None _logger_error = None @@ -124,6 +125,10 @@ class Audit(object): _logger_audit = None _health = Health() _py_ver = sys.version.replace("\n", "") + try: + _packages = filter(None, subprocess.check_output(["pip", "freeze"]).splitlines()) + except subprocess.CalledProcessError: + _packages = [] @staticmethod def init(service_name, service_version, config_file_path): @@ -131,13 +136,13 @@ class Audit(object): Audit._service_name = service_name Audit._service_version = service_version Audit._logger_debug = CommonLogger(config_file_path, "debug", \ - instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) + instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) Audit._logger_error = CommonLogger(config_file_path, "error", \ - instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) + instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ - instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) + instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) Audit._logger_audit = CommonLogger(config_file_path, "audit", \ - instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) + instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) @staticmethod def health(): @@ -146,12 +151,13 @@ class Audit(object): return { "service_name" : Audit._service_name, "service_version" : Audit._service_version, - "service_instance_UUID" : Audit._service_instance_UUID, + "service_instance_uuid" : Audit._service_instance_uuid, "python" : Audit._py_ver, "started" : str(Audit._started), "now" : str(now), "uptime" : str(now - Audit._started), - "stats" : Audit._health.dump() + "stats" : Audit._health.dump(), + "packages" : Audit._packages } def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs): diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 2c2f729..0b9e227 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -139,19 +139,20 @@ class PolicyUpdater(Thread): "catch_up_timer", self._catch_up_interval, PolicyUpdater.catch_up, + PolicyUpdater._logger, self ) self._catch_up_timer.start() self._logger.info("started catch_up_timer in %s", self._catch_up_interval) def _pause_catch_up_timer(self): - """stop catch_up_timer""" + """pause catch_up_timer""" if self._catch_up_timer: self._logger.info("pause catch_up_timer") self._catch_up_timer.pause() def _stop_catch_up_timer(self): - """stop catch_up_timer""" + """stop and destroy the catch_up_timer""" if self._catch_up_timer: self._logger.info("stopping catch_up_timer") self._catch_up_timer.stop() diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py index 6e2b3f6..ad77c85 100644 --- a/policyhandler/step_timer.py +++ b/policyhandler/step_timer.py @@ -18,46 +18,131 @@ """periodically callback""" -from threading import Event, Thread +from datetime import datetime +from threading import Event, Lock, Thread class StepTimer(Thread): """call on_time after interval number of seconds, then wait to continue""" - def __init__(self, name, interval, on_time, *args, **kwargs): + INIT = "init" + NEXT = "next" + STARTED = "started" + PAUSED = "paused" + STOPPING = "stopping" + STOPPED = "stopped" + + def __init__(self, name, interval, on_time, logger, *args, **kwargs): + """create step timer with controlled start. next step and pause""" Thread.__init__(self, name=name) self._interval = interval self._on_time = on_time + self._logger = logger self._args = args self._kwargs = kwargs + self._lock = Lock() + self._timeout = Event() self._paused = Event() - self._continue = Event() + self._next = Event() self._finished = Event() + self._event = StepTimer.INIT + self._event_counter = 0 + self._event_time = 0 + self._event_ts = datetime.now() + + self._substep = None + self._substep_time = 0 + self._substep_ts = datetime.now() + + def get_status(self): + """returns status of events""" + with self._lock: + return "{0}[{1}] {2}: timeout({3}), paused({4}), next({5}), finished({6})".format( + self._event, + self._event_counter, + self._substep, + self._timeout.is_set(), + self._paused.is_set(), + self._next.is_set(), + self._finished.is_set(), + ) + def next(self): """continue with the next timeout""" self._paused.clear() - self._continue.set() + self._next.set() + self._timeout.set() + self._set_timer_event(StepTimer.NEXT) def pause(self): """pause the timer""" self._paused.set() + self._next.clear() + self._set_timer_event(StepTimer.PAUSED) def stop(self): """stop the timer if it hasn't finished yet""" self._finished.set() self._timeout.set() - self._continue.set() + self._next.set() + self._set_timer_event(StepTimer.STOPPING) + + def _set_timer_event(self, event): + """set the event on the timer""" + with self._lock: + if event in [StepTimer.NEXT, StepTimer.STARTED]: + self._event_counter += 1 + + self._event = event + now = datetime.now() + self._event_time = (now - self._event_ts).total_seconds() + self._event_ts = now + self._logger.info("[{0}] {1} {2}".format( + self._event_time, self.name, self.get_status())) + + def _timer_substep(self, substep): + """log exe step""" + with self._lock: + self._substep = substep + now = datetime.now() + self._substep_time = (now - self._substep_ts).total_seconds() + self._substep_ts = now + self._logger.info("[{0}] {1}".format(self._substep_time, self.get_status())) def run(self): - """loop until stopped=finished""" + """loop one step a time until stopped=finished""" + self._set_timer_event(StepTimer.STARTED) while True: + self._timer_substep("waiting for timeout {0}...".format(self._interval)) self._timeout.wait(self._interval) + self._timer_substep("woke up after timeout") + if self._finished.is_set(): + self._timer_substep("finished") break - self._timeout.clear() - self._continue.clear() - if not self._paused.is_set(): + + if self._next.is_set(): + self._next.clear() + self._timeout.clear() + self._timer_substep("restart timer") + continue + + if self._paused.is_set(): + self._timer_substep("paused - skip on_time event") + else: + self._timer_substep("on_time event") self._on_time(*self._args, **self._kwargs) - self._continue.wait() + + self._timer_substep("waiting for next...") + self._next.wait() + self._next.clear() + self._timeout.clear() + self._timer_substep("woke up on next") + + if self._finished.is_set(): + self._timer_substep("finished") + break + + self._set_timer_event(StepTimer.STOPPED) diff --git a/pom.xml b/pom.xml index f53ca45..65e8ec1 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. org.onap.dcaegen2.platform policy-handler dcaegen2-platform-policy-handler - 2.4.0-SNAPSHOT + 2.4.1-SNAPSHOT http://maven.apache.org UTF-8 diff --git a/setup.py b/setup.py index 3886070..386e578 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ from setuptools import setup setup( name='policyhandler', description='DCAE-Controller policy-handler to communicate with policy-engine', - version="2.4.0", + version="2.4.1", author='Alex Shatov', packages=['policyhandler'], zip_safe=False, diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py index 307a355..72cb857 100644 --- a/tests/test_policyhandler.py +++ b/tests/test_policyhandler.py @@ -22,13 +22,13 @@ import copy import json import logging import re +import subprocess import sys import time import uuid from datetime import datetime import pytest - import cherrypy from cherrypy.test.helper import CPWebCase @@ -48,7 +48,10 @@ from policyhandler.policy_rest import PolicyRest from policyhandler.policy_utils import PolicyUtils, Utils from policyhandler.web_server import _PolicyWeb -POLICY_HANDLER_VERSION = "2.4.0" +try: + POLICY_HANDLER_VERSION = subprocess.check_output(["python", "setup.py", "--version"]).strip() +except subprocess.CalledProcessError: + POLICY_HANDLER_VERSION = "2.4.1" class MonkeyHttpResponse(object): """Monkey http reposne""" @@ -98,10 +101,11 @@ class Settings(object): logger = None RUN_TS = datetime.utcnow().isoformat()[:-3] + 'Z' dicovered_config = None + deploy_handler_instance_uuid = str(uuid.uuid4()) @staticmethod def init(): - """init locals""" + """init configs""" Config.load_from_file() with open("etc_upload/config.json", 'r') as config_json: @@ -109,6 +113,8 @@ class Settings(object): Config.load_from_file("etc_upload/config.json") + Config.config["catch_up"] = {"interval" : 10, "max_skips" : 2} + Settings.logger = logging.getLogger("policy_handler.unit_test") sys.stdout = LogWriter(Settings.logger.info) sys.stderr = LogWriter(Settings.logger.error) @@ -237,7 +243,10 @@ def fix_pdp_post(monkeypatch): def monkeyed_deploy_handler(full_path, json=None, headers=None): """monkeypatch for deploy_handler""" - return MonkeyedResponse(full_path, {}, json, headers) + return MonkeyedResponse(full_path, + {"server_instance_uuid": Settings.deploy_handler_instance_uuid}, + json, headers + ) @pytest.fixture() def fix_deploy_handler(monkeypatch, fix_discovery): @@ -252,7 +261,7 @@ def fix_deploy_handler(monkeypatch, fix_discovery): def monkeyed_cherrypy_engine_exit(): """monkeypatch for deploy_handler""" - Settings.logger.info("monkeyed_cherrypy_engine_exit()") + Settings.logger.info("cherrypy_engine_exit()") @pytest.fixture() def fix_cherrypy_engine_exit(monkeypatch): @@ -307,10 +316,12 @@ class MonkeyedWebSocket(object): def run_forever(self): """forever in the loop""" + counter = 0 while self.sock.connected: - Settings.logger.info("MonkeyedWebSocket sleep...") + counter += 1 + Settings.logger.info("MonkeyedWebSocket sleep %s...", counter) time.sleep(5) - Settings.logger.info("MonkeyedWebSocket exit") + Settings.logger.info("MonkeyedWebSocket exit %s", counter) def close(self): """close socket""" @@ -343,7 +354,7 @@ def test_healthcheck(): time.sleep(0.1) audit.metrics("test /healthcheck") - health = Audit.health() or {} + health = Audit.health() audit.audit_done(result=json.dumps(health)) Settings.logger.info("healthcheck: %s", json.dumps(health)) @@ -364,7 +375,7 @@ def test_healthcheck_with_error(): audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) audit.metrics("test /healthcheck") - health = Audit.health() or {} + health = Audit.health() audit.audit_done(result=json.dumps(health)) Settings.logger.info("healthcheck: %s", json.dumps(health)) @@ -441,14 +452,79 @@ class WebServerTest(CPWebCase): Settings.logger.info("expected_policies: %s", json.dumps(expected_policies)) assert Utils.are_the_same(policies_latest, expected_policies) + @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + def test_zzz_policy_updates_and_catch_ups(self): + """test run policy handler with policy updates and catchups""" + Settings.logger.info("start policy_updates_and_catch_ups") + audit = Audit(req_message="start policy_updates_and_catch_ups") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep before send_notification...") + time.sleep(2) + + MonkeyedWebSocket.send_notification([1, 3, 5]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.logger.info("sleep 30 before shutdown...") + time.sleep(30) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + + @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + def test_zzz_catch_up_on_deploy_handler_changed(self): + """test run policy handler with deployment-handler changed underneath""" + Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed") + audit = Audit(req_message="start zzz_catch_up_on_deploy_handler_changed") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep before send_notification...") + time.sleep(2) + + MonkeyedWebSocket.send_notification([1]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.deploy_handler_instance_uuid = str(uuid.uuid4()) + Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid) + + MonkeyedWebSocket.send_notification([2, 4]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.logger.info("sleep 5 before shutdown...") + time.sleep(5) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + + @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + def test_zzz_get_catch_up(self): + """test /catch_up""" + Settings.logger.info("start /catch_up") + audit = Audit(req_message="start /catch_up") + PolicyReceiver.run(audit) + time.sleep(5) + result = self.getPage("/catch_up") + Settings.logger.info("catch_up result: %s", result) + self.assertStatus('200 OK') + Settings.logger.info("got catch_up: %s", self.body) + + Settings.logger.info("sleep 5 before shutdown...") + time.sleep(5) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + @pytest.mark.usefixtures( "fix_deploy_handler", "fix_policy_receiver_websocket", "fix_cherrypy_engine_exit") - def test_zzz_run_policy_handler(self): - """test run policy handler""" - Settings.logger.info("start policy handler") - audit = Audit(req_message="start policy handler") + def test_zzzzz_shutdown(self): + """test shutdown""" + Settings.logger.info("start shutdown") + audit = Audit(req_message="start shutdown") PolicyReceiver.run(audit) Settings.logger.info("sleep before send_notification...") @@ -458,22 +534,9 @@ class WebServerTest(CPWebCase): Settings.logger.info("sleep after send_notification...") time.sleep(3) - Settings.logger.info("sleep before shutdown...") - time.sleep(1) + Settings.logger.info("shutdown...") result = self.getPage("/shutdown") Settings.logger.info("shutdown result: %s", result) self.assertStatus('200 OK') Settings.logger.info("got shutdown: %s", self.body) time.sleep(1) - - # @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") - # def test_zzz_web_catch_up(self): - # """test /catch_up""" - # Settings.logger.info("start policy handler") - # audit = Audit(req_message="start policy handler") - # PolicyReceiver.run(audit) - # time.sleep(5) - # result = self.getPage("/catch_up") - # Settings.logger.info("catch_up result: %s", result) - # self.assertStatus('200 OK') - # Settings.logger.info("got catch_up: %s", self.body) diff --git a/tests/test_step_timer.py b/tests/test_step_timer.py new file mode 100644 index 0000000..fe2d1c1 --- /dev/null +++ b/tests/test_step_timer.py @@ -0,0 +1,206 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""test of the step_timer""" + +import json +import logging +import sys +import time +from datetime import datetime + +from policyhandler.config import Config +from policyhandler.step_timer import StepTimer + +Config.load_from_file() + +class MockTimer(object): + """testing step_timer""" + logger = logging.getLogger("policy_handler.unit_test.step_timer") + + INIT = "init" + NEXT = "next" + STARTED = "started" + PAUSED = "paused" + STOPPING = "stopping" + STOPPED = "stopped" + + def __init__(self, name, interval): + """step_timer test settings""" + self.name = name or "step_timer" + self.interval = interval or 5 + self.step_timer = None + self.status = None + self.run_counter = 0 + self.status_ts = datetime.now() + self.exe_ts = None + self.exe_interval = None + self.set_status(MockTimer.INIT) + + def __enter__(self): + """constructor""" + return self + + def __exit__(self, exc_type, exc_value, traceback): + """destructor""" + self.stop_timer() + + def on_time(self, *args, **kwargs): + """timer event""" + self.exe_ts = datetime.now() + self.exe_interval = (self.exe_ts - self.status_ts).total_seconds() + MockTimer.logger.info("run on_time[%s] (%s, %s) in %s for %s", + self.run_counter, json.dumps(args), json.dumps(kwargs), + self.exe_interval, self.get_status()) + time.sleep(3) + MockTimer.logger.info("done on_time[%s] (%s, %s) in %s for %s", + self.run_counter, json.dumps(args), json.dumps(kwargs), + self.exe_interval, self.get_status()) + + def verify_last_event(self): + """assertions needs to be in the main thread""" + if self.exe_interval is None: + MockTimer.logger.info("not executed: %s", self.get_status()) + return + + MockTimer.logger.info("verify exe %s for %s", self.exe_interval, self.get_status()) + assert self.exe_interval >= self.interval + assert self.exe_interval < 2 * self.interval + MockTimer.logger.info("success %s", self.get_status()) + + def run_timer(self): + """create and start the step_timer""" + if self.step_timer: + self.step_timer.next() + self.set_status(MockTimer.NEXT) + return + + self.step_timer = StepTimer( + self.name, self.interval, MockTimer.on_time, + MockTimer.logger, + self + ) + self.step_timer.start() + self.set_status(MockTimer.STARTED) + + def pause_timer(self): + """pause step_timer""" + if self.step_timer: + self.step_timer.pause() + self.set_status(MockTimer.PAUSED) + + def stop_timer(self): + """stop step_timer""" + if self.step_timer: + self.set_status(MockTimer.STOPPING) + self.step_timer.stop() + self.step_timer.join() + self.step_timer = None + self.set_status(MockTimer.STOPPED) + + def set_status(self, status): + """set the status of the timer""" + if status in [MockTimer.NEXT, MockTimer.STARTED]: + self.run_counter += 1 + + self.status = status + now = datetime.now() + time_step = (now - self.status_ts).total_seconds() + self.status_ts = now + MockTimer.logger.info("%s: %s", time_step, self.get_status()) + + def get_status(self): + """string representation""" + status = "{0}[{1}] {2} in {3} from {4} last exe {5}".format( + self.status, self.run_counter, self.name, self.interval, + str(self.status_ts), str(self.exe_ts) + ) + if self.step_timer: + return "{0}: {1}".format(status, self.step_timer.get_status()) + return status + +def test_step_timer(): + """test step_timer""" + MockTimer.logger.info("============ test_step_timer =========") + with MockTimer("step_timer", 5) as step_timer: + step_timer.run_timer() + time.sleep(1) + step_timer.verify_last_event() + + time.sleep(1 + step_timer.interval) + step_timer.verify_last_event() + + step_timer.pause_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.pause_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(3 * step_timer.interval) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(3 * step_timer.interval) + step_timer.verify_last_event() + +def test_interrupt_step_timer(): + """test step_timer""" + MockTimer.logger.info("============ test_interrupt_step_timer =========") + with MockTimer("step_timer", 5) as step_timer: + step_timer.run_timer() + time.sleep(1) + step_timer.verify_last_event() + + step_timer.pause_timer() + time.sleep(2 + step_timer.interval) + step_timer.verify_last_event() + + step_timer.pause_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.pause_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(2 + step_timer.interval) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.pause_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(3 * step_timer.interval) + step_timer.verify_last_event() diff --git a/version.properties b/version.properties index 50463f0..0f14334 100644 --- a/version.properties +++ b/version.properties @@ -1,6 +1,6 @@ major=2 minor=4 -patch=0 +patch=1 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT -- cgit 1.2.3-korg