summaryrefslogtreecommitdiffstats
path: root/mock-dmaap/app/app.py
diff options
context:
space:
mode:
Diffstat (limited to 'mock-dmaap/app/app.py')
-rw-r--r--mock-dmaap/app/app.py98
1 files changed, 98 insertions, 0 deletions
diff --git a/mock-dmaap/app/app.py b/mock-dmaap/app/app.py
new file mode 100644
index 0000000..ffaaa63
--- /dev/null
+++ b/mock-dmaap/app/app.py
@@ -0,0 +1,98 @@
+"""
+ Copyright 2023 Deutsche Telekom AG, Nokia, Orange
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+"""
+
+import json
+import logging as sys_logging
+from event_storage import EventStorage
+from sdc import Sdc
+from ves import Ves
+
+from flask import Flask, request, logging, Response
+
+app = Flask(__name__)
+
+sys_logging.basicConfig(level=sys_logging.DEBUG)
+logger = logging.create_logger(app)
+apiKey = {"key": "test_key", "secret": "test_secret"}
+event_storage = EventStorage()
+sdc = Sdc(logger, event_storage)
+ves = Ves(logger, event_storage)
+
+
+@app.route("/apiKeys/create", methods=['POST'])
+def create_api_key():
+ resp = Response(json.dumps(apiKey))
+ return resp
+
+
+@app.route("/apiKeys/<path:key>", methods=['DELETE'])
+def delete_api_key(key):
+ if key == apiKey["key"]:
+ return {}, 200
+ return {"no such key"}, 404
+
+
+@app.route("/reset", methods=['GET'])
+def reset_events():
+ event_storage.clear()
+ return event_storage
+
+
+@app.route("/events", methods=['GET'])
+def get_events():
+ resp = Response(json.dumps(event_storage))
+ resp.headers['Content-Type'] = 'application/json'
+ return resp
+
+
+@app.route("/events/<path:topic>/<path:consumer_group>/<path:consumer_id>", methods=['GET'])
+def get_events_from_topic(topic, consumer_group, consumer_id):
+ events = event_storage.get_events_from_topic(topic)
+ resp = Response(json.dumps(events))
+ event_storage.delete_events(topic)
+ resp.headers['Content-Type'] = 'application/json'
+ return resp
+
+
+@app.route("/events/<path:topic>", methods=['POST'])
+def post_msg_to_topic(topic):
+ receive_msg = request.data.decode("utf-8")
+ if sdc.is_event_from_sdc(receive_msg):
+ return sdc.post_msg_to_topic(topic, receive_msg)
+ else:
+ return ves.handle_new_event(topic, receive_msg)
+
+
+@app.route("/events/<path:topic>/add", methods=['POST'])
+def add_msg_to_topic(topic):
+ receive_msg = request.data.decode("utf-8")
+ return sdc.add_msg_to_topic(topic, receive_msg)
+
+
+@app.route("/topics", methods=['GET'])
+def get_topics():
+ topics = {
+ 'topics': ['org.onap.dmaap.mr.PNF_REGISTRATION', 'SDC-DISTR-STATUS-TOPIC-AUTO', 'SDC-DISTR-NOTIF-TOPIC-AUTO',
+ 'org.onap.dmaap.mr.PNF_READY', 'POLICY-PDP-PAP', 'POLICY-NOTIFICATION',
+ 'unauthenticated.SEC_3GPP_FAULTSUPERVISION_OUTPUT', '__consumer_offsets',
+ 'org.onap.dmaap.mr.mirrormakeragent']}
+ resp = Response(json.dumps(topics))
+ resp.headers['Content-Type'] = 'application/json'
+ return resp
+
+
+if __name__ == "__main__":
+ app.run(host='0.0.0.0', port=3904)