diff options
author | Edyta Krukowska <edyta.krukowska@nokia.com> | 2021-04-21 15:23:13 +0200 |
---|---|---|
committer | Edyta Krukowska <edyta.krukowska@nokia.com> | 2021-04-22 06:42:11 +0200 |
commit | 6182d0fbd371250e43b5c0e741f56ab05b476fe6 (patch) | |
tree | 99c376e2a6316ad11bd35201fd989e7e561e130a | |
parent | 4677ead091aee5f2a9a75fd2b296c6a723ea7f6f (diff) |
Add retry mechanism when trying to connect to kafka1.0.1
Issue-ID: INT-1869
Signed-off-by: Edyta Krukowska <edyta.krukowska@nokia.com>
Change-Id: I926344788343b946dfdfc39c1e663a03531cbfe6
-rw-r--r-- | Changelog.md | 4 | ||||
-rw-r--r-- | pom.xml | 2 | ||||
-rw-r--r-- | src/python/netconf_change_listener_application.py | 9 | ||||
-rw-r--r-- | src/python/netconf_server/netconf_change_listener_factory.py | 26 | ||||
-rw-r--r-- | version.properties | 2 |
5 files changed, 29 insertions, 14 deletions
diff --git a/Changelog.md b/Changelog.md index 8bccd3e..a8bc977 100644 --- a/Changelog.md +++ b/Changelog.md @@ -4,5 +4,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). + +## [1.0.1] - 22/04/2021 + - [INT-1869] (https://jira.onap.org/browse/INT-1869) Add retry mechanism when trying to connect to kafka + ## [1.0.0] - 10/03/2021 - [INT-1869] (https://jira.onap.org/browse/INT-1869) Create netconf-server @@ -31,7 +31,7 @@ <groupId>org.onap.integration.simulators.nf-simulator.netconf-server</groupId> <artifactId>netconfserver</artifactId> - <version>1.0.0-SNAPSHOT</version> + <version>1.0.1-SNAPSHOT</version> <name>netconfserver</name> <properties> diff --git a/src/python/netconf_change_listener_application.py b/src/python/netconf_change_listener_application.py index 400ff3d..97b2130 100644 --- a/src/python/netconf_change_listener_application.py +++ b/src/python/netconf_change_listener_application.py @@ -20,12 +20,12 @@ import asyncio import sys import logging - +from retry import retry from netconf_server.netconf_app_configuration import NetconfAppConfiguration from netconf_server.netconf_change_listener import NetconfChangeListener -from netconf_server.netconf_change_listener_factory import NetconfChangeListenerFactory +from netconf_server.netconf_change_listener_factory import NetconfChangeListenerFactory, NetconfChangeListenerException from netconf_server.sysrepo_configuration.sysrepo_configuration_loader import SysrepoConfigurationLoader, \ ConfigLoadingException from netconf_server.sysrepo_interface.sysrepo_client import SysrepoClient @@ -44,6 +44,11 @@ def run_server_forever(session, connection, change_listener: NetconfChangeListen def create_change_listener(app_configuration: NetconfAppConfiguration) -> NetconfChangeListener: configuration = SysrepoConfigurationLoader.load_configuration(app_configuration.module_configuration_file_path) + return subscribe_to_kafka(app_configuration, configuration) + + +@retry(NetconfChangeListenerException, tries=10, delay=10) +def subscribe_to_kafka(app_configuration, configuration): return NetconfChangeListenerFactory(configuration.models_to_subscribe_to, app_configuration).create() diff --git a/src/python/netconf_server/netconf_change_listener_factory.py b/src/python/netconf_server/netconf_change_listener_factory.py index 9b9ff24..f023542 100644 --- a/src/python/netconf_server/netconf_change_listener_factory.py +++ b/src/python/netconf_server/netconf_change_listener_factory.py @@ -19,7 +19,6 @@ ### import logging - from netconf_server.netconf_app_configuration import NetconfAppConfiguration from netconf_server.netconf_change_listener import NetconfChangeListener from netconf_server.netconf_kafka_client import NetconfKafkaClient, provide_configured_kafka_client @@ -35,18 +34,25 @@ class NetconfChangeListenerFactory(object): self.app_configuration = app_configuration def create(self) -> NetconfChangeListener: - subscriptions = list() - for module_name in self.modules_to_subscribe_names: - subscriptions.append( - ConfigChangeSubscriber(module_name) + try: + subscriptions = list() + for module_name in self.modules_to_subscribe_names: + subscriptions.append( + ConfigChangeSubscriber(module_name) + ) + kafka_client = NetconfChangeListenerFactory._try_to_create_kafka_client( + self.app_configuration.kafka_host_name, + self.app_configuration.kafka_port ) - kafka_client = NetconfChangeListenerFactory._try_to_create_kafka_client( - self.app_configuration.kafka_host_name, - self.app_configuration.kafka_port - ) - return NetconfChangeListener(subscriptions, kafka_client, self.app_configuration.kafka_topic) + return NetconfChangeListener(subscriptions, kafka_client, self.app_configuration.kafka_topic) + except Exception as e: + raise NetconfChangeListenerException(e) @staticmethod def _try_to_create_kafka_client(kafka_host_name: str, kafka_port: int): return provide_configured_kafka_client(kafka_host_name, kafka_port) # type: NetconfKafkaClient + + +class NetconfChangeListenerException(Exception): + pass diff --git a/version.properties b/version.properties index 2ddebb3..0f1f46a 100644 --- a/version.properties +++ b/version.properties @@ -1,6 +1,6 @@ major=1 minor=0 -patch=0 +patch=1 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT |