aboutsummaryrefslogtreecommitdiffstats
path: root/netconfsimulator/netconf/netopeer_change_saver.py
diff options
context:
space:
mode:
Diffstat (limited to 'netconfsimulator/netconf/netopeer_change_saver.py')
-rw-r--r--netconfsimulator/netconf/netopeer_change_saver.py107
1 files changed, 107 insertions, 0 deletions
diff --git a/netconfsimulator/netconf/netopeer_change_saver.py b/netconfsimulator/netconf/netopeer_change_saver.py
new file mode 100644
index 0000000..92f8846
--- /dev/null
+++ b/netconfsimulator/netconf/netopeer_change_saver.py
@@ -0,0 +1,107 @@
+###
+# ============LICENSE_START=======================================================
+# Simulator
+# ================================================================================
+# Copyright (C) 2019 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+import sysrepo as sr
+import sys
+import json
+import time
+import logging
+from kafka import KafkaProducer
+from enum import Enum
+
+logging.basicConfig(filename='netopeer_change_saver.log', level=logging.DEBUG)
+
+kafka_producer = None
+topic = "config"
+
+
+class OperationType(Enum):
+ CREATED = sr.SR_OP_CREATED
+ DELETED = sr.SR_OP_DELETED
+ MODIFIED = sr.SR_OP_MODIFIED
+ MOVED = sr.SR_OP_MOVED
+
+
+def module_change_callback(session, name, event, private_ctx):
+ if sr.SR_EV_APPLY == event:
+ change_path = "/{}:*".format(name)
+ changes = session.get_changes_iter(change_path)
+ change = session.get_change_next(changes)
+ while change:
+ try:
+ process_change(change)
+ change = session.get_change_next(changes)
+ except Exception:
+ logging.exception("Exception occured")
+
+ return sr.SR_ERR_OK
+
+
+def process_change(change):
+ if change:
+ message = {"type": OperationType(change.oper()).name}
+ if change.old_val():
+ message["old"] = {"path": change.old_val().xpath(), "value": change.old_val().val_to_string()}
+ if change.new_val():
+ message["new"] = {"path": change.new_val().xpath(), "value": change.new_val().val_to_string()}
+ send_message(message)
+
+
+def send_message(message):
+ logging.debug("Message to kafka : %s", message)
+ response = kafka_producer.send(topic, message)
+ logging.info(response.get(timeout=90))
+
+
+def create_producer(server):
+ for i in range(10): # pylint: disable=W0612
+ try:
+ return KafkaProducer(bootstrap_servers=server, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+ except Exception:
+ time.sleep(15)
+ raise Exception("Could not connect to kafka server")
+
+
+def print_current_config(kafka_session, module):
+ name = "/{}:*//*".format(module)
+ logging.info("Retrieving current config for %s module", name)
+ values = kafka_session.get_items(name)
+ for i in range(values.val_cnt()):
+ logging.info(values.val(i).to_string())
+
+
+if __name__ == "__main__":
+ try:
+ module_name = sys.argv[1]
+ bootstrap_servers = sys.argv[2]
+ topic = sys.argv[3]
+ connection = sr.Connection("example_application2")
+ session = sr.Session(connection)
+ subscribe = sr.Subscribe(session)
+ subscribe.module_change_subscribe(module_name, module_change_callback)
+
+ print_current_config(session, module_name)
+
+ kafka_producer = create_producer(bootstrap_servers)
+
+ sr.global_loop()
+ except Exception as e:
+ logging.exception("Exception occured")
+ raise e