summaryrefslogtreecommitdiffstats
path: root/miss_htbt_service/htbtworker.py
diff options
context:
space:
mode:
authorGokul Singaraju <gs244f@att.com>2018-03-27 15:31:12 -0400
committerGokul Singaraju <gs244f@att.com>2018-03-27 15:32:55 -0400
commit286ec745cc0ef412b450d7c5c07d735707f9418b (patch)
treeb950f298867d1e04c93f69e16e28156e7b07ac86 /miss_htbt_service/htbtworker.py
parent5b1be8ea36f150c226cd75ec559bbafb2378e3cc (diff)
Added tests for heartbeat coverage
Issue-ID: DCAEGEN2-276 Change-Id: Ib0fa11fc5978f47854056f3c198347120b3873a8 Signed-off-by: Gokul Singaraju <gs244f@att.com>
Diffstat (limited to 'miss_htbt_service/htbtworker.py')
-rw-r--r--miss_htbt_service/htbtworker.py132
1 files changed, 90 insertions, 42 deletions
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py
index b8eadb4..b81deae 100644
--- a/miss_htbt_service/htbtworker.py
+++ b/miss_htbt_service/htbtworker.py
@@ -12,6 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+#
+# Author Gokul Singaraju gs244f@att.com
+# Simple Microservice
+# Tracks Heartbeat messages on input topic in DMaaP
+# and generates Missing Heartbeat signal for Policy Engine
import requests
import math
@@ -21,6 +26,7 @@ import string
import sys
+# Initialise tracking hash tables
intvl = 60
missing_htbt = 2
#tracks last epoch time
@@ -31,10 +37,11 @@ heartstate = {}
heartflag = {}
#saves heartbeat message for policy
heartmsg = {}
-mr_url = 'http://mrrouter.att.com:3904'
-pol_url = 'http://mrrouter.att.com:3904'
+mr_url = 'http://mrrouter.onap.org:3904'
+pol_url = 'http://mrrouter.onap.org:3904'
intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT'
outopic = 'POLICY-HILOTCA-EVENT-OUTPUT'
+periodic_scheduler = None
# Checks for heartbeat event on periodic basis
class PeriodicScheduler(object):
@@ -45,19 +52,22 @@ class PeriodicScheduler(object):
action(*actionargs)
self.scheduler.enter(interval, 1, self.setup,
(interval, action, actionargs))
-
def run(self):
self.scheduler.run()
+ def stop(self):
+ list(map(self.scheduler.cancel, self.scheduler.queue))
+
+# Formats collector uri from config files of heat template
def get_collector_uri():
"""
This method waterfalls reads an envioronmental variable called COLLECTOR_HOST
If that doesn't work, it raises an Exception
"""
- with open('/opt/config/coll_ip.txt', 'r') as myfile:
+ with open('/tmp/config/coll_ip.txt', 'r') as myfile:
coll_ip=myfile.read().replace('\n', '')
myfile.close()
- with open('/opt/config/coll_port.txt', 'r') as myfile2:
+ with open('/tmp/config/coll_port.txt', 'r') as myfile2:
coll_port=myfile2.read().replace('\n', '')
myfile2.close()
if coll_ip and coll_port:
@@ -68,15 +78,16 @@ def get_collector_uri():
else:
raise BadEnviornmentENVNotFound("COLLECTOR_HOST")
+# Formats Policy uri from config files of heat template
def get_policy_uri():
"""
This method waterfalls reads an envioronmental variable called POLICY_HOST
If that doesn't work, it raises an Exception
"""
- with open('/opt/config/coll_ip.txt', 'r') as myfile:
+ with open('/tmp/config/coll_ip.txt', 'r') as myfile:
pol_ip=myfile.read().replace('\n', '')
myfile.close()
- with open('/opt/config/coll_port.txt', 'r') as myfile2:
+ with open('/tmp/config/coll_port.txt', 'r') as myfile2:
pol_port=myfile2.read().replace('\n', '')
myfile2.close()
if pol_ip and pol_port :
@@ -90,21 +101,25 @@ def get_policy_uri():
# Process the heartbeat event on input topic
def periodic_event():
- print("Checking... ")
- print(datetime.datetime.now())
+ global periodic_scheduler
+ global mr_url, pol_url, missing_htbt, intvl, intopic, outopic
+ ret = 0
+ print("Checking..." , datetime.datetime.now())
#Read heartbeat
get_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000'
- print(get_url)
+ print("Getting :"+get_url)
res = requests.get(get_url)
#print(res)
#print(res.headers)
print(res.text)
#print(res.json)
inputString = res.text
- jlist = json.loads(inputString);
+ #jlist = json.loads(inputString)
+ jlist = inputString.split('\n');
#print("List:"+jlist[0])
+ # Process the DMaaP input message retreived
for line in jlist:
- #print(line)
+ print("Line:"+line)
jobj = json.loads(line)
#print(jobj)
srcid = (jobj['event']['commonEventHeader']['sourceId'])
@@ -122,9 +137,7 @@ def periodic_event():
heartflag[srcid] = sdiff;
heartmsg[srcid] = jobj;
else:
- print("Heartbeat Dead raising alarm event")
- #payload = {'Event': 'Heartbeat Failure', 'Host': srcid, 'LastTimestamp': hearttrack[srcid], 'Sequence': heartstate[srcid]}
- payload = {"event": {
+ payload = json.dumps({"event": {
"commonEventHeader": {
"reportingEntityName": "VNFVM",
"reportingEntityName": "VNFVM",
@@ -149,14 +162,16 @@ def periodic_event():
"sourceId": "cff8656d-0b42-4eda-ab5d-3d2b7f2d74c8"
}
}
- }
+ })
payload = heartmsg[srcid]
print(payload)
- send_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000'
- print(send_url)
+ psend_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000'
+ print(psend_url)
+ print("Heartbeat Dead raising alarm event "+psend_url)
#Send response for policy on output topic
- r = requests.post(send_url, data=payload)
+ r = requests.post(psend_url, data=payload)
print(r.status_code, r.reason)
+ ret = r.status_code
del heartstate[srcid]
del hearttrack[srcid]
del heartflag[srcid]
@@ -166,15 +181,15 @@ def periodic_event():
heartstate[srcid] = seqnum
heartflag[srcid] = 1
heartmsg[srcid] = jobj;
+ ret = 1
chkeys = []
- for key in heartstate.iterkeys():
+ for key in heartstate.keys():
print(key,heartstate[key])
if( heartflag[key] == 0 ):
print("Heartbeat Dead raise alarm event"+key)
chkeys.append( key )
- #payload = {'Event': 'Heartbeat Failure', 'Host': key, 'LastTimestamp': hearttrack[key], 'Sequence': heartstate[key]}
#print payload
- payload = {"event": {
+ payload = json.dumps({"event": {
"commonEventHeader": {
"reportingEntityName": "VNFVM",
"startEpochMicrosec": 1508641592248000,
@@ -198,37 +213,70 @@ def periodic_event():
"sourceId": "cff8656d-0b42-4eda-ab5d-3d2b7f2d74c8"
}
}
- }
+ })
payload = heartmsg[key]
print(payload)
send_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000'
print(send_url)
r = requests.post(send_url, data=payload)
print(r.status_code, r.reason)
+ ret = r.status_code
heartflag[key] = 0
for chkey in chkeys:
print(chkey)
del heartstate[chkey]
del hearttrack[chkey]
del heartflag[chkey]
+ return ret
+
+#test setup for coverage
+def test_setup(args):
+ global mr_url, pol_url, missing_htbt, intvl, intopic, outopic
+ mr_url = get_collector_uri()
+ pol_url = get_policy_uri()
+ missing_htbt = float(int(args[2]))
+ intvl = float(int(args[3]))
+ intopic = args[4]
+ outopic = args[5]
+ print ("Message router url %s " % mr_url)
+ print ("Policy url %s " % pol_url)
+ print ("Interval %s " % intvl)
+ print ("Input topic %s " % intopic)
+ print ("Output topic %s " % outopic)
+ #intvl = 60 # every second
+
+#Main invocation
+def main(args):
+ global periodic_scheduler
+ global mr_url, pol_url, missing_htbt, intvl, intopic, outopic
+ mr_url = get_collector_uri()
+ pol_url = get_policy_uri()
+ missing_htbt = int(args[2])
+ intvl = int(args[3])
+ intopic = args[4]
+ outopic = args[5]
+ print ("Message router url %s " % mr_url)
+ print ("Policy router url %s " % pol_url)
+ print ("Interval %s " % intvl)
+ #intvl = 60 # every second
+ #Start periodic scheduler runs every interval
+ periodic_scheduler = PeriodicScheduler()
+ periodic_scheduler.setup(intvl, periodic_event) # it executes the event just once
+ periodic_scheduler.run() # it starts the scheduler
-total = len(sys.argv)
-cmdargs = str(sys.argv)
-print ("The total numbers of args passed to the script: %d " % total)
-print ("Args list: %s " % cmdargs)
-print ("Script name: %s" % str(sys.argv[0]))
-for i in range(total):
+if __name__ == "__main__":
+ total = len(sys.argv)
+ cmdargs = str(sys.argv)
+ print ("The total numbers of args passed to the script: %d " % total)
+ print ("Missing Heartbeat Args list: %s " % cmdargs)
+ print ("Script name: %s" % str(sys.argv[0]))
+ for i in range(total):
print ("Argument # %d : %s" % (i, str(sys.argv[i])))
-if( total >= 6 ):
- mr_url = get_collector_uri()
- pol_url = get_policy_uri()
- #mr_url = sys.argv[1]
- missing_htbt = float(int(sys.argv[2]))
- intvl = float(int(sys.argv[3]))
- intopic = sys.argv[4]
- outopic = sys.argv[5]
-print ("Interval %s " % intvl)
-#intvl = 60 # every second
-periodic_scheduler = PeriodicScheduler()
-periodic_scheduler.setup(intvl, periodic_event) # it executes the event just once
-periodic_scheduler.run() # it starts the scheduler
+ main(sys.argv[1:])
+
+
+#force stop scheduler
+def stop():
+ global periodic_scheduler
+ if not periodic_scheduler is None:
+ periodic_scheduler.stop()