1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# org.onap.dcae
# ================================================================================
# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
import requests
import json
CONSUL_HOST = "http://localhost:8500"
def _get_connection_info_from_consul(service_component_name, logger):
"""
Call consul's catalog
TODO: currently assumes there is only one service
"""
url = "{0}/v1/catalog/service/{1}".format(CONSUL_HOST, service_component_name)
logger.info("Trying to query: {0}".format(url))
res = requests.get(url)
res.raise_for_status()
services = res.json()
return services[0]["ServiceAddress"], services[0]["ServicePort"]
def _get_broker_url(cdap_broker_name, service_component_name, logger):
"""
fetch the broker connection information from Consul
"""
broker_ip, broker_port = _get_connection_info_from_consul(cdap_broker_name, logger)
broker_url = "http://{ip}:{port}/application/{appname}".format(ip=broker_ip, port=broker_port, appname=service_component_name)
logger.info("Trying to connect to broker endpoint: {0}".format(broker_url))
return broker_url
"""
public
"""
def put_broker(cdap_broker_name,
service_component_name,
namespace,
streamname,
jar_url,
artifact_name,
artifact_version,
app_config,
app_preferences,
service_endpoints,
programs,
program_preferences,
logger):
"""
Conforms to Broker API 4.X
"""
data = dict()
data["cdap_application_type"] = "program-flowlet"
data["namespace"] = namespace
data["streamname"] = streamname
data["jar_url"] = jar_url
data["artifact_name"] = artifact_name
data["artifact_version"] = artifact_version
data["app_config"] = app_config
data["app_preferences"] = app_preferences
data["services"] = service_endpoints
data["programs"] = programs
data["program_preferences"] = program_preferences
#register with the broker
response = requests.put(_get_broker_url(cdap_broker_name, service_component_name, logger),
json = data,
headers = {'content-type':'application/json'})
logger.info((response, response.status_code, response.text))
response.raise_for_status() #bomb if not 2xx
def reconfigure_in_broker(cdap_broker_name,
service_component_name,
config,
reconfiguration_type,
logger):
#trigger a reconfiguration with the broker
#man am I glad I broke the broker API from 3 to 4 to standardize this interface because now I only need one function here
response = requests.put("{u}/reconfigure".format(u = _get_broker_url(cdap_broker_name, service_component_name, logger)),
headers = {'content-type':'application/json'},
json = {"reconfiguration_type" : reconfiguration_type,
"config" : config})
logger.info((response, response.status_code, response.text))
response.raise_for_status() #bomb if not 2xx
def delete_on_broker(cdap_broker_name, service_component_name, logger):
#deregister with the broker
response = requests.delete(_get_broker_url(cdap_broker_name, service_component_name, logger))
logger.info((response, response.status_code, response.text))
response.raise_for_status() #bomb if not 2xx
def delete_all_registered_apps(cdap_broker_name, logger):
#get the broker connection
broker_ip, broker_port = _get_connection_info_from_consul(cdap_broker_name, logger)
broker_url = "http://{ip}:{port}".format(ip=broker_ip, port=broker_port)
#binge and purge
logger.info("Trying to connect to broker called {0} at {1}".format(cdap_broker_name, broker_url))
registered_apps = json.loads(requests.get("{0}/application".format(broker_url)).text) #should be proper list of strings (appnames)
logger.info("Trying to delete: {0}".format(registered_apps))
r = requests.post("{0}/application/delete".format(broker_url),
headers = {'content-type':'application/json'},
json = {"appnames" : registered_apps})
logger.info("Response: {0}, Response Status: {1}".format(r.text, r.status_code))
|