diff options
Diffstat (limited to 'miss_htbt_service/htbtworker.py')
-rw-r--r-- | miss_htbt_service/htbtworker.py | 149 |
1 files changed, 42 insertions, 107 deletions
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index 3bad8c9..347dbd6 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python3 # Copyright 2017 AT&T Intellectual Property, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -41,6 +41,8 @@ mr_url = 'http://mrrouter.onap.org:3904' pol_url = 'http://mrrouter.onap.org:3904' intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT' outopic = 'POLICY-HILOTCA-EVENT-OUTPUT' +nfc = "vVNF" +cl_loop = 'ControlLoopEvent1' periodic_scheduler = None # Checks for heartbeat event on periodic basis @@ -58,56 +60,17 @@ class PeriodicScheduler(object): def stop(self): list(map(self.scheduler.cancel, self.scheduler.queue)) -# Formats collector uri from config files of heat template -def get_collector_uri(): - """ - This method waterfalls reads an envioronmental variable called COLLECTOR_HOST - If that doesn't work, it raises an Exception - """ - with open('/tmp/config/coll_ip.txt', 'r') as myfile: - coll_ip=myfile.read().replace('\n', '') - myfile.close() - with open('/tmp/config/coll_port.txt', 'r') as myfile2: - coll_port=myfile2.read().replace('\n', '') - myfile2.close() - if coll_ip and coll_port: - # WARNING! TODO! Currently the env file does not include the port. - # But some other people think that the port should be a part of that. - # For now, I'm hardcoding 8500 until this gets resolved. - return "http://{0}:{1}".format(coll_ip, coll_port) - else: - raise BadEnviornmentENVNotFound("COLLECTOR_HOST") - -# Formats Policy uri from config files of heat template -def get_policy_uri(): - """ - This method waterfalls reads an envioronmental variable called POLICY_HOST - If that doesn't work, it raises an Exception - """ - with open('/tmp/config/coll_ip.txt', 'r') as myfile: - pol_ip=myfile.read().replace('\n', '') - myfile.close() - with open('/tmp/config/coll_port.txt', 'r') as myfile2: - pol_port=myfile2.read().replace('\n', '') - myfile2.close() - if pol_ip and pol_port : - # WARNING! TODO! Currently the env file does not include the port. - # But some other people think that the port should be a part of that. - # For now, I'm hardcoding 8500 until this gets resolved. - return "http://{0}:{1}".format(pol_ip,pol_port) - else: - raise BadEnviornmentENVNotFound("POLICY_HOST") - # Process the heartbeat event on input topic def periodic_event(): global periodic_scheduler - global mr_url, pol_url, missing_htbt, intvl, intopic, outopic + global mr_url, pol_url, missing_htbt, intvl, intopic, outopic, nfc, cl_loop ret = 0 #print("Args are :", locals()) - print("Checking..." , datetime.datetime.now()) + print("{0} Checking...".format(datetime.datetime.now())) #Read heartbeat - get_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000' + #get_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000' + get_url = mr_url+'/DefaultGroup/1?timeout=15000' print("Getting :"+get_url) try: res = requests.get(get_url) @@ -124,6 +87,8 @@ def periodic_event(): print("Line:"+line) jobj = json.loads(line) #print(jobj) + if( nfc != jobj['event']['commonEventHeader']['nfNamingCode']) : + continue srcid = (jobj['event']['commonEventHeader']['sourceId']) lastepo = (jobj['event']['commonEventHeader']['lastEpochMicrosec']) seqnum = (jobj['event']['commonEventHeader']['sequence']) @@ -139,35 +104,19 @@ def periodic_event(): heartflag[srcid] = sdiff; heartmsg[srcid] = jobj; else: - payload = json.dumps({"event": { - "commonEventHeader": { - "reportingEntityName": "VNFVM", - "reportingEntityName": "VNFVM", - "startEpochMicrosec": 1508641592248000, - "lastEpochMicrosec": 1508641592248000, - "eventId": "VNFVM_heartbeat", - "sequence": 1, - "priority": "Normal", - "sourceName": "VNFVM", - "domain": "heartbeat", - "eventName": "Heartbeat_Vnf", - "internalHeaderFields": { - "closedLoopFlag": "True", - "eventTag": "hp.Heartbeat Service.20171022.8447964515", - "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT", - "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000", - "closedLoopControlName": "ControlLoopEvent1", - "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000" - }, - "reportingEntityId": "cff8656d-0b42-4eda-ab5d-3d2b7f2d74c8", - "version": 3, - "sourceId": "cff8656d-0b42-4eda-ab5d-3d2b7f2d74c8" - } - } - }) + jobj["internalHeaderFields"] = json.dumps({ + "closedLoopFlag": "True", + "eventTag": "hp.Heartbeat Service.20171022.8447964515", + "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT", + "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000", + "closedLoopControlName": cl_loop, + "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000" + }); + heartmsg[srcid] = jobj; payload = heartmsg[srcid] print(payload) - psend_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000' + #psend_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000' + psend_url = pol_url+'/DefaultGroup/1?timeout=15000' print(psend_url) print("Heartbeat Dead raising alarm event "+psend_url) #Send response for policy on output topic @@ -191,34 +140,17 @@ def periodic_event(): print("Heartbeat Dead raise alarm event"+key) chkeys.append( key ) #print payload - payload = json.dumps({"event": { - "commonEventHeader": { - "reportingEntityName": "VNFVM", - "startEpochMicrosec": 1508641592248000, - "lastEpochMicrosec": 1508641592248000, - "eventId": "VNFVM_heartbeat", - "sequence": 1, - "priority": "Normal", - "sourceName": "VNFVM", - "domain": "heartbeat", - "eventName": "Heartbeat_Vnf", - "internalHeaderFields": { - "closedLoopFlag": "True", - "eventTag": "hp.Heartbeat Service.20171022.8447964515", - "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT", - "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000", - "closedLoopControlName": "ControlLoopEvent1", - "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000" - }, - "reportingEntityId": "cff8656d-0b42-4eda-ab5d-3d2b7f2d74c8", - "version": 3, - "sourceId": "cff8656d-0b42-4eda-ab5d-3d2b7f2d74c8" - } - } - }) + heartmsg[key]["internalHeaderFields"] = json.dumps({ + "closedLoopFlag": "True", + "eventTag": "hp.Heartbeat Service.20171022.8447964515", + "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT", + "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000", + "closedLoopControlName": cl_loop, + "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000" + }) payload = heartmsg[key] print(payload) - send_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000' + send_url = pol_url+'/DefaultGroup/1?timeout=15000' print(send_url) r = requests.post(send_url, data=payload) print(r.status_code, r.reason) @@ -236,12 +168,12 @@ def periodic_event(): #test setup for coverage def test_setup(args): global mr_url, pol_url, missing_htbt, intvl, intopic, outopic - mr_url = get_collector_uri() - pol_url = get_policy_uri() missing_htbt = float(int(args[2])) intvl = float(int(args[3])) intopic = args[4] outopic = args[5] + mr_url = get_collector_uri()+'/events/'+intopic + pol_url = get_policy_uri()+'/events/'+outopic print ("Message router url %s " % mr_url) print ("Policy url %s " % pol_url) print ("Interval %s " % intvl) @@ -252,23 +184,26 @@ def test_setup(args): #Main invocation def main(args): global periodic_scheduler - global mr_url, pol_url, missing_htbt, intvl, intopic, outopic + global mr_url, pol_url, missing_htbt, intvl, intopic, outopic, nfc, cl_loop #mr_url = get_collector_uri() #pol_url = get_policy_uri() mr_url = args[0] intopic = args[1] pol_url = args[2] outopic = args[3] - missing_htbt = int(args[4]) - intvl = int(args[5]) + nfc = args[4] + missing_htbt = int(args[5]) + intvl = int(args[6]) + cl_loop = args[7] print ("Message router url %s " % mr_url) print ("Policy router url %s " % pol_url) print ("Interval %s " % intvl) - #intvl = 60 # every second - #Start periodic scheduler runs every interval - periodic_scheduler = PeriodicScheduler() - periodic_scheduler.setup(intvl, periodic_event,) # it executes the event just once - periodic_scheduler.run() # it starts the scheduler + if( cl_loop != "internal_test") : + #intvl = 60 # every second + #Start periodic scheduler runs every interval + periodic_scheduler = PeriodicScheduler() + periodic_scheduler.setup(intvl, periodic_event,) # it executes the event just once + periodic_scheduler.run() # it starts the scheduler if __name__ == "__main__": total = len(sys.argv) |