diff options
Diffstat (limited to 'src/python/netconf_server/netconf_change_listener.py')
-rw-r--r-- | src/python/netconf_server/netconf_change_listener.py | 33 |
1 files changed, 29 insertions, 4 deletions
diff --git a/src/python/netconf_server/netconf_change_listener.py b/src/python/netconf_server/netconf_change_listener.py index 44910d1..628f757 100644 --- a/src/python/netconf_server/netconf_change_listener.py +++ b/src/python/netconf_server/netconf_change_listener.py @@ -19,21 +19,46 @@ ### import logging +from kafka.producer.future import FutureRecordMetadata + + +from netconf_server.netconf_kafka_client import NetconfKafkaClient +from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData +from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage logger = logging.getLogger(__name__) class NetconfChangeListener(object): - def __init__(self, subscriptions: list): + def __init__(self, subscriptions: list, kafka_client: NetconfKafkaClient, topic: str): self.subscriptions = subscriptions + self.kafka_client = kafka_client + self.topic = topic def run(self, session): for subscription in self.subscriptions: - subscription.callback_function = self.__on_module_configuration_change + subscription.callback_function = self._on_module_configuration_change subscription.subscribe_on_model_change(session) + def _on_module_configuration_change(self, config_change_data: ConfigChangeData): + logger.info("Received module changed: {} , {} ".format(config_change_data.event, config_change_data.changes)) + if config_change_data.event != "done": + self._send_change_to_kafka(config_change_data) + + def _send_change_to_kafka(self, config_change_data): + for change in config_change_data.changes: + try: + kafka_message = NetconfChangeListener._create_kafka_message(change) + logging.info("Sending message '{}' to Kafka '{}' topic".format(kafka_message, self.topic)) + response = self.kafka_client.send(self.topic, kafka_message) # type: FutureRecordMetadata + logging.info("Response from Kafka: {}".format(response.get(timeout=1))) + + except Exception as e: + logger.error("Exception occurred during handling of sysrepo config change", e) + logger.info("Module changes sent to Kafka. Operation finished.") + @staticmethod - def __on_module_configuration_change(config_change_data: ConfigChangeData): - logger.info("Received module changed: %s , %s " % (config_change_data.event, config_change_data.changes)) + def _create_kafka_message(change): + return NetconfKafkaMessageFactory.create(SysrepoMessage(change)) |