diff options
author | 2020-05-27 10:36:10 +0800 | |
---|---|---|
committer | 2020-05-27 12:01:07 +0800 | |
commit | fddda96911bdb3ec9841ac71e764cb6eb8fa08d5 (patch) | |
tree | 598d721cdcbf3793828c52134ae93c6061aa9a8f /catalog/pub/msapi/sdc_controller.py | |
parent | 23785037bc77749a748c875712db84836fdf07e1 (diff) |
rename package name
add function for sdc subscribe and notification
Issue-ID: MODELING-366
Change-Id: I61dba314a62003577eddc640325f9c5b9263b2bc
Signed-off-by: dyh <dengyuanhong@chinamobile.com>
Diffstat (limited to 'catalog/pub/msapi/sdc_controller.py')
-rw-r--r-- | catalog/pub/msapi/sdc_controller.py | 215 |
1 files changed, 215 insertions, 0 deletions
diff --git a/catalog/pub/msapi/sdc_controller.py b/catalog/pub/msapi/sdc_controller.py new file mode 100644 index 0000000..454f3d1 --- /dev/null +++ b/catalog/pub/msapi/sdc_controller.py @@ -0,0 +1,215 @@ +# Copyright 2019 CMCC Technologies Co., Ltd. +import json +import logging +import os +import time +import traceback +import uuid +from threading import Thread + +from apscheduler.scheduler import Scheduler + +from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient +from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient +from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient +from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_IP, \ + DMAAP_MR_PORT +from catalog.pub.msapi import sdc + +logger = logging.getLogger(__name__) + +DMAAP_MR_BASE_URL = "http://%s:%s" % (DMAAP_MR_IP, DMAAP_MR_PORT) +ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"] + + +class SDCController(Thread): + def __init__(self): + super(SDCController, self).__init__() + self.identity = IdentityClient(DMAAP_MR_BASE_URL) + self.scheduler = Scheduler(standalone=True) + self.notification_topic = '' + self.status_topic = '' + self.consumer = '' + + @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL) + def fetch_task(): + self.fetch_notification() + + def run(self): + try: + description = 'nfvo catalog key for' + CONSUMER_ID + key = self.identity.create_apikey('', description) + topics = sdc.register_for_topics(key['apiKey']) + self.notification_topic = topics['distrNotificationTopicName'] + self.status_topic = topics['distrStatusTopicName'] + self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID) + self.consumer.set_api_credentials(key['apiKey'], key['apiSecret']) + self.scheduler.start() + except Exception as e: + logger.error('start sdc controller failed.') + logger.error(e.message) + logger.error(traceback.format_exc()) + + def fetch_notification(self): + try: + logger.info('start to fetch message from dmaap.') + now_ms = int(time.time() * 1000) + notification_msgs = self.consumer.fetch() + logger.info('Receive a notification from dmaap: %s', notification_msgs) + for notification_msg in notification_msgs: + notification_callback = build_callback_notification(now_ms, notification_msg) + if is_activate_callback(notification_callback): + process_notification(notification_callback) + except Exception as e: + logger.error('fetch message from dmaap failed.') + logger.error(e.message) + logger.error(traceback.format_exc()) + + +def is_activate_callback(notification_callback): + has_relevant_artifacts_in_resource = False + has_relevant_artifacts_in_service = False + if notification_callback['resources']: + has_relevant_artifacts_in_resource = True + if notification_callback['serviceArtifacts']: + has_relevant_artifacts_in_service = True + return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service + + +def build_callback_notification(now_ms, notification_msg): + # relevant_resource_instances = build_resource_instances(notification_msg, now_ms) + relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms, + notification_msg['serviceArtifacts']) + # notification_msg['resources'] = relevant_resource_instances + notification_msg['serviceArtifacts'] = relevant_service_artifacts + return notification_msg + + +def build_resource_instances(notification_msg, now_ms): + relevant_resource_instances = [] + resources = notification_msg['resources'] + for resource in resources: + artifacts = resource['artifacts'] + found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts) + if found_relevant_artifacts: + resources['artifacts'] = found_relevant_artifacts + relevant_resource_instances.append(resources) + return relevant_resource_instances + + +def handle_relevant_artifacts(notification_msg, now_ms, artifacts): + relevant_artifacts = [] + for artifact in artifacts: + artifact_type = artifact['artifactType'] + is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST + if is_artifact_relevant: + generated_from_uuid = artifact.get('generatedFromUUID', '') + if generated_from_uuid: + generated_from_artifact = None + for artifact_g in artifacts: + if generated_from_uuid == artifact_g['artifactUUID']: + generated_from_artifact = artifact_g + break + if generated_from_artifact: + is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST + else: + is_artifact_relevant = False + if is_artifact_relevant: + artifact = set_related_artifacts(artifact, notification_msg) + relevant_artifacts.append(artifact) + + # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant) + # if notification_status != 'SUCCESS': + # logger.error("Error failed to send notification status to Dmaap.") + + return relevant_artifacts + + +def set_related_artifacts(artifact, notification_msg): + related_artifacts_uuid = artifact.get('relatedArtifacts', '') + if related_artifacts_uuid: + related_artifacts = [] + for artifact_uuid in related_artifacts_uuid: + related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid)) + artifact['relatedArtifactsInfo'] = related_artifacts + return artifact + + +def get_artifact_metadata(notification_msg, uuid): + service_artifacts = notification_msg['serviceArtifacts'] + ret = None + for artifact in service_artifacts: + if artifact['artifactUUID'] == uuid: + ret = artifact + break + resources = notification_msg['resources'] + if (not ret) and resources: + for resource in resources: + artifacts = resource['artifacts'] + for artifact in artifacts: + if artifact['artifactUUID'] == uuid: + ret = artifact + break + if ret: + break + return ret + + +def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant): + logger.info('start to send notification status') + status = 'FAIL' + if is_artifact_relevant: + notification_status = 'NOTIFIED' + else: + notification_status = 'NOT_NOTIFIED' + request = { + 'distributionID': distribution_id, + 'consumerID': CONSUMER_ID, + 'timestamp': now_ms, + 'artifactURL': artifact['artifactURL'], + 'status': notification_status + } + request_json = json.JSONEncoder().encode(request) + pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria') + logger.info('try to send notification status: %s', request_json) + + try: + pub.send('MyPartitionKey', request_json) + time.sleep(1) + stuck = pub.close(10) + if not stuck: + status = 'SUCCESS' + logger.info('send notification status success.') + else: + logger.error('failed to send notification status, %s messages unsent', len(stuck)) + except Exception as e: + logger.error('failed to send notification status.') + logger.error(e.message) + logger.error(traceback.format_exc()) + + return status + + +def process_notification(msg): + logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources'])) + service_artifacts = msg['serviceArtifacts'] + for artifact in service_artifacts: + if artifact['artifactType'] == 'TOSCA_CSAR': + csar_id = artifact['artifactUUID'] + download_url = artifact['artifactURL'] + localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ns_csar_base = os.path.join(localhost_dir, "csars", "ns") + local_path = os.path.join(ns_csar_base, msg['distributionID']) + file_name = artifact['artifactName'] + csar_version = artifact['artifactVersion'] + sdc.download_artifacts(download_url, local_path, file_name) + # call ns package upload + data = { + 'nsPackageId': csar_id, + 'nsPackageVersion': csar_version, + 'csarName': file_name, + 'csarDir': local_path + } + jobid = uuid.uuid4() + # NsPackageParser(data, jobid).start() + logger.debug(data, jobid) |