diff options
author | dyh <dengyuanhong@chinamobile.com> | 2020-05-27 10:36:10 +0800 |
---|---|---|
committer | dyh <dengyuanhong@chinamobile.com> | 2020-05-27 12:01:07 +0800 |
commit | fddda96911bdb3ec9841ac71e764cb6eb8fa08d5 (patch) | |
tree | 598d721cdcbf3793828c52134ae93c6061aa9a8f /catalog/pub/Dmaap_lib/dmaap/consumer.py | |
parent | 23785037bc77749a748c875712db84836fdf07e1 (diff) |
rename package name
add function for sdc subscribe and notification
Issue-ID: MODELING-366
Change-Id: I61dba314a62003577eddc640325f9c5b9263b2bc
Signed-off-by: dyh <dengyuanhong@chinamobile.com>
Diffstat (limited to 'catalog/pub/Dmaap_lib/dmaap/consumer.py')
-rw-r--r-- | catalog/pub/Dmaap_lib/dmaap/consumer.py | 91 |
1 files changed, 91 insertions, 0 deletions
diff --git a/catalog/pub/Dmaap_lib/dmaap/consumer.py b/catalog/pub/Dmaap_lib/dmaap/consumer.py new file mode 100644 index 0000000..054791c --- /dev/null +++ b/catalog/pub/Dmaap_lib/dmaap/consumer.py @@ -0,0 +1,91 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import base64 +import hmac +import json +import logging +import datetime +import requests +from hashlib import sha1 + +from ..pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class ConsumerClient: + def __init__(self, host, topic, consumer_group, consumer_id, timeout_ms=-1, limit=-1, filter='', + api_key='', api_secret=''): + self.host = host + self.topic = topic + self.group = consumer_group + self.comsumer_id = consumer_id + self.timeout_ms = timeout_ms + self.limit = limit + self.filter = filter + self.api_key = api_key + self.api_secret = api_secret + + def set_api_credentials(self, api_key, api_secret): + + self.api_key = api_key + self.api_secret = api_secret + + def create_url(self): + url = "http://%s/events/%s/%s/%s" % (self.host, self.topic, self.group, self.comsumer_id) + add_url = "" + if self.timeout_ms > -1: + add_url += "timeout=%s" % self.timeout_ms + if self.limit > -1: + if add_url: + add_url += "&" + add_url += "limit=%s" % self.limit + if self.filter: + if add_url: + add_url += "&" + add_url += "filter=%s" % self.filter.encode("utf-8") + if add_url: + url = url + "?" + add_url + + return url + + def create_headers(self): + data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' + hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest() + signature = base64.b64encode(hmac_code).decode() + auth = self.api_key + ':' + signature + headers = { + 'X-CambriaDate': data, + 'X-CambriaAuth': auth + } + return headers + + def fetch(self): + try: + msgs = [] + url = self.create_url() + if self.api_key: + headers = self.create_headers() + ret = requests.get(url=url, headers=headers) + else: + ret = requests.get(url) + logger.info("Status code is %s, detail is %s.", ret.status_code, ret.json()) + if ret.status_code != 200: + raise DmaapClientException('Call dmaap failed. Status code is %s, detail is %s.' % (ret.status_code, ret.json())) + data = ret.json() + for msg in data: + msg = json.loads(msg) + msgs.append(msg) + return msgs + except Exception as e: + raise DmaapClientException(e.message) |