diff options
Diffstat (limited to 'test/mocks/pnfsimulator/netconfsimulator/netconf/netopeer_change_saver.py')
-rw-r--r-- | test/mocks/pnfsimulator/netconfsimulator/netconf/netopeer_change_saver.py | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/test/mocks/pnfsimulator/netconfsimulator/netconf/netopeer_change_saver.py b/test/mocks/pnfsimulator/netconfsimulator/netconf/netopeer_change_saver.py new file mode 100644 index 000000000..0a9e841d2 --- /dev/null +++ b/test/mocks/pnfsimulator/netconfsimulator/netconf/netopeer_change_saver.py @@ -0,0 +1,107 @@ +### +# ============LICENSE_START======================================================= +# Simulator +# ================================================================================ +# Copyright (C) 2019 Nokia. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +import sysrepo as sr +import sys +import json +import time +import logging +from kafka import KafkaProducer +from enum import Enum + +logging.basicConfig(filename='netopeer_change_saver.log', level=logging.DEBUG) + +kafka_producer = None +topic = "config" + + +class OperationType(Enum): + CREATED = sr.SR_OP_CREATED + DELETED = sr.SR_OP_DELETED + MODIFIED = sr.SR_OP_MODIFIED + MOVED = sr.SR_OP_MOVED + + +def module_change_callback(session, name, event, private_ctx): + if sr.SR_EV_APPLY == event: + change_path = "/{}:*".format(name) + changes = session.get_changes_iter(change_path) + change = session.get_change_next(changes) + while change: + try: + process_change(change) + change = session.get_change_next(changes) + except Exception: + logging.exception("Exception occured") + + return sr.SR_ERR_OK + + +def process_change(change): + if change: + message = {"type": OperationType(change.oper()).name} + if change.old_val(): + message["old"] = {"path": change.old_val().xpath(), "value": change.old_val().val_to_string()} + if change.new_val(): + message["new"] = {"path": change.new_val().xpath(), "value": change.new_val().val_to_string()} + send_message(message) + + +def send_message(message): + logging.debug("Message to kafka : %s", message) + response = kafka_producer.send(topic, message) + logging.info(response.get(timeout=90)) + + +def create_producer(server): + for i in range(10): + try: + return KafkaProducer(bootstrap_servers=server, value_serializer=lambda v: json.dumps(v).encode('utf-8')) + except Exception: + time.sleep(15) + raise Exception("Could not connect to kafka server") + + +def print_current_config(kafka_session, module): + name = "/{}:*//*".format(module) + logging.info("Retrieving current config for %s module", name) + values = kafka_session.get_items(name) + for i in range(values.val_cnt()): + logging.info(values.val(i).to_string()) + + +if __name__ == "__main__": + try: + module_name = sys.argv[1] + bootstrap_servers = sys.argv[2] + topic = sys.argv[3] + connection = sr.Connection("example_application2") + session = sr.Session(connection) + subscribe = sr.Subscribe(session) + subscribe.module_change_subscribe(module_name, module_change_callback) + + print_current_config(session, module_name) + + kafka_producer = create_producer(bootstrap_servers) + + sr.global_loop() + except Exception as e: + logging.exception("Exception occured") + raise e |