aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README.md5
-rw-r--r--src/python/netconf_rest_application.py7
-rw-r--r--src/python/netconf_server/netconf_change_listener.py15
-rw-r--r--src/python/netconf_server/netconf_kafka_client.py4
-rw-r--r--src/python/netconf_server/netconf_rest_server.py30
5 files changed, 41 insertions, 20 deletions
diff --git a/README.md b/README.md
index 33703dd..cd45156 100644
--- a/README.md
+++ b/README.md
@@ -82,6 +82,11 @@ Example of that custom config change subscription can be found in `example` dire
### REST API
Netconf server provides REST interface, with enpoints:
- *GET* `/healthcheck` returns 200 "UP" if server is up and running
+- *POST* `/readiness` return 200 "Ready" if server is ready, if not, returns 503 "Not Ready"
+- *POST* `/change_config/<path:module_name>` changes configuration ad returns 202 "Accepted"
+- *GET* `/change_history` returns 200 and change history as json
+- *GET* `/get_config/<path:module_name>` returns 200 and current configuration
+
### logging
Netconf server print all logs on to the console.
diff --git a/src/python/netconf_rest_application.py b/src/python/netconf_rest_application.py
index b13a084..8376fd8 100644
--- a/src/python/netconf_rest_application.py
+++ b/src/python/netconf_rest_application.py
@@ -21,7 +21,6 @@ import sys
import logging
from netconf_server.netconf_app_configuration import NetconfAppConfiguration
-from netconf_server.netconf_kafka_client import provide_configured_kafka_client
from netconf_server.netconf_rest_server import NetconfRestServer
from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager
@@ -36,11 +35,7 @@ logger = logging.getLogger("netconf_rest_application")
def start_rest_server(session, connection, server_rest: NetconfRestServer, netconf_app_configuration: NetconfAppConfiguration):
sysrepo_cfg_manager = create_conf_manager(session, connection)
- kafka_client = provide_configured_kafka_client(
- netconf_app_configuration.kafka_host_name,
- netconf_app_configuration.kafka_port
- )
- server_rest.start(sysrepo_cfg_manager, kafka_client, netconf_app_configuration.kafka_topic)
+ server_rest.start(sysrepo_cfg_manager, netconf_app_configuration)
def create_rest_server() -> NetconfRestServer:
diff --git a/src/python/netconf_server/netconf_change_listener.py b/src/python/netconf_server/netconf_change_listener.py
index 628f757..cdb5457 100644
--- a/src/python/netconf_server/netconf_change_listener.py
+++ b/src/python/netconf_server/netconf_change_listener.py
@@ -21,7 +21,6 @@ import logging
from kafka.producer.future import FutureRecordMetadata
-
from netconf_server.netconf_kafka_client import NetconfKafkaClient
from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory
from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData
@@ -53,11 +52,19 @@ class NetconfChangeListener(object):
kafka_message = NetconfChangeListener._create_kafka_message(change)
logging.info("Sending message '{}' to Kafka '{}' topic".format(kafka_message, self.topic))
response = self.kafka_client.send(self.topic, kafka_message) # type: FutureRecordMetadata
- logging.info("Response from Kafka: {}".format(response.get(timeout=1)))
-
+ self.set_up_callbacks_for_kafka_request(response)
+ logging.info("Module changes sent to Kafka")
except Exception as e:
logger.error("Exception occurred during handling of sysrepo config change", e)
- logger.info("Module changes sent to Kafka. Operation finished.")
+
+ @staticmethod
+ def set_up_callbacks_for_kafka_request(response):
+ response.add_callback(
+ lambda val: logging.info("Response from Kafka: {}".format(val))
+ )
+ response.add_errback(
+ lambda exc: logging.error("Exception from Kafka: {}".format(exc))
+ )
@staticmethod
def _create_kafka_message(change):
diff --git a/src/python/netconf_server/netconf_kafka_client.py b/src/python/netconf_server/netconf_kafka_client.py
index 027bde1..1f21604 100644
--- a/src/python/netconf_server/netconf_kafka_client.py
+++ b/src/python/netconf_server/netconf_kafka_client.py
@@ -48,6 +48,10 @@ class NetconfKafkaClient(object):
server = "{}:{}".format(host, port)
producer = KafkaProducer(
bootstrap_servers=server,
+ request_timeout_ms=15000,
+ retry_backoff_ms=1000,
+ max_in_flight_requests_per_connection=1,
+ retries=3,
value_serializer=lambda x: dumps(x).encode(STANDARD_CHARSETS_UTF8)
)
diff --git a/src/python/netconf_server/netconf_rest_server.py b/src/python/netconf_server/netconf_rest_server.py
index edaa6e8..5dbe6cd 100644
--- a/src/python/netconf_server/netconf_rest_server.py
+++ b/src/python/netconf_server/netconf_rest_server.py
@@ -20,7 +20,8 @@
from flask import Flask, logging, make_response, Response, request, jsonify
-from netconf_server.netconf_kafka_client import NetconfKafkaClient
+from netconf_server.netconf_app_configuration import NetconfAppConfiguration
+from netconf_server.netconf_kafka_client import provide_configured_kafka_client
from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager
@@ -28,8 +29,7 @@ class NetconfRestServer:
_rest_server: Flask = Flask("server")
logger = logging.create_logger(_rest_server)
_configuration_manager: SysrepoConfigurationManager
- _kafka_topic: str
- _kafka_client: NetconfKafkaClient
+ _app_configuration: NetconfAppConfiguration
def __init__(self, host='0.0.0.0', port=6555):
self._host = host
@@ -37,11 +37,9 @@ class NetconfRestServer:
def start(self,
configuration_manager: SysrepoConfigurationManager,
- kafka_client: NetconfKafkaClient,
- kafka_topic: str):
+ netconf_app_configuration: NetconfAppConfiguration):
NetconfRestServer._configuration_manager = configuration_manager
- NetconfRestServer._kafka_client = kafka_client
- NetconfRestServer._kafka_topic = kafka_topic
+ NetconfRestServer._app_configuration = netconf_app_configuration
Flask.run(
NetconfRestServer._rest_server,
host=self._host,
@@ -56,11 +54,22 @@ class NetconfRestServer:
@staticmethod
@_rest_server.route("/readiness")
def _readiness_check():
- if NetconfRestServer._kafka_client:
+ try:
+ NetconfRestServer.__try_connect_to_kafka()
return Response('Ready', status=200)
- else:
+ except Exception as e:
+ NetconfRestServer.logger.error("Unable to create a Kafka client", e)
return Response('Not Ready', status=503)
+ # if Kafka is up & running and hostname with port is proper, then client will be created; otherwise
+ # an error will be reported
+ @staticmethod
+ def __try_connect_to_kafka():
+ return provide_configured_kafka_client(
+ NetconfRestServer._app_configuration.kafka_host_name,
+ NetconfRestServer._app_configuration.kafka_port
+ )
+
@staticmethod
@_rest_server.route("/change_config/<path:module_name>", methods=['POST'])
def _change_config(module_name):
@@ -71,7 +80,8 @@ class NetconfRestServer:
@staticmethod
@_rest_server.route("/change_history")
def _change_history():
- history = NetconfRestServer._kafka_client.get_all_messages_from(NetconfRestServer._kafka_topic)
+ history = NetconfRestServer.__try_connect_to_kafka()\
+ .get_all_messages_from(NetconfRestServer._app_configuration.kafka_topic)
return jsonify(history), 200
@staticmethod