aboutsummaryrefslogtreecommitdiffstats
path: root/tests/dcaegen2/bbs-testcases/resources/simulator/DMaaP.py
blob: edeec8cf762e1d6389232fc64800d8d696ddcd59 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import re
import time
from http.server import BaseHTTPRequestHandler
import httpServerLib


posted_event_from_bbs = b'[]'
received_event_to_get_method = b'[]'


class DmaapSetup(BaseHTTPRequestHandler):

    """
    This Handler is used by the test harness to prepare the buffers:
        test harness places VES events using PUT from the harness into the 
        "received_event buffer"
        test harness will write policy topics from the BBS us to the posted event buffer
    """
    def do_PUT(self):
        # Read the event from the test harness place it in the received event buffer
        if re.search('/set_get_event', self.path):
            content_length = int(self.headers['Content-Length'])
            global received_event_to_get_method
            received_event_to_get_method = self.rfile.read(content_length)
            httpServerLib.header_200_and_json(self)

        return

    def do_GET(self):
        # The test harness receives the policy triggers from the posted event
        # by issuing a get and receiving the response.
        if re.search('/events/dcaeClOutput', self.path):
            global posted_event_from_bbs
            httpServerLib.header_200_and_json(self)
            self.wfile.write(posted_event_from_bbs)

        return

    def do_POST(self):
        if re.search('/reset', self.path):
            global posted_event_from_bbs
            global received_event_to_get_method
            posted_event_from_bbs = b'[]'
            received_event_to_get_method = b'[]'
            httpServerLib.header_200_and_json(self)

        return


class DMaaPHandler(BaseHTTPRequestHandler):
    """
    This Handler is what the BBS uS connects to - The test library has posted the
    the VES events in the setup Handler which are then received by the BBS uS via
    this handler's do_GET function.
    Likewise the policy trigger posted by the BBS uS is received and placed in the
     in the posted event buffer which the test harness retrieves using the setup handler.
    """

    def do_POST(self):
        # Post of the policy triggers from the BBS uS
        if re.search('/events/unauthenticated.DCAE_CL_OUTPUT', self.path):
            global posted_event_from_bbs
            content_length = int(self.headers['Content-Length'])
            posted_event_from_bbs = self.rfile.read(content_length)
            httpServerLib.header_200_and_json(self)

        return

    def do_GET(self):
        # BBS uS issues a Get to receive VES and PNF UPdate event from DMAAP
        global received_event_to_get_method
        if re.search('/events/unauthenticated.PNF_UPDATE', self.path):
            httpServerLib.header_200_and_json(self)
            self.wfile.write(received_event_to_get_method)
        elif re.search('/events/unauthenticated_CPE_AUTHENTICATION/OpenDcae-c12/c12', self.path):
            httpServerLib.header_200_and_json(self)
            self.wfile.write(received_event_to_get_method)

        return


def _main_(handler_class=DMaaPHandler, protocol="HTTP/1.0"):
    handler_class.protocol_version = protocol
    httpServerLib.start_http_endpoint(2222, DMaaPHandler)
    httpServerLib.start_https_endpoint(2223, DMaaPHandler, keyfile="certs/org.onap.dmaap-bc.key", certfile="certs/dmaap_bc_topic_mgr_dmaap_bc.onap.org.cer", ca_certs="certs/ca_local_0.cer")
    httpServerLib.start_http_endpoint(2224, DmaapSetup)
    while 1:
        time.sleep(10)


if __name__ == '__main__':
    _main_()