1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# ============LICENSE_START====================================================
# org.onap.ccsdk
# =============================================================================
# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
# =============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END======================================================
import consul
import json
from urlparse import urlparse
class ConsulHandle(object):
'''
Provide access to Consul KV store and service discovery
'''
def __init__(self, api_url, user, password, logger):
'''
Constructor
'''
u = urlparse(api_url)
self.ch = consul.Consul(host=u.hostname, port=u.port, scheme=u.scheme)
def get_config(self, key):
'''
Get configuration information from Consul using the provided key.
It should be in JSON form. Convert it to a dictionary
'''
(index, val) = self.ch.kv.get(key)
config = json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate
return config
def get_service(self,service_name):
'''
Look up the service named service_name in Consul.
Return the service address and port.
'''
(index, val) = self.ch.catalog.service(service_name)
if len(val) > 0: # catalog.service returns an empty array if service not found
service = val[0] # Could be multiple listings, but we take the first
if ('ServiceAddress' in service) and (len(service['ServiceAddress']) > 0):
service_address = service['ServiceAddress'] # Most services should have this
else:
service_address = service['Address'] # "External" services will have this only
service_port = service['ServicePort']
else:
raise Exception('Could not find service information for "{0}"'.format(service_name))
return service_address, service_port
def add_to_entry(self, key, add_name, add_value):
'''
Find 'key' in consul.
Treat its value as a JSON string representing a dict.
Extend the dict by adding an entry with key 'add_name' and value 'add_value'.
Turn the resulting extended dict into a JSON string.
Store the string back into Consul under 'key'.
Watch out for conflicting concurrent updates.
Example:
Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}'
add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'})
should result in the value for key 'xyz:dmaap' in consul being updated to
'{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}'
'''
while True: # do until update succeeds
(index, val) = self.ch.kv.get(key) # index gives version of key retrieved
if val is None: # no key yet
vstring = '{}'
mod_index = 0 # Use 0 as the cas index for initial insertion of the key
else:
vstring = val['Value']
mod_index = val['ModifyIndex']
# Build the updated dict
# Exceptions just propagate
v = json.loads(vstring)
v[add_name] = add_value
new_vstring = json.dumps(v)
updated = self.ch.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false
if updated:
break
def delete_entry(self,entry_name):
'''
Delete an entire key-value entry whose key is 'entry_name' from the Consul KV store.
Note that the kv.delete() operation always returns True,
whether there's an entry with key 'entry_name' exists or not. This doesn't seem like
a great design, but it means it's safe to try to delete the same entry repeatedly.
Note also in our application for this plugin, the uninstall workflow will always delete all of the topics and
feeds we've stored into the 'component_name:dmaap' entry.
Given the two foregoing notes, it is safe for this plugin to attempt to delete the entire
'component_name:dmaap' entry any time it performs an 'unlink' operation for a publishes or
subscribes relationship. The first unlink will actually remove the entry, the subsequent ones
will harmlessly try to remove it again.
The 'correct' approach would be to have a delete_from_entry(self, key, delete_name) that fetches
the entry from Consul, removes only the topic or feed being unlinked, and then puts the resulting
entry back into Consul. It would be very similar to add_from_entry. When there's nothing left
in the entry, then the entire entry would be deleted.
'''
self.ch.kv.delete(entry_name)
|