From 286ec745cc0ef412b450d7c5c07d735707f9418b Mon Sep 17 00:00:00 2001 From: Gokul Singaraju Date: Tue, 27 Mar 2018 15:31:12 -0400 Subject: Added tests for heartbeat coverage Issue-ID: DCAEGEN2-276 Change-Id: Ib0fa11fc5978f47854056f3c198347120b3873a8 Signed-off-by: Gokul Singaraju --- miss_htbt_service/htbtworker.py | 132 +++++++++++++++++++++++++++------------- 1 file changed, 90 insertions(+), 42 deletions(-) (limited to 'miss_htbt_service/htbtworker.py') diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index b8eadb4..b81deae 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -12,6 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# +# Author Gokul Singaraju gs244f@att.com +# Simple Microservice +# Tracks Heartbeat messages on input topic in DMaaP +# and generates Missing Heartbeat signal for Policy Engine import requests import math @@ -21,6 +26,7 @@ import string import sys +# Initialise tracking hash tables intvl = 60 missing_htbt = 2 #tracks last epoch time @@ -31,10 +37,11 @@ heartstate = {} heartflag = {} #saves heartbeat message for policy heartmsg = {} -mr_url = 'http://mrrouter.att.com:3904' -pol_url = 'http://mrrouter.att.com:3904' +mr_url = 'http://mrrouter.onap.org:3904' +pol_url = 'http://mrrouter.onap.org:3904' intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT' outopic = 'POLICY-HILOTCA-EVENT-OUTPUT' +periodic_scheduler = None # Checks for heartbeat event on periodic basis class PeriodicScheduler(object): @@ -45,19 +52,22 @@ class PeriodicScheduler(object): action(*actionargs) self.scheduler.enter(interval, 1, self.setup, (interval, action, actionargs)) - def run(self): self.scheduler.run() + def stop(self): + list(map(self.scheduler.cancel, self.scheduler.queue)) + +# Formats collector uri from config files of heat template def get_collector_uri(): """ This method waterfalls reads an envioronmental variable called COLLECTOR_HOST If that doesn't work, it raises an Exception """ - with open('/opt/config/coll_ip.txt', 'r') as myfile: + with open('/tmp/config/coll_ip.txt', 'r') as myfile: coll_ip=myfile.read().replace('\n', '') myfile.close() - with open('/opt/config/coll_port.txt', 'r') as myfile2: + with open('/tmp/config/coll_port.txt', 'r') as myfile2: coll_port=myfile2.read().replace('\n', '') myfile2.close() if coll_ip and coll_port: @@ -68,15 +78,16 @@ def get_collector_uri(): else: raise BadEnviornmentENVNotFound("COLLECTOR_HOST") +# Formats Policy uri from config files of heat template def get_policy_uri(): """ This method waterfalls reads an envioronmental variable called POLICY_HOST If that doesn't work, it raises an Exception """ - with open('/opt/config/coll_ip.txt', 'r') as myfile: + with open('/tmp/config/coll_ip.txt', 'r') as myfile: pol_ip=myfile.read().replace('\n', '') myfile.close() - with open('/opt/config/coll_port.txt', 'r') as myfile2: + with open('/tmp/config/coll_port.txt', 'r') as myfile2: pol_port=myfile2.read().replace('\n', '') myfile2.close() if pol_ip and pol_port : @@ -90,21 +101,25 @@ def get_policy_uri(): # Process the heartbeat event on input topic def periodic_event(): - print("Checking... ") - print(datetime.datetime.now()) + global periodic_scheduler + global mr_url, pol_url, missing_htbt, intvl, intopic, outopic + ret = 0 + print("Checking..." , datetime.datetime.now()) #Read heartbeat get_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000' - print(get_url) + print("Getting :"+get_url) res = requests.get(get_url) #print(res) #print(res.headers) print(res.text) #print(res.json) inputString = res.text - jlist = json.loads(inputString); + #jlist = json.loads(inputString) + jlist = inputString.split('\n'); #print("List:"+jlist[0]) + # Process the DMaaP input message retreived for line in jlist: - #print(line) + print("Line:"+line) jobj = json.loads(line) #print(jobj) srcid = (jobj['event']['commonEventHeader']['sourceId']) @@ -122,9 +137,7 @@ def periodic_event(): heartflag[srcid] = sdiff; heartmsg[srcid] = jobj; else: - print("Heartbeat Dead raising alarm event") - #payload = {'Event': 'Heartbeat Failure', 'Host': srcid, 'LastTimestamp': hearttrack[srcid], 'Sequence': heartstate[srcid]} - payload = {"event": { + payload = json.dumps({"event": { "commonEventHeader": { "reportingEntityName": "VNFVM", "reportingEntityName": "VNFVM", @@ -149,14 +162,16 @@ def periodic_event(): "sourceId": "cff8656d-0b42-4eda-ab5d-3d2b7f2d74c8" } } - } + }) payload = heartmsg[srcid] print(payload) - send_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000' - print(send_url) + psend_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000' + print(psend_url) + print("Heartbeat Dead raising alarm event "+psend_url) #Send response for policy on output topic - r = requests.post(send_url, data=payload) + r = requests.post(psend_url, data=payload) print(r.status_code, r.reason) + ret = r.status_code del heartstate[srcid] del hearttrack[srcid] del heartflag[srcid] @@ -166,15 +181,15 @@ def periodic_event(): heartstate[srcid] = seqnum heartflag[srcid] = 1 heartmsg[srcid] = jobj; + ret = 1 chkeys = [] - for key in heartstate.iterkeys(): + for key in heartstate.keys(): print(key,heartstate[key]) if( heartflag[key] == 0 ): print("Heartbeat Dead raise alarm event"+key) chkeys.append( key ) - #payload = {'Event': 'Heartbeat Failure', 'Host': key, 'LastTimestamp': hearttrack[key], 'Sequence': heartstate[key]} #print payload - payload = {"event": { + payload = json.dumps({"event": { "commonEventHeader": { "reportingEntityName": "VNFVM", "startEpochMicrosec": 1508641592248000, @@ -198,37 +213,70 @@ def periodic_event(): "sourceId": "cff8656d-0b42-4eda-ab5d-3d2b7f2d74c8" } } - } + }) payload = heartmsg[key] print(payload) send_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000' print(send_url) r = requests.post(send_url, data=payload) print(r.status_code, r.reason) + ret = r.status_code heartflag[key] = 0 for chkey in chkeys: print(chkey) del heartstate[chkey] del hearttrack[chkey] del heartflag[chkey] + return ret + +#test setup for coverage +def test_setup(args): + global mr_url, pol_url, missing_htbt, intvl, intopic, outopic + mr_url = get_collector_uri() + pol_url = get_policy_uri() + missing_htbt = float(int(args[2])) + intvl = float(int(args[3])) + intopic = args[4] + outopic = args[5] + print ("Message router url %s " % mr_url) + print ("Policy url %s " % pol_url) + print ("Interval %s " % intvl) + print ("Input topic %s " % intopic) + print ("Output topic %s " % outopic) + #intvl = 60 # every second + +#Main invocation +def main(args): + global periodic_scheduler + global mr_url, pol_url, missing_htbt, intvl, intopic, outopic + mr_url = get_collector_uri() + pol_url = get_policy_uri() + missing_htbt = int(args[2]) + intvl = int(args[3]) + intopic = args[4] + outopic = args[5] + print ("Message router url %s " % mr_url) + print ("Policy router url %s " % pol_url) + print ("Interval %s " % intvl) + #intvl = 60 # every second + #Start periodic scheduler runs every interval + periodic_scheduler = PeriodicScheduler() + periodic_scheduler.setup(intvl, periodic_event) # it executes the event just once + periodic_scheduler.run() # it starts the scheduler -total = len(sys.argv) -cmdargs = str(sys.argv) -print ("The total numbers of args passed to the script: %d " % total) -print ("Args list: %s " % cmdargs) -print ("Script name: %s" % str(sys.argv[0])) -for i in range(total): +if __name__ == "__main__": + total = len(sys.argv) + cmdargs = str(sys.argv) + print ("The total numbers of args passed to the script: %d " % total) + print ("Missing Heartbeat Args list: %s " % cmdargs) + print ("Script name: %s" % str(sys.argv[0])) + for i in range(total): print ("Argument # %d : %s" % (i, str(sys.argv[i]))) -if( total >= 6 ): - mr_url = get_collector_uri() - pol_url = get_policy_uri() - #mr_url = sys.argv[1] - missing_htbt = float(int(sys.argv[2])) - intvl = float(int(sys.argv[3])) - intopic = sys.argv[4] - outopic = sys.argv[5] -print ("Interval %s " % intvl) -#intvl = 60 # every second -periodic_scheduler = PeriodicScheduler() -periodic_scheduler.setup(intvl, periodic_event) # it executes the event just once -periodic_scheduler.run() # it starts the scheduler + main(sys.argv[1:]) + + +#force stop scheduler +def stop(): + global periodic_scheduler + if not periodic_scheduler is None: + periodic_scheduler.stop() -- cgit 1.2.3-korg