aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMars chen <chenzihao@bupt.edu.cn>2020-09-18 10:26:56 +0800
committerdengyh <dengyuanhong@chinamobile.com>2020-09-18 15:00:54 +0800
commit4123780df595fd99883286e351a936349709ef68 (patch)
tree42d6b8eaf17a308f0124b7c7c5c71b9f18ec9334
parentacc879458b00c2d78d859ca79d683c1ec79ff554 (diff)
Process notifications for SDC and stores NS and related resources
Issue-ID: MODELING-335 Change-Id: Ie8836c865d21fb4695b85f07f1098d0d4617ac0c Signed-off-by: Mars chen <chenzihao@bupt.edu.cn>
-rw-r--r--catalog/pub/Dmaap_lib/dmaap/publisher.py8
-rw-r--r--catalog/pub/msapi/sdc_controller.py97
-rw-r--r--docs/release-notes.rst16
-rw-r--r--requirements.txt2
4 files changed, 86 insertions, 37 deletions
diff --git a/catalog/pub/Dmaap_lib/dmaap/publisher.py b/catalog/pub/Dmaap_lib/dmaap/publisher.py
index 7ebbca0..23a2351 100644
--- a/catalog/pub/Dmaap_lib/dmaap/publisher.py
+++ b/catalog/pub/Dmaap_lib/dmaap/publisher.py
@@ -21,8 +21,7 @@ import time
from hashlib import sha1
import requests
-from apscheduler.scheduler import Scheduler
-
+from apscheduler.schedulers.background import BackgroundScheduler
from catalog.pub.Dmaap_lib.pub.exceptions import DmaapClientException
logger = logging.getLogger(__name__)
@@ -40,9 +39,10 @@ class BatchPublisherClient:
self.pending = []
self.closed = False
self.dont_send_until_ms = 0
- self.scheduler = Scheduler(standalone=False)
+ self.scheduler = BackgroundScheduler()
- @self.scheduler.interval_schedule(second=1)
+ # @self.scheduler.interval_schedule(second=1)
+ @self.scheduler.scheduled_job(second=1)
def crawl_job():
self.send_message(False)
diff --git a/catalog/pub/msapi/sdc_controller.py b/catalog/pub/msapi/sdc_controller.py
index 74a5ae8..8bad857 100644
--- a/catalog/pub/msapi/sdc_controller.py
+++ b/catalog/pub/msapi/sdc_controller.py
@@ -7,7 +7,15 @@ import traceback
import uuid
from threading import Thread
-from apscheduler.scheduler import Scheduler
+from apscheduler.schedulers.background import BackgroundScheduler
+
+from catalog.pub.database.models import NSPackageModel, VnfPackageModel
+from catalog.pub.exceptions import CatalogException
+from catalog.packages.biz import sdc_vnf_package
+from catalog.pub.utils import fileutil
+from catalog.packages.biz.ns_descriptor import NsDescriptor
+from catalog.pub.utils.jobutil import JobUtil
+from catalog.pub.utils.values import ignore_case_get
from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
@@ -24,12 +32,12 @@ class SDCController(Thread):
def __init__(self):
super(SDCController, self).__init__()
self.identity = IdentityClient(DMAAP_MR_BASE_URL)
- self.scheduler = Scheduler(standalone=True)
+ self.scheduler = BackgroundScheduler(job_defaults={'max_instances': 3})
self.notification_topic = ''
self.status_topic = ''
self.consumer = ''
- @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL)
+ @self.scheduler.scheduled_job('interval', seconds=POLLING_INTERVAL)
def fetch_task():
self.fetch_notification()
@@ -190,24 +198,65 @@ def send_notification_status(status_topic, now_ms, distribution_id, artifact, is
def process_notification(msg):
logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
- service_artifacts = msg['serviceArtifacts']
- for artifact in service_artifacts:
- if artifact['artifactType'] == 'TOSCA_CSAR':
- csar_id = artifact['artifactUUID']
- download_url = artifact['artifactURL']
- localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
- local_path = os.path.join(ns_csar_base, msg['distributionID'])
- file_name = artifact['artifactName']
- csar_version = artifact['artifactVersion']
- sdc.download_artifacts(download_url, local_path, file_name)
- # call ns package upload
- data = {
- 'nsPackageId': csar_id,
- 'nsPackageVersion': csar_version,
- 'csarName': file_name,
- 'csarDir': local_path
- }
- jobid = uuid.uuid4()
- # NsPackageParser(data, jobid).start()
- logger.debug(data, jobid)
+ try:
+ ns = sdc.get_asset(sdc.ASSETTYPE_SERVICES, msg['serviceUUID'])
+ # check if the related resources exist
+ resources = ns.get('resources', None)
+ job_array = []
+ resource_threads = []
+ if resources:
+ for resource in resources:
+ if (resource['resoucreType'] == 'VF') and not VnfPackageModel.objects.filter(vnfPackageId=resource['resourceUUID']):
+ logger.debug("VF [%s] is not distributed.", resource['resourceUUID'])
+ # raise CatalogException("VF (%s) is not distributed." % resource['resourceUUID'])
+ logger.info("VF [%s] begin to distribute.", resource['resourceUUID'])
+ csar_id = resource['resourceUUID']
+ vim_ids = ignore_case_get(resource, "vimIds")
+ lab_vim_id = ignore_case_get(resource, "labVimId")
+ job_id = str(uuid.uuid4())
+ job_array.append(job_id)
+ resource_threads.append(sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id))
+ # sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id).start()
+ else:
+ logger.debug("resource [%s] has been distributed", resource['resourceUUID'])
+ for resource_thread in resource_threads:
+ resource_thread.start()
+ for resource_thread in resource_threads:
+ resource_thread.join()
+ for jobID in job_array:
+ job_status = JobUtil.query_job_status(jobID)
+ if job_status[0].status == 'error':
+ raise CatalogException("VF resource fail to distributed.")
+
+ service_artifacts = msg['serviceArtifacts']
+ for artifact in service_artifacts:
+ if artifact['artifactType'] == 'TOSCA_CSAR':
+ csar_id = artifact['artifactUUID']
+ if not NSPackageModel.objects.filter(nsPackageId=artifact['artifactUUID']):
+ download_url = artifact['artifactURL']
+ localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+ ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
+ local_path = os.path.join(ns_csar_base, msg['distributionID'])
+ file_name = artifact['artifactName']
+ # csar_version = artifact['artifactVersion']
+
+ # download csar package
+ local_file_name = sdc.download_artifacts(download_url, local_path, file_name)
+ if local_file_name.endswith(".csar") or local_file_name.endswith(".zip"):
+ artifact_vnf_file = fileutil.unzip_file(local_file_name, local_path,
+ "Artifacts/Deployment/OTHER/ns.csar")
+ if os.path.exists(artifact_vnf_file):
+ local_file_name = artifact_vnf_file
+
+ data = {
+ 'userDefinedData': {}
+ }
+ nsd = NsDescriptor()
+ nsd.create(data, csar_id)
+ nsd.parse_nsd_and_save(csar_id, local_file_name)
+ logger.debug("CSAR(%s) distriuted successfully.", csar_id)
+ else:
+ logger.debug("CSAR(%s) has been distriuted", csar_id)
+ except CatalogException as e:
+ logger.error("Failed to download the resource")
+ logger.error(str(e))
diff --git a/docs/release-notes.rst b/docs/release-notes.rst
index 6a6a42e..dfc2137 100644
--- a/docs/release-notes.rst
+++ b/docs/release-notes.rst
@@ -13,27 +13,27 @@ Service.
Version: 1.0.7
--------------
-:Release Date: 2020-05-11
+:Release Date: 2020-09-09
**New Features**
-- optimize the docker image
+- Optimize the docker image
+- Remove the mandatory dependency on MSB
Released components:
- - etsicatalog 1.0.6
+ - etsicatalog 1.0.7
-Version: 1.0.7
+Version: 1.0.6
--------------
-:Release Date: 2020-09-09
+:Release Date: 2020-05-11
**New Features**
-- Optimize the docker image
-- Remove the mandatory dependency on MSB
+- optimize the docker image
Released components:
- - etsicatalog 1.0.7
+ - etsicatalog 1.0.6
Version: 1.0.5
--------------
diff --git a/requirements.txt b/requirements.txt
index 8404b6f..240f812 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -37,4 +37,4 @@ onappylog==1.0.9
# uwsgi for parallel processing
uwsgi
-apscheduler==2.1.2 \ No newline at end of file
+apscheduler==3.6.3 \ No newline at end of file