diff options
-rw-r--r-- | README.md | 5 | ||||
-rw-r--r-- | src/python/netconf_rest_application.py | 7 | ||||
-rw-r--r-- | src/python/netconf_server/netconf_change_listener.py | 15 | ||||
-rw-r--r-- | src/python/netconf_server/netconf_kafka_client.py | 4 | ||||
-rw-r--r-- | src/python/netconf_server/netconf_rest_server.py | 30 |
5 files changed, 41 insertions, 20 deletions
@@ -82,6 +82,11 @@ Example of that custom config change subscription can be found in `example` dire ### REST API Netconf server provides REST interface, with enpoints: - *GET* `/healthcheck` returns 200 "UP" if server is up and running +- *POST* `/readiness` return 200 "Ready" if server is ready, if not, returns 503 "Not Ready" +- *POST* `/change_config/<path:module_name>` changes configuration ad returns 202 "Accepted" +- *GET* `/change_history` returns 200 and change history as json +- *GET* `/get_config/<path:module_name>` returns 200 and current configuration + ### logging Netconf server print all logs on to the console. diff --git a/src/python/netconf_rest_application.py b/src/python/netconf_rest_application.py index b13a084..8376fd8 100644 --- a/src/python/netconf_rest_application.py +++ b/src/python/netconf_rest_application.py @@ -21,7 +21,6 @@ import sys import logging from netconf_server.netconf_app_configuration import NetconfAppConfiguration -from netconf_server.netconf_kafka_client import provide_configured_kafka_client from netconf_server.netconf_rest_server import NetconfRestServer from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager @@ -36,11 +35,7 @@ logger = logging.getLogger("netconf_rest_application") def start_rest_server(session, connection, server_rest: NetconfRestServer, netconf_app_configuration: NetconfAppConfiguration): sysrepo_cfg_manager = create_conf_manager(session, connection) - kafka_client = provide_configured_kafka_client( - netconf_app_configuration.kafka_host_name, - netconf_app_configuration.kafka_port - ) - server_rest.start(sysrepo_cfg_manager, kafka_client, netconf_app_configuration.kafka_topic) + server_rest.start(sysrepo_cfg_manager, netconf_app_configuration) def create_rest_server() -> NetconfRestServer: diff --git a/src/python/netconf_server/netconf_change_listener.py b/src/python/netconf_server/netconf_change_listener.py index 628f757..cdb5457 100644 --- a/src/python/netconf_server/netconf_change_listener.py +++ b/src/python/netconf_server/netconf_change_listener.py @@ -21,7 +21,6 @@ import logging from kafka.producer.future import FutureRecordMetadata - from netconf_server.netconf_kafka_client import NetconfKafkaClient from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData @@ -53,11 +52,19 @@ class NetconfChangeListener(object): kafka_message = NetconfChangeListener._create_kafka_message(change) logging.info("Sending message '{}' to Kafka '{}' topic".format(kafka_message, self.topic)) response = self.kafka_client.send(self.topic, kafka_message) # type: FutureRecordMetadata - logging.info("Response from Kafka: {}".format(response.get(timeout=1))) - + self.set_up_callbacks_for_kafka_request(response) + logging.info("Module changes sent to Kafka") except Exception as e: logger.error("Exception occurred during handling of sysrepo config change", e) - logger.info("Module changes sent to Kafka. Operation finished.") + + @staticmethod + def set_up_callbacks_for_kafka_request(response): + response.add_callback( + lambda val: logging.info("Response from Kafka: {}".format(val)) + ) + response.add_errback( + lambda exc: logging.error("Exception from Kafka: {}".format(exc)) + ) @staticmethod def _create_kafka_message(change): diff --git a/src/python/netconf_server/netconf_kafka_client.py b/src/python/netconf_server/netconf_kafka_client.py index 027bde1..1f21604 100644 --- a/src/python/netconf_server/netconf_kafka_client.py +++ b/src/python/netconf_server/netconf_kafka_client.py @@ -48,6 +48,10 @@ class NetconfKafkaClient(object): server = "{}:{}".format(host, port) producer = KafkaProducer( bootstrap_servers=server, + request_timeout_ms=15000, + retry_backoff_ms=1000, + max_in_flight_requests_per_connection=1, + retries=3, value_serializer=lambda x: dumps(x).encode(STANDARD_CHARSETS_UTF8) ) diff --git a/src/python/netconf_server/netconf_rest_server.py b/src/python/netconf_server/netconf_rest_server.py index edaa6e8..5dbe6cd 100644 --- a/src/python/netconf_server/netconf_rest_server.py +++ b/src/python/netconf_server/netconf_rest_server.py @@ -20,7 +20,8 @@ from flask import Flask, logging, make_response, Response, request, jsonify -from netconf_server.netconf_kafka_client import NetconfKafkaClient +from netconf_server.netconf_app_configuration import NetconfAppConfiguration +from netconf_server.netconf_kafka_client import provide_configured_kafka_client from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager @@ -28,8 +29,7 @@ class NetconfRestServer: _rest_server: Flask = Flask("server") logger = logging.create_logger(_rest_server) _configuration_manager: SysrepoConfigurationManager - _kafka_topic: str - _kafka_client: NetconfKafkaClient + _app_configuration: NetconfAppConfiguration def __init__(self, host='0.0.0.0', port=6555): self._host = host @@ -37,11 +37,9 @@ class NetconfRestServer: def start(self, configuration_manager: SysrepoConfigurationManager, - kafka_client: NetconfKafkaClient, - kafka_topic: str): + netconf_app_configuration: NetconfAppConfiguration): NetconfRestServer._configuration_manager = configuration_manager - NetconfRestServer._kafka_client = kafka_client - NetconfRestServer._kafka_topic = kafka_topic + NetconfRestServer._app_configuration = netconf_app_configuration Flask.run( NetconfRestServer._rest_server, host=self._host, @@ -56,11 +54,22 @@ class NetconfRestServer: @staticmethod @_rest_server.route("/readiness") def _readiness_check(): - if NetconfRestServer._kafka_client: + try: + NetconfRestServer.__try_connect_to_kafka() return Response('Ready', status=200) - else: + except Exception as e: + NetconfRestServer.logger.error("Unable to create a Kafka client", e) return Response('Not Ready', status=503) + # if Kafka is up & running and hostname with port is proper, then client will be created; otherwise + # an error will be reported + @staticmethod + def __try_connect_to_kafka(): + return provide_configured_kafka_client( + NetconfRestServer._app_configuration.kafka_host_name, + NetconfRestServer._app_configuration.kafka_port + ) + @staticmethod @_rest_server.route("/change_config/<path:module_name>", methods=['POST']) def _change_config(module_name): @@ -71,7 +80,8 @@ class NetconfRestServer: @staticmethod @_rest_server.route("/change_history") def _change_history(): - history = NetconfRestServer._kafka_client.get_all_messages_from(NetconfRestServer._kafka_topic) + history = NetconfRestServer.__try_connect_to_kafka()\ + .get_all_messages_from(NetconfRestServer._app_configuration.kafka_topic) return jsonify(history), 200 @staticmethod |