summaryrefslogtreecommitdiffstats
path: root/dcae-cli/dcae_cli/util/cdap_util.py
blob: a38f5304c5027b29454f4269470b5a2692433861 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# ============LICENSE_START=======================================================
# org.onap.dcae
# ================================================================================
# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.

# -*- coding: utf-8 -*-
"""
Provides utilities for cdap components
"""
import logging
import json
import requests
import six

from dcae_cli.util.logger import get_logger
from dcae_cli.util.exc import DcaeException
from dcae_cli.util import discovery

_logger = get_logger('cdap-utils')
_logger.setLevel(logging.DEBUG)

#HELPER FUNCTIONS
def _merge_spec_config_into_broker_put(jar, config, spec, params, templated_conf):
    """
    The purpose of this function is to form the CDAP Broker PUT from the CDAP compponent jar, spec, config, and params, where:
        - jar is a URL
        - config is the CDAP "auxillary file"
        - spec is the CDAP component specification
        - params contains the subkeys "app_config", "app_preferences", "program_preferences" from the parameters config specification
           - (this last one isn't REALLY needed because it is a subset of "spec", but some preprocessing has already been done, specifically "normalize_cdap_params"

    The CDAP Broker API existed well before the component spec, so there is overlap with different naming.
    In the future, if this component spec becomes production and everyone follows it, 
      I will change the broker API to use the same keys so that this mapping becomes unneccessary.
    However, while this is still a moving project, I am simply going to do a horrible mapping here. 

    The CDAP broker PUT looks as follows:
        {
            "service_component_type" : ...,
            "jar_url" : ...,
            "artifact_name" : ...,
            "artifact_version" : ...,
            "app_config" : ...,
            "app_preferences" : ...,
            "program_preferences": ...,
            "programs": ...,
            "streamname" : ...,
            "namespace" : ...,
            "service_endpoints" : ...
        }    

    "So you cooked up a story and dropped the six of us into a *meat grinder*" - Arnold Schwarzenegger, Predator.
    
    #RE: Streams/consumes: this is used in the designer for validation but does not lead to anything in the CDAP developers configuration. 
    """

    #map services/provides into service_endpoints broker JSON
    services = spec["services"]["provides"] # is [] if empty
    se = []
    if services != []:
        for s in services:
            se.append({"service_name" : s["service_name"], "service_endpoint" : s["service_endpoint"], "endpoint_method" : s["verb"]})

    BrokerPut = {
        "cdap_application_type" : "program-flowlet", #TODO! Fix this once Hydrator apps is integrated into this CLI tool. 
        "service_component_type" : spec["self"]["component_type"],
        "jar_url" : jar,
        "artifact_version" : config["artifact_version"],
        "artifact_name" : config["artifact_name"],
        "artifact_version" : config["artifact_version"],
        "programs": config["programs"],
        "streamname" : config["streamname"],
        "services" : se,
    }
   
    Optionals = {v : config[v] for v in [i for i in ["namespace"] if i in config]}

    #not a fan of whatever is going on in update such that I can't return this in single line
    BrokerPut.update(Optionals)
    BrokerPut.update(params)

    # NOTE: app_config comes from params
    BrokerPut["app_config"]["services_calls"] = templated_conf["services_calls"]
    BrokerPut["app_config"]["streams_publishes"] = templated_conf["streams_publishes"]
    BrokerPut["app_config"]["streams_subscribes"] = templated_conf["streams_subscribes"]

    return BrokerPut

def _get_broker_url_from_profile(profile):
    """
    Gets the broker URL from profile
    """
    #Functions named so well you don't need docstrings. (C) tombo 2017
    res = requests.get("http://{0}:8500/v1/catalog/service/{1}".format(profile.consul_host, profile.cdap_broker)).json()
    return "http://{ip}:{port}".format(ip=res[0]["ServiceAddress"], port=res[0]["ServicePort"])

