1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
###
# ============LICENSE_START=======================================================
# Simulator
# ================================================================================
# Copyright (C) 2019 Nokia. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
###
import sysrepo as sr
import sys
import json
import time
import logging
from kafka import KafkaProducer
from enum import Enum
logging.basicConfig(filename='netopeer_change_saver.log', level=logging.DEBUG)
kafka_producer = None
topic = "config"
class OperationType(Enum):
CREATED = sr.SR_OP_CREATED
DELETED = sr.SR_OP_DELETED
MODIFIED = sr.SR_OP_MODIFIED
MOVED = sr.SR_OP_MOVED
def module_change_callback(session, name, event, private_ctx):
if sr.SR_EV_APPLY == event:
change_path = "/{}:*".format(name)
changes = session.get_changes_iter(change_path)
change = session.get_change_next(changes)
while change:
try:
process_change(change)
change = session.get_change_next(changes)
except Exception:
logging.exception("Exception occured")
return sr.SR_ERR_OK
def process_change(change):
if change:
message = {"type": OperationType(change.oper()).name}
if change.old_val():
message["old"] = {"path": change.old_val().xpath(), "value": change.old_val().val_to_string()}
if change.new_val():
message["new"] = {"path": change.new_val().xpath(), "value": change.new_val().val_to_string()}
send_message(message)
def send_message(message):
logging.debug("Message to kafka : %s", message)
response = kafka_producer.send(topic, message)
logging.info(response.get(timeout=90))
def create_producer(server):
for i in range(10):
try:
return KafkaProducer(bootstrap_servers=server, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
except Exception:
time.sleep(15)
raise Exception("Could not connect to kafka server")
def print_current_config(kafka_session, module):
name = "/{}:*//*".format(module)
logging.info("Retrieving current config for %s module", name)
values = kafka_session.get_items(name)
for i in range(values.val_cnt()):
logging.info(values.val(i).to_string())
if __name__ == "__main__":
try:
module_name = sys.argv[1]
bootstrap_servers = sys.argv[2]
topic = sys.argv[3]
connection = sr.Connection("example_application2")
session = sr.Session(connection)
subscribe = sr.Subscribe(session)
subscribe.module_change_subscribe(module_name, module_change_callback)
print_current_config(session, module_name)
kafka_producer = create_producer(bootstrap_servers)
sr.global_loop()
except Exception as e:
logging.exception("Exception occured")
raise e
|