diff options
author | Bartosz Gardziejewski <bartosz.gardziejewski@nokia.com> | 2020-04-08 09:31:13 +0200 |
---|---|---|
committer | Bogumil Zebek <bogumil.zebek@nokia.com> | 2020-04-08 09:43:31 +0000 |
commit | 3c494af52c476a86ae1389991b464914517774b8 (patch) | |
tree | e6d9b4f261eac5f7b3fd0f42e740840a106842e6 /netconfsimulator/netconf/netopeer_change_saver.py | |
parent | 75496bfc5b2f7e03e49ab4929d1f20962b39c992 (diff) |
Move PNF simulator from /test/mocks to new project
This code is a copy of pnfsimulator located in integration repository
(/test/mocks/pnfsimulator) with added profile "docker" in pom.xml
located in pnfsimulator and netconfsimulator subprojects
Issue-ID: INT-1517
Signed-off-by: Bartosz Gardziejewski <bartosz.gardziejewski@nokia.com>
Change-Id: I725fa0530c41b13cb12705979dee8b8b354dc1a1
Diffstat (limited to 'netconfsimulator/netconf/netopeer_change_saver.py')
-rw-r--r-- | netconfsimulator/netconf/netopeer_change_saver.py | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/netconfsimulator/netconf/netopeer_change_saver.py b/netconfsimulator/netconf/netopeer_change_saver.py new file mode 100644 index 0000000..92f8846 --- /dev/null +++ b/netconfsimulator/netconf/netopeer_change_saver.py @@ -0,0 +1,107 @@ +### +# ============LICENSE_START======================================================= +# Simulator +# ================================================================================ +# Copyright (C) 2019 Nokia. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +import sysrepo as sr +import sys +import json +import time +import logging +from kafka import KafkaProducer +from enum import Enum + +logging.basicConfig(filename='netopeer_change_saver.log', level=logging.DEBUG) + +kafka_producer = None +topic = "config" + + +class OperationType(Enum): + CREATED = sr.SR_OP_CREATED + DELETED = sr.SR_OP_DELETED + MODIFIED = sr.SR_OP_MODIFIED + MOVED = sr.SR_OP_MOVED + + +def module_change_callback(session, name, event, private_ctx): + if sr.SR_EV_APPLY == event: + change_path = "/{}:*".format(name) + changes = session.get_changes_iter(change_path) + change = session.get_change_next(changes) + while change: + try: + process_change(change) + change = session.get_change_next(changes) + except Exception: + logging.exception("Exception occured") + + return sr.SR_ERR_OK + + +def process_change(change): + if change: + message = {"type": OperationType(change.oper()).name} + if change.old_val(): + message["old"] = {"path": change.old_val().xpath(), "value": change.old_val().val_to_string()} + if change.new_val(): + message["new"] = {"path": change.new_val().xpath(), "value": change.new_val().val_to_string()} + send_message(message) + + +def send_message(message): + logging.debug("Message to kafka : %s", message) + response = kafka_producer.send(topic, message) + logging.info(response.get(timeout=90)) + + +def create_producer(server): + for i in range(10): # pylint: disable=W0612 + try: + return KafkaProducer(bootstrap_servers=server, value_serializer=lambda v: json.dumps(v).encode('utf-8')) + except Exception: + time.sleep(15) + raise Exception("Could not connect to kafka server") + + +def print_current_config(kafka_session, module): + name = "/{}:*//*".format(module) + logging.info("Retrieving current config for %s module", name) + values = kafka_session.get_items(name) + for i in range(values.val_cnt()): + logging.info(values.val(i).to_string()) + + +if __name__ == "__main__": + try: + module_name = sys.argv[1] + bootstrap_servers = sys.argv[2] + topic = sys.argv[3] + connection = sr.Connection("example_application2") + session = sr.Session(connection) + subscribe = sr.Subscribe(session) + subscribe.module_change_subscribe(module_name, module_change_callback) + + print_current_config(session, module_name) + + kafka_producer = create_producer(bootstrap_servers) + + sr.global_loop() + except Exception as e: + logging.exception("Exception occured") + raise e |