#PUBLIC 
def run_component(catalog, params, instance_name, profile, jar, config, spec, templated_conf):
    """
    Runs a CDAP Component
    
    By the time this function is called, the instance_name and instance_name:rel have already been pushed into consul by this parent function
    instance_name will be overwritten by the broker and the rels key will be used by the broker to call the CBS
    """
    broker_url = _get_broker_url_from_profile(profile)

    #register with the broker
    broker_put = _merge_spec_config_into_broker_put(jar, config, spec, params, templated_conf)
    
    #helps the component developer debug their spec if CDAP throws a 400
    _logger.info("Your (unbound, bound will be shown if deployment completes) app_config is being sent as")
    _logger.info(json.dumps(broker_put["app_config"]))

    _logger.info("Your app_preferences are being sent as")
    _logger.info(json.dumps(broker_put["app_preferences"]))

    _logger.info("Your program_preferences are being sent as")
    _logger.info(json.dumps(broker_put["program_preferences"]))

    response = requests.put("{brokerurl}/application/{appname}".format(brokerurl=broker_url, appname=instance_name),
                            json = broker_put, 
                            headers = {'content-type':'application/json'})
    
    deploy_success = False
    try: 
        response.raise_for_status() #bomb if not 2xx
        deploy_success = True
    except:
        #need this to raise a dirty status code for tests to work, so not just logging
        raise DcaeException("A Deployment Error Occured. Broker Response: {0}, Broker Response Text: {1}".format(response.status_code, response.text))

    if deploy_success:
        #TODO: not sure what this error handling looks like, should never happen that a deploy succeeds but this get fails
        #Get the cluster URL to tell the user to go check their application
        response = requests.get(broker_url)
        response.raise_for_status() #bomb if not 2xx
        cdap_cluster = response.json()["managed cdap url"]

        #Fetch the Application's AppConfig to show them what the bound config looks like:
        #TODO: This should be an endpoint in the broker. I filed an issue in the broker. For now, do the horrendous special character mapping here.
        #TODO: This only fetches AppConfig, add AppPreferences
        ns = "default" if "namespace" not in broker_put else broker_put["namespace"]
        mapped_appname = ''.join(e for e in instance_name if e.isalnum()) 
        r = requests.get("{0}/v3/namespaces/{1}/apps/{2}".format(cdap_cluster, ns, mapped_appname)).json()
        config = r["configuration"]

        _logger.info("Deployment Complete!")
        _logger.info("The CDAP cluster API is at {0}. The *GUI* Port is {1}. You may now go check your application there to confirm it is running correctly.".format(cdap_cluster, response.json()["cdap GUI port"]))
        _logger.info("Your instance name is: {0}. In CDAP, this will appear as: {1}".format(instance_name, mapped_appname))
        _logger.info("The bound Configuration for this application is: {0}".format(config))

        #TODO: Should we tell the user about metrics and healthcheck to try those too?

def normalize_cdap_params(spec):
    """
    The CDAP component specification includes some optional fields that the broker expects.
    This parses the specification, includes those fields if those are there, and sets the broker defaults otherwise
    """
    Params = {}
    p = spec["parameters"]
    #app preferences
    Params["app_preferences"] = {} if "app_preferences" not in p else {param["name"] : param["value"] for param in p["app_preferences"]}
    #app config
    Params["app_config"] = {} if "app_config" not in p else {param["name"] : param["value"] for param in p["app_config"]}
    #program preferences
    if "program_preferences" not in p:
        Params["program_preferences"] = []
    else:
        Params["program_preferences"] = []
        for tup in p["program_preferences"]:
            Params["program_preferences"].append({"program_id" : tup["program_id"], 
                                                  "program_type" : tup["program_type"], 
                                                  "program_pref" : {param["name"] : param["value"] for param in tup["program_pref"]}})
    return Params

def undeploy_component(profile, instance_name):
    """
    Undeploys  a CDAP Component, which in CDAP terms means stop and delete
    """
    broker_url = _get_broker_url_from_profile(profile) 

    #call the delete
    response = requests.delete("{brokerurl}/application/{appname}".format(brokerurl=broker_url, appname=instance_name))
    try: 
        response.raise_for_status() #bomb if not 2xx
        _logger.info("Undeploy complete.")
        return True
    except Exception as e:
        _logger.error("An undeploy Error Occured: {2}. Broker Response: {0}, Broker Response Text: {1}".format(response.status_code, response.text, e))
        return False