summaryrefslogtreecommitdiffstats
path: root/cdap/cdapplugin/cdapcloudify/discovery.py
blob: c654cbb3747cf0c9241e760323dd01d824494a26 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# org.onap.dcae
# ================================================================================
# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#      http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.

import requests
import json

CONSUL_HOST = "http://localhost:8500"

def _get_connection_info_from_consul(service_component_name, logger):
    """
    Call consul's catalog
    TODO: currently assumes there is only one service
    """
    url = "{0}/v1/catalog/service/{1}".format(CONSUL_HOST, service_component_name)
    logger.info("Trying to query: {0}".format(url))
    res = requests.get(url)
    res.raise_for_status()
    services = res.json()
    return services[0]["ServiceAddress"], services[0]["ServicePort"]

def _get_broker_url(cdap_broker_name, service_component_name, logger):
    """
    fetch the broker connection information from Consul
    """
    broker_ip, broker_port = _get_connection_info_from_consul(cdap_broker_name, logger)
    broker_url = "http://{ip}:{port}/application/{appname}".format(ip=broker_ip, port=broker_port, appname=service_component_name)
    logger.info("Trying to connect to broker endpoint: {0}".format(broker_url))
    return broker_url

"""
public
"""
def put_broker(cdap_broker_name, 
               service_component_name, 
               namespace, 
               streamname, 
               jar_url,
               artifact_name, 
               artifact_version,
               app_config,
               app_preferences, 
               service_endpoints, 
               programs, 
               program_preferences, 
               logger):
    """
    Conforms to Broker API 4.X
    """

    data = dict()
    data["cdap_application_type"] = "program-flowlet"
    data["namespace"] = namespace
    data["streamname"] = streamname
    data["jar_url"] = jar_url
    data["artifact_name"] = artifact_name
    data["artifact_version"] = artifact_version
    data["app_config"] = app_config
    data["app_preferences"] = app_preferences
    data["services"] = service_endpoints
    data["programs"] = programs
    data["program_preferences"] = program_preferences
    
    #register with the broker
    response = requests.put(_get_broker_url(cdap_broker_name, service_component_name, logger),
                            json = data, 
                            headers = {'content-type':'application/json'})
    logger.info((response, response.status_code, response.text))
    response.raise_for_status() #bomb if not 2xx

def reconfigure_in_broker(cdap_broker_name, 
                          service_component_name, 
                          config,
                          reconfiguration_type,
                          logger):
    #trigger a reconfiguration with the broker
    #man am I glad I broke the broker API from 3 to 4 to standardize this interface because now I only need one function here
    response = requests.put("{u}/reconfigure".format(u = _get_broker_url(cdap_broker_name, service_component_name, logger)),
                            headers = {'content-type':'application/json'},
                            json = {"reconfiguration_type" : reconfiguration_type, 
                                    "config" : config})
    logger.info((response, response.status_code, response.text))
    response.raise_for_status() #bomb if not 2xx

def delete_on_broker(cdap_broker_name, service_component_name, logger):
    #deregister with the broker
    response = requests.delete(_get_broker_url(cdap_broker_name, service_component_name, logger))
    logger.info((response, response.status_code, response.text))
    response.raise_for_status() #bomb if not 2xx

def delete_all_registered_apps(cdap_broker_name, logger):
    #get the broker connection
    broker_ip, broker_port = _get_connection_info_from_consul(cdap_broker_name, logger)
    broker_url = "http://{ip}:{port}".format(ip=broker_ip, port=broker_port)

    #binge and purge
    logger.info("Trying to connect to broker called {0} at {1}".format(cdap_broker_name, broker_url))
    registered_apps = json.loads(requests.get("{0}/application".format(broker_url)).text) #should be proper list of strings (appnames)
    logger.info("Trying to delete: {0}".format(registered_apps))
    r = requests.post("{0}/application/delete".format(broker_url),
                 headers = {'content-type':'application/json'},
                 json = {"appnames" : registered_apps})
    logger.info("Response: {0}, Response Status: {1}".format(r.text, r.status_code))