From fddda96911bdb3ec9841ac71e764cb6eb8fa08d5 Mon Sep 17 00:00:00 2001 From: dyh Date: Wed, 27 May 2020 10:36:10 +0800 Subject: rename package name add function for sdc subscribe and notification Issue-ID: MODELING-366 Change-Id: I61dba314a62003577eddc640325f9c5b9263b2bc Signed-off-by: dyh --- catalog/pub/Dmaap-lib/__init__.py | 13 -- catalog/pub/Dmaap-lib/dmaap/__init__.py | 10 -- catalog/pub/Dmaap-lib/dmaap/consumer.py | 91 ------------ catalog/pub/Dmaap-lib/dmaap/identity.py | 46 ------ catalog/pub/Dmaap-lib/dmaap/publisher.py | 174 ---------------------- catalog/pub/Dmaap-lib/pub/__init__.py | 10 -- catalog/pub/Dmaap-lib/pub/exceptions.py | 15 -- catalog/pub/Dmaap-lib/test/test_consumer.py | 81 ----------- catalog/pub/Dmaap-lib/test/test_identity.py | 38 ----- catalog/pub/Dmaap_lib/__init__.py | 13 ++ catalog/pub/Dmaap_lib/dmaap/__init__.py | 10 ++ catalog/pub/Dmaap_lib/dmaap/consumer.py | 91 ++++++++++++ catalog/pub/Dmaap_lib/dmaap/identity.py | 46 ++++++ catalog/pub/Dmaap_lib/dmaap/publisher.py | 174 ++++++++++++++++++++++ catalog/pub/Dmaap_lib/pub/__init__.py | 10 ++ catalog/pub/Dmaap_lib/pub/exceptions.py | 15 ++ catalog/pub/Dmaap_lib/test/test_consumer.py | 81 +++++++++++ catalog/pub/Dmaap_lib/test/test_identity.py | 38 +++++ catalog/pub/config/config.py | 12 +- catalog/pub/msapi/sdc.py | 38 +++++ catalog/pub/msapi/sdc_controller.py | 215 ++++++++++++++++++++++++++++ requirements.txt | 4 +- 22 files changed, 741 insertions(+), 484 deletions(-) delete mode 100644 catalog/pub/Dmaap-lib/__init__.py delete mode 100644 catalog/pub/Dmaap-lib/dmaap/__init__.py delete mode 100644 catalog/pub/Dmaap-lib/dmaap/consumer.py delete mode 100644 catalog/pub/Dmaap-lib/dmaap/identity.py delete mode 100644 catalog/pub/Dmaap-lib/dmaap/publisher.py delete mode 100644 catalog/pub/Dmaap-lib/pub/__init__.py delete mode 100644 catalog/pub/Dmaap-lib/pub/exceptions.py delete mode 100644 catalog/pub/Dmaap-lib/test/test_consumer.py delete mode 100644 catalog/pub/Dmaap-lib/test/test_identity.py create mode 100644 catalog/pub/Dmaap_lib/__init__.py create mode 100644 catalog/pub/Dmaap_lib/dmaap/__init__.py create mode 100644 catalog/pub/Dmaap_lib/dmaap/consumer.py create mode 100644 catalog/pub/Dmaap_lib/dmaap/identity.py create mode 100644 catalog/pub/Dmaap_lib/dmaap/publisher.py create mode 100644 catalog/pub/Dmaap_lib/pub/__init__.py create mode 100644 catalog/pub/Dmaap_lib/pub/exceptions.py create mode 100644 catalog/pub/Dmaap_lib/test/test_consumer.py create mode 100644 catalog/pub/Dmaap_lib/test/test_identity.py create mode 100644 catalog/pub/msapi/sdc_controller.py diff --git a/catalog/pub/Dmaap-lib/__init__.py b/catalog/pub/Dmaap-lib/__init__.py deleted file mode 100644 index 7ae04f0..0000000 --- a/catalog/pub/Dmaap-lib/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2019, CMCC Technologies Co., Ltd. - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/catalog/pub/Dmaap-lib/dmaap/__init__.py b/catalog/pub/Dmaap-lib/dmaap/__init__.py deleted file mode 100644 index 0c1e8e1..0000000 --- a/catalog/pub/Dmaap-lib/dmaap/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -# Copyright (c) 2019, CMCC Technologies Co., Ltd. -# Licensed under the Apache License, Version 2.0 (the "License") -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/catalog/pub/Dmaap-lib/dmaap/consumer.py b/catalog/pub/Dmaap-lib/dmaap/consumer.py deleted file mode 100644 index 054791c..0000000 --- a/catalog/pub/Dmaap-lib/dmaap/consumer.py +++ /dev/null @@ -1,91 +0,0 @@ -# Copyright (c) 2019, CMCC Technologies Co., Ltd. -# Licensed under the Apache License, Version 2.0 (the "License") -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import base64 -import hmac -import json -import logging -import datetime -import requests -from hashlib import sha1 - -from ..pub.exceptions import DmaapClientException - -logger = logging.getLogger(__name__) - - -class ConsumerClient: - def __init__(self, host, topic, consumer_group, consumer_id, timeout_ms=-1, limit=-1, filter='', - api_key='', api_secret=''): - self.host = host - self.topic = topic - self.group = consumer_group - self.comsumer_id = consumer_id - self.timeout_ms = timeout_ms - self.limit = limit - self.filter = filter - self.api_key = api_key - self.api_secret = api_secret - - def set_api_credentials(self, api_key, api_secret): - - self.api_key = api_key - self.api_secret = api_secret - - def create_url(self): - url = "http://%s/events/%s/%s/%s" % (self.host, self.topic, self.group, self.comsumer_id) - add_url = "" - if self.timeout_ms > -1: - add_url += "timeout=%s" % self.timeout_ms - if self.limit > -1: - if add_url: - add_url += "&" - add_url += "limit=%s" % self.limit - if self.filter: - if add_url: - add_url += "&" - add_url += "filter=%s" % self.filter.encode("utf-8") - if add_url: - url = url + "?" + add_url - - return url - - def create_headers(self): - data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' - hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest() - signature = base64.b64encode(hmac_code).decode() - auth = self.api_key + ':' + signature - headers = { - 'X-CambriaDate': data, - 'X-CambriaAuth': auth - } - return headers - - def fetch(self): - try: - msgs = [] - url = self.create_url() - if self.api_key: - headers = self.create_headers() - ret = requests.get(url=url, headers=headers) - else: - ret = requests.get(url) - logger.info("Status code is %s, detail is %s.", ret.status_code, ret.json()) - if ret.status_code != 200: - raise DmaapClientException('Call dmaap failed. Status code is %s, detail is %s.' % (ret.status_code, ret.json())) - data = ret.json() - for msg in data: - msg = json.loads(msg) - msgs.append(msg) - return msgs - except Exception as e: - raise DmaapClientException(e.message) diff --git a/catalog/pub/Dmaap-lib/dmaap/identity.py b/catalog/pub/Dmaap-lib/dmaap/identity.py deleted file mode 100644 index 1dcaad8..0000000 --- a/catalog/pub/Dmaap-lib/dmaap/identity.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright (c) 2019, CMCC Technologies Co., Ltd. -# Licensed under the Apache License, Version 2.0 (the "License") -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import json -import logging -import requests - -from ..pub.exceptions import DmaapClientException - -logger = logging.getLogger(__name__) - - -class IdentityClient: - def __init__(self, host): - self.host = host - - def create_apikey(self, email, description): - try: - headers = {'content-type': 'application/json;charset=UTF-8'} - data = { - 'email': email, - 'description': description - } - data = json.JSONEncoder().encode(data) - url = "http://%s/apiKeys/create" % (self.host) - ret = requests.post(url=url, data=data, headers=headers) - logger.info('create apiKey, response status_code: %s, body: %s', ret.status_code, ret.json()) - if ret.status_code != 200: - raise DmaapClientException(ret.json()) - ret = ret.json() - resp_data = { - 'apiKey': ret.get('key', ''), - 'apiSecret': ret.get('secret', '') - } - return resp_data - except Exception as e: - raise DmaapClientException('create apikey from dmaap failed: ' + e.message) diff --git a/catalog/pub/Dmaap-lib/dmaap/publisher.py b/catalog/pub/Dmaap-lib/dmaap/publisher.py deleted file mode 100644 index 643ba90..0000000 --- a/catalog/pub/Dmaap-lib/dmaap/publisher.py +++ /dev/null @@ -1,174 +0,0 @@ -# Copyright (c) 2019, CMCC Technologies Co., Ltd. -# Licensed under the Apache License, Version 2.0 (the "License") -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import base64 -import datetime -import hmac -import json -import logging -import time -from hashlib import sha1 - -import requests -from apscheduler.scheduler import Scheduler - -from ..pub.exceptions import DmaapClientException - -logger = logging.getLogger(__name__) - - -class BatchPublisherClient: - def __init__(self, host, topic, partition="", contenttype="text/plain", max_batch_size=100, max_batch_age_ms=1000): - self.host = host - self.topic = topic - self.partition = partition - self.contenttype = contenttype - self.max_batch_size = max_batch_size - self.max_batch_age_ms = max_batch_age_ms - self.pending = [] - self.closed = False - self.dont_send_until_ms = 0 - self.scheduler = Scheduler(standalone=False) - self.api_key = '', - self.api_secret = '' - - @self.scheduler.interval_schedule(second=1) - def crawl_job(): - self.send_message(False) - self.scheduler.start() - - def set_api_credentials(self, api_key, api_secret): - self.api_key = api_key - self.api_secret = api_secret - - def send(self, partition, msg): - try: - if self.closed: - raise DmaapClientException("The publisher was closed.") - message = Message(partition, msg) - self.pending.append(message) - return len(self.pending) - except Exception as e: - raise DmaapClientException("append message failed: " + e.message) - - def send_message(self, force): - if force or self.should_send_now(): - if not self.send_batch(): - logger.error("Send failed, %s message to send.", len(self.pending)) - - def should_send_now(self): - should_send = False - if len(self.pending) > 0: - now_ms = int(time.time() * 1000) - should_send = len(self.pending) >= self.max_batch_size - if not should_send: - send_at_ms = self.pending[0].timestamp_ms - should_send = send_at_ms <= now_ms - - should_send = should_send and now_ms >= self.dont_send_until_ms - - return should_send - - def send_batch(self): - if len(self.pending) < 1: - return True - now_ms = int(time.time() * 1000) - url = self.create_url() - logger.info("sending %s msgs to %s . Oldest: %s ms", len(self.pending), url, - str(now_ms - self.pending[0].timestamp_ms)) - try: - str_msg = '' - if self.contenttype == "application/json": - str_msg = self.parse_json() - elif self.contenttype == "text/plain": - for m in self.pending: - str_msg += m.msg - str_msg += '\n' - elif self.contenttype == "application/cambria": - for m in self.pending: - str_msg += str(len(m.partition)) - str_msg += '.' - str_msg += str(len(m.msg)) - str_msg += '.' - str_msg += m.partition - str_msg += m.msg - str_msg += '\n' - else: - for m in self.pending: - str_msg += m.msg - msg = bytearray(str_msg) - - start_ms = int(time.time() * 1000) - if self.api_key: - headers = self.create_headers() - else: - headers = {'content-type': self.contenttype} - ret = requests.post(url=url, data=msg, headers=headers) - if ret.status_code < 200 or ret.status_code > 299: - return False - logger.info("MR reply ok (%s ms): %s", start_ms - int(time.time() * 1000), ret.json()) - self.pending = [] - return True - - except Exception as e: - logger.error(e.message) - return False - - def create_headers(self): - data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' - hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest() - signature = base64.b64encode(hmac_code).decode() - auth = self.api_key + ':' + signature - headers = { - 'X-CambriaDate': data, - 'X-CambriaAuth': auth, - 'content-type': self.contenttype - } - return headers - - def create_url(self): - url = "http://%s/events/%s" % (self.host, self.topic) - if self.partition: - url = url + "?partitionKey=" + self.partition - return url - - def parse_json(self): - data = [] - for message in self.pending: - msg = json.loads(message.msg) - for m in msg: - data.append(m) - return json.dumps(data) - - def close(self, timeout): - try: - self.closed = True - self.scheduler.shutdown() - now_ms = int(time.time() * 1000) - wait_in_ms = now_ms + timeout * 1000 - - while int(time.time() * 1000) < wait_in_ms and len(self.pending) > 0: - self.send_message(True) - time.sleep(0.25) - return self.pending - except Exception as e: - raise DmaapClientException("send message failed: " + e.message) - - -class Message: - def __init__(self, partition, msg): - if not partition: - self.partition = "" - else: - self.partition = partition - self.msg = msg - self.timestamp_ms = int(time.time() * 1000) diff --git a/catalog/pub/Dmaap-lib/pub/__init__.py b/catalog/pub/Dmaap-lib/pub/__init__.py deleted file mode 100644 index 0c1e8e1..0000000 --- a/catalog/pub/Dmaap-lib/pub/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -# Copyright (c) 2019, CMCC Technologies Co., Ltd. -# Licensed under the Apache License, Version 2.0 (the "License") -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/catalog/pub/Dmaap-lib/pub/exceptions.py b/catalog/pub/Dmaap-lib/pub/exceptions.py deleted file mode 100644 index 6b65fcf..0000000 --- a/catalog/pub/Dmaap-lib/pub/exceptions.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright (c) 2019, CMCC Technologies Co., Ltd. -# Licensed under the Apache License, Version 2.0 (the "License") -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class DmaapClientException(Exception): - def __init__(self, msg): - Exception.__init__(self, msg) diff --git a/catalog/pub/Dmaap-lib/test/test_consumer.py b/catalog/pub/Dmaap-lib/test/test_consumer.py deleted file mode 100644 index 1f89f65..0000000 --- a/catalog/pub/Dmaap-lib/test/test_consumer.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright (c) 2019, CMCC Technologies Co., Ltd. - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import base64 -import datetime -import hmac -import unittest -from _sha1 import sha1 - -from ..dmaap.consumer import ConsumerClient - - -class CreateApiKeyTest(unittest.TestCase): - def setUp(self): - self.apiKey = "7TuwzpLJ4QfQs4O" - self.apiSecret = "7TuwzpLJ4QfQs4O" - self.host = '127.0.0.1' - self.topic = 'abc' - self.group = 'def' - self.comsumer_id = '123' - self.timeout_ms = 3 - self.limit = 3 - self.filter = 'test' - - def tearDown(self): - self.ret_url = "" - - def test_create_url(self): - exp_url = 'http://127.0.0.1/events/abc/def/123' - consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id) - ret_url = consumer.create_url() - self.assertEqual(exp_url, ret_url) - - def test_create_timeout_url(self): - exp_url = 'http://127.0.0.1/events/abc/def/123?timeout=3' - consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id, self.timeout_ms) - ret_url = consumer.create_url() - self.assertEqual(exp_url, ret_url) - - def test_create_limit_url(self): - - exp_url = 'http://127.0.0.1/events/abc/def/123?timeout=3&limit=3' - consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id, - self.timeout_ms, self.limit) - ret_url = consumer.create_url() - self.assertEqual(exp_url, ret_url) - - def test_create_filter_url(self): - - exp_url = "http://127.0.0.1/events/abc/def/123?timeout=3&limit=3&filter=b'test'" - consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id, - self.timeout_ms, self.limit, self.filter) - ret_url = consumer.create_url() - self.assertEqual(exp_url, ret_url) - - def test_create_headers(self): - data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' - hmac_code = hmac.new(self.apiSecret.encode(), data.encode(), sha1).digest() - signature = base64.b64encode(hmac_code).decode() - auth = self.apiKey + ':' + signature - exp_headers = { - 'X-CambriaDate': data, - 'X-CambriaAuth': auth - } - - consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id, - self.timeout_ms, self.limit, self.filter, self.apiKey, self.apiSecret) - consumer.set_api_credentials(self.apiKey, self.apiSecret) - rea_headers = consumer.create_headers() - self.assertEqual(exp_headers, rea_headers) diff --git a/catalog/pub/Dmaap-lib/test/test_identity.py b/catalog/pub/Dmaap-lib/test/test_identity.py deleted file mode 100644 index 0f88a5e..0000000 --- a/catalog/pub/Dmaap-lib/test/test_identity.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright (c) 2019, CMCC Technologies Co., Ltd. - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -import mock - -from ..dmaap.identity import IdentityClient - - -class CreateApiKeyTest(unittest.TestCase): - def setUp(self): - self.apiKey = "7TuwzpLJ4QfQs4O" - self.apiSecret = "7TuwzpLJ4QfQs4O" - self.host = '127.0.0.1' - - def tearDown(self): - self.ret_url = "" - - @mock.patch.object(IdentityClient, 'create_apikey') - def test_create_apiKey(self, mock_create_apikey): - mock_create_apikey.return_value = { - 'apiKey': "7TuwzpLJ4QfQs4O", - 'apiSecret': "7TuwzpLJ4QfQs4O" - } - resp_data = IdentityClient(self.host).create_apikey('', 'description') - self.assertEqual(self.apiKey, resp_data.get("apiKey")) - self.assertEqual(self.apiSecret, resp_data.get("apiSecret")) diff --git a/catalog/pub/Dmaap_lib/__init__.py b/catalog/pub/Dmaap_lib/__init__.py new file mode 100644 index 0000000..7ae04f0 --- /dev/null +++ b/catalog/pub/Dmaap_lib/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/catalog/pub/Dmaap_lib/dmaap/__init__.py b/catalog/pub/Dmaap_lib/dmaap/__init__.py new file mode 100644 index 0000000..0c1e8e1 --- /dev/null +++ b/catalog/pub/Dmaap_lib/dmaap/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/catalog/pub/Dmaap_lib/dmaap/consumer.py b/catalog/pub/Dmaap_lib/dmaap/consumer.py new file mode 100644 index 0000000..054791c --- /dev/null +++ b/catalog/pub/Dmaap_lib/dmaap/consumer.py @@ -0,0 +1,91 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import base64 +import hmac +import json +import logging +import datetime +import requests +from hashlib import sha1 + +from ..pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class ConsumerClient: + def __init__(self, host, topic, consumer_group, consumer_id, timeout_ms=-1, limit=-1, filter='', + api_key='', api_secret=''): + self.host = host + self.topic = topic + self.group = consumer_group + self.comsumer_id = consumer_id + self.timeout_ms = timeout_ms + self.limit = limit + self.filter = filter + self.api_key = api_key + self.api_secret = api_secret + + def set_api_credentials(self, api_key, api_secret): + + self.api_key = api_key + self.api_secret = api_secret + + def create_url(self): + url = "http://%s/events/%s/%s/%s" % (self.host, self.topic, self.group, self.comsumer_id) + add_url = "" + if self.timeout_ms > -1: + add_url += "timeout=%s" % self.timeout_ms + if self.limit > -1: + if add_url: + add_url += "&" + add_url += "limit=%s" % self.limit + if self.filter: + if add_url: + add_url += "&" + add_url += "filter=%s" % self.filter.encode("utf-8") + if add_url: + url = url + "?" + add_url + + return url + + def create_headers(self): + data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' + hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest() + signature = base64.b64encode(hmac_code).decode() + auth = self.api_key + ':' + signature + headers = { + 'X-CambriaDate': data, + 'X-CambriaAuth': auth + } + return headers + + def fetch(self): + try: + msgs = [] + url = self.create_url() + if self.api_key: + headers = self.create_headers() + ret = requests.get(url=url, headers=headers) + else: + ret = requests.get(url) + logger.info("Status code is %s, detail is %s.", ret.status_code, ret.json()) + if ret.status_code != 200: + raise DmaapClientException('Call dmaap failed. Status code is %s, detail is %s.' % (ret.status_code, ret.json())) + data = ret.json() + for msg in data: + msg = json.loads(msg) + msgs.append(msg) + return msgs + except Exception as e: + raise DmaapClientException(e.message) diff --git a/catalog/pub/Dmaap_lib/dmaap/identity.py b/catalog/pub/Dmaap_lib/dmaap/identity.py new file mode 100644 index 0000000..1dcaad8 --- /dev/null +++ b/catalog/pub/Dmaap_lib/dmaap/identity.py @@ -0,0 +1,46 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import logging +import requests + +from ..pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class IdentityClient: + def __init__(self, host): + self.host = host + + def create_apikey(self, email, description): + try: + headers = {'content-type': 'application/json;charset=UTF-8'} + data = { + 'email': email, + 'description': description + } + data = json.JSONEncoder().encode(data) + url = "http://%s/apiKeys/create" % (self.host) + ret = requests.post(url=url, data=data, headers=headers) + logger.info('create apiKey, response status_code: %s, body: %s', ret.status_code, ret.json()) + if ret.status_code != 200: + raise DmaapClientException(ret.json()) + ret = ret.json() + resp_data = { + 'apiKey': ret.get('key', ''), + 'apiSecret': ret.get('secret', '') + } + return resp_data + except Exception as e: + raise DmaapClientException('create apikey from dmaap failed: ' + e.message) diff --git a/catalog/pub/Dmaap_lib/dmaap/publisher.py b/catalog/pub/Dmaap_lib/dmaap/publisher.py new file mode 100644 index 0000000..643ba90 --- /dev/null +++ b/catalog/pub/Dmaap_lib/dmaap/publisher.py @@ -0,0 +1,174 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import base64 +import datetime +import hmac +import json +import logging +import time +from hashlib import sha1 + +import requests +from apscheduler.scheduler import Scheduler + +from ..pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class BatchPublisherClient: + def __init__(self, host, topic, partition="", contenttype="text/plain", max_batch_size=100, max_batch_age_ms=1000): + self.host = host + self.topic = topic + self.partition = partition + self.contenttype = contenttype + self.max_batch_size = max_batch_size + self.max_batch_age_ms = max_batch_age_ms + self.pending = [] + self.closed = False + self.dont_send_until_ms = 0 + self.scheduler = Scheduler(standalone=False) + self.api_key = '', + self.api_secret = '' + + @self.scheduler.interval_schedule(second=1) + def crawl_job(): + self.send_message(False) + self.scheduler.start() + + def set_api_credentials(self, api_key, api_secret): + self.api_key = api_key + self.api_secret = api_secret + + def send(self, partition, msg): + try: + if self.closed: + raise DmaapClientException("The publisher was closed.") + message = Message(partition, msg) + self.pending.append(message) + return len(self.pending) + except Exception as e: + raise DmaapClientException("append message failed: " + e.message) + + def send_message(self, force): + if force or self.should_send_now(): + if not self.send_batch(): + logger.error("Send failed, %s message to send.", len(self.pending)) + + def should_send_now(self): + should_send = False + if len(self.pending) > 0: + now_ms = int(time.time() * 1000) + should_send = len(self.pending) >= self.max_batch_size + if not should_send: + send_at_ms = self.pending[0].timestamp_ms + should_send = send_at_ms <= now_ms + + should_send = should_send and now_ms >= self.dont_send_until_ms + + return should_send + + def send_batch(self): + if len(self.pending) < 1: + return True + now_ms = int(time.time() * 1000) + url = self.create_url() + logger.info("sending %s msgs to %s . Oldest: %s ms", len(self.pending), url, + str(now_ms - self.pending[0].timestamp_ms)) + try: + str_msg = '' + if self.contenttype == "application/json": + str_msg = self.parse_json() + elif self.contenttype == "text/plain": + for m in self.pending: + str_msg += m.msg + str_msg += '\n' + elif self.contenttype == "application/cambria": + for m in self.pending: + str_msg += str(len(m.partition)) + str_msg += '.' + str_msg += str(len(m.msg)) + str_msg += '.' + str_msg += m.partition + str_msg += m.msg + str_msg += '\n' + else: + for m in self.pending: + str_msg += m.msg + msg = bytearray(str_msg) + + start_ms = int(time.time() * 1000) + if self.api_key: + headers = self.create_headers() + else: + headers = {'content-type': self.contenttype} + ret = requests.post(url=url, data=msg, headers=headers) + if ret.status_code < 200 or ret.status_code > 299: + return False + logger.info("MR reply ok (%s ms): %s", start_ms - int(time.time() * 1000), ret.json()) + self.pending = [] + return True + + except Exception as e: + logger.error(e.message) + return False + + def create_headers(self): + data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' + hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest() + signature = base64.b64encode(hmac_code).decode() + auth = self.api_key + ':' + signature + headers = { + 'X-CambriaDate': data, + 'X-CambriaAuth': auth, + 'content-type': self.contenttype + } + return headers + + def create_url(self): + url = "http://%s/events/%s" % (self.host, self.topic) + if self.partition: + url = url + "?partitionKey=" + self.partition + return url + + def parse_json(self): + data = [] + for message in self.pending: + msg = json.loads(message.msg) + for m in msg: + data.append(m) + return json.dumps(data) + + def close(self, timeout): + try: + self.closed = True + self.scheduler.shutdown() + now_ms = int(time.time() * 1000) + wait_in_ms = now_ms + timeout * 1000 + + while int(time.time() * 1000) < wait_in_ms and len(self.pending) > 0: + self.send_message(True) + time.sleep(0.25) + return self.pending + except Exception as e: + raise DmaapClientException("send message failed: " + e.message) + + +class Message: + def __init__(self, partition, msg): + if not partition: + self.partition = "" + else: + self.partition = partition + self.msg = msg + self.timestamp_ms = int(time.time() * 1000) diff --git a/catalog/pub/Dmaap_lib/pub/__init__.py b/catalog/pub/Dmaap_lib/pub/__init__.py new file mode 100644 index 0000000..0c1e8e1 --- /dev/null +++ b/catalog/pub/Dmaap_lib/pub/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/catalog/pub/Dmaap_lib/pub/exceptions.py b/catalog/pub/Dmaap_lib/pub/exceptions.py new file mode 100644 index 0000000..6b65fcf --- /dev/null +++ b/catalog/pub/Dmaap_lib/pub/exceptions.py @@ -0,0 +1,15 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class DmaapClientException(Exception): + def __init__(self, msg): + Exception.__init__(self, msg) diff --git a/catalog/pub/Dmaap_lib/test/test_consumer.py b/catalog/pub/Dmaap_lib/test/test_consumer.py new file mode 100644 index 0000000..1f89f65 --- /dev/null +++ b/catalog/pub/Dmaap_lib/test/test_consumer.py @@ -0,0 +1,81 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import datetime +import hmac +import unittest +from _sha1 import sha1 + +from ..dmaap.consumer import ConsumerClient + + +class CreateApiKeyTest(unittest.TestCase): + def setUp(self): + self.apiKey = "7TuwzpLJ4QfQs4O" + self.apiSecret = "7TuwzpLJ4QfQs4O" + self.host = '127.0.0.1' + self.topic = 'abc' + self.group = 'def' + self.comsumer_id = '123' + self.timeout_ms = 3 + self.limit = 3 + self.filter = 'test' + + def tearDown(self): + self.ret_url = "" + + def test_create_url(self): + exp_url = 'http://127.0.0.1/events/abc/def/123' + consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id) + ret_url = consumer.create_url() + self.assertEqual(exp_url, ret_url) + + def test_create_timeout_url(self): + exp_url = 'http://127.0.0.1/events/abc/def/123?timeout=3' + consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id, self.timeout_ms) + ret_url = consumer.create_url() + self.assertEqual(exp_url, ret_url) + + def test_create_limit_url(self): + + exp_url = 'http://127.0.0.1/events/abc/def/123?timeout=3&limit=3' + consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id, + self.timeout_ms, self.limit) + ret_url = consumer.create_url() + self.assertEqual(exp_url, ret_url) + + def test_create_filter_url(self): + + exp_url = "http://127.0.0.1/events/abc/def/123?timeout=3&limit=3&filter=b'test'" + consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id, + self.timeout_ms, self.limit, self.filter) + ret_url = consumer.create_url() + self.assertEqual(exp_url, ret_url) + + def test_create_headers(self): + data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' + hmac_code = hmac.new(self.apiSecret.encode(), data.encode(), sha1).digest() + signature = base64.b64encode(hmac_code).decode() + auth = self.apiKey + ':' + signature + exp_headers = { + 'X-CambriaDate': data, + 'X-CambriaAuth': auth + } + + consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id, + self.timeout_ms, self.limit, self.filter, self.apiKey, self.apiSecret) + consumer.set_api_credentials(self.apiKey, self.apiSecret) + rea_headers = consumer.create_headers() + self.assertEqual(exp_headers, rea_headers) diff --git a/catalog/pub/Dmaap_lib/test/test_identity.py b/catalog/pub/Dmaap_lib/test/test_identity.py new file mode 100644 index 0000000..0f88a5e --- /dev/null +++ b/catalog/pub/Dmaap_lib/test/test_identity.py @@ -0,0 +1,38 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import mock + +from ..dmaap.identity import IdentityClient + + +class CreateApiKeyTest(unittest.TestCase): + def setUp(self): + self.apiKey = "7TuwzpLJ4QfQs4O" + self.apiSecret = "7TuwzpLJ4QfQs4O" + self.host = '127.0.0.1' + + def tearDown(self): + self.ret_url = "" + + @mock.patch.object(IdentityClient, 'create_apikey') + def test_create_apiKey(self, mock_create_apikey): + mock_create_apikey.return_value = { + 'apiKey': "7TuwzpLJ4QfQs4O", + 'apiSecret': "7TuwzpLJ4QfQs4O" + } + resp_data = IdentityClient(self.host).create_apikey('', 'description') + self.assertEqual(self.apiKey, resp_data.get("apiKey")) + self.assertEqual(self.apiSecret, resp_data.get("apiSecret")) diff --git a/catalog/pub/config/config.py b/catalog/pub/config/config.py index dcc6cc1..99932d7 100644 --- a/catalog/pub/config/config.py +++ b/catalog/pub/config/config.py @@ -18,11 +18,6 @@ MSB_SERVICE_IP = '127.0.0.1' MSB_SERVICE_PORT = '80' MSB_BASE_URL = "%s://%s:%s" % (MSB_SERVICE_PROTOCOL, MSB_SERVICE_IP, MSB_SERVICE_PORT) -# [REDIS] -# REDIS_HOST = '127.0.0.1' -# REDIS_PORT = '6379' -# REDIS_PASSWD = '' - # [mysql] DB_IP = "127.0.0.1" DB_PORT = 3306 @@ -97,4 +92,11 @@ SDC_BASE_URL = "https://msb-iag/api" SDC_USER = "modeling" SDC_PASSWD = "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U" +# [dmaap config] +DMAAP_MR_IP = '127.0.0.1' +DMAAP_MR_PORT = '3904' +CONSUMER_GROUP = "consumerGroup" +CONSUMER_ID = "consumerId" +POLLING_INTERVAL = 15 + VNFD_SCHEMA_VERSION_DEFAULT = "base" diff --git a/catalog/pub/msapi/sdc.py b/catalog/pub/msapi/sdc.py index 86930f0..498db8d 100644 --- a/catalog/pub/msapi/sdc.py +++ b/catalog/pub/msapi/sdc.py @@ -129,3 +129,41 @@ def download_artifacts(download_url, local_path, file_name): local_file.write(ret[1]) local_file.close() return local_file_name + + +def create_consumer(name, salt, password): + req_data = { + 'consumerName': name, + 'consumerSalt': salt, + 'consumerPassword': password + } + req_data = json.JSONEncoder().encode(req_data) + resource = '/sdc2/rest/v1/consumers' + headers = {'USER_ID': 'jh0003'} + ret = restcall.call_req(base_url=SDC_BASE_URL, + user="", + passwd="", + auth_type=restcall.rest_no_auth, + resource=resource, + method="POST", + content=req_data, + additional_headers=headers) + if ret[0] != 0: + logger.error("Status code is %s, detail is %s.", ret[2], ret[1]) + raise CatalogException("Failed to create consumer from sdc.") + + +def register_for_topics(key): + req_data = { + 'apiPublicKey': key, + 'distrEnvName': 'AUTO', + 'isConsumerToSdcDistrStatusTopic': False, + 'distEnvEndPoints': [] + } + req_data = json.JSONEncoder().encode(req_data) + url = '/sdc/v1/registerForDistribution' + ret = call_sdc(url, 'POST', req_data) + if ret[0] != 0: + logger.error("Status code is %s, detail is %s.", ret[2], ret[1]) + raise CatalogException("Failed to register from sdc.") + return json.JSONDecoder().decode(ret[1]) diff --git a/catalog/pub/msapi/sdc_controller.py b/catalog/pub/msapi/sdc_controller.py new file mode 100644 index 0000000..454f3d1 --- /dev/null +++ b/catalog/pub/msapi/sdc_controller.py @@ -0,0 +1,215 @@ +# Copyright 2019 CMCC Technologies Co., Ltd. +import json +import logging +import os +import time +import traceback +import uuid +from threading import Thread + +from apscheduler.scheduler import Scheduler + +from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient +from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient +from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient +from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_IP, \ + DMAAP_MR_PORT +from catalog.pub.msapi import sdc + +logger = logging.getLogger(__name__) + +DMAAP_MR_BASE_URL = "http://%s:%s" % (DMAAP_MR_IP, DMAAP_MR_PORT) +ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"] + + +class SDCController(Thread): + def __init__(self): + super(SDCController, self).__init__() + self.identity = IdentityClient(DMAAP_MR_BASE_URL) + self.scheduler = Scheduler(standalone=True) + self.notification_topic = '' + self.status_topic = '' + self.consumer = '' + + @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL) + def fetch_task(): + self.fetch_notification() + + def run(self): + try: + description = 'nfvo catalog key for' + CONSUMER_ID + key = self.identity.create_apikey('', description) + topics = sdc.register_for_topics(key['apiKey']) + self.notification_topic = topics['distrNotificationTopicName'] + self.status_topic = topics['distrStatusTopicName'] + self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID) + self.consumer.set_api_credentials(key['apiKey'], key['apiSecret']) + self.scheduler.start() + except Exception as e: + logger.error('start sdc controller failed.') + logger.error(e.message) + logger.error(traceback.format_exc()) + + def fetch_notification(self): + try: + logger.info('start to fetch message from dmaap.') + now_ms = int(time.time() * 1000) + notification_msgs = self.consumer.fetch() + logger.info('Receive a notification from dmaap: %s', notification_msgs) + for notification_msg in notification_msgs: + notification_callback = build_callback_notification(now_ms, notification_msg) + if is_activate_callback(notification_callback): + process_notification(notification_callback) + except Exception as e: + logger.error('fetch message from dmaap failed.') + logger.error(e.message) + logger.error(traceback.format_exc()) + + +def is_activate_callback(notification_callback): + has_relevant_artifacts_in_resource = False + has_relevant_artifacts_in_service = False + if notification_callback['resources']: + has_relevant_artifacts_in_resource = True + if notification_callback['serviceArtifacts']: + has_relevant_artifacts_in_service = True + return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service + + +def build_callback_notification(now_ms, notification_msg): + # relevant_resource_instances = build_resource_instances(notification_msg, now_ms) + relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms, + notification_msg['serviceArtifacts']) + # notification_msg['resources'] = relevant_resource_instances + notification_msg['serviceArtifacts'] = relevant_service_artifacts + return notification_msg + + +def build_resource_instances(notification_msg, now_ms): + relevant_resource_instances = [] + resources = notification_msg['resources'] + for resource in resources: + artifacts = resource['artifacts'] + found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts) + if found_relevant_artifacts: + resources['artifacts'] = found_relevant_artifacts + relevant_resource_instances.append(resources) + return relevant_resource_instances + + +def handle_relevant_artifacts(notification_msg, now_ms, artifacts): + relevant_artifacts = [] + for artifact in artifacts: + artifact_type = artifact['artifactType'] + is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST + if is_artifact_relevant: + generated_from_uuid = artifact.get('generatedFromUUID', '') + if generated_from_uuid: + generated_from_artifact = None + for artifact_g in artifacts: + if generated_from_uuid == artifact_g['artifactUUID']: + generated_from_artifact = artifact_g + break + if generated_from_artifact: + is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST + else: + is_artifact_relevant = False + if is_artifact_relevant: + artifact = set_related_artifacts(artifact, notification_msg) + relevant_artifacts.append(artifact) + + # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant) + # if notification_status != 'SUCCESS': + # logger.error("Error failed to send notification status to Dmaap.") + + return relevant_artifacts + + +def set_related_artifacts(artifact, notification_msg): + related_artifacts_uuid = artifact.get('relatedArtifacts', '') + if related_artifacts_uuid: + related_artifacts = [] + for artifact_uuid in related_artifacts_uuid: + related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid)) + artifact['relatedArtifactsInfo'] = related_artifacts + return artifact + + +def get_artifact_metadata(notification_msg, uuid): + service_artifacts = notification_msg['serviceArtifacts'] + ret = None + for artifact in service_artifacts: + if artifact['artifactUUID'] == uuid: + ret = artifact + break + resources = notification_msg['resources'] + if (not ret) and resources: + for resource in resources: + artifacts = resource['artifacts'] + for artifact in artifacts: + if artifact['artifactUUID'] == uuid: + ret = artifact + break + if ret: + break + return ret + + +def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant): + logger.info('start to send notification status') + status = 'FAIL' + if is_artifact_relevant: + notification_status = 'NOTIFIED' + else: + notification_status = 'NOT_NOTIFIED' + request = { + 'distributionID': distribution_id, + 'consumerID': CONSUMER_ID, + 'timestamp': now_ms, + 'artifactURL': artifact['artifactURL'], + 'status': notification_status + } + request_json = json.JSONEncoder().encode(request) + pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria') + logger.info('try to send notification status: %s', request_json) + + try: + pub.send('MyPartitionKey', request_json) + time.sleep(1) + stuck = pub.close(10) + if not stuck: + status = 'SUCCESS' + logger.info('send notification status success.') + else: + logger.error('failed to send notification status, %s messages unsent', len(stuck)) + except Exception as e: + logger.error('failed to send notification status.') + logger.error(e.message) + logger.error(traceback.format_exc()) + + return status + + +def process_notification(msg): + logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources'])) + service_artifacts = msg['serviceArtifacts'] + for artifact in service_artifacts: + if artifact['artifactType'] == 'TOSCA_CSAR': + csar_id = artifact['artifactUUID'] + download_url = artifact['artifactURL'] + localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ns_csar_base = os.path.join(localhost_dir, "csars", "ns") + local_path = os.path.join(ns_csar_base, msg['distributionID']) + file_name = artifact['artifactName'] + csar_version = artifact['artifactVersion'] + sdc.download_artifacts(download_url, local_path, file_name) + # call ns package upload + data = { + 'nsPackageId': csar_id, + 'nsPackageVersion': csar_version, + 'csarName': file_name, + 'csarDir': local_path + } + jobid = uuid.uuid4() + # NsPackageParser(data, jobid).start() + logger.debug(data, jobid) diff --git a/requirements.txt b/requirements.txt index 6e8880a..65354d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,4 +35,6 @@ swagger-spec-validator>=2.1.0 onappylog==1.0.9 # uwsgi for parallel processing -# uwsgi \ No newline at end of file +# uwsgi + +apscheduler==2.1.2 \ No newline at end of file -- cgit 1.2.3-korg