aboutsummaryrefslogtreecommitdiffstats
path: root/miss_htbt_service/htbtworker.py
diff options
context:
space:
mode:
Diffstat (limited to 'miss_htbt_service/htbtworker.py')
-rw-r--r--miss_htbt_service/htbtworker.py149
1 files changed, 42 insertions, 107 deletions
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py
index 3bad8c9..347dbd6 100644
--- a/miss_htbt_service/htbtworker.py
+++ b/miss_htbt_service/htbtworker.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
# Copyright 2017 AT&T Intellectual Property, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -41,6 +41,8 @@ mr_url = 'http://mrrouter.onap.org:3904'
pol_url = 'http://mrrouter.onap.org:3904'
intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT'
outopic = 'POLICY-HILOTCA-EVENT-OUTPUT'
+nfc = "vVNF"
+cl_loop = 'ControlLoopEvent1'
periodic_scheduler = None
# Checks for heartbeat event on periodic basis
@@ -58,56 +60,17 @@ class PeriodicScheduler(object):
def stop(self):
list(map(self.scheduler.cancel, self.scheduler.queue))
-# Formats collector uri from config files of heat template
-def get_collector_uri():
- """
- This method waterfalls reads an envioronmental variable called COLLECTOR_HOST
- If that doesn't work, it raises an Exception
- """
- with open('/tmp/config/coll_ip.txt', 'r') as myfile:
- coll_ip=myfile.read().replace('\n', '')
- myfile.close()
- with open('/tmp/config/coll_port.txt', 'r') as myfile2:
- coll_port=myfile2.read().replace('\n', '')
- myfile2.close()
- if coll_ip and coll_port:
- # WARNING! TODO! Currently the env file does not include the port.
- # But some other people think that the port should be a part of that.
- # For now, I'm hardcoding 8500 until this gets resolved.
- return "http://{0}:{1}".format(coll_ip, coll_port)
- else:
- raise BadEnviornmentENVNotFound("COLLECTOR_HOST")
-
-# Formats Policy uri from config files of heat template
-def get_policy_uri():
- """
- This method waterfalls reads an envioronmental variable called POLICY_HOST
- If that doesn't work, it raises an Exception
- """
- with open('/tmp/config/coll_ip.txt', 'r') as myfile:
- pol_ip=myfile.read().replace('\n', '')
- myfile.close()
- with open('/tmp/config/coll_port.txt', 'r') as myfile2:
- pol_port=myfile2.read().replace('\n', '')
- myfile2.close()
- if pol_ip and pol_port :
- # WARNING! TODO! Currently the env file does not include the port.
- # But some other people think that the port should be a part of that.
- # For now, I'm hardcoding 8500 until this gets resolved.
- return "http://{0}:{1}".format(pol_ip,pol_port)
- else:
- raise BadEnviornmentENVNotFound("POLICY_HOST")
-
# Process the heartbeat event on input topic
def periodic_event():
global periodic_scheduler
- global mr_url, pol_url, missing_htbt, intvl, intopic, outopic
+ global mr_url, pol_url, missing_htbt, intvl, intopic, outopic, nfc, cl_loop
ret = 0
#print("Args are :", locals())
- print("Checking..." , datetime.datetime.now())
+ print("{0} Checking...".format(datetime.datetime.now()))
#Read heartbeat
- get_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000'
+ #get_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000'
+ get_url = mr_url+'/DefaultGroup/1?timeout=15000'
print("Getting :"+get_url)
try:
res = requests.get(get_url)
@@ -124,6 +87,8 @@ def periodic_event():
print("Line:"+line)
jobj = json.loads(line)
#print(jobj)
+ if( nfc != jobj['event']['commonEventHeader']['nfNamingCode']) :
+ continue
srcid = (jobj['event']['commonEventHeader']['sourceId'])
lastepo = (jobj['event']['commonEventHeader']['lastEpochMicrosec'])
seqnum = (jobj['event']['commonEventHeader']['sequence'])
@@ -139,35 +104,19 @@ def periodic_event():
heartflag[srcid] = sdiff;
heartmsg[srcid] = jobj;
else:
- payload = json.dumps({"event": {
- "commonEventHeader": {
- "reportingEntityName": "VNFVM",
- "reportingEntityName": "VNFVM",
- "startEpochMicrosec": 1508641592248000,
- "lastEpochMicrosec": 1508641592248000,
- "eventId": "VNFVM_heartbeat",
- "sequence": 1,
- "priority": "Normal",
- "sourceName": "VNFVM",
- "domain": "heartbeat",
- "eventName": "Heartbeat_Vnf",
- "internalHeaderFields": {
- "closedLoopFlag": "True",
- "eventTag": "hp.Heartbeat Service.20171022.8447964515",
- "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT",
- "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000",
- "closedLoopControlName": "ControlLoopEvent1",
- "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000"
- },
- "reportingEntityId": "cff8656d-0b42-4eda-ab5d-3d2b7f2d74c8",
- "version": 3,
- "sourceId": "cff8656d-0b42-4eda-ab5d-3d2b7f2d74c8"
- }
- }
- })
+ jobj["internalHeaderFields"] = json.dumps({
+ "closedLoopFlag": "True",
+ "eventTag": "hp.Heartbeat Service.20171022.8447964515",
+ "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT",
+ "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000",
+ "closedLoopControlName": cl_loop,
+ "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000"
+ });
+ heartmsg[srcid] = jobj;
payload = heartmsg[srcid]
print(payload)
- psend_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000'
+ #psend_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000'
+ psend_url = pol_url+'/DefaultGroup/1?timeout=15000'
print(psend_url)
print("Heartbeat Dead raising alarm event "+psend_url)
#Send response for policy on output topic
@@ -191,34 +140,17 @@ def periodic_event():
print("Heartbeat Dead raise alarm event"+key)
chkeys.append( key )
#print payload
- payload = json.dumps({"event": {
- "commonEventHeader": {
- "reportingEntityName": "VNFVM",
- "startEpochMicrosec": 1508641592248000,
- "lastEpochMicrosec": 1508641592248000,
- "eventId": "VNFVM_heartbeat",
- "sequence": 1,
- "priority": "Normal",
- "sourceName": "VNFVM",
- "domain": "heartbeat",
- "eventName": "Heartbeat_Vnf",
- "internalHeaderFields": {
- "closedLoopFlag": "True",
- "eventTag": "hp.Heartbeat Service.20171022.8447964515",
- "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT",
- "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000",
- "closedLoopControlName": "ControlLoopEvent1",
- "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000"
- },
- "reportingEntityId": "cff8656d-0b42-4eda-ab5d-3d2b7f2d74c8",
- "version": 3,
- "sourceId": "cff8656d-0b42-4eda-ab5d-3d2b7f2d74c8"
- }
- }
- })
+ heartmsg[key]["internalHeaderFields"] = json.dumps({
+ "closedLoopFlag": "True",
+ "eventTag": "hp.Heartbeat Service.20171022.8447964515",
+ "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT",
+ "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000",
+ "closedLoopControlName": cl_loop,
+ "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000"
+ })
payload = heartmsg[key]
print(payload)
- send_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000'
+ send_url = pol_url+'/DefaultGroup/1?timeout=15000'
print(send_url)
r = requests.post(send_url, data=payload)
print(r.status_code, r.reason)
@@ -236,12 +168,12 @@ def periodic_event():
#test setup for coverage
def test_setup(args):
global mr_url, pol_url, missing_htbt, intvl, intopic, outopic
- mr_url = get_collector_uri()
- pol_url = get_policy_uri()
missing_htbt = float(int(args[2]))
intvl = float(int(args[3]))
intopic = args[4]
outopic = args[5]
+ mr_url = get_collector_uri()+'/events/'+intopic
+ pol_url = get_policy_uri()+'/events/'+outopic
print ("Message router url %s " % mr_url)
print ("Policy url %s " % pol_url)
print ("Interval %s " % intvl)
@@ -252,23 +184,26 @@ def test_setup(args):
#Main invocation
def main(args):
global periodic_scheduler
- global mr_url, pol_url, missing_htbt, intvl, intopic, outopic
+ global mr_url, pol_url, missing_htbt, intvl, intopic, outopic, nfc, cl_loop
#mr_url = get_collector_uri()
#pol_url = get_policy_uri()
mr_url = args[0]
intopic = args[1]
pol_url = args[2]
outopic = args[3]
- missing_htbt = int(args[4])
- intvl = int(args[5])
+ nfc = args[4]
+ missing_htbt = int(args[5])
+ intvl = int(args[6])
+ cl_loop = args[7]
print ("Message router url %s " % mr_url)
print ("Policy router url %s " % pol_url)
print ("Interval %s " % intvl)
- #intvl = 60 # every second
- #Start periodic scheduler runs every interval
- periodic_scheduler = PeriodicScheduler()
- periodic_scheduler.setup(intvl, periodic_event,) # it executes the event just once
- periodic_scheduler.run() # it starts the scheduler
+ if( cl_loop != "internal_test") :
+ #intvl = 60 # every second
+ #Start periodic scheduler runs every interval
+ periodic_scheduler = PeriodicScheduler()
+ periodic_scheduler.setup(intvl, periodic_event,) # it executes the event just once
+ periodic_scheduler.run() # it starts the scheduler
if __name__ == "__main__":
total = len(sys.argv)