From 0619cb3772a12e0a5a3aecc09b96eeb7df20a000 Mon Sep 17 00:00:00 2001 From: Jack Lucas Date: Fri, 1 Sep 2017 13:48:08 +0000 Subject: Add seed code for DMaaP plugin Change-Id: I8c7a9c432badd3052a571ed87b9b580760b376e6 Issue-Id: CCSDK-65 Signed-off-by: Jack Lucas --- dmaap/consulif/consulif.py | 120 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 dmaap/consulif/consulif.py (limited to 'dmaap/consulif/consulif.py') diff --git a/dmaap/consulif/consulif.py b/dmaap/consulif/consulif.py new file mode 100644 index 0000000..e742895 --- /dev/null +++ b/dmaap/consulif/consulif.py @@ -0,0 +1,120 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +import consul +import json +from urlparse import urlparse + +class ConsulHandle(object): + ''' + Provide access to Consul KV store and service discovery + ''' + + def __init__(self, api_url, user, password, logger): + ''' + Constructor + ''' + u = urlparse(api_url) + self.ch = consul.Consul(host=u.hostname, port=u.port, scheme=u.scheme) + + def get_config(self, key): + ''' + Get configuration information from Consul using the provided key. + It should be in JSON form. Convert it to a dictionary + ''' + (index, val) = self.ch.kv.get(key) + config = json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate + return config + + def get_service(self,service_name): + ''' + Look up the service named service_name in Consul. + Return the service address and port. + ''' + (index, val) = self.ch.catalog.service(service_name) + if len(val) > 0: # catalog.service returns an empty array if service not found + service = val[0] # Could be multiple listings, but we take the first + if ('ServiceAddress' in service) and (len(service['ServiceAddress']) > 0): + service_address = service['ServiceAddress'] # Most services should have this + else: + service_address = service['Address'] # "External" services will have this only + service_port = service['ServicePort'] + else: + raise Exception('Could not find service information for "{0}"'.format(service_name)) + + return service_address, service_port + + def add_to_entry(self, key, add_name, add_value): + ''' + Find 'key' in consul. + Treat its value as a JSON string representing a dict. + Extend the dict by adding an entry with key 'add_name' and value 'add_value'. + Turn the resulting extended dict into a JSON string. + Store the string back into Consul under 'key'. + Watch out for conflicting concurrent updates. + + Example: + Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}' + add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'}) + should result in the value for key 'xyz:dmaap' in consul being updated to + '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}' + ''' + + while True: # do until update succeeds + (index, val) = self.ch.kv.get(key) # index gives version of key retrieved + + if val is None: # no key yet + vstring = '{}' + mod_index = 0 # Use 0 as the cas index for initial insertion of the key + else: + vstring = val['Value'] + mod_index = val['ModifyIndex'] + + # Build the updated dict + # Exceptions just propagate + v = json.loads(vstring) + v[add_name] = add_value + new_vstring = json.dumps(v) + + updated = self.ch.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false + if updated: + break + + + def delete_entry(self,entry_name): + ''' + Delete an entire key-value entry whose key is 'entry_name' from the Consul KV store. + + Note that the kv.delete() operation always returns True, + whether there's an entry with key 'entry_name' exists or not. This doesn't seem like + a great design, but it means it's safe to try to delete the same entry repeatedly. + + Note also in our application for this plugin, the uninstall workflow will always delete all of the topics and + feeds we've stored into the 'component_name:dmaap' entry. + + Given the two foregoing notes, it is safe for this plugin to attempt to delete the entire + 'component_name:dmaap' entry any time it performs an 'unlink' operation for a publishes or + subscribes relationship. The first unlink will actually remove the entry, the subsequent ones + will harmlessly try to remove it again. + + The 'correct' approach would be to have a delete_from_entry(self, key, delete_name) that fetches + the entry from Consul, removes only the topic or feed being unlinked, and then puts the resulting + entry back into Consul. It would be very similar to add_from_entry. When there's nothing left + in the entry, then the entire entry would be deleted. + ''' + self.ch.kv.delete(entry_name) -- cgit 1.2.3-korg