diff options
Diffstat (limited to 'catalog/pub/Dmaap-lib/dmaap')
-rw-r--r-- | catalog/pub/Dmaap-lib/dmaap/__init__.py | 10 | ||||
-rw-r--r-- | catalog/pub/Dmaap-lib/dmaap/consumer.py | 91 | ||||
-rw-r--r-- | catalog/pub/Dmaap-lib/dmaap/identity.py | 46 | ||||
-rw-r--r-- | catalog/pub/Dmaap-lib/dmaap/publisher.py | 174 |
4 files changed, 321 insertions, 0 deletions
diff --git a/catalog/pub/Dmaap-lib/dmaap/__init__.py b/catalog/pub/Dmaap-lib/dmaap/__init__.py new file mode 100644 index 0000000..0c1e8e1 --- /dev/null +++ b/catalog/pub/Dmaap-lib/dmaap/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/catalog/pub/Dmaap-lib/dmaap/consumer.py b/catalog/pub/Dmaap-lib/dmaap/consumer.py new file mode 100644 index 0000000..054791c --- /dev/null +++ b/catalog/pub/Dmaap-lib/dmaap/consumer.py @@ -0,0 +1,91 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import base64 +import hmac +import json +import logging +import datetime +import requests +from hashlib import sha1 + +from ..pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class ConsumerClient: + def __init__(self, host, topic, consumer_group, consumer_id, timeout_ms=-1, limit=-1, filter='', + api_key='', api_secret=''): + self.host = host + self.topic = topic + self.group = consumer_group + self.comsumer_id = consumer_id + self.timeout_ms = timeout_ms + self.limit = limit + self.filter = filter + self.api_key = api_key + self.api_secret = api_secret + + def set_api_credentials(self, api_key, api_secret): + + self.api_key = api_key + self.api_secret = api_secret + + def create_url(self): + url = "http://%s/events/%s/%s/%s" % (self.host, self.topic, self.group, self.comsumer_id) + add_url = "" + if self.timeout_ms > -1: + add_url += "timeout=%s" % self.timeout_ms + if self.limit > -1: + if add_url: + add_url += "&" + add_url += "limit=%s" % self.limit + if self.filter: + if add_url: + add_url += "&" + add_url += "filter=%s" % self.filter.encode("utf-8") + if add_url: + url = url + "?" + add_url + + return url + + def create_headers(self): + data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' + hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest() + signature = base64.b64encode(hmac_code).decode() + auth = self.api_key + ':' + signature + headers = { + 'X-CambriaDate': data, + 'X-CambriaAuth': auth + } + return headers + + def fetch(self): + try: + msgs = [] + url = self.create_url() + if self.api_key: + headers = self.create_headers() + ret = requests.get(url=url, headers=headers) + else: + ret = requests.get(url) + logger.info("Status code is %s, detail is %s.", ret.status_code, ret.json()) + if ret.status_code != 200: + raise DmaapClientException('Call dmaap failed. Status code is %s, detail is %s.' % (ret.status_code, ret.json())) + data = ret.json() + for msg in data: + msg = json.loads(msg) + msgs.append(msg) + return msgs + except Exception as e: + raise DmaapClientException(e.message) diff --git a/catalog/pub/Dmaap-lib/dmaap/identity.py b/catalog/pub/Dmaap-lib/dmaap/identity.py new file mode 100644 index 0000000..1dcaad8 --- /dev/null +++ b/catalog/pub/Dmaap-lib/dmaap/identity.py @@ -0,0 +1,46 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import logging +import requests + +from ..pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class IdentityClient: + def __init__(self, host): + self.host = host + + def create_apikey(self, email, description): + try: + headers = {'content-type': 'application/json;charset=UTF-8'} + data = { + 'email': email, + 'description': description + } + data = json.JSONEncoder().encode(data) + url = "http://%s/apiKeys/create" % (self.host) + ret = requests.post(url=url, data=data, headers=headers) + logger.info('create apiKey, response status_code: %s, body: %s', ret.status_code, ret.json()) + if ret.status_code != 200: + raise DmaapClientException(ret.json()) + ret = ret.json() + resp_data = { + 'apiKey': ret.get('key', ''), + 'apiSecret': ret.get('secret', '') + } + return resp_data + except Exception as e: + raise DmaapClientException('create apikey from dmaap failed: ' + e.message) diff --git a/catalog/pub/Dmaap-lib/dmaap/publisher.py b/catalog/pub/Dmaap-lib/dmaap/publisher.py new file mode 100644 index 0000000..643ba90 --- /dev/null +++ b/catalog/pub/Dmaap-lib/dmaap/publisher.py @@ -0,0 +1,174 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import base64 +import datetime +import hmac +import json +import logging +import time +from hashlib import sha1 + +import requests +from apscheduler.scheduler import Scheduler + +from ..pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class BatchPublisherClient: + def __init__(self, host, topic, partition="", contenttype="text/plain", max_batch_size=100, max_batch_age_ms=1000): + self.host = host + self.topic = topic + self.partition = partition + self.contenttype = contenttype + self.max_batch_size = max_batch_size + self.max_batch_age_ms = max_batch_age_ms + self.pending = [] + self.closed = False + self.dont_send_until_ms = 0 + self.scheduler = Scheduler(standalone=False) + self.api_key = '', + self.api_secret = '' + + @self.scheduler.interval_schedule(second=1) + def crawl_job(): + self.send_message(False) + self.scheduler.start() + + def set_api_credentials(self, api_key, api_secret): + self.api_key = api_key + self.api_secret = api_secret + + def send(self, partition, msg): + try: + if self.closed: + raise DmaapClientException("The publisher was closed.") + message = Message(partition, msg) + self.pending.append(message) + return len(self.pending) + except Exception as e: + raise DmaapClientException("append message failed: " + e.message) + + def send_message(self, force): + if force or self.should_send_now(): + if not self.send_batch(): + logger.error("Send failed, %s message to send.", len(self.pending)) + + def should_send_now(self): + should_send = False + if len(self.pending) > 0: + now_ms = int(time.time() * 1000) + should_send = len(self.pending) >= self.max_batch_size + if not should_send: + send_at_ms = self.pending[0].timestamp_ms + should_send = send_at_ms <= now_ms + + should_send = should_send and now_ms >= self.dont_send_until_ms + + return should_send + + def send_batch(self): + if len(self.pending) < 1: + return True + now_ms = int(time.time() * 1000) + url = self.create_url() + logger.info("sending %s msgs to %s . Oldest: %s ms", len(self.pending), url, + str(now_ms - self.pending[0].timestamp_ms)) + try: + str_msg = '' + if self.contenttype == "application/json": + str_msg = self.parse_json() + elif self.contenttype == "text/plain": + for m in self.pending: + str_msg += m.msg + str_msg += '\n' + elif self.contenttype == "application/cambria": + for m in self.pending: + str_msg += str(len(m.partition)) + str_msg += '.' + str_msg += str(len(m.msg)) + str_msg += '.' + str_msg += m.partition + str_msg += m.msg + str_msg += '\n' + else: + for m in self.pending: + str_msg += m.msg + msg = bytearray(str_msg) + + start_ms = int(time.time() * 1000) + if self.api_key: + headers = self.create_headers() + else: + headers = {'content-type': self.contenttype} + ret = requests.post(url=url, data=msg, headers=headers) + if ret.status_code < 200 or ret.status_code > 299: + return False + logger.info("MR reply ok (%s ms): %s", start_ms - int(time.time() * 1000), ret.json()) + self.pending = [] + return True + + except Exception as e: + logger.error(e.message) + return False + + def create_headers(self): + data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' + hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest() + signature = base64.b64encode(hmac_code).decode() + auth = self.api_key + ':' + signature + headers = { + 'X-CambriaDate': data, + 'X-CambriaAuth': auth, + 'content-type': self.contenttype + } + return headers + + def create_url(self): + url = "http://%s/events/%s" % (self.host, self.topic) + if self.partition: + url = url + "?partitionKey=" + self.partition + return url + + def parse_json(self): + data = [] + for message in self.pending: + msg = json.loads(message.msg) + for m in msg: + data.append(m) + return json.dumps(data) + + def close(self, timeout): + try: + self.closed = True + self.scheduler.shutdown() + now_ms = int(time.time() * 1000) + wait_in_ms = now_ms + timeout * 1000 + + while int(time.time() * 1000) < wait_in_ms and len(self.pending) > 0: + self.send_message(True) + time.sleep(0.25) + return self.pending + except Exception as e: + raise DmaapClientException("send message failed: " + e.message) + + +class Message: + def __init__(self, partition, msg): + if not partition: + self.partition = "" + else: + self.partition = partition + self.msg = msg + self.timestamp_ms = int(time.time() * 1000) |