From d62a90a858728fa84acf2eade65f9909f67b7513 Mon Sep 17 00:00:00 2001 From: yangyan Date: Mon, 21 Oct 2019 15:10:29 +0800 Subject: Integrate Dmaap-lib to catalog Change-Id: I693a41b9f423028cc1b125be2ce6457aece10389 Issue-ID: VFC-1557 Signed-off-by: yangyan --- .../nfvo-dmaap-nfvo-v4.0-devel-ns/README.md | 2 + .../client/__init__.py | 10 ++ .../client/dmaap/__init__.py | 10 ++ .../client/dmaap/consumer.py | 87 +++++++++++ .../client/dmaap/identity.py | 46 ++++++ .../client/dmaap/publisher.py | 172 +++++++++++++++++++++ .../client/pub/__init__.py | 10 ++ .../client/pub/exceptions.py | 15 ++ .../nfvo-dmaap-nfvo-v4.0-devel-ns/setup.py | 21 +++ .../nfvo-dmaap-python-lib/pax_global_header | 1 + requirements.txt | 5 +- 11 files changed, 378 insertions(+), 1 deletion(-) create mode 100644 catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/README.md create mode 100644 catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/__init__.py create mode 100644 catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/__init__.py create mode 100644 catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/consumer.py create mode 100644 catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/identity.py create mode 100644 catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/publisher.py create mode 100644 catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/pub/__init__.py create mode 100644 catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/pub/exceptions.py create mode 100644 catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/setup.py create mode 100644 catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/pax_global_header diff --git a/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/README.md b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/README.md new file mode 100644 index 00000000..67e1a1c3 --- /dev/null +++ b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/README.md @@ -0,0 +1,2 @@ +# nfvo-dmaap + diff --git a/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/__init__.py b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/__init__.py new file mode 100644 index 00000000..0c1e8e15 --- /dev/null +++ b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/__init__.py b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/__init__.py new file mode 100644 index 00000000..0c1e8e15 --- /dev/null +++ b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/consumer.py b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/consumer.py new file mode 100644 index 00000000..e5677d8c --- /dev/null +++ b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/consumer.py @@ -0,0 +1,87 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import base64 +import hmac +import json +import logging +import datetime +import requests +from hashlib import sha1 + +from client.pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class ConsumerClient: + def __init__(self, host, topic, consumer_group, consumer_id, timeout_ms=-1, limit=-1, filter=''): + self.host = host + self.topic = topic + self.group = consumer_group + self.comsumer_id = consumer_id + self.timeout_ms = timeout_ms + self.limit = limit + self.filter = filter + + def set_api_credentials(self, api_key, api_secret): + self.api_key = api_key + self.api_secret = api_secret + + def create_url(self): + url = "http://%s/events/%s/%s/%s" % (self.host, self.topic, self.group, self.comsumer_id) + add_url = "" + if self.timeout_ms > -1: + add_url += "timeout=%s" % self.timeout_ms + if self.limit > -1: + if add_url: + add_url += "&" + add_url += "limit=%s" % self.limit + if self.filter: + if add_url: + add_url += "&" + add_url += "filter=%s" % self.filter.encode("utf-8") + if add_url: + url = url + "?" + add_url + + return url + + def create_headers(self): + data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' + hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest() + signature = base64.b64encode(hmac_code).decode() + auth = self.api_key + ':' + signature + headers = { + 'X-CambriaDate': data, + 'X-CambriaAuth': auth + } + return headers + + def fetch(self): + try: + msgs = [] + url = self.create_url() + if self.api_key: + headers = self.create_headers() + ret = requests.get(url=url, headers=headers) + else: + ret = requests.get(url) + logger.info("Status code is %s, detail is %s.", ret.status_code, ret.json()) + if ret.status_code != 200: + raise DmaapClientException('Call dmaap failed. Status code is %s, detail is %s.' % (ret.status_code, ret.json())) + data = ret.json() + for msg in data: + msg = json.loads(msg) + msgs.append(msg) + return msgs + except Exception as e: + raise DmaapClientException(e.message) diff --git a/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/identity.py b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/identity.py new file mode 100644 index 00000000..0c8b90db --- /dev/null +++ b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/identity.py @@ -0,0 +1,46 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import logging +import requests + +from client.pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class IdentityClient: + def __init__(self, host): + self.host = host + + def create_apikey(self, email, description): + try: + headers = {'content-type': 'application/json;charset=UTF-8'} + data = { + 'email': email, + 'description': description + } + data = json.JSONEncoder().encode(data) + url = "http://%s/apiKeys/create" % (self.host) + ret = requests.post(url=url, data=data, headers=headers) + logger.info('create apiKey, response status_code: %s, body: %s', ret.status_code, ret.json()) + if ret.status_code != 200: + raise DmaapClientException(ret.json()) + ret = ret.json() + resp_data = { + 'apiKey': ret.get('key', ''), + 'apiSecret': ret.get('secret', '') + } + return resp_data + except Exception as e: + raise DmaapClientException('create apikey from dmaap failed: ' + e.message) diff --git a/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/publisher.py b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/publisher.py new file mode 100644 index 00000000..e5d19871 --- /dev/null +++ b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/dmaap/publisher.py @@ -0,0 +1,172 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import base64 +import datetime +import hmac +import json +import logging +import time +from hashlib import sha1 + +import requests +from apscheduler.scheduler import Scheduler + +from client.pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class BatchPublisherClient: + def __init__(self, host, topic, partition="", contenttype="text/plain", max_batch_size=100, max_batch_age_ms=1000): + self.host = host + self.topic = topic + self.partition = partition + self.contenttype = contenttype + self.max_batch_size = max_batch_size + self.max_batch_age_ms = max_batch_age_ms + self.pending = [] + self.closed = False + self.dont_send_until_ms = 0 + self.scheduler = Scheduler(standalone=False) + + @self.scheduler.interval_schedule(second=1) + def crawl_job(): + self.send_message(False) + self.scheduler.start() + + def set_api_credentials(self, api_key, api_secret): + self.api_key = api_key + self.api_secret = api_secret + + def send(self, partition, msg): + try: + if self.closed: + raise DmaapClientException("The publisher was closed.") + message = Message(partition, msg) + self.pending.append(message) + return len(self.pending) + except Exception as e: + raise DmaapClientException("append message failed: " + e.message) + + def send_message(self, force): + if force or self.should_send_now(): + if not self.send_batch(): + logger.error("Send failed, %s message to send.", len(self.pending)) + + def should_send_now(self): + should_send = False + if len(self.pending) > 0: + now_ms = int(time.time() * 1000) + should_send = len(self.pending) >= self.max_batch_size + if not should_send: + send_at_ms = self.pending[0].timestamp_ms + should_send = send_at_ms <= now_ms + + should_send = should_send and now_ms >= self.dont_send_until_ms + + return should_send + + def send_batch(self): + if len(self.pending) < 1: + return True + now_ms = int(time.time() * 1000) + url = self.create_url() + logger.info("sending %s msgs to %s . Oldest: %s ms", len(self.pending), url, + str(now_ms - self.pending[0].timestamp_ms)) + try: + str_msg = '' + if self.contenttype == "application/json": + str_msg = self.parse_json() + elif self.contenttype == "text/plain": + for m in self.pending: + str_msg += m.msg + str_msg += '\n' + elif self.contenttype == "application/cambria": + for m in self.pending: + str_msg += str(len(m.partition)) + str_msg += '.' + str_msg += str(len(m.msg)) + str_msg += '.' + str_msg += m.partition + str_msg += m.msg + str_msg += '\n' + else: + for m in self.pending: + str_msg += m.msg + msg = bytearray(str_msg) + + start_ms = int(time.time() * 1000) + if self.api_key: + headers = self.create_headers() + else: + headers = {'content-type': self.contenttype} + ret = requests.post(url=url, data=msg, headers=headers) + if ret.status_code < 200 or ret.status_code > 299: + return False + logger.info("MR reply ok (%s ms): %s", start_ms - int(time.time() * 1000), ret.json()) + self.pending = [] + return True + + except Exception as e: + logger.error(e.message) + return False + + def create_headers(self): + data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' + hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest() + signature = base64.b64encode(hmac_code).decode() + auth = self.api_key + ':' + signature + headers = { + 'X-CambriaDate': data, + 'X-CambriaAuth': auth, + 'content-type': self.contenttype + } + return headers + + def create_url(self): + url = "http://%s/events/%s" % (self.host, self.topic) + if self.partition: + url = url + "?partitionKey=" + self.partition + return url + + def parse_json(self): + data = [] + for message in self.pending: + msg = json.loads(message.msg) + for m in msg: + data.append(m) + return json.dumps(data) + + def close(self, timeout): + try: + self.closed = True + self.scheduler.shutdown() + now_ms = int(time.time() * 1000) + wait_in_ms = now_ms + timeout * 1000 + + while int(time.time() * 1000) < wait_in_ms and len(self.pending) > 0: + self.send_message(True) + time.sleep(0.25) + return self.pending + except Exception as e: + raise DmaapClientException("send message failed: " + e.message) + + +class Message: + def __init__(self, partition, msg): + if not partition: + self.partition = "" + else: + self.partition = partition + self.msg = msg + self.timestamp_ms = int(time.time() * 1000) diff --git a/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/pub/__init__.py b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/pub/__init__.py new file mode 100644 index 00000000..0c1e8e15 --- /dev/null +++ b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/pub/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/pub/exceptions.py b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/pub/exceptions.py new file mode 100644 index 00000000..6b65fcf8 --- /dev/null +++ b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/client/pub/exceptions.py @@ -0,0 +1,15 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class DmaapClientException(Exception): + def __init__(self, msg): + Exception.__init__(self, msg) diff --git a/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/setup.py b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/setup.py new file mode 100644 index 00000000..7f7b2ccd --- /dev/null +++ b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/nfvo-dmaap-nfvo-v4.0-devel-ns/setup.py @@ -0,0 +1,21 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from distutils.core import setup + +setup( + name="dmaapclient", + version="1.0", + author="cmcc", + author_email="chinamobile.com", + url="", + packages=["client", "client.dmaap", "client.pub"], requires=['requests', 'apscheduler'] +) diff --git a/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/pax_global_header b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/pax_global_header new file mode 100644 index 00000000..e0b3646a --- /dev/null +++ b/catalog/pub/Dmaap-lib/nfvo-dmaap-python-lib/pax_global_header @@ -0,0 +1 @@ +52 comment=0388c8ff7aa45457a50bb1701d9ff5ed05553f5c diff --git a/requirements.txt b/requirements.txt index b8b5e85d..d9592616 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,4 +35,7 @@ swagger-spec-validator>=2.1.0 onappylog==1.0.9 # uwsgi for parallel processing -# uwsgi \ No newline at end of file +# uwsgi + +# Demaap-lib +apscheduler \ No newline at end of file -- cgit 1.2.3-korg