aboutsummaryrefslogtreecommitdiffstats
path: root/netconfsimulator/netconf/netopeer_change_saver.py
blob: 92f88463dc26e6b43f85ae18f6b91797d074249a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
###
# ============LICENSE_START=======================================================
# Simulator
# ================================================================================
# Copyright (C) 2019 Nokia. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
###

import sysrepo as sr
import sys
import json
import time
import logging
from kafka import KafkaProducer
from enum import Enum

logging.basicConfig(filename='netopeer_change_saver.log', level=logging.DEBUG)

kafka_producer = None
topic = "config"


class OperationType(Enum):
    CREATED = sr.SR_OP_CREATED
    DELETED = sr.SR_OP_DELETED
    MODIFIED = sr.SR_OP_MODIFIED
    MOVED = sr.SR_OP_MOVED


def module_change_callback(session, name, event, private_ctx):
    if sr.SR_EV_APPLY == event:
        change_path = "/{}:*".format(name)
        changes = session.get_changes_iter(change_path)
        change = session.get_change_next(changes)
        while change:
            try:
                process_change(change)
                change = session.get_change_next(changes)
            except Exception:
                logging.exception("Exception occured")

    return sr.SR_ERR_OK


def process_change(change):
    if change:
        message = {"type": OperationType(change.oper()).name}
        if change.old_val():
            message["old"] = {"path": change.old_val().xpath(), "value": change.old_val().val_to_string()}
        if change.new_val():
            message["new"] = {"path": change.new_val().xpath(), "value": change.new_val().val_to_string()}
        send_message(message)


def send_message(message):
    logging.debug("Message to kafka : %s", message)
    response = kafka_producer.send(topic, message)
    logging.info(response.get(timeout=90))


def create_producer(server):
    for i in range(10): # pylint: disable=W0612
        try:
            return KafkaProducer(bootstrap_servers=server, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
        except Exception:
            time.sleep(15)
    raise Exception("Could not connect to kafka server")


def print_current_config(kafka_session, module):
    name = "/{}:*//*".format(module)
    logging.info("Retrieving current config for %s module", name)
    values = kafka_session.get_items(name)
    for i in range(values.val_cnt()):
        logging.info(values.val(i).to_string())


if __name__ == "__main__":
    try:
        module_name = sys.argv[1]
        bootstrap_servers = sys.argv[2]
        topic = sys.argv[3]
        connection = sr.Connection("example_application2")
        session = sr.Session(connection)
        subscribe = sr.Subscribe(session)
        subscribe.module_change_subscribe(module_name, module_change_callback)

        print_current_config(session, module_name)

        kafka_producer = create_producer(bootstrap_servers)

        sr.global_loop()
    except Exception as e:
        logging.exception("Exception occured")
        raise e