summaryrefslogtreecommitdiffstats
path: root/catalog/pub/msapi/sdc_controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'catalog/pub/msapi/sdc_controller.py')
-rw-r--r--catalog/pub/msapi/sdc_controller.py97
1 files changed, 73 insertions, 24 deletions
diff --git a/catalog/pub/msapi/sdc_controller.py b/catalog/pub/msapi/sdc_controller.py
index 74a5ae8..8bad857 100644
--- a/catalog/pub/msapi/sdc_controller.py
+++ b/catalog/pub/msapi/sdc_controller.py
@@ -7,7 +7,15 @@ import traceback
import uuid
from threading import Thread
-from apscheduler.scheduler import Scheduler
+from apscheduler.schedulers.background import BackgroundScheduler
+
+from catalog.pub.database.models import NSPackageModel, VnfPackageModel
+from catalog.pub.exceptions import CatalogException
+from catalog.packages.biz import sdc_vnf_package
+from catalog.pub.utils import fileutil
+from catalog.packages.biz.ns_descriptor import NsDescriptor
+from catalog.pub.utils.jobutil import JobUtil
+from catalog.pub.utils.values import ignore_case_get
from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
@@ -24,12 +32,12 @@ class SDCController(Thread):
def __init__(self):
super(SDCController, self).__init__()
self.identity = IdentityClient(DMAAP_MR_BASE_URL)
- self.scheduler = Scheduler(standalone=True)
+ self.scheduler = BackgroundScheduler(job_defaults={'max_instances': 3})
self.notification_topic = ''
self.status_topic = ''
self.consumer = ''
- @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL)
+ @self.scheduler.scheduled_job('interval', seconds=POLLING_INTERVAL)
def fetch_task():
self.fetch_notification()
@@ -190,24 +198,65 @@ def send_notification_status(status_topic, now_ms, distribution_id, artifact, is
def process_notification(msg):
logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
- service_artifacts = msg['serviceArtifacts']
- for artifact in service_artifacts:
- if artifact['artifactType'] == 'TOSCA_CSAR':
- csar_id = artifact['artifactUUID']
- download_url = artifact['artifactURL']
- localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
- local_path = os.path.join(ns_csar_base, msg['distributionID'])
- file_name = artifact['artifactName']
- csar_version = artifact['artifactVersion']
- sdc.download_artifacts(download_url, local_path, file_name)
- # call ns package upload
- data = {
- 'nsPackageId': csar_id,
- 'nsPackageVersion': csar_version,
- 'csarName': file_name,
- 'csarDir': local_path
- }
- jobid = uuid.uuid4()
- # NsPackageParser(data, jobid).start()
- logger.debug(data, jobid)
+ try:
+ ns = sdc.get_asset(sdc.ASSETTYPE_SERVICES, msg['serviceUUID'])
+ # check if the related resources exist
+ resources = ns.get('resources', None)
+ job_array = []
+ resource_threads = []
+ if resources:
+ for resource in resources:
+ if (resource['resoucreType'] == 'VF') and not VnfPackageModel.objects.filter(vnfPackageId=resource['resourceUUID']):
+ logger.debug("VF [%s] is not distributed.", resource['resourceUUID'])
+ # raise CatalogException("VF (%s) is not distributed." % resource['resourceUUID'])
+ logger.info("VF [%s] begin to distribute.", resource['resourceUUID'])
+ csar_id = resource['resourceUUID']
+ vim_ids = ignore_case_get(resource, "vimIds")
+ lab_vim_id = ignore_case_get(resource, "labVimId")
+ job_id = str(uuid.uuid4())
+ job_array.append(job_id)
+ resource_threads.append(sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id))
+ # sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id).start()
+ else:
+ logger.debug("resource [%s] has been distributed", resource['resourceUUID'])
+ for resource_thread in resource_threads:
+ resource_thread.start()
+ for resource_thread in resource_threads:
+ resource_thread.join()
+ for jobID in job_array:
+ job_status = JobUtil.query_job_status(jobID)
+ if job_status[0].status == 'error':
+ raise CatalogException("VF resource fail to distributed.")
+
+ service_artifacts = msg['serviceArtifacts']
+ for artifact in service_artifacts:
+ if artifact['artifactType'] == 'TOSCA_CSAR':
+ csar_id = artifact['artifactUUID']
+ if not NSPackageModel.objects.filter(nsPackageId=artifact['artifactUUID']):
+ download_url = artifact['artifactURL']
+ localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+ ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
+ local_path = os.path.join(ns_csar_base, msg['distributionID'])
+ file_name = artifact['artifactName']
+ # csar_version = artifact['artifactVersion']
+
+ # download csar package
+ local_file_name = sdc.download_artifacts(download_url, local_path, file_name)
+ if local_file_name.endswith(".csar") or local_file_name.endswith(".zip"):
+ artifact_vnf_file = fileutil.unzip_file(local_file_name, local_path,
+ "Artifacts/Deployment/OTHER/ns.csar")
+ if os.path.exists(artifact_vnf_file):
+ local_file_name = artifact_vnf_file
+
+ data = {
+ 'userDefinedData': {}
+ }
+ nsd = NsDescriptor()
+ nsd.create(data, csar_id)
+ nsd.parse_nsd_and_save(csar_id, local_file_name)
+ logger.debug("CSAR(%s) distriuted successfully.", csar_id)
+ else:
+ logger.debug("CSAR(%s) has been distriuted", csar_id)
+ except CatalogException as e:
+ logger.error("Failed to download the resource")
+ logger.error(str(e))