aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordyh <dengyuanhong@chinamobile.com>2020-05-27 10:36:10 +0800
committerdyh <dengyuanhong@chinamobile.com>2020-05-27 12:01:07 +0800
commitfddda96911bdb3ec9841ac71e764cb6eb8fa08d5 (patch)
tree598d721cdcbf3793828c52134ae93c6061aa9a8f
parent23785037bc77749a748c875712db84836fdf07e1 (diff)
rename package name
add function for sdc subscribe and notification Issue-ID: MODELING-366 Change-Id: I61dba314a62003577eddc640325f9c5b9263b2bc Signed-off-by: dyh <dengyuanhong@chinamobile.com>
-rw-r--r--catalog/pub/Dmaap_lib/__init__.py (renamed from catalog/pub/Dmaap-lib/__init__.py)0
-rw-r--r--catalog/pub/Dmaap_lib/dmaap/__init__.py (renamed from catalog/pub/Dmaap-lib/dmaap/__init__.py)0
-rw-r--r--catalog/pub/Dmaap_lib/dmaap/consumer.py (renamed from catalog/pub/Dmaap-lib/dmaap/consumer.py)0
-rw-r--r--catalog/pub/Dmaap_lib/dmaap/identity.py (renamed from catalog/pub/Dmaap-lib/dmaap/identity.py)0
-rw-r--r--catalog/pub/Dmaap_lib/dmaap/publisher.py (renamed from catalog/pub/Dmaap-lib/dmaap/publisher.py)0
-rw-r--r--catalog/pub/Dmaap_lib/pub/__init__.py (renamed from catalog/pub/Dmaap-lib/pub/__init__.py)0
-rw-r--r--catalog/pub/Dmaap_lib/pub/exceptions.py (renamed from catalog/pub/Dmaap-lib/pub/exceptions.py)0
-rw-r--r--catalog/pub/Dmaap_lib/test/test_consumer.py (renamed from catalog/pub/Dmaap-lib/test/test_consumer.py)0
-rw-r--r--catalog/pub/Dmaap_lib/test/test_identity.py (renamed from catalog/pub/Dmaap-lib/test/test_identity.py)0
-rw-r--r--catalog/pub/config/config.py12
-rw-r--r--catalog/pub/msapi/sdc.py38
-rw-r--r--catalog/pub/msapi/sdc_controller.py215
-rw-r--r--requirements.txt4
13 files changed, 263 insertions, 6 deletions
diff --git a/catalog/pub/Dmaap-lib/__init__.py b/catalog/pub/Dmaap_lib/__init__.py
index 7ae04f0..7ae04f0 100644
--- a/catalog/pub/Dmaap-lib/__init__.py
+++ b/catalog/pub/Dmaap_lib/__init__.py
diff --git a/catalog/pub/Dmaap-lib/dmaap/__init__.py b/catalog/pub/Dmaap_lib/dmaap/__init__.py
index 0c1e8e1..0c1e8e1 100644
--- a/catalog/pub/Dmaap-lib/dmaap/__init__.py
+++ b/catalog/pub/Dmaap_lib/dmaap/__init__.py
diff --git a/catalog/pub/Dmaap-lib/dmaap/consumer.py b/catalog/pub/Dmaap_lib/dmaap/consumer.py
index 054791c..054791c 100644
--- a/catalog/pub/Dmaap-lib/dmaap/consumer.py
+++ b/catalog/pub/Dmaap_lib/dmaap/consumer.py
diff --git a/catalog/pub/Dmaap-lib/dmaap/identity.py b/catalog/pub/Dmaap_lib/dmaap/identity.py
index 1dcaad8..1dcaad8 100644
--- a/catalog/pub/Dmaap-lib/dmaap/identity.py
+++ b/catalog/pub/Dmaap_lib/dmaap/identity.py
diff --git a/catalog/pub/Dmaap-lib/dmaap/publisher.py b/catalog/pub/Dmaap_lib/dmaap/publisher.py
index 643ba90..643ba90 100644
--- a/catalog/pub/Dmaap-lib/dmaap/publisher.py
+++ b/catalog/pub/Dmaap_lib/dmaap/publisher.py
diff --git a/catalog/pub/Dmaap-lib/pub/__init__.py b/catalog/pub/Dmaap_lib/pub/__init__.py
index 0c1e8e1..0c1e8e1 100644
--- a/catalog/pub/Dmaap-lib/pub/__init__.py
+++ b/catalog/pub/Dmaap_lib/pub/__init__.py
diff --git a/catalog/pub/Dmaap-lib/pub/exceptions.py b/catalog/pub/Dmaap_lib/pub/exceptions.py
index 6b65fcf..6b65fcf 100644
--- a/catalog/pub/Dmaap-lib/pub/exceptions.py
+++ b/catalog/pub/Dmaap_lib/pub/exceptions.py
diff --git a/catalog/pub/Dmaap-lib/test/test_consumer.py b/catalog/pub/Dmaap_lib/test/test_consumer.py
index 1f89f65..1f89f65 100644
--- a/catalog/pub/Dmaap-lib/test/test_consumer.py
+++ b/catalog/pub/Dmaap_lib/test/test_consumer.py
diff --git a/catalog/pub/Dmaap-lib/test/test_identity.py b/catalog/pub/Dmaap_lib/test/test_identity.py
index 0f88a5e..0f88a5e 100644
--- a/catalog/pub/Dmaap-lib/test/test_identity.py
+++ b/catalog/pub/Dmaap_lib/test/test_identity.py
diff --git a/catalog/pub/config/config.py b/catalog/pub/config/config.py
index dcc6cc1..99932d7 100644
--- a/catalog/pub/config/config.py
+++ b/catalog/pub/config/config.py
@@ -18,11 +18,6 @@ MSB_SERVICE_IP = '127.0.0.1'
MSB_SERVICE_PORT = '80'
MSB_BASE_URL = "%s://%s:%s" % (MSB_SERVICE_PROTOCOL, MSB_SERVICE_IP, MSB_SERVICE_PORT)
-# [REDIS]
-# REDIS_HOST = '127.0.0.1'
-# REDIS_PORT = '6379'
-# REDIS_PASSWD = ''
-
# [mysql]
DB_IP = "127.0.0.1"
DB_PORT = 3306
@@ -97,4 +92,11 @@ SDC_BASE_URL = "https://msb-iag/api"
SDC_USER = "modeling"
SDC_PASSWD = "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U"
+# [dmaap config]
+DMAAP_MR_IP = '127.0.0.1'
+DMAAP_MR_PORT = '3904'
+CONSUMER_GROUP = "consumerGroup"
+CONSUMER_ID = "consumerId"
+POLLING_INTERVAL = 15
+
VNFD_SCHEMA_VERSION_DEFAULT = "base"
diff --git a/catalog/pub/msapi/sdc.py b/catalog/pub/msapi/sdc.py
index 86930f0..498db8d 100644
--- a/catalog/pub/msapi/sdc.py
+++ b/catalog/pub/msapi/sdc.py
@@ -129,3 +129,41 @@ def download_artifacts(download_url, local_path, file_name):
local_file.write(ret[1])
local_file.close()
return local_file_name
+
+
+def create_consumer(name, salt, password):
+ req_data = {
+ 'consumerName': name,
+ 'consumerSalt': salt,
+ 'consumerPassword': password
+ }
+ req_data = json.JSONEncoder().encode(req_data)
+ resource = '/sdc2/rest/v1/consumers'
+ headers = {'USER_ID': 'jh0003'}
+ ret = restcall.call_req(base_url=SDC_BASE_URL,
+ user="",
+ passwd="",
+ auth_type=restcall.rest_no_auth,
+ resource=resource,
+ method="POST",
+ content=req_data,
+ additional_headers=headers)
+ if ret[0] != 0:
+ logger.error("Status code is %s, detail is %s.", ret[2], ret[1])
+ raise CatalogException("Failed to create consumer from sdc.")
+
+
+def register_for_topics(key):
+ req_data = {
+ 'apiPublicKey': key,
+ 'distrEnvName': 'AUTO',
+ 'isConsumerToSdcDistrStatusTopic': False,
+ 'distEnvEndPoints': []
+ }
+ req_data = json.JSONEncoder().encode(req_data)
+ url = '/sdc/v1/registerForDistribution'
+ ret = call_sdc(url, 'POST', req_data)
+ if ret[0] != 0:
+ logger.error("Status code is %s, detail is %s.", ret[2], ret[1])
+ raise CatalogException("Failed to register from sdc.")
+ return json.JSONDecoder().decode(ret[1])
diff --git a/catalog/pub/msapi/sdc_controller.py b/catalog/pub/msapi/sdc_controller.py
new file mode 100644
index 0000000..454f3d1
--- /dev/null
+++ b/catalog/pub/msapi/sdc_controller.py
@@ -0,0 +1,215 @@
+# Copyright 2019 CMCC Technologies Co., Ltd.
+import json
+import logging
+import os
+import time
+import traceback
+import uuid
+from threading import Thread
+
+from apscheduler.scheduler import Scheduler
+
+from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
+from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
+from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient
+from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_IP, \
+ DMAAP_MR_PORT
+from catalog.pub.msapi import sdc
+
+logger = logging.getLogger(__name__)
+
+DMAAP_MR_BASE_URL = "http://%s:%s" % (DMAAP_MR_IP, DMAAP_MR_PORT)
+ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"]
+
+
+class SDCController(Thread):
+ def __init__(self):
+ super(SDCController, self).__init__()
+ self.identity = IdentityClient(DMAAP_MR_BASE_URL)
+ self.scheduler = Scheduler(standalone=True)
+ self.notification_topic = ''
+ self.status_topic = ''
+ self.consumer = ''
+
+ @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL)
+ def fetch_task():
+ self.fetch_notification()
+
+ def run(self):
+ try:
+ description = 'nfvo catalog key for' + CONSUMER_ID
+ key = self.identity.create_apikey('', description)
+ topics = sdc.register_for_topics(key['apiKey'])
+ self.notification_topic = topics['distrNotificationTopicName']
+ self.status_topic = topics['distrStatusTopicName']
+ self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID)
+ self.consumer.set_api_credentials(key['apiKey'], key['apiSecret'])
+ self.scheduler.start()
+ except Exception as e:
+ logger.error('start sdc controller failed.')
+ logger.error(e.message)
+ logger.error(traceback.format_exc())
+
+ def fetch_notification(self):
+ try:
+ logger.info('start to fetch message from dmaap.')
+ now_ms = int(time.time() * 1000)
+ notification_msgs = self.consumer.fetch()
+ logger.info('Receive a notification from dmaap: %s', notification_msgs)
+ for notification_msg in notification_msgs:
+ notification_callback = build_callback_notification(now_ms, notification_msg)
+ if is_activate_callback(notification_callback):
+ process_notification(notification_callback)
+ except Exception as e:
+ logger.error('fetch message from dmaap failed.')
+ logger.error(e.message)
+ logger.error(traceback.format_exc())
+
+
+def is_activate_callback(notification_callback):
+ has_relevant_artifacts_in_resource = False
+ has_relevant_artifacts_in_service = False
+ if notification_callback['resources']:
+ has_relevant_artifacts_in_resource = True
+ if notification_callback['serviceArtifacts']:
+ has_relevant_artifacts_in_service = True
+ return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service
+
+
+def build_callback_notification(now_ms, notification_msg):
+ # relevant_resource_instances = build_resource_instances(notification_msg, now_ms)
+ relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms,
+ notification_msg['serviceArtifacts'])
+ # notification_msg['resources'] = relevant_resource_instances
+ notification_msg['serviceArtifacts'] = relevant_service_artifacts
+ return notification_msg
+
+
+def build_resource_instances(notification_msg, now_ms):
+ relevant_resource_instances = []
+ resources = notification_msg['resources']
+ for resource in resources:
+ artifacts = resource['artifacts']
+ found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts)
+ if found_relevant_artifacts:
+ resources['artifacts'] = found_relevant_artifacts
+ relevant_resource_instances.append(resources)
+ return relevant_resource_instances
+
+
+def handle_relevant_artifacts(notification_msg, now_ms, artifacts):
+ relevant_artifacts = []
+ for artifact in artifacts:
+ artifact_type = artifact['artifactType']
+ is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST
+ if is_artifact_relevant:
+ generated_from_uuid = artifact.get('generatedFromUUID', '')
+ if generated_from_uuid:
+ generated_from_artifact = None
+ for artifact_g in artifacts:
+ if generated_from_uuid == artifact_g['artifactUUID']:
+ generated_from_artifact = artifact_g
+ break
+ if generated_from_artifact:
+ is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST
+ else:
+ is_artifact_relevant = False
+ if is_artifact_relevant:
+ artifact = set_related_artifacts(artifact, notification_msg)
+ relevant_artifacts.append(artifact)
+
+ # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant)
+ # if notification_status != 'SUCCESS':
+ # logger.error("Error failed to send notification status to Dmaap.")
+
+ return relevant_artifacts
+
+
+def set_related_artifacts(artifact, notification_msg):
+ related_artifacts_uuid = artifact.get('relatedArtifacts', '')
+ if related_artifacts_uuid:
+ related_artifacts = []
+ for artifact_uuid in related_artifacts_uuid:
+ related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid))
+ artifact['relatedArtifactsInfo'] = related_artifacts
+ return artifact
+
+
+def get_artifact_metadata(notification_msg, uuid):
+ service_artifacts = notification_msg['serviceArtifacts']
+ ret = None
+ for artifact in service_artifacts:
+ if artifact['artifactUUID'] == uuid:
+ ret = artifact
+ break
+ resources = notification_msg['resources']
+ if (not ret) and resources:
+ for resource in resources:
+ artifacts = resource['artifacts']
+ for artifact in artifacts:
+ if artifact['artifactUUID'] == uuid:
+ ret = artifact
+ break
+ if ret:
+ break
+ return ret
+
+
+def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant):
+ logger.info('start to send notification status')
+ status = 'FAIL'
+ if is_artifact_relevant:
+ notification_status = 'NOTIFIED'
+ else:
+ notification_status = 'NOT_NOTIFIED'
+ request = {
+ 'distributionID': distribution_id,
+ 'consumerID': CONSUMER_ID,
+ 'timestamp': now_ms,
+ 'artifactURL': artifact['artifactURL'],
+ 'status': notification_status
+ }
+ request_json = json.JSONEncoder().encode(request)
+ pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria')
+ logger.info('try to send notification status: %s', request_json)
+
+ try:
+ pub.send('MyPartitionKey', request_json)
+ time.sleep(1)
+ stuck = pub.close(10)
+ if not stuck:
+ status = 'SUCCESS'
+ logger.info('send notification status success.')
+ else:
+ logger.error('failed to send notification status, %s messages unsent', len(stuck))
+ except Exception as e:
+ logger.error('failed to send notification status.')
+ logger.error(e.message)
+ logger.error(traceback.format_exc())
+
+ return status
+
+
+def process_notification(msg):
+ logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
+ service_artifacts = msg['serviceArtifacts']
+ for artifact in service_artifacts:
+ if artifact['artifactType'] == 'TOSCA_CSAR':
+ csar_id = artifact['artifactUUID']
+ download_url = artifact['artifactURL']
+ localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+ ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
+ local_path = os.path.join(ns_csar_base, msg['distributionID'])
+ file_name = artifact['artifactName']
+ csar_version = artifact['artifactVersion']
+ sdc.download_artifacts(download_url, local_path, file_name)
+ # call ns package upload
+ data = {
+ 'nsPackageId': csar_id,
+ 'nsPackageVersion': csar_version,
+ 'csarName': file_name,
+ 'csarDir': local_path
+ }
+ jobid = uuid.uuid4()
+ # NsPackageParser(data, jobid).start()
+ logger.debug(data, jobid)
diff --git a/requirements.txt b/requirements.txt
index 6e8880a..65354d4 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -35,4 +35,6 @@ swagger-spec-validator>=2.1.0
onappylog==1.0.9
# uwsgi for parallel processing
-# uwsgi \ No newline at end of file
+# uwsgi
+
+apscheduler==2.1.2 \ No newline at end of file