summaryrefslogtreecommitdiffstats
path: root/extra/docker/elk/tools/DMaaPServiceMocker/ds_mocker.py
diff options
context:
space:
mode:
Diffstat (limited to 'extra/docker/elk/tools/DMaaPServiceMocker/ds_mocker.py')
-rwxr-xr-xextra/docker/elk/tools/DMaaPServiceMocker/ds_mocker.py257
1 files changed, 257 insertions, 0 deletions
diff --git a/extra/docker/elk/tools/DMaaPServiceMocker/ds_mocker.py b/extra/docker/elk/tools/DMaaPServiceMocker/ds_mocker.py
new file mode 100755
index 0000000..bd6caec
--- /dev/null
+++ b/extra/docker/elk/tools/DMaaPServiceMocker/ds_mocker.py
@@ -0,0 +1,257 @@
+#!/usr/bin/env python3
+import os
+import json
+import copy
+import random
+import requests
+import uuid
+import time
+from datetime import datetime
+
+def luck(n=2):
+ """ gives 1 chance out of n (default: 2) to return True """
+ assert n > 1
+ return bool(random.randint(0, n-1))
+def now_dmaap_timestamp():
+ return str(datetime.now().timestamp()).replace(".","")[:13]
+def now_notification_time():
+ return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00")
+
+CONTROL_LOOP_NAMES = [
+ 'CL-vCPE-d925ed73',
+ 'CL-vCPE-37b1c91e',
+ 'CL-vCPE-c2597657',
+ 'CL-vCPE-a11318ba',
+ 'CL-vCPE-5321c558',
+]
+
+TEMPLATES = {
+ 'event_abated' :'event_abated.json',
+ 'event_onset' :'event_onset.json',
+ 'notification_active' :'notification_active.json',
+ 'notification_final_failed' :'notification_final_failed.json',
+ 'notification_final_open' :'notification_final_open.json',
+ 'notification_final_success' :'notification_final_success.json',
+ 'notification_operation_failure' :'notification_operation_failure.json',
+ 'notification_operation' :'notification_operation.json',
+ 'notification_operation_success' :'notification_operation_success.json',
+ 'notification_rejected_disabled' :'notification_rejected_disabled.json',
+ 'notification_rejected_missing' :'notification_rejected_missing.json',
+}
+
+ERROR_MESSAGES = [
+ ('APPC', 'APPC1 : timeout on restart','RESTART'),
+ ('APPC', 'APPC2 : cannot restart','RESTART'),
+ ('SO', 'SO1 : scale up failed', 'SCALEUP'),
+ ('SO', 'SO2 : scale down failed', 'SCALEDOWN'),
+]
+
+for key in TEMPLATES:
+ with open(TEMPLATES[key]) as f:
+ content = f.read()
+ TEMPLATES[key] = json.loads(content)
+
+
+class DMaaPMessage(dict):
+
+ dmaap_host_url = "http://dmaap.host.url:9200/"
+ dmaap_username = None
+ dmaap_password = None
+
+ @classmethod
+ def from_template(cls, tmpl, **kwargs):
+ obj = cls()
+ obj.update(copy.deepcopy(TEMPLATES[tmpl]))
+ for keys,value in kwargs.items():
+ current_node = obj
+ keys = keys.split(".")
+ key = keys[0]
+ for i in range(len(keys) - 1):
+ current_node = current_node[keys[i]]
+ key = keys[i]
+ current_node[key] = value
+ return obj
+
+ def publish(self, topic):
+ url = "%s/events/%s" % (self.dmaap_host_url, topic)
+ auth = None
+ if self.dmaap_username and self.dmaap_password:
+ auth = (self.dmaap_username, self.dmaap_password)
+ response = requests.post(url, data=json.dumps(self), auth=auth)
+ return response.status_code
+
+class Event(DMaaPMessage):
+
+ topic = "DCAE-CL-EVENT"
+
+ @staticmethod
+ def abated(**kwargs):
+ return Event.from_template('event_abated', **kwargs)
+
+ @staticmethod
+ def onset(**kwargs):
+ return Event.from_template('event_onset', **kwargs)
+
+ def publish(self):
+ return super().publish(self.topic)
+
+
+class Notification(DMaaPMessage):
+
+ topic = "POLICY-CL-MGT"
+
+ @classmethod
+ def from_template(cls, tmpl, **kwargs):
+ kwargs['notificationTime'] = now_notification_time()
+ return super().from_template(tmpl, **kwargs)
+
+ @staticmethod
+ def active(**kwargs):
+ return Notification.from_template('notification_active', **kwargs)
+
+ @staticmethod
+ def final(**kwargs):
+ class FinalNotification(Notification):
+ @staticmethod
+ def success(**kwargs):
+ return FinalNotification.from_template('notification_final_success', **kwargs)
+ @staticmethod
+ def failed(**kwargs):
+ msg = FinalNotification.from_template('notification_final_failed', **kwargs)
+ error = ERROR_MESSAGES[random.randint(0, len(ERROR_MESSAGES) - 1)]
+ h = msg['history'][-1]
+ h['actor'],h['message'],h['operation'] = error[0],error[1],error[2]
+ return msg
+ @staticmethod
+ def open(**kwargs):
+ return FinalNotification.from_template('notification_final_open', **kwargs)
+ return FinalNotification
+
+ @staticmethod
+ def operation(**kwargs):
+ class OperationNotification(Notification):
+ @staticmethod
+ def success(**kwargs):
+ return OperationNotification.from_template('notification_operation_success', **kwargs)
+ @staticmethod
+ def failure(**kwargs):
+ return OperationNotification.from_template('notification_operation_failure', **kwargs)
+ return OperationNotification.from_template('notification_operation', **kwargs)
+
+ @staticmethod
+ def rejected(**kwargs):
+ class RejectedNotification(Notification):
+ @staticmethod
+ def disabled(**kwargs):
+ return RejectedNotification.from_template('notification_rejected_disabled', **kwargs)
+ @staticmethod
+ def missing_fields(**kwargs):
+ return RejectedNotification.from_template('notification_rejected_missing', **kwargs)
+
+ return RejectedNotification
+
+ def publish(self):
+ return super().publish(self.topic)
+
+
+
+class CLStatus(object):
+
+ def __init__(self, dmaap_url=None,
+ missing=None, disabled=None, op_failure=None):
+ self._stopped = False
+ def maybe(thing, ):
+ if thing is None:
+ thing = not luck(10)
+ return thing
+ self._missing = maybe(missing)
+ self._disabled = maybe(disabled)
+ self._op_failure = maybe(op_failure)
+ self._config = dict(
+ requestID=str(uuid.uuid4()),
+ closedLoopControlName=CONTROL_LOOP_NAMES[random.randint(0, len(CONTROL_LOOP_NAMES) - 1)]
+ )
+
+ def __iter__(self):
+ return next(self)
+
+ def __next__(self):
+ if self._stopped:
+ raise StopIteration()
+ config = self._config
+ config.update(dict(closedLoopAlarmStart=now_dmaap_timestamp()))
+ yield Event.onset(**config)
+ if self._missing:
+ self._stopped = True
+ yield Notification.rejected().missing_fields(**config)
+ raise StopIteration()
+ elif self._disabled:
+ self._stopped = True
+ yield Notification.rejected().disabled(**config)
+ raise StopIteration()
+
+ yield Notification.active(**config)
+ yield Notification.operation(**config)
+
+ config['closedLoopAlarmEnd'] = now_dmaap_timestamp()
+ if self._op_failure:
+ yield Notification.operation().failure(**config)
+ self._stopped = True
+ yield Notification.final().failed(**config)
+ else:
+ yield Notification.operation().success(**config)
+ yield Event.abated(**config)
+ self._stopped = True
+ yield Notification.final().success(**config)
+ raise StopIteration()
+
+def print_usage():
+ print("""
+ ./ds_mocker.py <DMAAP_URL> <EVENT_TOPIC> [NOTIFICATION_TOPIC [REQUEST_TOPIC]]
+ """)
+ exit()
+
+def push(test_datas):
+ for current_i, status in enumerate(test_datas):
+ time.sleep(random.randint(0,3))
+ for s in status:
+ # print(s)
+ status_code = s.publish()
+ if status_code != 200:
+ print("Error when publishing : status_code={}".format(status_code))
+ exit(1)
+ time.sleep(random.randint(0,3))
+ print("%03d,missing:%5s,disabled:%5s,op_failure:%5s - %s" % (current_i, status._missing, status._disabled, status._op_failure, status._config))
+
+
+
+def generate_dataset_1():
+ test_datas = [CLStatus(missing=False, disabled=False, op_failure=False) for i in range(300)] \
+ + [CLStatus(missing=True, disabled=False, op_failure=False) for i in range(5)] \
+ + [CLStatus(missing=False, disabled=True, op_failure=False) for i in range(6)] \
+ + [CLStatus(missing=False, disabled=False, op_failure=True) for i in range(12)]
+ random.shuffle(test_datas)
+ return test_datas
+
+def generate_error_dataset_1():
+ test_datas = [CLStatus(missing=False, disabled=False, op_failure=True) for i in range(60)]
+ random.shuffle(test_datas)
+ return test_datas
+
+
+DATASETS = {
+ 'dataset_1': generate_dataset_1,
+ 'op_failure_1': generate_error_dataset_1,
+}
+
+if __name__ == "__main__":
+ import sys
+ if len(sys.argv) < 3:
+ print_usage()
+
+ DMaaPMessage.dmaap_host_url = sys.argv[1]
+ Event.topic = sys.argv[2]
+ Notification.topic = len(sys.argv) > 3 and sys.argv[3] or sys.argv[2]
+ # Request.topic = len(sys.argv) > 4 or Notification.topic
+ #push(DATASETS['op_failure_1']())
+ push(DATASETS['dataset_1']())