From 647addf5d6c78b2b8c941cc9cd8c57a3eb9f30b4 Mon Sep 17 00:00:00 2001 From: Tommy Carpenter Date: Tue, 22 Aug 2017 18:07:40 -0400 Subject: [DCAEGEN2-42] Initial commit of broker Change-Id: I1c553c82d5b39a4c134c44e2320ac0e44785e0ef Signed-off-by: Tommy Carpenter --- .gitignore | 20 + .gitreview | 4 + Changelog.md | 107 +++ Dockerfile | 30 + LICENSE.txt | 31 + README.md | 131 +++ bin/build_and_check.sh | 5 + bin/build_and_run.sh | 8 + bin/build_and_test.sh | 12 + bin/build_and_unit_test.sh | 5 + config/sys.config | 86 ++ config/vm.args | 9 + config_template.json | 1 + doc/cdap_broker.png | Bin 0 -> 351432 bytes entry.sh | 20 + get_version.sh | 3 + rebar.config | 74 ++ src/application.hrl | 28 + src/cdap_interface.erl | 363 ++++++++ src/cdap_interface_tests.erl | 34 + src/cdapbroker.app.src | 23 + src/cdapbroker_app.erl | 94 ++ src/cdapbroker_sup.erl | 55 ++ src/consul_interface.erl | 139 +++ src/httpabs.erl | 118 +++ src/httpabs_tests.erl | 33 + src/logging.erl | 157 ++++ src/resource_handler.erl | 465 ++++++++++ src/util.erl | 192 ++++ src/util_tests.erl | 53 ++ src/workflows.erl | 324 +++++++ src/workflows_tests.erl | 27 + swagger/swagger.html | 1899 ++++++++++++++++++++++++++++++++++++++++ swagger/swagger.json | 560 ++++++++++++ swagger/swagger.yaml | 418 +++++++++ test/apitest/apitest_SUITE.erl | 826 +++++++++++++++++ 36 files changed, 6354 insertions(+) create mode 100644 .gitignore create mode 100644 .gitreview create mode 100644 Changelog.md create mode 100644 Dockerfile create mode 100644 LICENSE.txt create mode 100644 README.md create mode 100755 bin/build_and_check.sh create mode 100755 bin/build_and_run.sh create mode 100755 bin/build_and_test.sh create mode 100755 bin/build_and_unit_test.sh create mode 100644 config/sys.config create mode 100644 config/vm.args create mode 100644 config_template.json create mode 100644 doc/cdap_broker.png create mode 100755 entry.sh create mode 100755 get_version.sh create mode 100644 rebar.config create mode 100644 src/application.hrl create mode 100644 src/cdap_interface.erl create mode 100644 src/cdap_interface_tests.erl create mode 100644 src/cdapbroker.app.src create mode 100644 src/cdapbroker_app.erl create mode 100644 src/cdapbroker_sup.erl create mode 100644 src/consul_interface.erl create mode 100644 src/httpabs.erl create mode 100644 src/httpabs_tests.erl create mode 100644 src/logging.erl create mode 100644 src/resource_handler.erl create mode 100644 src/util.erl create mode 100644 src/util_tests.erl create mode 100644 src/workflows.erl create mode 100644 src/workflows_tests.erl create mode 100644 swagger/swagger.html create mode 100644 swagger/swagger.json create mode 100644 swagger/swagger.yaml create mode 100644 test/apitest/apitest_SUITE.erl diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..27bdf03 --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +build_and_cover.sh +Mnesia.nonode@nohost/ +rebar3.crashdump +.DS_Store +.rebar3 +rebar.lock +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +_build diff --git a/.gitreview b/.gitreview new file mode 100644 index 0000000..0699f50 --- /dev/null +++ b/.gitreview @@ -0,0 +1,4 @@ +[gerrit] +host=gerrit.onap.org +port=29418 +project=dcaegen2/platform/cdapbroker.git diff --git a/Changelog.md b/Changelog.md new file mode 100644 index 0000000..0794b83 --- /dev/null +++ b/Changelog.md @@ -0,0 +1,107 @@ +# Change Log +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/) +and this project adheres to [Semantic Versioning](http://semver.org/). + +## [4.0.2] - July 26 2017 +* Make testing magical so that it runs on localhost or in Docker the same +* Code cleanups in workflows thanks to abstraction provided by Garry + +## [4.0.1] - July 7 2017 +* Work with config binding service 1.0.0. + +## [4.0.0] - June 28 2017 +* Policy reconfiguration work. There are two new reconfiguration endpoints: smart and app-preferences. +* The smart interface allows you to send a JSON of keys that are either in app_preferences, app_config, or both. The broker figures out the overlaps, refreshes them in Consul, and makes the appropriate CDAP reconfiguration calls. The preferences API allows you to simply reset a CDAP app's application preferences (analagous to the call that was already in here for app_config). +* I broke the original app_config API to make all three reconfiguration APIs consistent. You give it "reconfiguration_type" and "config" for all three APIs. And hopefully for any future reconfiguration APIs, e.g., for hydrator pipelines. +* Hardcodes the cute short name "config_binding_service", now that DCAE is using short names. Obliterates usage of the env variable and removes passing it around all over the place. + +## [3.4.8] - May 16 2017 +* support JARs hosted at https webservers +* trim JAR URLs to prevent problems due to whitespace +* add a primitive get_version.sh script for the build team to use.. needs to be actually implemented instead of printing a fixed version right now.. + +## [3.4.7] - May 15 2017 +* EELF part 6... pretty much done at this point, some less-important logs could still be cleaned up maybe. +* All HTTP requests made from the broker to another HTTP API now include the XER. +* All functions in `src/workflows` are now EELF compliant, and produce detailed metrics logs. Workflows is the workhorse of all the major API calls so this tracks pretty much all requests the broker makes to to it's job. +* App Config reconfiguration is now a workflow + +## [3.4.6] - May 12 2017 +* All HTTP GETs issued by this broker now pass an/the XER... even if the downstream server is not expecting it #ExpectUs.. + +## [3.4.5] - May 10 2017 +* error EELF field implemented, three important Metrics EELF fields added +* src/workflows has no more lager (non EELF) statements... though it could use *more* EELF statements because part of it is not covered by logs.. + +## [3.4.4] - Apr 28 2017 +* four more EELF fields implemented, one ignored. + +## [3.4.3] - Apr 27 2017 +* Fixes audit records not being used at the end of API calls (they now are, and elapsed time is computed) +* Implements ISO8601 timestamps in all log records +* Implements a few more of the EELF fields, still more to go + +## [3.4.2] - Apr 26 2017 +* Towards Implementing the "ONAP logging requirements" aka "EELF": +* Logs are now written to /tmp/log/cdapbroker/... and are in three files: error.log, audit.log, metrics.log (logs are still also written to stdout) +* X-ECOMP-RequestID is now parsed and logged, and generated if missing, for all main broker APIs. It does not yet generate it's own for the call to the CBS. +* EELF logging functions added to a new source file `src/logging`. +* The field list formats and delimiters in each of those follow version 1.0 of the ONAP logging requirements. However, many required fields are left blank right now, this needs to be fixed. Parsing the fields should work though as each has the correct positional arguments and correct number of positions. +* Only functions in `src/resource_handler` are using the EELF logging functions right now, this needs to be fixed + +## [3.4.1] +* Move tests, update README + +## [3.4.0] +* Only healthcheck programs that are part of an intial program-flowlet PUT. The use case for this was a CDAP application that had flows that were purposely not wanting to be started by the component developer (whether that is valid is another question...) + The broker thought their application was in an unhealthy state because it was sourcing the programs to get the health of from CDAP itself, and not the intial PUT request. + This actually goes back to the way healthcheck used to work, but this now does it in a better way, because the application table is now used for multiple types of CDAP apps +* To accomplish the above, this creates a "supplementary" table for program-flowlet apps, that right now stores the programs that are in the intial PUT, but could be extended later with more properties. +* Paves the way for a "hydrator supplemental" table if that is ever needed +* Starts the painful process of adding function and type definitions so that the Erlang Dialyzer ("bolt on" static typing tool) is of some use; sick of huge positional tuples +* Fixes a bug where undeploy would not "plow through" if CDAP was totally unreachable (or gone) + +## [3.3.0] +* Broker now includes it's own API version in information endpoint + +## [3.2.1] +* Fix an error found in HTTP error reporting (a bad request type of error is now correctly caught) +* Fix errors in test cases found while testing a new version of CDAP. Tests are now more agnostic to json key ordering. + +## [3.2.0] +* Returns more information in the "infomation endpoint" ('/'). Specifically, now returns the CDAP GUI port and the CDAP cluster version. +This is mostly to aid component developers in testing that are hitting the broker via the DCAE CLI tool. +* Introduces the first unit test (eunit; the rest of the test suite is ran via common_test). + +## [3.1.0] +* Persistence. This is an important feature. This persists the MNesia database to disk so that you can stop and start the broker without losing state. NOTE: Catching Docker SIGTERM is not yet implemented. It turns out this is non-trivial to do in Erlang. So, in rare cases, if `docker stop` is called immediately after `delete`, the deletion may not persist to the database. +* Erlang 19.3 is supposed to handle SIGTERM at the BEAM level. That is not out yet as Docker (it was just announced a week ago.). +The proper way to shut down an Erlang node is to execute a command to the BEAM. +I don't know what Erlang developers did for 20 years, but to date the BEAM does not handle SIGTERM, so a wrapper script is needed if you want to kill the BEAM gracefully on a SIGTERM. +This was needed because this broker runs in Docker. With database persistence, the broker has a graceful stop function that ensures any transactions in cache (RAM) are persisted to disk before the shutdown. +I need this function called, so I need SIGTERM to be handled in a way that gracefully terminates the VM, which in turn gracefully shuts down my application, which in turn calls this function. Supposedly, this will be no longer necessary soon: http://www.erlang.org/news/110 + +## [3.0.0] +* Waterfall CDAP_CLUSTER_TO_MANAGE +* Handle CBS moving while broker is up +* For program-flowlet apps, return configuration when info requested +* make /application only return the appnames (API change) +* make testing more stable by not using order-dependent Expected JSONs +* Reconfiguring with an invalid request returns a 4xx instead of 5xx now (bugfix) +* Dead programs/flowlets now triggers a failing healthcheck (bugfix) +* Rename "reconfigure_app_config" to make way for reconfiguring preferences (API change) + +## [2.2.0] +* Implements healthchecking for Pipelines based on Terry S's healthCheck.py script + +## [2.1.0] +* Added a text file of some example commands that can be useful for demos and understanding this repo +* Fixed a bug in a workaround for https://issues.cask.co/browse/CDAP-7191?filter=-2 +* Added the ability to deploy Pipelines that contain custom JARs + +## [2.0.0] + +* Add support for primitive CDAP Pipelines. Required an API break because the API caller has to specify whether the application is a program-flowlet style app or a hydrator pipeline. +* Changelog did not exist before this... diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ef4f76e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,30 @@ +FROM erlang:19.2 +MAINTAINER tommy at research dot eh tee tee dot com + +ENV TERM=xterm + +#copy files into repo +ADD src /tmp/src +ADD test /tmp/test +ADD rebar.config /tmp +ADD entry.sh /tmp +ADD config /tmp/config + +WORKDIR /tmp + +#make sure there is no leftover release +RUN rm -rf _build/ + +#build +RUN rebar3 upgrade +RUN rebar3 release + +#fail the docker build if unit tests fail +RUN rebar3 eunit + +#set the broker test type so the integration test can be run inside Docker +ENV BROKER_TEST_TYPE=DOCKER + +#run +EXPOSE 7777 +ENTRYPOINT ["/tmp/entry.sh"] diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..7c1e63a --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,31 @@ +============LICENSE_START======================================================= +org.onap.dcae +================================================================================ +Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +================================================================================ +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END========================================================= + +ECOMP is a trademark and service mark of AT&T Intellectual Property. + +Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +=================================================================== +Licensed under the Creative Commons License, Attribution 4.0 Intl. (the "License"); +you may not use this documentation except in compliance with the License. +You may obtain a copy of the License at + https://creativecommons.org/licenses/by/4.0/ +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..14aa99c --- /dev/null +++ b/README.md @@ -0,0 +1,131 @@ +# cdapbroker +This repo is the thing in red: +![Alt text](doc/cdap_broker.png?raw=true) + +The high level workflow for how this broker is to be used in the DCAE platform is as follows +
    +
  1. CLoudify wants to deploy a new CDAP app. It deploys it via the broker.
  2. +
  3. This broker registers with Consul and lights up a metrics, health, and configuration change endpoint for the app
  4. +
  5. The broker pushes the app's configuration into Consul for the Ops view layer (documentation purposes) +
  6. At periodic intervals the health endpoints (plural when mutliple CDAP apps are registered), provided by this broker, are pinged by consul
  7. +
  8. At peridic intervals, some metrics system (TBD) pulls then pushes metrics from the apps
  9. +
  10. Occasionaly some higher power decides to update configuration for a running CDAP app. They make the change via the broker.
  11. +
  12. Finally on a service undeploy Cloudify deletes the CDAP app via the broker
  13. +
+ +# Purpose & Responsibilities +The purposes of this broker are as follows: + +1. To isolate CDAP developers from the complexity of the DCAE platform. The only thing CDAP developers should need to interface with are the CDAP APIs. They should not need to know about concepts like "service discovery" or "confiuration discovery". They should specify what external systems they should talk to in their configuration template and the rest should be hidden from them. + +2. To minimize the onboarding process for CDAP developers. Their *only* deliverables should be a JAR (with built in metrics), and their configuration template. + +3. Provide Consul interfacing, metrics, healthchecks, and configuration updates bewtween CDAP applications and the rest of the platform. Specifically, this has northbound interfaces for +
    +
  1. providing initial configuration of a CDAP app
  2. +
  3. changing configuration of a CDAP app
  4. +
  5. getting the custom metrics of a CDAP app
  6. +
  7. healthchecking a CDAP app
  8. +
+ And talks to the CDAP APIs in various ways southbound to achieve these objectives. + +# CDAP service discovery +When registering an application (`appname`) with the broker, the broker registeres *itself* as the Address and Port of that CDAP application. That is because a CDAP application can have multiple services behind it, so there is no way to register a CDAP application in Consul. + +The CDAP client library for collectors (or anything upstream of a CDAP app) will be required to do the following: + +1. do a consul service discovery GET on `appname` +2. Take that serviceaddress and port, and form `serviceaddress:port/application/appname` +3. Do a GET on that, which is the broker +4. Parse out `connectionurl` and `serviceendpoints`, which is input and output connection information for the CDAP app. See the swagger spec (in the swagger folder in this repo) for the schema of those. + +# Special charcters: hackaround +CDAP application names cannot have special characters in them. +When this broker talks southbound to CDAP, it maps `APPNAME` into `APPNAME_WITHOUT_SPECIAL_CHARACTERS`. So in CDAP you will see the latter. However, + when communicating with this broker API, you should still pass in `APPNAME`. The conversion is handled internally. + +# Logs +The broker conforms to ONAP/ECOMP logging requirements (aka EELF). The logs are written to `/tmp/log/cdapbroker/..`. If running in Docker, you'll want to mount this outside into a safe store. + +# Running and testing + +## Runtime Assumptions +In a real deployment setting, these assumptions are handled by the ONAP Docker plugin when this is launched. +When doing local testing, you must set up these assumptions yourself. + +1. `HOSTNAME` is set as an env variable in this runtime environment and is a Consul key that holds this broker's configuration. + +2. `CONSUL_HOST` is set as an env variable and is either an IP address or a resolvable name (Via real DNS whereever you are running this). It should not include the port, the broker currently hardcodes 8500. + +3. `CDAP_CLUSTER_TO_MANAGE` is an env variable, and cooresponds to a CDAP cluster registered in `CONSUL_HOST`. This is the CDAP cluster the broker will manage when it runs. The CDAP API port must be registered as well, that is, the broker does *not* hardcode "well known ports" and retrieves it from Consul. (Currently for CDAP 3 that port is 10000 and CDAP 4 it is 11011.) + +4. `CONFIG_BINDING_SERVICE` is set as an env variable, and cooresponds to the name that the CBS is registered in `CONSUL_HOST`. + +5. The user that runs this application needs write access to the directory '/var/mnesia'. This works in Docker, but on your local machine, this may not be owned by your local user.In this case, create the dictory and CHOWN it to your user. The broker writes it's database backup to that location. + +## The integration test suite +Bestides the unit tests, the broker comes with an integration suite that touches both Consul and CDAP (the ones pointed to via `CONSUL_HOST` and `CDAP_CLUSTER_TO_MANAGE`. It will actually deploy test apps, test configurations into Consul, and then delete them after the test suite is over. Please note that this is not a unit test, it touches the real systems the broker is connected to. It can be used to verify a signifigant portion of the DCAE, including CDAP, Consul, the broker, and the config binding service. + +The integration test suite requires several test artifacts to be loaded into a Nexus Raw Webserver. In the test suite, the root of this server is read from the env `NEXUS_RAW_ROOT`. Thus, in order to run the integration suite, in addition to the above runtime assumptions, this env must be set. +(TODO: Talk about the specific artifacts to be loaded in) + +## Docker + +### Build +Note: doing a Docker build *automatically* runs the unit tests. In fact, the docker container will fail to build if any unit tests fail. + + docker build --rm -t cdapbroker:VERSION . + +### Run + + docker run -v /tmp/log/cdapbroker/:/tmp/log/cdapbroker/ -d -e CDAP_CLUSTER_TO_MANAGE="cdap_cluster" -e HOSTNAME="cdap_broker" -e CONSUL_HOST="XXXX" -e CONFIG_BINDING_SERVICE="config_binding_service" -p 7777:7777 DOCKER_REGISTRY/cdapbroker:VERSION + +(Fill in `CONSUL_HOST` and `DOCKER_REGISTRY`) + +### Running the Integration Suite in Docker + + docker exec -it CONTAINER_ID_HERE /bin/bash + export NEXUS_RAW_ROOT=XXXX + rebar3 ct + +## Local Development +The below is for building, running, and testing on your local machine. This requires Erlang 19+ and rebar3 on your machine. These scripts require the runtime assumptions mentioned above. + +### Running locally +To use this script, you will have to modify `CONSUL_HOST` for your enviornment. + +./bin/build_and_run.sh + +### Running the integration test suite +To use this script, you will have to modify `CONSUL_HOST` and `NEXUS_RAW_ROOT` for your enviornment. + + ./bin/build_and_test.sh + +### Running the unit tests + + ./bin/build_and_unit_test.sh + +### Running the Dialyzer +This is a type checking tool that attempts to find errors based on types. +Warning, right now very few things are typed/specs, so the usefulness of this is limited: + + ./bin/build_and_check.sh + +# On "Leave No Trace" +The CDAP broker does not currently implement "leave no trace", because it is not really clear in DCAE when this should be done, or whether the orchestrator should be allowed to invoke such deadly operations, i.e., maybe this should be done by an operations team. +Leave No Trace here means deleting streams and datasets after an application is deleted from the broker. +The problem is that streams and datasets can be shared between applications, like a shared Kafka queue used by two CDAP applications. +Streams and datasets are created by the CDAP deployment API when an application that uses a streamname or a dataset name for the first time, however subsequent apps can be deployed using those, without re-creating them. +Moreover, subsequently deployed apps may be *expecting* that data already exists in some dataset. +Consider the case of deplyoying A1 that creates S1, then deploying A2 that uses S1, then wanting to undeploy A1. Deleting the stream or it's data on the deletion of A1 would clobber A2. + +This leaves the question of who/what/when/where/why/how streams and datasets should be cleaned up from CDAP clusters in DCAE. +For now, this is an open question. It can always be done on the CDAP management interface by an operations team in the meantime. + +# On Version Bumping (Development) +Currently the CDAP Broker Version is in four places in this repo.. +1. rebar.config +2. src/cdapbroker.app.src +3. get_version.sh (could be made smarter, parse rebar.config) +4. swagger spec +If you make a developmemt change, please bump in all places until this is resolved.. diff --git a/bin/build_and_check.sh b/bin/build_and_check.sh new file mode 100755 index 0000000..7d8b4e1 --- /dev/null +++ b/bin/build_and_check.sh @@ -0,0 +1,5 @@ +#!/usr/local/bin/fish +rm -rf _build/; +rebar3 upgrade; +rebar3 release; +dialyzer -r _build/default/lib/cdapbroker/ebin/ diff --git a/bin/build_and_run.sh b/bin/build_and_run.sh new file mode 100755 index 0000000..8d48716 --- /dev/null +++ b/bin/build_and_run.sh @@ -0,0 +1,8 @@ +#!/usr/local/bin/fish +rm -rf /tmp/log/cdapbroker/*; +rebar3 release; +set -x CDAP_CLUSTER_TO_MANAGE "cdap"; +set -x CONSUL_HOST "XXXX"; +set -x CONFIG_BINDING_SERVICE "config_binding_service"; +set -x HOSTNAME "cdap_broker"; +./_build/default/rel/cdapbroker/bin/cdapbroker diff --git a/bin/build_and_test.sh b/bin/build_and_test.sh new file mode 100755 index 0000000..b0456f4 --- /dev/null +++ b/bin/build_and_test.sh @@ -0,0 +1,12 @@ +#!/usr/local/bin/fish +rm -rf /tmp/log/cdapbroker/*; +rebar3 release; +set -x NEXUS_RAW_ROOT "XXXX"; +set -x CDAP_CLUSTER_TO_MANAGE "cdap"; +set -x CONSUL_HOST "XXXX"; +set -x CONFIG_BINDING_SERVICE "config_binding_service"; +set -x HOSTNAME "cdap_broker"; +set -x REBAR3_ERL_ARGS "-sname cdapbroker@localhost"; +rebar3 local install; +env DEBUG=1 ~/.cache/rebar3/bin/rebar3 ct; +rebar3 eunit diff --git a/bin/build_and_unit_test.sh b/bin/build_and_unit_test.sh new file mode 100755 index 0000000..7a2c757 --- /dev/null +++ b/bin/build_and_unit_test.sh @@ -0,0 +1,5 @@ +#!/usr/local/bin/fish +rm -rf _build/; +rebar3 upgrade; +rebar3 release; +rebar3 eunit diff --git a/config/sys.config b/config/sys.config new file mode 100644 index 0000000..3b8edc0 --- /dev/null +++ b/config/sys.config @@ -0,0 +1,86 @@ +[ + {sasl, [{utc_log, true}]}, + + %% {size, 10485760}, {date, "$D0"}, {count, 5} -> This tells lager to log error and above messages to error.log and to rotate the file at midnight or when it reaches 10mb, whichever comes first, and to keep 5 rotated logs in addition to the current one + + {lager, [ + {colored, true}, + {log_root, "/tmp/log/cdapbroker"}, + + %%Any logs just starting with lager: will go into stdout and cdapbroker.log. These are the default for non-EELF caught logs + {handlers, + [ + {lager_console_backend, info}, + {lager_file_backend, [{file, "cdapbroker.log"}, + {level, debug}, + {formatter, lager_default_formatter}, + {size, 10485760}, + {date, "$D0"}, + {count, 5}, + %% Message will hold the delimited fields that the logging standard actually wants + {formatter_config, [message, "--", module ,":", function, ":", line, "\n"]} + ]} + ] + }, + %%EELF logs have special sink names: error:, audit:, metrics:, and optionally debug: + %%these sinks will log to their respective files, and for sanity reasons, stdout as well + {extra_sinks, + [ + {audit_lager_event, + [{handlers, + [ + {lager_console_backend, debug}, + {lager_file_backend, + [{file, "audit.log"}, + %regarding level, we want the lowest becaue audit:anything will show up in audit.log + {level, debug}, + {formatter, lager_default_formatter}, + {size, 10485760}, + {date, "$D0"}, + {count, 5}, + %% Message will hold the delimited fields that the logging standard actually wants + {formatter_config, [message, "--", module ,":", function, ":", line, "\n"]}] + } + ] + }] + }, + {metrics_lager_event, + [{handlers, + [ + {lager_console_backend, debug}, + {lager_file_backend, + [{file, "metrics.log"}, + %regarding level, we want the lowest becaue metrics:anything will show up in metrics.log + {level, debug}, + {formatter, lager_default_formatter}, + {size, 10485760}, + {date, "$D0"}, + {count, 5}, + %% Message will hold the delimited fields that the logging standard actually wants + {formatter_config, [message, "--", module ,":", function, ":", line, "\n"]}] + } + ] + }] + }, + {error_lager_event, + [{handlers, + [ + {lager_console_backend, debug}, + {lager_file_backend, + [{file, "error.log"}, + %regarding level, we want the lowest becaue error:anything will show up in error.log + {level, debug}, + {formatter, lager_default_formatter}, + {size, 10485760}, + {date, "$D0"}, + {count, 5}, + %% Message will hold the delimited fields that the logging standard actually wants + {formatter_config, [message, "--", " [",severity,"] ", module ,":", function, ":", line, "\n"]}] + } + ] + }] + } + ] + } + ]} +]. diff --git a/config/vm.args b/config/vm.args new file mode 100644 index 0000000..cbe012c --- /dev/null +++ b/config/vm.args @@ -0,0 +1,9 @@ +# only show the programmed prompt +-noshell + +## Name of the node +-sname cdapbroker@localhost + +## Cookie for distributed erlang +-setcookie cdapbroker + diff --git a/config_template.json b/config_template.json new file mode 100644 index 0000000..1775b8f --- /dev/null +++ b/config_template.json @@ -0,0 +1 @@ +{"autoderegisterafter": "10m", "cdap_cluster_to_manage": "{{cdap_cluster}}", "bindingttw": 5, "hcinterval": "5s"} diff --git a/doc/cdap_broker.png b/doc/cdap_broker.png new file mode 100644 index 0000000..712cdc2 Binary files /dev/null and b/doc/cdap_broker.png differ diff --git a/entry.sh b/entry.sh new file mode 100755 index 0000000..2b60caf --- /dev/null +++ b/entry.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +echo "if testing locally send SIGTERM to $$" + +term_handler() { + echo "Stopping the Erlang VM gracefully" + #/usr/local/Cellar/erlang/19.1/lib/erlang/lib/erl_interface-3.9.1/bin/erl_call -c cdapbroker -s -a 'init stop' -n 'cdapbroker@localhost' + /usr/local/lib/erlang/lib/erl_interface-3.9.2/bin/erl_call -c cdapbroker -s -a 'init stop' -n 'cdapbroker@localhost' + echo "Erlang VM Stopped" +} + +trap term_handler SIGQUIT SIGINT SIGTERM + +./_build/default/rel/cdapbroker/bin/cdapbroker & +PID=$! + +echo "Erlang VM Started" +#wait $PID +while kill -0 $PID ; do wait $PID ; EXIT_STATUS=$? ; done +echo "Exiting Wrapper." +exit $EXIT_STATUS diff --git a/get_version.sh b/get_version.sh new file mode 100755 index 0000000..6883146 --- /dev/null +++ b/get_version.sh @@ -0,0 +1,3 @@ +#!/bin/sh +#todo... build this so that it parses automatically +echo "4.0.3 diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..b62c0da --- /dev/null +++ b/rebar.config @@ -0,0 +1,74 @@ +{relx, [ + {release, + {cdapbroker,"4.0.3"}, + [cdapbroker] + }, + %{extend_start_script,true}, + % + %for the following two fancyiness see https://www.rebar3.org/docs/releases + %Supply our own vm.args + {vm_args, "config/vm.args"}, + %supply our own application configuration + {sys_config, "config/sys.config"} + ]}. + +{deps, [ + {jiffy, ".*", {git, "git://github.com/davisp/jiffy.git", {branch, "master"}}}, + %{leptus, ".*", {git, "git://github.com/s1n4/leptus.git", {branch, "master"}}}, + {leptus, ".*", {git, "git://github.com/tommyjcarpenter/leptus.git", {branch, "version"}}}, + {lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}}, + %generate RFC compliant UUIDs + {uuid, ".*", {git, "https://github.com/avtobiff/erlang-uuid.git", {branch, "master"}}}, + %generate ISO8601 timestamps + {iso8601, {git, "https://github.com/erlsci/iso8601.git", {tag, "1.2.3"}}} + ]}. + +%%% Require OTP 19.2 at a bare minimum +{minimum_otp_vsn, "19"}. + +%% Plugins +{shell, [ + {apps, [lager, sync, gen_rpc]} +]}. + +% Erlang options +{erl_opts, + [ + %%lager first + %%support lager introspecting module/line/function + {parse_transform, lager_transform}, + %%support EELF sinks + {lager_extra_sinks, [audit,metrics,error]}, + + %other opts + %warnings_as_errors, + debug_info, + {warn_format, 1}, + bin_opt_info, + inline_list_funcs, + report_warnings, + warn_export_vars, + warn_export_all, + warn_shadow_vars, + warn_obsolete_guard, + warn_unused_import, + warn_deprecated_function, + warn_unused_vars, + warn_unused_function, + warn_bif_clash, + warn_unused_record, + warn_exported_vars + ] +}. + +%% Code coverage +{cover_enabled, true}. +{cover_export_enabled, true}. +{cover_opts, [verbose]}. + +%% EUnit options +{eunit_compile_opts, [debug_info, export_all]}. +{eunit_opts, [verbose, + no_tty, + {report, {eunit_progress, [colored, profile]}} +]}. diff --git a/src/application.hrl b/src/application.hrl new file mode 100644 index 0000000..fc7c80e --- /dev/null +++ b/src/application.hrl @@ -0,0 +1,28 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +-record(program, {type :: binary(), id :: binary()}). +-type lprogram() :: [#program{}]. +-type status_code() :: 100..101 | 200..206 | 300..307 | 400..417 | 500..505. %stolen from leptus +-type httpstat() :: {status_code(), string()}. +-record(application, {appname, apptype, namespace, healthcheckurl, metricsurl, url, connectionurl, serviceendpoints, creationtime}). +-record(prog_flow_supp, {appname, programs::lprogram()}). + + diff --git a/src/cdap_interface.erl b/src/cdap_interface.erl new file mode 100644 index 0000000..7ce3b37 --- /dev/null +++ b/src/cdap_interface.erl @@ -0,0 +1,363 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +-module(cdap_interface). +-export([get_app_metrics/4, + get_app_healthcheck/4, + push_down_config/5, + form_service_json_from_service_tuple/4, + form_stream_url_from_streamname/3, + exec_programs/6, + push_down_program_preferences/5, + push_down_app_preferences/5, + deploy_app/8, + deploy_pipeline/5, + delete_app/4, + %get_app_programs/3, + exec_pipeline/5, + exec_pipeline_workflow/5, + get_pipeline_healthcheck/5, + get_pipeline_metrics/3, + deploy_pipeline_dependencies/4, + deploy_pipeline_dependencies_properties/4, + create_namespace/3, + get_app_config/4, + get_app_preferences/4, + get_cdap_cluster_version/2, + get_cdap_gui_port_from_version/1 + ]). +-include("application.hrl"). +-define(SC(L), util:concat(L)). +-define(BAD_HEALTH_CODE, 400). %%not sure if this is the best status code for "unhealthy". I don't like 500 because I am able to complete the user's request (healthcheck) + +%helpful: https://robhirschfeld.com/2012/08/15/erlang-http-client-restful-api-post-example-code/ + +%%% +%%%INTERNAL +%%% +map_appname(Appname) -> + %CDAP APIs do not allow app names with any special characters. Here we will map the + %name to ensure it does not contain special characters using a regex whitelist + %see http://stackoverflow.com/questions/3303420/regex-to-remove-all-special-characters-from-string + re:replace(Appname, "[^0-9a-zA-Z]+", "", [{return, binary}, global]). + +get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL) -> + URL = ?SC([CDAPURL, "/v3/metrics/search?target=metric&tag=namespace:", Namespace, "&tag=app:", map_appname(Appname)]), + {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", ""), + case ReturnCode of + 200 -> [ X || X <- jiffy:decode(RetBody),binary:match(X, <<"user.">>) /= nomatch]; + _ -> 504 %must bubble this up + end. + +-spec get_app_healthcheck_program(string(), binary(), binary(), string(), #program{}) -> integer(). +get_app_healthcheck_program(XER, Appname, Namespace, CDAPURL, P) -> + %helper function: checks for a partocular program from an app + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/", P#program.type, "/", P#program.id]), + {RC, _} = httpabs:get(XER, URL), + case RC of + 200 -> + %Next make sure it's status is running + {RC2, RB2} = httpabs:get(XER, ?SC([URL, "/status"])), + case RC2 of + 200 -> + S = jiffy:decode(RB2, [return_maps]), + case maps:is_key(<<"status">>, S) andalso maps:get(<<"status">>, S) == <<"RUNNING">> of + true -> 200; %return 200 + false -> ?BAD_HEALTH_CODE %return + end; + _ -> ?BAD_HEALTH_CODE + end; + _ -> ?BAD_HEALTH_CODE + end. + +-spec exec_program(string(), binary(), binary(), string(), #program{}, string()) -> httpstat(). +exec_program(XER, Appname, Namespace, CDAPURL, P, Exec) -> + %Exec should be 'start' or 'stop' + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/", P#program.type, "/", P#program.id, "/", Exec]), + httpabs:post(XER, URL, "application/json", ""). + +push_down_program_preference(XER, Appname, Namespace, CDAPURL, {PT, PI, PP}) -> + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/", PT, "/", PI, "/", "preferences"]), + httpabs:put(XER, URL, "application/json", jiffy:encode(PP)). + +deploy_pipeline_dependency(XER, Namespace, CDAPURL, {ArtExtendsHeader, ArtName, ArtVerHeader, ArtURL, _}) -> + %deploys a single dependency + %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple. + {200, JarBody} = httpabs:get(XER, ArtURL), + Headers = [{"Artifact-Extends", ArtExtendsHeader}, {"Artifact-Version", ArtVerHeader}], + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName]), + httpabs:post(XER, URL, Headers, "application/octet-stream", JarBody). + +deploy_pipeline_dependency_property(XER, Namespace, CDAPURL, {_, ArtName, ArtVerHeader, _, UIPropertiesURL}) -> + %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple. + case UIPropertiesURL of + none -> {200, ""}; %nothing to do + _ -> + {200, PropertiesJSON} = httpabs:get(XER, UIPropertiesURL), + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName, "/versions/", ArtVerHeader, "/properties"]), + httpabs:put(XER, URL, "application/json", PropertiesJSON) + end. + +%%% +%%%EXTERNAL +%%% +get_app_metrics(XER, Appname, Namespace, CDAPURL) -> + %Namespace should be a binary + MetricsList = get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL), %assert 200 or bomb + case MetricsList of + 504 -> {504, []}; + %TODO! cdap seems to return a {200, []} for above call even if the app is not deployed. Is that OK? for now return empty list but maybe we should determine this and error with a 404 or something + [] -> {200, []}; + _ -> + URL = ?SC([CDAPURL, "/v3/metrics/query"]), + Body = jiffy:encode( + {[{<<"appmetrics">>, + {[{<<"tags">>, {[{<<"namespace">>, Namespace}, {<<"app">>, map_appname(Appname)}]}}, + {<<"metrics">>, MetricsList}]} + }]}), + {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", Body), %even when app does not exist this seems to return a 200 so assert it! + case ReturnCode of + 200 -> {200, jiffy:decode(RetBody)}; + 504 -> {504, []} + end + end. + +%THIS FUNCTION WAS ONCE NEEDED +%It gets the list of all programs in a running CDAP app. +%However this is no longer used due to a feature request where people want to start/healthcheck only *some* programs in their application +%So now this information is sourced from the initial PUT request, where those programs are saved as supplemntal state +%However, leaving it here because maybe one day it will be useful +%%%get_app_programs(Appname, Namespace, CDAPURL) -> +%%% %fetch the list of programs from a running CDAP app. +%%% %Parse this into a list of [{ProgramType, ProgramID}] tuples. +%%% %Used as part of healthcheck, and also as part of undeploy so I don't have to keep Programs in MNesia pipeline. +%%% %get informaation about application +%%% URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), +%%% {ReturnCode, RetBody} = httpabs:http_get(URL), +%%% case ReturnCode of +%%% 504 -> 504; +%%% 404 -> 404; +%%% 400 -> 400; +%%% 200 -> +%%% Drb = jiffy:decode(RetBody, [return_maps]), +%%% Programs = maps:get(<<"programs">>, Drb), +%%% lists:map(fun(X) -> #program{ +%%% %stupid CDAP APIs require a different get call then what is returned as the Program Type. For example you need to get "flows" to get a program of type Flow. im filing a bug with them. see program-typeOne of flows, mapreduce, services, spark, workers, or workflows here: http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#details-of-a-deployed-application. I filed an issue: https://issues.cask.co/browse/CDAP-7191?filter=-2 +%%% %From http://docs.cask.co/cdap/current/en/developers-manual/building-blocks/program-lifecycle.html +%%% %All the uppercase ones are: Flow, MapReduce, Service, Spark, Worker, Workflow +%%% %From http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#details-of-a-program +%%% %All the lowercase ones are: flows, mapreduce, services, spark, workers, or workflows +%%% program_type = case maps:get(<<"type">>, X) of +%%% <<"Flow">> -> <<"flows">>; %cdap api fail man +%%% <<"Mapreduce">> -> <<"mapreduce">>; +%%% <<"Service">> -> <<"services">>; +%%% <<"Spark">> -> <<"spark">>; +%%% <<"Worker">> -> <<"workers">>; +%%% <<"Workflow">> -> <<"workflows">> +%%% end, +%%% program_id = maps:get(<<"id">>, X) +%%% } +%%% end, Programs) +%%% end. +%%% +-spec get_app_healthcheck(string(), binary(), binary(), string()) -> integer(). +get_app_healthcheck(XER, Appname, Namespace, CDAPURL) -> + %cdap does not provide a simple "heathcheck" api for apps like it does metrics + %what I am using is: + % making sure the application is there + % making sure all programs (flows, services, etc) are there + %for now. From: http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#details-of-a-deployed-application + + Programs = util:get_programs_for_pfapp_from_db(Appname), + %check each program + M = lists:map(fun(X) -> get_app_healthcheck_program(XER, Appname, Namespace, CDAPURL, X) end, Programs), + case lists:foldl(fun(X, Y) -> X == 200 andalso Y end, true, M) of %check all 200s + true -> 200; + false -> ?BAD_HEALTH_CODE + end. + +push_down_config(XER, Appname, Namespace, CDAPURL, ConfigJson) -> + %http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#update-an-application + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/update"]), + Body = jiffy:encode( + {[ + {<<"config">>, ConfigJson} + ]}), + httpabs:post(XER, URL, "application/json", Body). %returns {ReturnCode, ReturnBody} + +push_down_program_preferences(XER, Appname, Namespace, CDAPURL, ParsedProgramPreferences) -> + FClosure = fun(X) -> push_down_program_preference(XER, Appname, Namespace, CDAPURL, X) end, + workflows:all_200s_else_showerror(FClosure, ParsedProgramPreferences). + +form_service_json_from_service_tuple(Appname, Namespace, CDAPURL, {SN, SE, EM}) -> + %transforms {SN, SE, EM} into {url: foo, method: bar} + URL = list_to_binary(?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/services/", SN, "/methods/", SE])), + {[{<<"url">>, URL}, + {<<"method">>, EM} + ]}. + +form_stream_url_from_streamname(CDAPURL, Namespace, Streamname) -> + list_to_binary(?SC([CDAPURL, "/v3/namespaces/", Namespace, "/streams/", Streamname])). + +-spec exec_programs(string(), binary(), binary(), string(), lprogram(), string()) -> httpstat(). +exec_programs(XER, Appname, Namespace, CDAPURL, Programs, Exec) -> + FClosure = fun(X) -> exec_program(XER, Appname, Namespace, CDAPURL, X, Exec) end, + workflows:all_200s_else_showerror(FClosure, Programs). + +push_down_app_preferences(XER, Appname, Namespace, CDAPURL, AppPreferences) -> + %use app level preferences API if specified + %http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/preferences.html + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/preferences"]), + httpabs:put(XER, URL, "application/json", jiffy:encode(AppPreferences)). + +deploy_app(XER, Appname, Namespace, CDAPURL, JarBody, ArtifactName, ArtifactVersion, AppConfig) -> + %Create Artifact + %Deploy App + + %post the artifact, OK if already exists, no check + %explicitly set artifact version becausme some JARS do not have it + httpabs:post(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtifactName]), [{"Artifact-Version", erlang:binary_to_list(ArtifactVersion)}], "application/octet-stream", JarBody), + + %deploy the application + PutBody = jiffy:encode( + {[ + {<<"artifact">>, {[ + {<<"name">>, ArtifactName}, + {<<"version">>, ArtifactVersion}, + {<<"scope">>, <<"user">>} + ]}}, + {<<"config">>, AppConfig} + ]}), + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), + httpabs:put(XER, URL, "application/json", PutBody). + +delete_app(XER, Appname, Namespace, CDAPURL) -> + %delete an application; works for prog-flow and hydrator + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), + httpabs:delete(XER, URL). + +deploy_pipeline(XER, Appname, Namespace, CDAPURL, PipelineJson) -> + %Deploy a hydrator pipeline, assumes namespace has already been set up + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), + httpabs:put(XER, URL, "application/json", PipelineJson). + +exec_pipeline(XER, Appname, Namespace, CDAPURL, Exec) -> + %Exec assumed to be: "resume" or "suspend" + %TODO! REVISIT WHETHER datapipelineschedule is a parameter + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/schedules/dataPipelineSchedule/", Exec]), + httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body. + +exec_pipeline_workflow(XER, Appname, Namespace, CDAPURL, Exec) -> + %Exec assumed to be: "stop" or + %TODO! REVISIT WHETHER DataPipelineWorkflow is a parameter + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/workflows/DataPipelineWorkflow/", Exec]), + httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body. + +get_pipeline_healthcheck(XER, Appname, Namespace, CDAPURL, PipelineHealthLimit) -> + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/schedules/dataPipelineSchedule/status"]), + {RC, RB} = httpabs:get(XER, URL), + case RC /= 200 of + true -> ?BAD_HEALTH_CODE; %failed to even hit the status. + false -> + Status = jiffy:decode(RB, [return_maps]), + case maps:is_key(<<"status">>, Status) andalso maps:get(<<"status">>, Status) == <<"SCHEDULED">> of + false -> ?BAD_HEALTH_CODE; %status is malformed or the pipeline is not scheduled, both not good + true -> + %Next, check the last number of runs, and report a failure if they were not sucessful, or report of more than one of them is running at the same time. + %This logic came from Terry. + %His logic is essentially that, if your application is running, but is failing, that should be interpeted as unhealthy and needs investigation. + %His logic was also that if you have two runs running at the same time, that requires investigation as that should not happen. + %RE list_to_binary(integer_to_list( see http://stackoverflow.com/questions/4010713/integer-to-binary-erlang + L = list_to_binary(integer_to_list(PipelineHealthLimit)), + {RC2, RB2} = httpabs:get(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/workflows/DataPipelineWorkflow/runs?limit=", L])), + case RC2 /= 200 of + true -> RC2; %failed to even hit the status + false -> + LRST = lists:map(fun(S) -> + case maps:is_key(<<"status">>, S) of + false -> critical; + true -> case maps:get(<<"status">>, S) of <<"COMPLETED">> -> ok; <<"RUNNING">> -> running; <<"FAILED">> -> critical end + end end, jiffy:decode(RB2, [return_maps])), + %now process the transformed list + %check if any had failed or if the status JSONs were malformed, or check if more than 2 running at once + case lists:any(fun(X) -> X == critical end, LRST) orelse length(lists:filter(fun(X) -> X == running end, LRST)) > 1 of + true -> ?BAD_HEALTH_CODE; + false -> 200 %ALL TESTS PASS + end + end + end + end. + +get_pipeline_metrics(_Appname, _Namespace, _CDAPURL) -> + lager:warning("WARNING, metrics not actually implemented yet for pipelines!!"), + {200, []}. + +deploy_pipeline_dependencies(XER, Namespace, CDAPURL, ParsedDependencies) -> + FClosure = fun(X) -> deploy_pipeline_dependency(XER, Namespace, CDAPURL, X) end, + workflows:all_200s_else_showerror(FClosure, ParsedDependencies). + +deploy_pipeline_dependencies_properties(XER, Namespace, CDAPURL, ParsedDependencies) -> + FClosure = fun(X) -> deploy_pipeline_dependency_property(XER, Namespace, CDAPURL, X) end, + workflows:all_200s_else_showerror(FClosure, ParsedDependencies). + +create_namespace(_XER, <<"default">>, _) -> {200, ""}; %no-op, already exists +create_namespace(XER, Namespace, CDAPURL) -> + httpabs:put(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace]), "", ""). + +get_app_config(XER, Appname, Namespace, CDAPURL) -> + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), + {RC, RB} = httpabs:get(XER, URL), + case RC of + 200 -> {200, maps:get(<<"configuration">>, jiffy:decode(RB, [return_maps]))}; + _ -> {RC, RB} + end. + +get_app_preferences(XER, Appname, Namespace, CDAPURL) -> + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/preferences"]), + {RC, RB} = httpabs:get(XER, URL), + case RC of + 200 -> {200, jiffy:decode(RB, [return_maps])}; + _ -> {RC, RB} + end. + +get_cdap_cluster_version(XER, CDAPURL) -> + %CDAP decided to change their port numbers between release 3 and 4. + %The broker works with both. + %In order to add the correct GUI information into the broker "info endpoint", I have to know what CDAP version we are connected to. + %The GUI information is a convinence function for component developers that hit the broker via the CLI tool. + URL = ?SC([CDAPURL, "/v3/version"]), + {RC, RB} = httpabs:get(XER, URL), + case RC of + 200 -> maps:get(<<"version">>, jiffy:decode(RB, [return_maps])); + _ -> <<"UNKNOWN CDAP VERSION">> + end. + +-spec get_cdap_gui_port_from_version(binary() | string()) -> 9999 | 11011 | binary(). +get_cdap_gui_port_from_version(Version) -> + %given the cdap clsuter version, return the GUI port + case re:run(Version, "3\.\[0-9]+\.[0-9]+") of + nomatch -> + case re:run(Version, "4\.\[0-9]+\.[0-9]+") of + nomatch -> <<"UNKNOWN CDAP VERSION">>; + _ -> 11011 + end; + _ -> 9999 + end. + diff --git a/src/cdap_interface_tests.erl b/src/cdap_interface_tests.erl new file mode 100644 index 0000000..37926a6 --- /dev/null +++ b/src/cdap_interface_tests.erl @@ -0,0 +1,34 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +-module(cdap_interface_tests). +-include_lib("eunit/include/eunit.hrl"). + +get_cdap_gui_port_from_version_test() -> + ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.0.0")), + ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.10.0")), + ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.0.10")), + ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.10.10")), + ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.0.0">>)), + ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.10.0">>)), + ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.0.10">>)), + ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.10.10">>)), + ?assert(<<"UNKNOWN CDAP VERSION">> == cdap_interface:get_cdap_gui_port_from_version("5.0.0")). + diff --git a/src/cdapbroker.app.src b/src/cdapbroker.app.src new file mode 100644 index 0000000..1d04330 --- /dev/null +++ b/src/cdapbroker.app.src @@ -0,0 +1,23 @@ +{application, cdapbroker, + [{description, "Interface between Consul and CDAP in DCAE"}, + {vsn, "4.0.3"}, + {registered, []}, + {mod, { cdapbroker_app, []}}, + {applications, + [kernel, + stdlib, + lager, + inets, + ssl, + jiffy, + mnesia, + leptus, + uuid, + iso8601 + ]}, + {env,[]}, + {modules, []}, + {maintainers, []}, + {licenses, []}, + {links, []} + ]}. diff --git a/src/cdapbroker_app.erl b/src/cdapbroker_app.erl new file mode 100644 index 0000000..13256b0 --- /dev/null +++ b/src/cdapbroker_app.erl @@ -0,0 +1,94 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +%%%------------------------------------------------------------------- +%% @doc cdapbroker public API +%% @end +%%%------------------------------------------------------------------- + +-module(cdapbroker_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%for application state +-include("application.hrl"). +-define(SC(L), util:concat(L)). + +%%==================================================================== +%% API +%%==================================================================== + +start(_StartType, _StartArgs) -> + %starting inets to make request calls + inets:start(), + %from the HTTPC page: + %"If the scheme https is used, the SSL application must be started. + ssl:start(), + + %EELF hello + audit:info("Audit log initlized"), + metrics:info("Metrics log initlized"), + error:info("Error log initlized"), + + %fail fast; check all failure conditions first + PE = util:get_platform_envs_and_config(), + case PE of + [] -> %crash and burn. Need to exit else supervisor will restart this endlessly. + exit("fatal error, either HOSTNAME or CONSUL_HOST or CONFIG_BINDING_SERVICE is not set as an env variable"); + [_, ConsulURL, CDAPUrl, BoundConfigMap] -> + try + S = dict:new(), + S1 = dict:store("consulurl", ConsulURL, S), + S2 = dict:store("configmap", BoundConfigMap, S1), + State = dict:store("cdapurl", CDAPUrl, S2), + + %initialize database + ok = util:initialize_database(), + + %print out currently registered apps at startup + lager:info("Currently installed apps: ~p~n", [util:get_all_appnames_from_db()]), + + %start the REST server + leptus:start_listener(http, [{'_', [{resource_handler, State}]}], [{port, 7777}, {ip, {0,0,0,0}}]), + + %start the supervisor + lager:info("Starting supervisor"), + cdapbroker_sup:start_link() + catch Class:Reason -> + lager:error("~nError Stacktrace:~s", [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})]), + exit(Reason) + end + end. + +%%-------------------------------------------------------------------- +stop(_State) -> + %need to make sure there are no pending transcations in RAM not written to disk yet on shutdown. + %Two hard problems in CS, this is one of then... + lager:info("Stop recieved."), + case mnesia:sync_log() of + ok -> ok; + {error, Reason} -> + lager:error(io_lib:format("While stopping, MNESIA could not by syncd due to: ~s. This means on bootup the database may be in a bad state!!", [Reason])), + notok + end. + diff --git a/src/cdapbroker_sup.erl b/src/cdapbroker_sup.erl new file mode 100644 index 0000000..b0894e8 --- /dev/null +++ b/src/cdapbroker_sup.erl @@ -0,0 +1,55 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +%%%------------------------------------------------------------------- +%% @doc cdapbroker top level supervisor. +%% @end +%%%------------------------------------------------------------------- + +-module(cdapbroker_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%==================================================================== +%% API functions +%%==================================================================== + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%==================================================================== +%% Supervisor callbacks +%%==================================================================== + +%% Child :: {Id,StartFunc,Restart,Shutdown,Type,Modules} +init([]) -> + {ok, { {one_for_all, 0, 1}, []} }. + +%%==================================================================== +%% Internal functions +%%==================================================================== diff --git a/src/consul_interface.erl b/src/consul_interface.erl new file mode 100644 index 0000000..e52abd8 --- /dev/null +++ b/src/consul_interface.erl @@ -0,0 +1,139 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +-module(consul_interface). +-export([consul_register/8, + consul_deregister/3, + consul_get_configuration/3, + consul_get_preferences/3, + consul_get_service_ip_port/3, + consul_push_config/4, + consul_push_preferences/4, + consul_bind_config/3, + consul_delete_config/3, + consul_delete_preferences/3 + ]). + +-define(SC(L), util:concat(L)). +-define(PrefKey(Appname), ?SC([Appname, ":preferences"])). + +%the Erlang library linked on the Consul webpage does not seem to be equivelent to python-consul. It does something else: https://github.com/undeadlabs/discovery +%and this one seems to be just elixer: https://github.com/undeadlabs/consul-ex +%so, for now, I'm just doing some HTTP calls directly against the REST API. + +consul_get_service(XER, Appname, ConsulURL) -> + %returns a list of maps + URL = ?SC([ConsulURL, "/v1/catalog/service/", Appname]), + {200, ReturnBody} = httpabs:get(XER, URL), + jiffy:decode(ReturnBody, [return_maps]). + +consul_write_kv(XER, Key, Value, ConsulURL) -> + %generic helper function to write a value into Consul + URL = ?SC([ConsulURL,"/v1/kv/", Key]), + httpabs:put(XER, URL, "application/json", Value). + +consul_read_kv(XER, Key, ConsulURL) -> + %does a get on the KV store, see https://www.consul.io/docs/agent/http/kv.html + %Appname MUST be a binary not a string! + URL = ?SC([ConsulURL, "/v1/kv/", Key]), + {RC, RB} = httpabs:get(XER, URL), + case RC of + 200 -> + [Drb] = jiffy:decode(RB, [return_maps]), + {200, base64:decode(maps:get(<<"Value">>, Drb))}; %NOTE! Does not do a JSON decode here in case you want to read non-JSON from Consul. Leaves the decode to the caller. + _ -> + {RC, RB} + end. + +consul_delete_kv(XER, Key, ConsulURL) -> + %the config has not been jiffy'd prior to this function + URL = ?SC([ConsulURL,"/v1/kv/", Key]), + httpabs:delete(XER, URL). + +%%%%%%%%%%%%%%%%% +%PUBLIC FUNCTIONS +%%%%%%%%%%%%%%%%% + +consul_register(XER, Appname, ConsulURL, SDIP, SDPort, HealthURL, HCInterval, AutoDeregisterAfter) -> + %uses the agent api to register this app as a service and it's healthcheck URL, which is this broker as a proxy. + %/v1/agent/service/register from https://www.consul.io/docs/agent/http/agent.html#agent_service_register + URL = ?SC([ConsulURL, "/v1/agent/service/register"]), + Body = jiffy:encode( + {[{<<"Name">>, Appname}, + {<<"Address">>, SDIP}, + {<<"Port">>, SDPort}, + {<<"Check">> , + {[{<<"HTTP">>, HealthURL}, {<<"Interval">>, HCInterval}, {<<"DeregisterCriticalServiceAfter">>, AutoDeregisterAfter}]} + } + ]}), + httpabs:put(XER, URL, "application/json", Body). + +consul_deregister(XER, Appname, ConsulURL) -> + %/v1/agent/service/deregister/ from https://www.consul.io/docs/agent/http/agent.html#agent_service_register + URL = ?SC([ConsulURL, "/v1/agent/service/deregister/", Appname]), + httpabs:put(XER, URL, "application/json", ""). %kinda weird that this isnt a DELETE + +consul_get_configuration(XER, Appname, ConsulURL) -> + %fetch configuration from consul, assumes it is a json + %returns it as a map. can be encoded again. + {RC, RB} = consul_read_kv(XER, Appname, ConsulURL), + case RC of + 200 -> {200, jiffy:decode(RB, [return_maps])}; %configuration is a JSON + _ -> {RC, RB} + end. + +consul_get_preferences(XER, Appname, ConsulURL) -> + %This function is currently only used in testing and is not used in resource_handler, but could be useful later + %returns it as a map. can be encoded again. + {RC, RB} = consul_read_kv(XER, ?PrefKey(Appname), ConsulURL), + case RC of + 200 -> {200, jiffy:decode(RB, [return_maps])}; %configuration is a JSON + _ -> {RC, RB} + end. + +consul_get_service_ip_port(XER, Appname, ConsulURL) -> + %use when you are expecting consul_get_service to return a list of exactly one service and all you want is ip:port + M = lists:nth(1, consul_get_service(XER, Appname, ConsulURL)), + {maps:get(<<"ServiceAddress">>, M), maps:get(<<"ServicePort">>, M)}. + +consul_push_config(XER, Appname, ConsulURL, Config) -> + %pushes Config into Consul under the key "Appname". + %TODO: Possibly this should be under "Appname:config" to be consistent with preferences but this came first and that's an invasive change. + %the config has not been jiffy'd prior to this function + consul_write_kv(XER, Appname, jiffy:encode(Config), ConsulURL). + +consul_push_preferences(XER, Appname, ConsulURL, Preferences) -> + %pushes the preferences into Consul under the key "Appname:preferences" + %the config has not been jiffy'd prior to this function + consul_write_kv(XER, ?PrefKey(Appname), jiffy:encode(Preferences), ConsulURL). + +consul_delete_config(XER, Appname, ConsulURL) -> + consul_delete_kv(XER, Appname, ConsulURL). + +consul_delete_preferences(XER, Appname, ConsulURL) -> + consul_delete_kv(XER, ?PrefKey(Appname), ConsulURL). + +consul_bind_config(XER, Appname, ConsulURL) -> + URL = ?SC([util:resolve_cbs(XER, ConsulURL), "/service_component/", Appname]), + {ReturnCode, ReturnBody} = httpabs:get(XER, URL), + case ReturnCode of + 200 -> {200, jiffy:decode(ReturnBody)}; + _ -> {ReturnCode, ReturnBody} %do not try to decode if not correct + end. diff --git a/src/httpabs.erl b/src/httpabs.erl new file mode 100644 index 0000000..bc9e068 --- /dev/null +++ b/src/httpabs.erl @@ -0,0 +1,118 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +-module(httpabs). +-export([get/2, + post/4, %I miss python's default arguments.. + post/5, + put/4, + delete/2 + ]). +-include("application.hrl"). +-define(SC(L), util:concat(L)). + +%NOTE +%Consider the Erlang statement: +% +%{ok, {{"HTTP/1.1",ReturnCode, State}, Head, Body}} = httpc:get(URL). +%CDAP returns error messages in the “Body” field above. +% +%However, Consul: +%1) Always (in all HTTP failures I’ve tested) returns Body == “500\n” +%2) Returns the error message in the State field +% +%Example: +% +%{{"HTTP/1.0",404,"Client Error: Not Found for url: http://consul.[...].com:8500/v1/kv/hwtestYOUHAVEFAILEDME:rel"},[{"date","Mon, 14 Nov 2016 14:41:03 GMT"},{"server","Werkzeug/0.11.11 Python/3.5.1"},{"content-length","4"},{"content-type","application/json"}],"500\n"} +% +%This means that error handling in HTTP is not consistent across CDAP and Consul. +% +%Thus below, on a failure, I return the concatenation of State and Body + +%%% +%%%HELPER +%%% +-spec parse_response({error|ok, any()}, string()) -> httpstat(). +parse_response({Status, Response}, URL) -> + case Status of + error -> + lager:error("httpc error: cannot hit: ~s", [URL]), + case Response of + no_scheme -> {400, io_lib:format("ERROR: The following URL is malformed: ~s", [URL])}; + {bad_body, _} -> {400, "ERROR: The request Body is malformed"}; + {bad_body_generator,_} -> {400, "ERROR: The request Body is malformed"}; + _ -> + lager:error(io_lib:format("Unexpected ERROR hitting ~s", [URL])), + {504, list_to_binary(io_lib:format("ERROR: The following URL is unreachable or the request was unable to be parsed due to an unknown error: ~s", [URL]))} %Are there other reasons other than bad body and unreachable that crash request? (Sneak peak: the answer is probably) + end; + ok -> + {{_, ReturnCode, State}, _Head, Body} = Response, + case ReturnCode of + 200 -> + {ReturnCode, Body}; + _ -> + lager:error("Error While hitting ~s, Non-200 status code returned. HTTP Code ~p, State ~s, ResponseBody ~s:", [URL, ReturnCode, State, Body]), + %see Note at the top of this file + RetBody = ?SC(["State: ", State, ". Return Body: ", Body]), + {ReturnCode, RetBody} + end + end. + +sanitize(URL) -> + %allow URL to look like "www.foo.com" or <<"www.foo.com">>, trim it + case is_binary(URL) of + true -> string:strip(binary_to_list(URL)); + false -> string:strip(URL) + end. + +%anywhere you see any() is essentially lazy typing.. fix these someday when time is abundant +-spec post(string(), string()|binary(), string(), any()) -> httpstat(). +post(XER, URL, ContentType, Body) -> + %post that sends the XER, no headers signature + Headers = [{"x-ecomp-requestid", XER}], + U = sanitize(URL), + parse_response(httpc:request(post, {U, Headers, ContentType, Body}, [],[]), U). + +-spec post(string(), string()|binary(), list(), string(), any()) -> httpstat(). +post(XER, URL, Headers, ContentType, Body) -> + %post that sends XER, appends the header onto the list of desired headers + U = sanitize(URL), + parse_response(httpc:request(post, {U, [{"x-ecomp-requestid", XER} | Headers], ContentType, Body}, [],[]), U). + +-spec get(string(), string()|binary()) -> httpstat(). +get(XER, URL) -> + %http get that always sends the XER.. even if the server doesn't want it; maybe this will blow up on me one day. + U = sanitize(URL), + Headers = [{"x-ecomp-requestid", XER}], + parse_response(httpc:request(get, {U, Headers}, [], []), U). + +-spec put(string(), string()|binary(), string(), any()) -> httpstat(). +put(XER, URL, ContentType, Body) -> + %http put that always sends the XER + U = sanitize(URL), + Headers = [{"x-ecomp-requestid", XER}], + parse_response(httpc:request(put, {U, Headers, ContentType, Body}, [],[]), U). + +-spec delete(string(), string()|binary()) -> httpstat(). +delete(XER, URL) -> + %http delete that always sends the XER + U = sanitize(URL), + Headers = [{"x-ecomp-requestid", XER}], + parse_response(httpc:request(delete, {U, Headers}, [],[]), U). diff --git a/src/httpabs_tests.erl b/src/httpabs_tests.erl new file mode 100644 index 0000000..d8ad529 --- /dev/null +++ b/src/httpabs_tests.erl @@ -0,0 +1,33 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +-module(httpabs_tests). +-include_lib("eunit/include/eunit.hrl"). +-import(httpabs, [ + sanitize/1 + ] + ). + +sanitize_test() -> + ?assert(sanitize(<<" www.foo.com ">>) == "www.foo.com"), + ?assert(sanitize(" www.foo.com ") == "www.foo.com"), + ?assert(sanitize(<<"www.foo.com">>) == "www.foo.com"). + + diff --git a/src/logging.erl b/src/logging.erl new file mode 100644 index 0000000..942637d --- /dev/null +++ b/src/logging.erl @@ -0,0 +1,157 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +-module(logging). +-export([ + audit/3, + metrics/3, + err/2 + ]). + +-import(util, [iso/0, iso_elapsed/2, to_str/1, ip_to_str/1]). +%..lazy macros +-define(SC(L), util:concat(L)). +-define(PV(Name, PL), proplists:get_value(Name, PL, "")). + +%This module is intended to support the logging format standard for ONAP/ECOMP components. SOmetimes that is reffered to as "EELF". + +%levels are none | debug | info | notice | warning | error | critical | alert | emergency. + +%%% +%%%Helper functions +%%% +pid() -> pid_to_list(self()). +%they wanted milleseconds but they are getting seconds rounded to ms because I don't have an erlang BIF that gives me this +elapsed(Endtime, Starttime) -> to_str(iso_elapsed(Endtime, Starttime)*1000). + +start_end_elapsed(ArgPropl) -> + %returns start time, end time,... and elapsed time + EndT = iso(), + StartT = ?PV(bts, ArgPropl), + ElapT = elapsed(EndT, StartT), + {StartT, EndT, ElapT}. + +%things this logging class can compute based on the Req so need not be in every logging function +server_add(Req) -> + {MyUrl, _} = cowboy_req:host_url((leptus_req:get_req(Req))), + %discard HTTP portion using Erlang binary matching + <<_:7/binary, URL/binary>> = MyUrl, + URL. + +ip(Req) -> ip_to_str(leptus_req:peer(Req)). + +path(Req) -> + %get us the method and API path that was hit from the request + {Path, {_,_,_,_,_,Method,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_}} = cowboy_req:path(leptus_req:get_req(Req)), + ?SC([Method, " ", Path]). + +%%% +%%%External +%%% + +audit(Sev, Req, ArgPropl) -> + F = case Sev of + info -> fun(M)->audit:info(M) end; %cant use the shorthand "fun -> audit:info/2" because of parse_transform + warning -> fun(M)->audit:warning(M) end + end, + %The audit field list: + % + %1 BeginTimestamp Implemented (bts) + %2 EndTimestamp Auto Injected when this is called + %3 RequestID Implemented (xer) + %4 serviceInstanceID + %5 threadId Auto Injected... however this is a VM language and so this is the Erlang process PID, not the "OS level thread id". Unclear what they want here. + %6 physical/virtual server name  + %7 serviceName Implemented (from Req) + %8 PartnerName + %9 StatusCode + %10 ResponseCode Implemented (rcode) + %11 Response Description Will not implement. This says "human readable description of the *code*. They don't want the response here. I won't do that because this is a generic function that gets called no matter what the code is, so I can't do that. But since this is supposed to be human readable, they can look up the code in the swagger spec. + %12 instanceUUID + %13 Category log level Implemented (Sev) + %14 Severity + %15 Server IP address Implemented (from Req) + %16 ElapsedTime Auto Injected but THIS IS SO DUMB TO BE IN THE STANDARD WASTING DISK SPACE THIS IS DIRECTLY COMPUTABLE FROM 1,2 WTF + %17 Server + %18 ClientIPaddress Implemented (from Req) + %19 class name Implemented (mod), though docs say OOP, I am using the Erlang module here + %20 Unused Implemented.. + %21 ProcessKey + %22 CustomField1 + %23 CustomField2 + %24 CustomField3 + %25 CustomField4 + %26 detailMessage Implemented (msg) + + {StartT, EndT, ElapT} = start_end_elapsed(ArgPropl), + %compute message + Message = ?SC([StartT, "|", EndT, "|", ?PV(xer, ArgPropl), "||", pid(), "||", path(Req), "|||", to_str(?PV(rcode, ArgPropl)), "|see swagger spec||", to_str(Sev), "||", server_add(Req), "|", ElapT, "||", ip(Req), "|", ?PV(mod, ArgPropl), "|||||||", ?PV(msg, ArgPropl)]), + F(Message). + +metrics(Sev, Req, ArgPropl) -> + F = case Sev of + debug -> fun(M)->metrics:debug(M) end; + info -> fun(M)->metrics:info(M) end; + warning -> fun(M)->metrics:warning(M) end + end, + %The metrics field list: + %SAME AS METRICS 1-8 + %_____ + %9 TargetEntity Implemented (tgte) + %10 TargetServiceName Implemented (tgts) + %11 Status Code Implemented (tgtrsc) + %_____ + %SAME AS METRICS 9 -> + %total 29 fields + + {StartT, EndT, ElapT} = start_end_elapsed(ArgPropl), + %compute message + Message = ?SC([StartT, "|", EndT, "|", ?PV(xer, ArgPropl), "||", pid(), "||", path(Req), "||", ?PV(tgte, ArgPropl), "|", ?PV(tgts, ArgPropl), "|", to_str(?PV(tgtrsc, ArgPropl)), "|", to_str(?PV(rcode, ArgPropl)), "|see swagger spec||", to_str(Sev), "||", server_add(Req), "|", ElapT, "||", ip(Req), "|", ?PV(mod, ArgPropl), "||||||||", ?PV(msg, ArgPropl)]), + F(Message). + +err(Sev, ArgPropl) -> + F = case Sev of + warning -> fun(M)->error:warning(M) end; + error -> fun(M)->error:error(M) end; + critical -> fun(M)->error:critical(M) end; + alert -> fun(M)->error:alert(M) end; + emergency -> fun(M)->error:emergency(M) end + end, + SevInLog = case Sev of + warning -> "WARN"; + error -> "ERROR"; + _ -> "FATAL" + end, + %Error field list: + %1 Timestamp Auto Injected when this is called + %2 RequestID Implemented + %3 ThreadId Auto Injected... however this is a VM language and so this is the Erlang thread PID. Not the "OS level pid". Unclear what they want here. + %4 ServiceName Implemented + %5 PartnerName + %6 TargetEntity + %7 TargetServiceName + %8 ErrorCategory Implemented + %9 ErrorCode + %10 ErrorDescription This is what I'm using detailMessage for, these seem to be the same to me. + %11 detailMessage Implemented + + Message = ?SC([iso(), "|", ?PV(xer, ArgPropl), "|", pid(), "|", ?PV(servn, ArgPropl), "||||", SevInLog, "|||", ?PV(msg, ArgPropl)]), + F(Message). + diff --git a/src/resource_handler.erl b/src/resource_handler.erl new file mode 100644 index 0000000..a9703fc --- /dev/null +++ b/src/resource_handler.erl @@ -0,0 +1,465 @@ +-module(resource_handler). +-compile({parse_transform, leptus_pt}). + +%% leptus callbacks +-export([init/3]). +-export([terminate/4]). +-export([get/3, put/3, delete/3, post/3]). +-export([cross_domains/3]). + +%%for keeping state +%%The application record is defined in application.hrl +%%In Mnesia, the first element is the type of record and the second element is the key +%%http://erlang.org/doc/apps/mnesia/Mnesia_chap2.html +-include("application.hrl"). + +-define(CONSURL, dict:fetch("consulurl", State)). +-define(CDAPURL, dict:fetch("cdapurl", State)). +%below come from config map +-define(HCInterval, maps:get(<<"hcinterval">>, dict:fetch("configmap", State))). +-define(AutoDeregisterAfter, maps:get(<<"autoderegisterafter">>, dict:fetch("configmap", State))). +-define(PipelineHealthLimit, maps:get(<<"pipelinehealthlimit">>, dict:fetch("configmap", State))). +-define(PUBLICFIELDS, [<<"appname">>, <<"apptype">>, <<"namespace">>, <<"healthcheckurl">>, <<"metricsurl">>, <<"url">>, <<"connectionurl">>, <<"serviceendpoints">>]). + +%super lazy macros/imports... +-import(logging, [audit/3, metrics/3, err/2]). +-import(util, [iso/0, to_str/1]). +%lazy concat +-define(SC(L), util:concat(L)). +%lazy shorthand to write info audit records. man I miss defines in python. c ftw. +-define(AUDI(Req, Bts, XER, Rcode), audit(info, Req, [{bts, Bts}, {xer,XER}, {rcode, RCode}, {mod, mod()}])). + + +%%% +%%Helper functions +%%% +mod() -> to_str(?MODULE). + +get_request_id(Req) -> + %ECOMP request tracing + %see if we got a X-ECOMP-REQUESTID, or generate a new one if not + HXER = leptus_req:header(Req, <<"x-ecomp-requestid">>), + case HXER of + undefined -> + XER = util:gen_uuid(), + %LOL, use the client ip here to shame them into their request id + audit(warning, Req, [{bts, iso()}, {xer, XER}, {mod, mod()}, {msg, "Request is missing requestID. Assigned this one."}]), %eelf documentation says to log this message if requestid was missing + XER; + _ -> + binary_to_list(HXER) %httpc expects strings as headers, so this needs to be str for subsequent passing + end. + +%shared-code function initlization that creates a begining timestamp and gets or generates a request ID +init_api_call(Req) -> + Bts = iso(), %record begining timestamp + XER = get_request_id(Req), %get or generate XER + {Bts, XER}. + +lookup_application(Appname) -> + %do a lookup in mnesia of an appname + Ret = mnesia:transaction(fun() -> mnesia:match_object(application, {application, Appname, '_', '_', '_', '_', '_', '_', '_', '_'}, read) end), + case Ret of + {atomic, []} -> none; %no matches + {atomic, [Rec]} -> Rec + %fail hard if there was more than one result + end. + +appname_to_application_map(Appname) -> + %return a Map of an Mnesia record + Rec = lookup_application(Appname), + case Rec of + none -> none; + {application, Appname, AppType, Namespace, Healthcheckurl, Metricsurl, Url, Connectionurl, ServiceEndpoints, CreationTime} -> + #{<<"appname">> => Appname, + <<"apptype">> => AppType, + <<"namespace">> => Namespace, + <<"healthcheckurl">> => Healthcheckurl, + <<"metricsurl">> => Metricsurl, + <<"url">> => Url, + <<"connectionurl">> => Connectionurl, + <<"serviceendpoints">> => ServiceEndpoints, + <<"creationtime">> => CreationTime + } + end. + +appname_to_field_vals(Appname, FieldList) -> + %Return just a list of values of an application with fields FieldList + M = appname_to_application_map(Appname), + case M of + none -> none; + _ -> [maps:get(F, M) || F <- FieldList] + end. + +appname_to_application_http(XER, Appname, State) -> + %Return an HTTP response of an application record. If this is a program flowlet style app, additionally return it's bound and unbound config + A = appname_to_application_map(Appname), + case A of + none -> {404, "", State}; + _ -> + Body = maps:with(?PUBLICFIELDS, A), + case maps:get(<<"apptype">>, Body) of + %if program-flowlet style app, append the bound and unbound config into the return JSON + <<"program-flowlet">> -> + UB = case consul_interface:consul_get_configuration(XER, Appname, ?CONSURL) of + {200, Unbound} -> Unbound; + {_, _} -> <<"WARNING: COULD NOT FETCH CONFIG FROM CONSUL">> + end, + B = case cdap_interface:get_app_config(XER, Appname, maps:get(<<"namespace">>, Body), ?CDAPURL) of + {200, Bound} -> Bound; + {_, _} -> <<"WARNING: COULD NOT FETCH CONFIG FROM CDAP">> + end, + CM = #{<<"unbound_config">> => UB, + <<"bound_config">> => B}, + {200, {json, maps:merge(Body, CM)}, State}; + %TODO! can we do something for hydrator apps? + <<"hydrator-pipeline">> -> + {200, {json, Body}, State} + end + end. + +-spec parse_progflow_put_body_map(map()) -> + {binary(), binary(), string(), binary(), binary(), map(), map(), any(), lprogram(), any()}. %TODO! Spec parsedservices and parsedprogrampreferences so we don't have any() here... +parse_progflow_put_body_map(Body) -> + Namespace = maps:get(<<"namespace">>, Body), + Streamname = maps:get(<<"streamname">>, Body), + JarURL = maps:get(<<"jar_url">>, Body), + ArtifactName = maps:get(<<"artifact_name">>, Body), + ArtifactVersion = maps:get(<<"artifact_version">>, Body), + AppConfig = maps:get(<<"app_config">>, Body), + AppPreferences = maps:get(<<"app_preferences">>, Body), + ParsedServices = lists:map(fun(S) -> {maps:get(<<"service_name">>, S), + maps:get(<<"service_endpoint">>, S), + maps:get(<<"endpoint_method">>, S)} + end, maps:get(<<"services">>, Body)), + Programs = lists:map(fun(P) -> #program{type=maps:get(<<"program_type">>, P), + id= maps:get(<<"program_id">>, P)} + end, maps:get(<<"programs">>, Body)), + ParsedProgramPreferences = lists:map(fun(P) -> {maps:get(<<"program_type">>, P), + maps:get(<<"program_id">>, P), + maps:get(<<"program_pref">>, P)} + end, maps:get(<<"program_preferences">>, Body)), + {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences}. + +parse_hydrator_pipeline_put_body_map(Body) -> + Namespace = maps:get(<<"namespace">>, Body), + Streamname = maps:get(<<"streamname">>, Body), + PipelineConfigJsonURL = maps:get(<<"pipeline_config_json_url">>, Body), + + %Dependencies is optional. This function will normalize it's return with [] if the dependencies key was not passed in. + ParsedDependencies = case maps:is_key(<<"dependencies">>, Body) of + true -> + D = maps:get(<<"dependencies">>, Body), + %crash and let caller deal with it if not a list or if required keys are missing. Else parse it into + % {artifact-extends-header, artifact_name, artifact-version-header, artifact_url} + %tuples + % + %regarding the binart_to_lists: these all come in as binaries but they need to be "strings" (which are just lists of integers in erlang) + %for headers requiring strings, see http://stackoverflow.com/questions/28292576/setting-headers-in-a-httpc-post-request-in-erlang + % + lists:map(fun(X) -> {binary_to_list(maps:get(<<"artifact_extends_header">>, X)), + maps:get(<<"artifact_name">>, X), + binary_to_list(maps:get(<<"artifact_version_header">>, X)), + maps:get(<<"artifact_url">>, X), + %even if dependencies is specified, ui_properties is optional. This will normalize it's return with 'none' if not passed in + case maps:is_key(<<"ui_properties_url">>, X) of true -> maps:get(<<"ui_properties_url">>, X); false -> none end + } end, D); + false -> [] %normalize optional user input into []; just prevents user from having to explicitly pass in [] + end, + + {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies}. + +parse_put_body(B) -> + Body = jiffy:decode(B, [return_maps]), + Type = maps:get(<<"cdap_application_type">>, Body), + case Type of + <<"program-flowlet">> -> + {pf, <<"program-flowlet">>, parse_progflow_put_body_map(Body)}; + <<"hydrator-pipeline">> -> + {hp, <<"hydrator-pipeline">>, parse_hydrator_pipeline_put_body_map(Body)}; + _ -> + unsupported + end. + +delete_app_helper(Appname, State, XER, Req) -> + %Helper because it is used by both delete and rollback on failed deploy + % + %%Internal Crisis Alert: + % + %I pondered this for some time. There are three points of state for this: the cdap cluster, consul, and the broker's internal database + %The question is, if something in the delete fails, do we: + %1) Tell the user to try again later + %2) Clean up as much as we can, log the error, and keep going + % + %I have decided for now on taking number 2). This is the "Cloudify" way of doing things where you don't raise a NonRecoerable in a Delete operation. + %This has the benefit that this delete operation can be used as the *rollback*, so if anything fails in the deploy, this delete function is called to clean up any dirty state. + % + %Number 1 is not so straitforward, because "putting back things the way they were" is difficult. For example, the deletion from CDAP succeeds, but Consul can't be reached. + %What happens? Do I *redeploy* the CDAP app to try to make their state as it was before the botched delete was called? + % + %My conclusion is that transactions across distributed systems is hard. It's much easier if it is all local (e.g., Transactions in a single Postgres DB) + % + %SO, as a result of this decision, the broker does *NOT* assert the status code of any delete operations to be 200. + %The only way this function does not return a 200 is if I can't even delete from my own database. + % + metrics(info, Req, [{bts, iso()}, {xer, XER}, {mod, mod()}, {msg, io_lib:format("Delete recieved for ~s", [Appname])}]), + case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of + none -> {404, "Tried to delete an application that was not registered", State}; + [AppType, Namespace] -> + try + case AppType of + <<"program-flowlet">> -> + ok = workflows:undeploy_cdap_app(Req, XER, Appname, ?CDAPURL, ?CONSURL, Namespace), + %delete from the program-flowlet supplementary table + {atomic, ok} = mnesia:transaction(fun() -> mnesia:delete(prog_flow_supp, Appname, write) end); + <<"hydrator-pipeline">> -> ok = workflows:undeploy_hydrator_pipeline(Req, XER, Appname, Namespace, ?CDAPURL, ?CONSURL) + end, + %delete from application table (shared between both types of apps) + {atomic, ok} = mnesia:transaction(fun() -> mnesia:delete(application, Appname, write) end), + {200, "", State} %Return + catch + %this is really bad, means I can't even delete from my own database. For now, log and pray. + %generic failure catch-all, catastrophic + Class:Reason -> + err(emergency, [{xer, XER}, {msg, io_lib:format("Catastrophic failure, can't delete ~s from my database. ~s:~s", [Appname, Class, Reason])}]), + err(error, [{xer, XER}, {msg, io_lib:format("~nError Stacktrace:~s", [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})])}]), + {500, "Please report this error", State} + end + end. + +%%%CALLBACKS %%% +init(_Route, _Req, State) -> + {ok, State}. +terminate(_Reason, _Route, _Req, _State) -> + ok. +%%%FOR Cors support +%%%Note! only matches on host. Does not handle ports. See: https://github.com/s1n4/leptus/issues/55 +cross_domains(_Route, _Req, State) -> + {['_'], State}. + +%%%GET Methods +get("/", Req, State) -> + %The broker's "info" endpoint; returns some possibly useful information + {Bts, XER} = init_api_call(Req), + Apps = util:get_all_appnames_from_db(), + {UT, _} = statistics(wall_clock), + CDAPVer = cdap_interface:get_cdap_cluster_version(XER, ?CDAPURL), + RB = {[ + {<<"cdap cluster version">>, CDAPVer}, + {<<"managed cdap url">>, ?CDAPURL}, + {<<"cdap GUI port">>, cdap_interface:get_cdap_gui_port_from_version(CDAPVer)}, + {<<"number of applications registered">>, length(Apps)}, + {<<"uptime (s)">>, UT/1000}, + {<<"broker API version">>, util:get_my_version()} + ]}, + {RCode, RBody, RState} = {200, {json, RB}, State}, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +get("/application", Req, State) -> + %get a list of all registered apps + {Bts, XER} = init_api_call(Req), + {RCode, RBody, RState} = {200, {json, util:get_all_appnames_from_db()}, State}, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +get("/application/:appname", Req, State) -> + %get information about a registered application + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + {RCode, RBody, RState} = appname_to_application_http(XER, Appname, State), + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +get("/application/:appname/metrics", Req, State) -> + %get metrics for a registered application + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of + none -> {404, "", State}; + [<<"program-flowlet">>, Namespace] -> + {ReturnCode, ReturnBody} = cdap_interface:get_app_metrics(XER, Appname, Namespace, ?CDAPURL), %warning, see note in README, this always reutrns 200 + {ReturnCode, {json, ReturnBody}, State}; + [<<"hydrator-pipeline">>, Namespace] -> + lager:warning("WARNING, metrics not actually implemented yet for pipelines!!"), + {ReturnCode, ReturnBody} = cdap_interface:get_pipeline_metrics(Appname, Namespace, ?CDAPURL), + {ReturnCode, {json, ReturnBody}, State} + end, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +get("/application/:appname/healthcheck", Req, State) -> + %get healthcheck of an application + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + lager:info(io_lib:format("Get Healthcheck recieved for ~s", [Appname])), + {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of + none -> {404, "", State}; + [<<"program-flowlet">>, Namespace] -> + {cdap_interface:get_app_healthcheck(XER, Appname, Namespace, ?CDAPURL), "", State}; + [<<"hydrator-pipeline">>, Namespace] -> + {cdap_interface:get_pipeline_healthcheck(XER, Appname, Namespace, ?CDAPURL, ?PipelineHealthLimit), "", State} + end, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}. + +%%%DELETE Methods +delete("/application/:appname", Req, State) -> + %Uninstall and delete a CDAP app + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + {RCode, RBody, RState} = delete_app_helper(Appname, State, XER, Req), + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}. + +%%%PUT Methods +put("/application/:appname", Req, State) -> + %create a new registration; deploys and starts a cdap application + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"appname">>]) of + [Appname] -> + {400, "Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first", State}; + none -> %no matches, create the resource, return the application record + %Initial put requires the put body parameters + case try parse_put_body(leptus_req:body_raw(Req)) catch _:_ -> invalid end of + %could not parse the body + invalid -> {400, "Invalid PUT Body or unparseable URL", State}; + + %unsupported cdap application type + unsupported -> {404, "Unsupported CDAP Application Type", State}; + + {Type, AppType, Params} -> + %form shared info + %hateaos cuz they aintaos + {RequestUrl,_} = cowboy_req:url((leptus_req:get_req(Req))), + Metricsurl = <>/binary>>, + Healthcheckurl = <>/binary>>, + + try + case Type of + hp -> + {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies} = Params, + ConnectionURL = cdap_interface:form_stream_url_from_streamname(?CDAPURL, Namespace, Streamname), + + %TODO: This! + ServiceEndpoints = [], %unclear if this is possible with pipelines + + %write into mnesia, deploy + A = #application{appname = Appname, apptype = AppType, namespace = Namespace, healthcheckurl = Healthcheckurl, metricsurl = Metricsurl, url = RequestUrl, connectionurl = ConnectionURL, serviceendpoints = ServiceEndpoints, creationtime=erlang:system_time()}, + {atomic,ok} = mnesia:transaction(fun() -> mnesia:write(A) end), + ok = workflows:deploy_hydrator_pipeline(Req, XER, Appname, Namespace, ?CDAPURL, PipelineConfigJsonURL, ParsedDependencies, ?CONSURL, RequestUrl, Healthcheckurl, ?HCInterval, ?AutoDeregisterAfter), + metrics(info, Req, [{bts, iso()}, {xer, XER}, {mod, mod()}, {msg, io_lib:format("New Hydrator Application Created: ~p", [lager:pr(A, ?MODULE)])}]), %see Record Pretty Printing: https://github.com/basho/lager + ok; + pf -> + {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences} = Params, + %Form URLs that are part of the record + %NOTE: These are both String concatenation functions and neither make an HTTP call so not catching normal {Code, Status} return here + ConnectionURL = cdap_interface:form_stream_url_from_streamname(?CDAPURL, Namespace, Streamname), + ServiceEndpoints = lists:map(fun(X) -> cdap_interface:form_service_json_from_service_tuple(Appname, Namespace, ?CDAPURL, X) end, ParsedServices), + + %write into mnesia. deploy + A = #application{appname = Appname, apptype = AppType, namespace = Namespace, healthcheckurl = Healthcheckurl, metricsurl = Metricsurl, url = RequestUrl, connectionurl = ConnectionURL, serviceendpoints = ServiceEndpoints, creationtime=erlang:system_time()}, + ASupplemental = #prog_flow_supp{appname = Appname, programs = Programs}, + {atomic,ok} = mnesia:transaction(fun() -> mnesia:write(A) end), %warning, here be mnesia magic that knows what table you want to write to based on the record type + {atomic,ok} = mnesia:transaction(fun() -> mnesia:write(ASupplemental) end), %warning: "" + ok = workflows:deploy_cdap_app(Req, XER, Appname, ?CONSURL, ?CDAPURL, ?HCInterval, ?AutoDeregisterAfter, AppConfig, JarURL, ArtifactName, ArtifactVersion, Namespace, AppPreferences, ParsedProgramPreferences, Programs, RequestUrl, Healthcheckurl), + metrics(info, Req, [{bts, iso()}, {xer, XER}, {mod, mod()}, {msg, io_lib:format("New Program-Flowlet Application Created: ~p with supplemental data: ~p", [lager:pr(A, ?MODULE), lager:pr(ASupplemental, ?MODULE)])}]), + ok + end, + appname_to_application_http(XER, Appname, State) + + catch + %catch a bad HTTP error code + error:{badmatch, {BadErrorCode, BadStatusMsg}} -> + err(error, [{xer, XER}, {msg, io_lib:format("Badmatch caught in Deploy. Rolling Back. ~p ~s", [BadErrorCode, BadStatusMsg])}]), + {_,_,_} = delete_app_helper(Appname, State, XER, Req), + {BadErrorCode, BadStatusMsg, State}; %pass the bad error/status back to user + Class:Reason -> + %generic failure catch-all, catastrophic + err(error, [{xer, XER}, {msg, io_lib:format("~nUnexpected Exception caught in Deploy. Error Stacktrace:~s", [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})])}]), + {_,_,_} = delete_app_helper(Appname, State, XER, Req), + {500, "Please report this error", State} + end + end + end, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +put("/application/:appname/reconfigure", Req, State) -> + %if appname already is registerd, trigger a consul pull and reconfigure + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"namespace">>]) of + none -> {404, "Reconfigure recieved but the app is not registered", State}; + [Namespace] -> + D = jiffy:decode(leptus_req:body_raw(Req), [return_maps]), + case try maps:get(<<"config">>, D) catch _:_ -> invalid end of + invalid -> {400, "Invalid PUT Reconfigure Body: key 'config' is missing", State}; + Config -> + case try maps:get(<<"reconfiguration_type">>, D) catch _:_ -> invalid end of + invalid -> {400, "Invalid PUT Reconfigure Body: key 'reconfiguration_type' is missing", State}; + <<"program-flowlet-app-config">> -> + %reconfigure a program-flowlet style app's app config + try + ok = workflows:app_config_reconfigure(Req, XER, Appname, Namespace, ?CONSURL, ?CDAPURL, Config), + {200, "", State} + catch Class:Reason -> + err(error, [{xer,XER}, {msg, io_lib:format("~nError Stacktrace:~s", [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})])}]), + {500, "", State} + end; + <<"program-flowlet-app-preferences">> -> + %reconfigure a program-flowlet style app's app config + try + ok = workflows:app_preferences_reconfigure(Req, XER, Appname, Namespace, ?CONSURL, ?CDAPURL, Config), + {200, "", State} + catch Class:Reason -> + err(error, [{xer,XER}, {msg, io_lib:format("~nError Stacktrace:~s", [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})])}]), + {500, "", State} + end; + <<"program-flowlet-smart">> -> + %try to "figure out" whether the supplied JSON contains keys in appconfig, app preferences, or both + try + ok = workflows:smart_reconfigure(Req, XER, Appname, Namespace, ?CONSURL, ?CDAPURL, Config), + {200, "", State} + catch + %catch a bad HTTP error code; also catches the non-overlapping configuration case + error:{badmatch, {BadErrorCode, BadStatusMsg}} -> + err(error, [{xer, XER}, {msg, io_lib:format("~p ~s", [BadErrorCode, BadStatusMsg])}]), + {BadErrorCode, BadStatusMsg, State}; + Class:Reason -> + err(error, [{xer,XER}, {msg, io_lib:format("~nError Stacktrace:~s", [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})])}]), + {500, "", State} + end; + NI -> + %TODO! Implement other types of reconfig once CDAP APIs exis + {501, io_lib:format("This type (~s) of reconfiguration is not implemented", [NI]), State} + end + end + end, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}. + +%%%POST methods +post("/application/delete", Req, State) -> + %This follows the AWS S3 Multi Key Delete: http://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html + %Except I added an additional special value called "*" + {Bts, XER} = init_api_call(Req), + {RCode, RBody, RState} = case try + B = maps:get(<<"appnames">>, jiffy:decode(leptus_req:body_raw(Req), [return_maps])), + true = erlang:is_list(B), + B + catch _:_ -> + invalid + end + of + invalid -> {400, "Invalid PUT Body", State}; + IDs -> + case IDs of + [] -> {200, "EMPTY PUT BODY", State}; + _ -> + %<<"*">> -> + %this block deleted all apps, but decided this backdoor wasn't very RESTy + %% {atomic, Apps} = mnesia:transaction(fun() -> mnesia:match_object(application, {application, '_', '_', '_', '_', '_', '_', '_', '_', '_'}, read) end), + % AppsToDelete = lists:map(fun(X) -> {application, Appname, _,_,_,_,_,_,_,_} = X, Appname end, Apps), + Returns = lists:map(fun(X) -> delete_app_helper(X, State, XER, Req) end, IDs), + RL = lists:map(fun({RC, _, _}) -> RC end, Returns), + {200, jiffy:encode(RL), State} + end + end, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}. diff --git a/src/util.erl b/src/util.erl new file mode 100644 index 0000000..d96675b --- /dev/null +++ b/src/util.erl @@ -0,0 +1,192 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +-module(util). +-include("application.hrl"). + +-export([concat/1, + get_platform_envs_and_config/0, + resolve_cbs/2, + initialize_database/0, + get_all_appnames_from_db/0, + get_my_version/0, + get_programs_for_pfapp_from_db/1, + gen_uuid/0, + iso/0, + iso_elapsed/2, + to_str/1, + ip_to_str/1, + update_with_new_config_map/2, + ejson_to_map/1 + ]). + +%http://stackoverflow.com/questions/39757020/erlang-drying-up-stringbinary-concatenation +%NOTE! Does not work or bomb when an element in the list is an atom. Must be a string or binary. Maybe add a check for this +to_string(Value) when is_binary(Value) -> binary_to_list(Value); +to_string(Value) -> Value. +concat(List) -> + lists:flatten(lists:map(fun to_string/1, List)). + +resolve_cbs(XER, ConsulURL) -> + %Ideally this function would dissapear if we get real DNS. This essentially is doing an SRV record lookup every time someone needs the bindng URL + %This allows the broker to handle the case where the CBS moves IP or Ports + %New as of 6/28/17: Uses the hardcoded short name for the CBS + {IP, Port} = consul_interface:consul_get_service_ip_port(XER, "config_binding_service", ConsulURL), + concat(["http://", IP, ":", integer_to_binary(Port)]). + +get_platform_envs_and_config() -> + %Get platform envs needed for broker operation, then fetch my config. + %If something critical fails, returns [], else [ConsulURL, CDAPUrl, BoundConfigMap] + MyName = os:getenv("HOSTNAME"), + ConsulHost = os:getenv("CONSUL_HOST"), + case MyName == false orelse ConsulHost == false of + true -> []; + false -> + %build Consul URL + ConsulURL = concat(["http://", ConsulHost, ":8500"]), + + %Bind my own config map + %generate my own XER here + XER = gen_uuid(), + {200, BoundConfig} = consul_interface:consul_bind_config(XER, MyName, ConsulURL), + BoundConfigMap = jiffy:decode(jiffy:encode(BoundConfig), [return_maps]), %kind of an interesting way to turn an erlang proplist into a map + + %Here, we waterfall looking for "CDAP_CLUSTER_TO_MANAGE". + %First, we will check for environmnental variables for a cluster *NAME* + %If that is not found, then we will check out bound config for a fully bound URL + %If that is also not found, let it crash baby. + CDAPURL = case os:getenv("CDAP_CLUSTER_TO_MANAGE") of + false -> + list_to_binary(concat(["http://", lists:nth(1, maps:get(<<"cdap_cluster_to_manage">>, BoundConfigMap))])); %cbs returns ip:port. need http:// or will get "no adaptors found" error + CDAPName -> + {IP, Port} = consul_interface:consul_get_service_ip_port(XER, CDAPName, ConsulURL), + list_to_binary(concat(["http://", IP, ":", integer_to_binary(Port)])) + end, + [MyName, ConsulURL, CDAPURL, BoundConfigMap] + end. + +initialize_database() -> + %Create the database (currently MNesia) if it does not exist, and the application table. + %Or, do nothing. + N = node(), + lager:info(io_lib:format("Initializing database. My node name is ~s", [N])), + + %set MNesia dir + application:set_env(mnesia, dir, "/var/mnesia/"), + + %stop if running, can't create schema if it is. Dont check status, OK if stopped + mnesia:stop(), + + %create the schema if it does not already exist. Dont check status, ok if exists + %erlang:display(mnesia:delete_schema([N])), + mnesia:create_schema([N]), + %start MNesia, assert it works + + ok = mnesia:start(), %start MNesia, bomb if alreay started, should not happen + lager:info("Mnesia started"), + + %try to create the table, or if it exists, do nothing + %erlang:display(mnesia:delete_table(application)), + case mnesia:create_table(application, [{attributes, record_info(fields, application)}, {disc_copies, [N]}]) of + {aborted,{already_exists,application}} -> + lager:info("Application table already exists"); + {atomic,ok} -> + lager:info(io_lib:format("Created application table on ~s", [N])) + end, + + %try to create the app supplementaty table, or if it exists, do nothing + %erlang:display(mnesia:delete_table(application)), + case mnesia:create_table(prog_flow_supp, [{attributes, record_info(fields, prog_flow_supp)}, {disc_copies, [N]}]) of + {aborted,{already_exists, prog_flow_supp}} -> + lager:info("prog_flow_supp table already exists"); + {atomic,ok} -> + lager:info(io_lib:format("Created prog_flow_supp table on ~s", [N])) + end, + + %wait up to 30s for the table to come up. Usually instantaneous. If it takes more crash abd burn + ok = mnesia:wait_for_tables([application, prog_flow_supp], 30000), + ok. + +get_all_appnames_from_db() -> + {atomic, Apps} = mnesia:transaction(fun() -> mnesia:match_object(application, #application{_ = '_'}, read) end), + lists:map(fun(X) -> {application, Appname,_,_,_,_,_,_,_,_} = X, + Appname + end, Apps). + +-spec get_programs_for_pfapp_from_db(binary()) -> lprogram(). +get_programs_for_pfapp_from_db(Appname) -> + {atomic, [#prog_flow_supp{appname = Appname, programs=Programs}]} = mnesia:transaction(fun() -> mnesia:match_object(prog_flow_supp, #prog_flow_supp{appname = Appname, _ = '_'}, read) end), + Programs. + +get_my_version() -> + %stolen from the SO post I asked about: http://stackoverflow.com/questions/43147530/erlang-programmatically-get-application-version/43152182#43152182 + case lists:keyfind(cdapbroker, 1, application:loaded_applications()) of + {_, _, Ver} -> list_to_binary(Ver); + false -> <<"error">> + end. + +gen_uuid() -> + %generate an RFC compliant v1 uuid using lib + uuid:to_string(uuid:uuid1()). + +iso() -> + %generate 8601 ts + iso8601:format(erlang:timestamp()). + +iso_elapsed(Endtime, Starttime) -> + %%%...subtract two isos and return the number of seconds elapsed between Starttime and Endtime + Edt = iso8601:parse(Endtime), + Sdt = iso8601:parse(Starttime), + Egs = calendar:datetime_to_gregorian_seconds(Edt), + Sgs = calendar:datetime_to_gregorian_seconds(Sdt), + Egs - Sgs. + +to_str("") -> ""; +to_str(Term) -> lists:flatten(io_lib:format("~p", [Term])). + +-spec ip_to_str({inet:ip_address(), inet:port_number()}) -> binary(). +%nasty.. I miss pythons x <= Foo <= Y syntax.. or something mathematical like Foo in [X..Y].. erlang not good 4 math +ip_to_str({{A,B,C,D}, Port}) when A >= 0 andalso A =< 255 andalso B >= 0 andalso B =< 255 andalso C >= 0 andalso C =< 255 andalso D >= 0 andalso D =< 255 andalso port >= 0 andalso Port =<65535 -> + concat([to_str(A),".",to_str(B),".", to_str(C),".",to_str(D),":",to_str(Port)]); +ip_to_str({_,_}) -> invalid. + +update_with_new_config_map(NewConfig, OldConfig) -> + %helper for smart_reconfigure, broken out so we can unit test it. + % + %Takes in a new config, some keys in which may be shared with OldConfig, and returns a new map with the same keys as OldConfig, except values that had overlap were replaced by NewConfig + %if no keys in NewConfig overlap with OldConfig, returns the atom 'nooverlap' + % + %This is very similar to the maps:merge/2 builtin but that will inject keys of newconfig that were not in oldconfig. We need a "RIGHT JOIN" + NCKeys = maps:keys(NewConfig), + ConfigOverlaps = [X || X <- NCKeys, maps:is_key(X, OldConfig)], + case ConfigOverlaps of + [] -> nooverlap; + _ -> + %we have an entry that should be in app config + %build a new map with just the keys to update + Pred = fun(X,_) -> lists:member(X, ConfigOverlaps) end, + NewVals = maps:filter(Pred, NewConfig), + maps:merge(OldConfig, NewVals) + end. + +ejson_to_map(E) -> + %takes the jiffy "ejson: format of {[{<<"foo">>, <<"bar">>}, {<<"foo2">>, <<"bar2">>}]} and turns it into a map, + %usefu because ejsons do not appear to be order-independent-comparable, but maps are (e.g., two maps are equal if all their k+v are equal but agnostic to order) + jiffy:decode(jiffy:encode(E), [return_maps]). diff --git a/src/util_tests.erl b/src/util_tests.erl new file mode 100644 index 0000000..e37e492 --- /dev/null +++ b/src/util_tests.erl @@ -0,0 +1,53 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +-module(util_tests). +-include_lib("eunit/include/eunit.hrl"). +-import(util, [ + iso_elapsed/2, + ip_to_str/1, + update_with_new_config_map/2, + ejson_to_map/1]). + +iso_elapsed_test() -> + ?assert(iso_elapsed(<<"2017-04-27T18:38:10Z">>, <<"2017-04-27T18:38:08Z">>) == 2), + ?assert(iso_elapsed(<<"2017-04-29T18:38:10Z">>, <<"2017-04-27T18:38:08Z">>) == 60*60*24*2+2). + +ip_to_str_test() -> + ?assert(ip_to_str({{6,6,6,6}, 666}) == "6.6.6.6:666"), + ?assert(ip_to_str({{196,196,196,196}, 1}) == "196.196.196.196:1"), + ?assert(ip_to_str({{6,6,6,6666}, 666}) == invalid), + ?assert(ip_to_str({{6,6,6,6}, 66666}) == invalid), + ?assert(ip_to_str({{6,6,-6,6}, 666}) == invalid), + ?assert(ip_to_str({{6,6,six,6}, 666}) == invalid). + +update_with_new_config_map_test() -> + ?assert(update_with_new_config_map(#{<<"foo">>=><<"smartbar">>, <<"preffoo">>=><<"smartprefbar">>}, #{<<"foo">>=><<"bar">>}) == #{<<"foo">>=><<"smartbar">>}), + ?assert(update_with_new_config_map(#{<<"fooD">>=><<"smartbar">>}, #{<<"foo">>=><<"bar">>}) == nooverlap), + ?assert(update_with_new_config_map(#{<<"foo">>=><<"smartbar">>,<<"foo2">>=><<"smartbar2">>}, #{<<"foo">>=><<"bar">>, <<"foo2">>=><<"bar2">>}) == #{<<"foo">>=><<"smartbar">>, <<"foo2">>=><<"smartbar2">>}). + +ejson_to_map_test() -> + EJ1 = {[{<<"foo">>, <<"bar">>}, {<<"foo2">>, <<"bar2">>}]}, + EJ2 = {[{<<"foo2">>, <<"bar2">>}, {<<"foo">>, <<"bar">>}]}, + M1 = ejson_to_map(EJ1), + M2 = ejson_to_map(EJ2), + ?assert(EJ1 /= EJ2), %HERE LIES THE PROBLEM HUDSON + ?assert(M1 == M2). %GREAT SUCCESS! + diff --git a/src/workflows.erl b/src/workflows.erl new file mode 100644 index 0000000..a8c6abb --- /dev/null +++ b/src/workflows.erl @@ -0,0 +1,324 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +-module(workflows). + +%Module holds functions that execute big workflows, like deploying a CDAP application. + +-include("application.hrl"). +-export([deploy_cdap_app/17, %super offensive arity.. should probably start using some structs to cut this + undeploy_cdap_app/6, + undeploy_hydrator_pipeline/6, + deploy_hydrator_pipeline/12, + all_200s_else_showerror/2, + app_config_reconfigure/7, + app_preferences_reconfigure/7, + smart_reconfigure/7 + ]). + +-import(util, [iso/0, to_str/1]). +-import(logging, [metrics/3]). + +-define(MET(Sev, Req, Bts, XER, TgtE, TgtS, TgtRSC, Msg), metrics(Sev, Req, [{bts, Bts}, {xer,XER}, {tgte, TgtE}, {tgts, TgtS}, {tgtrsc, TgtRSC}, {mod, to_str(?MODULE)}, {msg, Msg}])). +-define(CDAPE, "cdap cluster"). +-define(CNSE, "consul cluster"). + +%private +attempt( Req, XER, { Mod, Func, Args }, ServiceName, Action, LogResponse) -> + %Thanks Garry!! + %Helper function to + %1. log the start timestamp + %2. Do an action specificed by mod:func(args). Assumes XER always first arg + %3. Log a metrics info statement about the API cll + %4. assert the return code was a 200, let it crash otehrwise, caller catches + Start = iso(), + {RC, RB} = apply( Mod, Func, [XER | Args] ), + ?MET(info, Req, Start, XER, ServiceName, Action, RC, case LogResponse of true -> to_str(RB); false -> "" end), + {RC, RB}. + +%public +-spec all_200s_else_showerror(fun((any()) -> httpstat()), list()) -> httpstat(). +all_200s_else_showerror(FClosure, ListToMap) -> + %Takes a "partial" with the spec: f(X) -> {HTTP_Status_Code, HTTP_Response}, maps it onto ListToMap, and either + %returns {200, ""} or else the first error encountered after executing the entire list (does not short circuit!) + % + %I say "partial" because there are no real "partials" in Erlang but you can make them using Closure's out of funs (anonymous functions), so FClosure is a Closure just waiting for the last argument + %See: + %https://www.google.com/url?sa=t&rct=j&q=&esrc=s&source=web&cd=1&ved=0ahUKEwjtyeiC6LbSAhVH0FQKHffhAr0QFggcMAA&url=http%3A%2F%2Fstackoverflow.com%2Fquestions%2F13355544%2Ferlang-equivalents-of-haskell-where-partial-lambda&usg=AFQjCNHnEZjQHtQhKXN67DBKKoJpqRXztg&cad=rja + %http://stackoverflow.com/questions/16183971/currying-functions-erlang + L = lists:filter(fun({X, _}) -> X /= 200 end, lists:map(FClosure, ListToMap)), + case L of + [] -> {200, ""}; + _ -> lists:nth(1, L) + end. + +deploy_cdap_app(Req, XER, Appname, ConsulURL, CDAPURL, HCInterval, AutoDeregisterAfter, AppConfig, JarURL, ArtifactName, ArtifactVersion, Namespace, AppPreferences, ParsedProgramPreferences, Programs, RequestUrl, Healthcheckurl) -> + %push the UNBOUND config and preferences into Consul. + %I don't think we should push bound configs because triggering a "rebind" will suffer if the templating language is lost. + {200,_} = attempt(Req, XER, { consul_interface, consul_push_config, [ Appname, ConsulURL, AppConfig ] }, ?CNSE, "push config", true), + + %push the preferences + {200,_} = attempt(Req, XER, { consul_interface, consul_push_preferences, [ Appname, ConsulURL, AppPreferences ] }, ?CNSE, "push preferences", true), + + %get the bound config + {200,BoundConfig} = attempt(Req, XER, { consul_interface, consul_bind_config, [ Appname, ConsulURL ] }, "config binding service", "bind config", false), + + %fetch the JAR. + {200,JarBody} = attempt(Req, XER, { httpabs, get, [ JarURL ] }, "nexus", "file get", false), + + %create the Namespace + {200,_} = attempt( Req, XER, { cdap_interface, create_namespace, [ Namespace, CDAPURL ] }, ?CDAPE, "create namespace", true), + + %deploy the application + {200,_} = attempt( Req, XER, { cdap_interface, deploy_app, [ Appname, Namespace, CDAPURL, JarBody, ArtifactName, ArtifactVersion, BoundConfig ] }, ?CDAPE, "deploy application", true), + + %set app preferences + {200,_} = attempt( Req, XER, { cdap_interface, push_down_app_preferences, [ Appname, Namespace, CDAPURL, AppPreferences ] }, ?CDAPE, "set app preferences", true), + + %push down the program preferences + {200,_} = attempt( Req, XER, { cdap_interface, push_down_program_preferences, [ Appname, Namespace, CDAPURL, ParsedProgramPreferences ] }, ?CDAPE, "set program preferences", true), + + %start the CDAP application services + {200,_} = attempt( Req, XER, { cdap_interface, exec_programs, [ Appname, Namespace, CDAPURL, Programs, "start" ] }, ?CDAPE, "start program", true), + + %Parse my IP and port + {ok, {http, _, IPaS, Port, _, _}} = http_uri:parse(binary_to_list(RequestUrl)), + + %register with Consul; We will register the broker's URL as the address in Consul then the upstream service can do a GET on this Broker to get the resource object + {200,_} = attempt( Req, XER, { consul_interface, consul_register, [ Appname, ConsulURL, list_to_binary(IPaS), Port, Healthcheckurl, HCInterval, AutoDeregisterAfter ] }, ?CNSE, "service register", true), + + ok. %if got to here, all went well. + +-spec undeploy_cdap_app(any(), string(), binary(), string(), string(), binary()) -> ok. +undeploy_cdap_app(Req, XER, Appname, CDAPURL, ConsulURL, Namespace) -> + %stop the CDAP programs assuming they are valid + Programs = util:get_programs_for_pfapp_from_db(Appname), + Bts = iso(), %record begining timestamp + {RC, RB} = cdap_interface:exec_programs(XER, Appname, Namespace, CDAPURL, Programs, "stop"), + case RC of + 200 -> ?MET(info, Req, Bts, XER, ?CDAPE, "program stop", RC, "OK"); + 400 -> ?MET(warning, Req, Bts, XER, ?CDAPE, "program stop", RC, io_lib:format("Delete called on ~s but it's programs were not running, probably indicates the app crashed in some way: ~s", [Appname, RB])); + 404 -> ?MET(warning, Req, Bts, XER, ?CDAPE, "program stop", RC, io_lib:format("Delete called on ~s but it's gone, probably indicates a horrible manual deletion from CDAP: ~s", [Appname, RB])); + _ -> ?MET(warning, Req, Bts, XER, ?CDAPE, "program stop", RC, io_lib:format("Delete called on ~s but CDAP returned a ~p. This likely means things will NOT be cleaned up properly!! ~s", [Appname, RC, RB])) + end, + + %delete the application + Bts2 = iso(), + {RC2, RB2} = cdap_interface:delete_app(XER, Appname, Namespace, CDAPURL), + case RC2 of + 200 -> ?MET(info, Req, Bts2, XER, ?CDAPE, "app delete", RC2, "OK"); + 404 -> ?MET(warning, Req, Bts2, XER, ?CDAPE, "app delete", RC2, io_lib:format("Delete called on ~s but it's gone, probably indicates a horrible manual deletion from CDAP: ~s", [Appname, RB2])); + _ -> ?MET(warning, Req, Bts2, XER, ?CDAPE, "app delete", RC2, io_lib:format("Delete called on ~s but CDAP returned a ~p. This likely means things will NOT be cleaned up properly!! ~s", [Appname, RC2, RB2])) + end, + + %deregister with consul + Bts3 = iso(), + {RC3, RB3} = consul_interface:consul_deregister(XER, Appname, ConsulURL), + case RC3 of + 200 -> ?MET(info, Req, Bts3, XER, ?CNSE, "service deregister", RC3, "OK"); + _ -> ?MET(warning, Req, Bts3, XER, ?CNSE, "service deregister", RC3, io_lib:format("Delete called on ~s but Consul returned a ~p. This likely means a service is not cleaned up properly! ~s", [Appname, RC3, RB3])) + end, + + %delete the config key stored earlier + Bts4 = iso(), + {RC4, RB4} = consul_interface:consul_delete_config(XER, Appname, ConsulURL), + case RC4 of + 200 -> ?MET(info, Req, Bts4, XER, ?CNSE, "key (config) delete", RC4, "OK"); + 404 -> ?MET(warning, Req, Bts4, XER, ?CNSE, "key (config) delete", RC4, io_lib:format("Delete called on ~s but it's consul key is gone, probably indicates a horrible manual deletion from Consul: ~s", [Appname, RB4])); + _ -> ?MET(warning, Req, Bts4, XER, ?CNSE, "key (config) delete", RC4, io_lib:format("Delete called on ~s but Consul returned a ~p. This likely means a key is not cleaned up properly! ~s", [Appname, RC4, RB4])) + end, + + %delete the config key stored earlier + Bts5 = iso(), + {RC5, RB5} = consul_interface:consul_delete_preferences(XER, Appname, ConsulURL), + case RC5 of + 200 -> ?MET(info, Req, Bts5, XER, ?CNSE, "key (preferences) delete", RC5, "OK"); + 404 -> ?MET(warning, Req, Bts5, XER, ?CNSE, "key (preferences) delete", RC5, io_lib:format("Delete called on ~s but it's consul key is gone, probably indicates a horrible manual deletion from Consul: ~s", [Appname, RB5])); + _ -> ?MET(warning, Req, Bts5, XER, ?CNSE, "key (preferences) delete", RC5, io_lib:format("Delete called on ~s but Consul returned a ~p. This likely means a key is not cleaned up properly! ~s", [Appname, RC5, RB5])) + end, + + ok. + +deploy_hydrator_pipeline(Req, XER, Appname, Namespace, CDAPURL, PipelineConfigJsonURL, Dependencies, ConsulURL, RequestUrl, Healthcheckurl, HCInterval, AutoDeregisterAfter) -> + %fetch the JSON + {200,PipelineJson} = attempt(Req, XER, { httpabs, get, [ PipelineConfigJsonURL ] }, "nexus", "file get", false), + + %TODO! Config + + %create the Namespace + {200,_} = attempt( Req, XER, { cdap_interface, create_namespace, [ Namespace, CDAPURL ] }, ?CDAPE, "create namespace", true), + + %deploy pipeline dependencies% + {200,_} = attempt( Req, XER, { cdap_interface, deploy_pipeline_dependencies, [ Namespace, CDAPURL, Dependencies ] }, ?CDAPE, "deploy dependencies", true), + + %deploy pipeline dependencies UI properties + %NOTE! There is a bit of redundancy with the above call. I debated merging the two. + %I decided against it because I want failures to load the deps seperated from failures to load the properties files, because they are different URLs. + %Splitting them like this allows me to return the error to the user on the exact step that failed + {200,_} = attempt( Req, XER, { cdap_interface, deploy_pipeline_dependencies_properties, [ Namespace, CDAPURL, Dependencies ] }, ?CDAPE, "deploy dependency properties", true), + + %deploy the pipeline + {200,"Deploy Complete"} = attempt( Req, XER, { cdap_interface, deploy_pipeline, [ Appname, Namespace, CDAPURL, PipelineJson ] }, ?CDAPE, "deploy pipeline", true), + + %start the pipeline + {200,_} = attempt( Req, XER, { cdap_interface, exec_pipeline, [ Appname, Namespace, CDAPURL, "resume" ] }, ?CDAPE, "start pipeline", true), + + %Parse my IP and port + {ok, {http, _, IPaS, Port, _, _}} = http_uri:parse(binary_to_list(RequestUrl)), + + %register with Consul; We will register the broker's URL as the address in Consul, then the upstream service can do a GET on this Broker to get the resource object + {200,_} = attempt( Req, XER, { consul_interface, consul_register, [ Appname, ConsulURL, list_to_binary(IPaS), Port, Healthcheckurl, HCInterval, AutoDeregisterAfter] }, ?CNSE, "service register", true), + + ok. + +undeploy_hydrator_pipeline(Req, XER, Appname, Namespace, CDAPURL, ConsulURL) -> + %UNDEPLOY NOTES: + % 1 Never fail on undeploy, log and continue. + % 2 Leave artifact dependencies on the cluster. We can revisit this if we need a "LEAVE NO TRACE" solution. TODO. + % 3 I noticed an asymetry in deploy/undeplopy here: there is no need to start workflows. Terry clarified this is correct: "Batch pipelines contain a schedule, but deploying the pipeline does not activate the schedule. Resuming the schedule makes it active so the pipeline will run at its next scheduled time. When undeploying, if you only suspend the schedule which prevents future runs from starting, then any currently active runs will continue until they finish (or not finish if they are hung). So we follow up with a stop workflow to kill any run that may be in progress so the following commands will not fail (delete pipeline or delete namespace). + %We avoid a race condition by suspending the schedule first. + + %suspend the pipeline + Bts = iso(), + {RC1, RB1} = cdap_interface:exec_pipeline(XER, Appname, Namespace, CDAPURL, "suspend"), + case RC1 of + 200 -> ?MET(info, Req, Bts, XER, ?CDAPE, "pipeline suspend", RC1, "OK"); + 400 -> ?MET(warning, Req, Bts, XER, ?CDAPE, "pipeline suspend", RC1, io_lib:format("Pipeline suspend called on ~s but it's was not running, probably OK, probably it is on a schedule ~s", [Appname, RB1])); + 404 -> ?MET(warning, Req, Bts, XER, ?CDAPE, "pipeline suspend", RC1, io_lib:format("Pipeline suspend called on ~s but it's gone, probably indicates a horrible manual deletion from CDAP: ~s", [Appname, RB1])); + _ -> ?MET(warning, Req, Bts, XER, ?CDAPE, "pipeline suspend", RC1, io_lib:format("Pipeline suspend called on ~s but CDAP unexpectedly return a ~p. This likely means things will NOT be cleaned up properly!! ~s", [Appname, RC1, RB1])) + end, + + %stop the workflow + Bts2 = iso(), + {RC2, RB2} = cdap_interface:exec_pipeline_workflow(XER, Appname, Namespace, CDAPURL, "stop"), + case RC2 of + 200 -> ?MET(info, Req, Bts2, XER, ?CDAPE, "workflow stop", RC2, "OK"); + 400 -> ?MET(warning, Req, Bts2, XER, ?CDAPE, "workflow stop", RC2, io_lib:format("Workflow stop called on ~s but it's was not running, probably OK, probably it is on a schedule ~s", [Appname, RB2])); + 404 -> ?MET(warning, Req, Bts2, XER, ?CDAPE, "workflow stop", RC2, io_lib:format("Workflow stop called on ~s but it's gone, probably indicates a horrible manual deletion from CDAP: ~s", [Appname, RB2])); + _ -> ?MET(warning, Req, Bts2, XER, ?CDAPE, "workflow stop", RC2, io_lib:format("Workflow stop called on ~s but CDAP unexpectedly return a ~p. This likely means things will NOT be cleaned up properly!! ~s", [Appname, RC2, RB2])) + end, + + %?MET(warning, Req, Bts2, XER, ?CDAPE, "workflow stop", RC2, io_lib:format()); + + %TODO! Delete config (Configs are currently not pushed for hydrator pipelines, so have to do that first) + + %delete the application + Bts3 = iso(), + {RC3, RB3} = cdap_interface:delete_app(XER, Appname, Namespace, CDAPURL), + case RC3 of + 200 -> ?MET(info, Req, Bts3, XER, ?CDAPE, "app delete", RC3, "OK"); + 404 -> ?MET(warning, Req, Bts3, XER, ?CDAPE, "app delete", RC3, io_lib:format("Delete called on ~s but it's gone, probably indicates a horrible manual deletion from CDAP: ~s", [Appname, RB3])); + _ -> ?MET(warning, Req, Bts3, XER, ?CDAPE, "app delete", RC3, io_lib:format("Delete called on ~s but CDAP returned a ~p. This likely means things will NOT be cleaned up properly!! ~s", [Appname, RC3, RB3])) + end, + + %deregister with consul + Bts4 = iso(), + {RC4, RB4} = consul_interface:consul_deregister(XER, Appname, ConsulURL), + case RC4 of + 200 -> ?MET(info, Req, Bts4, XER, ?CNSE, "service deregister", RC4, "OK"); + _ -> ?MET(warning, Req, Bts4, XER, ?CNSE, "service deregister", RC3, io_lib:format("Delete called on ~s but Consul returned a ~p. This likely means a service is not cleaned up properly! ~s", [Appname, RC4, RB4])) + end, + ok. + +app_config_reconfigure(Req, XER, Appname, Namespace, ConsulURL, CDAPURL, AppConfig) -> + %Reconfigure CDAP App's App Config + + %push the UNBOUND config into Consul. I don't think we should push bound configs because triggering a "rebind" will suffer if the templating language is lost. + {200,_} = attempt( Req, XER, { consul_interface, consul_push_config, [ Appname, ConsulURL, AppConfig ] }, ?CNSE, "push config", true), + + %get the bound config + {200,BoundConfig} = attempt(Req, XER, { consul_interface, consul_bind_config, [ Appname, ConsulURL ] }, "config binding service", "bind config", false), + + %push it to CDAP + %TODO! What happens when we push to consul but connection to CDAP fails? Then CDAP and Consul are out of sync. + %Maybe create a "BACKUP" key in Consul for the old config and "rollback" if the below fails + %Transactions across distributed systems is hard =( + {200,_} = attempt( Req, XER, { cdap_interface, push_down_config, [ Appname, Namespace, CDAPURL, BoundConfig ] }, ?CDAPE, "reconfigure app config", true), + + ok. + +app_preferences_reconfigure(Req, XER, Appname, Namespace, ConsulURL, CDAPURL, AppPreferences) -> + %Workflow: + % 1) push the new preferences to Cosnul + % 2) stop all the programs + % 3) push the programs to CDAP + % 4) start all the programs + % + % NOTE! Currently it is assumed that preferences do not need to be bound by the config_binding_service, + % as only app config contains service discovery items. + + Programs = util:get_programs_for_pfapp_from_db(Appname), + + %1 push the new prefs up to Consul + {200,_} = attempt(Req, XER, { consul_interface, consul_push_preferences, [ Appname, ConsulURL, AppPreferences ] }, ?CNSE, "push preferences", true), + + %2 stop the programs + {200,_} = attempt( Req, XER, { cdap_interface, exec_programs, [ Appname, Namespace, CDAPURL, Programs, "stop" ] }, ?CDAPE, "stop programs", true), + + %3 set app preferences + {200,_} = attempt( Req, XER, { cdap_interface, push_down_app_preferences, [ Appname, Namespace, CDAPURL, AppPreferences ] }, ?CDAPE, "set app preferences", true), + + %4 start er' up again + {200,_} = attempt( Req, XER, { cdap_interface, exec_programs, [ Appname, Namespace, CDAPURL, Programs, "start" ] }, ?CDAPE, "start program", true), + + ok. + +smart_reconfigure(Req, XER, Appname, Namespace, ConsulURL, CDAPURL, NewConfig) -> + %Smart reconfigure takes in a JSON (NewConfig) and tries to be "smart"; it tries to figure out whether Config is a reconfiguration of + %app config, app preferences, or both. + % + %Specifically this workflow works as follows; + %1) pull down AppConfig in consul + %2) pull down Prefernces in consul + %3) see if any keynames in this function's input (NewConfig) are keynames in AppConfig + % 3a if so, reconfigure it + % 3b write the delta'd AppConfig back to consul + %4) see if any keynames in this fucntion's input (NewConfig) are keynames in Preferences + % 4a if so, reconfigure ppreferences + % 4b write the delta'd preferences back to consul + %5) Return a status + + + %see if we have app config overlaps + {200, ConsulAppConfig} = consul_interface:consul_get_configuration(XER, Appname, ConsulURL), + NewAppConfig = util:update_with_new_config_map(NewConfig, ConsulAppConfig), + WasNewAppConfig = case NewAppConfig of + nooverlap -> nooverlap; + _ -> + ok = app_config_reconfigure(Req, XER, Appname, Namespace, ConsulURL, CDAPURL, NewAppConfig) + end, + + %see if we have preferences overlap + {200, ConsulPreferences} = consul_interface:consul_get_preferences(XER, Appname, ConsulURL), + NewAppPreferences = util:update_with_new_config_map(NewConfig, ConsulPreferences), + WasNewAppPreferences = case NewAppPreferences of + nooverlap -> nooverlap; + _ -> + ok = app_preferences_reconfigure(Req, XER, Appname, Namespace, ConsulURL, CDAPURL, NewAppPreferences) + end, + + case WasNewAppConfig == nooverlap andalso WasNewAppPreferences == nooverlap of + true -> + {400, "non-overlapping configuration was sent"}; + false -> + ok + end. + + diff --git a/src/workflows_tests.erl b/src/workflows_tests.erl new file mode 100644 index 0000000..1b7b51c --- /dev/null +++ b/src/workflows_tests.erl @@ -0,0 +1,27 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. + +-module(workflows_tests). +-include_lib("eunit/include/eunit.hrl"). + +all_200s_else_showerror_test() -> + ?assert({200, ""} == workflows:all_200s_else_showerror(fun(_) -> {200, "all good"} end, [1,"A", foo])), + ?assert({500, "constant dissapointment"} == workflows:all_200s_else_showerror(fun(X) -> if X < 5 -> {200, "all good"}; true -> {500, "constant dissapointment"} end end, [0,10])). + diff --git a/swagger/swagger.html b/swagger/swagger.html new file mode 100644 index 0000000..3d51590 --- /dev/null +++ b/swagger/swagger.html @@ -0,0 +1,1899 @@ + + + + + + CDAP Broker API + + +
+

CDAP Broker API

+

Version: 4.0.3

+

+ + +
+ Schemes: + +
+ +

Summary

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
PathOperationDescription
+ / + + GET + + +
+ /application + + GET + + +
+ /application*/{appname} + + PUT + + +
+ /application/delete + + POST + + +
+ /application/{appname} + + DELETE + + +
+ GET + + +
+ PUT + + +
+ /application/{appname}/healthcheck + + GET + + +
+ /application/{appname}/metrics + + GET + + +
+ /application/{appname}/reconfigure + + PUT + + +
+ + + +

Paths

+ + + +
+
+
+

GET /

+
+
+
+

shows some information about this service

+ +
+ + +
+ +
+
+ 200 OK + +
+
+
+
+

successful response

+ +
+
+
+ +
+
+
+ info +
+
+ +
+
+
+
+
+ +
+
+
+

GET /application

+
+
+
+

get all applications registered with this broker

+ +
+ + +
+ +
+
+ 200 OK + +
+
+
+
+

successful response

+ +
+
+
+ +
+
+
+ +
+ appname + + + +
+ +
+
+
+ +
+
+
+
+
+ +
+
+
+

PUT /application*/{appname}

+
+
+
+

(This is a hacky way of supporting "oneOf" because Swagger does not support oneOf https://github.com/OAI/OpenAPI-Specification/issues/333. This is the same endpoint as PUT /application/appname, except the PUT body is different.)

+

Register a hydrator app for service and configuration discovery. This will light up a metrics and health endpoint for this app. appname is assumed to also be the key in consul.

+ +
+ +
+ +

application/json +

+
+
+

required put body

+

+
+
+
+ +
+
+
+
+ + + + + + + + + + + + + + + + + + +
+ appname +

Name of the application.

+
pathstring (text) + + + + +
+
+ +
+

application/json +

+ +
+
+ 200 OK + +
+
+
+
+

Successful response

+ +
+
+
+ +
+
+ +
+ +
+
+ 400 Bad Request + +
+
+
+
+

put was performed but the appname was already registered with the broker, or Invalid PUT body

+ +
+
+
+ +
+
+ +
+
+
+
+
+ +
+
+
+

POST /application/delete

+
+
+
+

endpoint to delete multiple applications at once. Returns an array of status codes, where statuscode[i] = response returned from DELETE(application/i)

+ +
+ +
+ +
+
+

required post body

+

+
+
+
+ +
+
+
+ +
+ +
+
+ 200 OK + +
+
+
+
+

successful response

+ +
+
+
+ +
+
+
+ +
+ returncode + + + +
+ +
+
+
+ +
+
+
+
+
+ +
+
+
+

DELETE /application/{appname}

+
+
+
+

Remove an app for service and configuration discovery. This will remove the metrics and health endpoints for this app.

+ +
+ +
+ + + + + + + + + + + + + + + + + + +
+ appname +

Name of the application.

+
pathstring (text) + + + + +
+
+ +
+ +
+
+ 200 OK + +
+
+
+
+

Successful response

+ +
+
+
+ +
+
+ +
+
+ 404 Not Found + +
+
+
+
+

no app with name 'appname' registered with this broker.

+ +
+
+
+ +
+
+ +
+
+
+
+
+
+
+

GET /application/{appname}

+
+
+
+

Returns the representation of the application resource, including the links for healthcheck and metrics.

+ +
+ +
+ + + + + + + + + + + + + + + + + + +
+ appname +

Name of the application.

+
pathstring (text) + + + + +
+
+ +
+ +
+
+ 200 OK + +
+
+
+
+

Successful response

+ +
+
+
+ +
+
+ +
+ +
+
+ 404 Not Found + +
+
+
+
+

no app with name 'appname' registered with this broker.

+ +
+
+
+ +
+
+ +
+
+
+
+
+
+
+

PUT /application/{appname}

+
+
+
+

Register an app for service and configuration discovery. This will light up a metrics and health endpoint for this app. appname is assumed to also be the key in consul.

+ +
+ +
+ +

application/json +

+
+
+

required put body

+

+
+
+
+
+ appput +
+
+
+
+
+ + + + + + + + + + + + + + + + + + +
+ appname +

Name of the application.

+
pathstring (text) + + + + +
+
+ +
+

application/json +

+ +
+
+ 200 OK + +
+
+
+
+

Successful response

+ +
+
+
+ +
+
+ +
+ +
+
+ 400 Bad Request + +
+
+
+
+

put was performed but the appname was already registered with the broker, or Invalid PUT body

+ +
+
+
+ +
+
+ +
+
+
+
+
+ +
+
+
+

GET /application/{appname}/healthcheck

+
+
+
+

Perform a healthcheck on the running app appname.

+ +
+ +
+ + + + + + + + + + + + + + + + + + +
+ appname +

Name of the application to get the healthcheck for.

+
pathstring (test) + + + + +
+
+ +
+ +
+
+ 200 OK + +
+
+
+
+

Successful response, healthcheck pass

+ +
+
+
+ +
+
+ +
+
+ 404 Not Found + +
+
+
+
+

no app with name 'appname' registered with this broker, or the healthcheck has failed (though I would like to disambiguiate from the first case, CDAP returns a 404 for this).

+ +
+
+
+ +
+
+ +
+
+
+
+
+ +
+
+
+

GET /application/{appname}/metrics

+
+
+
+

Get live (real-time) app specific metrics for the running app appname. Metrics are customized per each app by the component developer

+ +
+ +
+ + + + + + + + + + + + + + + + + + +
+ appname +

Name of the application to get metrics for.

+
pathstring (test) + + + + +
+
+ +
+ +
+
+ 200 OK + +
+
+
+
+

Successful response

+ +
+
+
+ +
+
+ +
+ +
+
+ 404 Not Found + +
+
+
+
+

no app with name 'appname' registered with this broker.

+ +
+
+
+ +
+
+ +
+
+
+
+
+ +
+
+
+

PUT /application/{appname}/reconfigure

+
+
+
+

Reconfigures the application.

+ +
+ +
+ +
+
+

required put body

+

+
+
+
+ +
+
+
+
+ + + + + + + + + + + + + + + + + + +
+ appname +

Name of the application.

+
pathstring (text) + + + + +
+
+ +
+ +
+
+ 200 OK + +
+
+
+
+

Successful response

+ +
+
+
+ +
+
+ +
+
+ 400 Bad Request + +
+
+
+
+

Bad request. Can happen with 1) {appname} is not registered with the broker, 2) the required PUT body is wrong, or 3) the smart interface was chosen and none of the config keys match anything in app_config or app_preferences

+ +
+
+
+ +
+
+ +
+
+
+
+
+ + + +

Schema definitions

+ + +
+
+

Application: + object + + + +

+
+
+ +
+
+
+ appname: + string + + +
+
+

application name

+ +
+ +
+
+
+ healthcheckurl: + string + + +
+
+

fully qualified url to perform healthcheck

+ +
+ +
+
+
+ metricsurl: + string + + +
+
+

fully qualified url to get metrics from

+ +
+ +
+
+
+ url: + string + + +
+
+

fully qualified url of the resource

+ +
+ +
+
+
+ connectionurl: + string + + +
+
+

input URL that you can POST data into (URL of the CDAP stream)

+ +
+ +
+
+
+ serviceendpoints: + object[] + + +
+
+

a list of HTTP services exposed by this CDAP application

+ +
+ +
+ service_method + + + +
+ +
+
+
+
+
+
+
+
+
+

appname: + string + + + +

+
+
+
+

an application name

+ +
+ +
+
+
+
+

appput: + object + + + +

+
+
+ +
+
+
+ cdap_application_type: + string , x ∈ { + program-flowlet + + } + + + +
+
+

denotes whether this is a program-flowlet style application or a hydrator pipeline. For program-flowlet style apps, this value must be "program-flowlet"

+ +
+ +
+
+
+ streamname: + string + + +
+
+

name of the CDAP stream to ingest data into this app. Should come from the developer and Tosca model.

+ +
+ +
+
+
+ namespace: + string + + +
+
+

the cdap namespace this is deployed into

+ +
+ +
+
+
+ jar_url: + string + + +
+
+

the URL that the JAR you're deploying resides

+ +
+ +
+
+
+ artifact_name: + string + + +
+
+

the name of the CDAP artifact to be added

+ +
+ +
+
+
+ artifact_ver: + object + + +
+
+

the version of the artifact. Must be in X.Y.Z form

+ +
+ +
+
+
+ app_config: + object + + +
+
+

the application config JSON

+ +
+ +
+
+
+ app_preferences: + object + + +
+
+

the application preferences JSON

+ +
+ +
+
+
+ programs: + object[] + + +
+
+ +
+ +
+ programs + + + +
+ +
+
+
+
+ program_preferences: + object[] + + +
+
+ +
+ +
+ programpref + + + +
+ +
+
+
+
+ services: + object[] + + +
+
+ +
+ +
+ service_endpoint + + + +
+ +
+
+
+
+
+
+
+
+
+

hydratorappput: + object + + + +

+
+
+ +
+
+
+ cdap_application_type: + string , x ∈ { + hydrator-pipeline + + } + + + + +
+
+

denotes whether this is a program-flowlet style application or a hydrator pipeline. For hydrator, this value must be "hydrator-pipeline"

+ +
+ +
+
+
+ namespace: + string + + + +
+
+

the cdap namespace this is deployed into

+ +
+ +
+
+
+ pipeline_config_json_url: + string + + + +
+
+

the URL of the config.json for this pipeline

+ +
+ +
+
+
+ streamname: + string + + + +
+
+

name of the CDAP stream to ingest data into this app. Should come from the developer and Tosca model.

+ +
+ +
+
+
+ dependencies: + object[] + + +
+
+

represents a list of dependencies to be loaded for this pipeline. Not required.

+ +
+ +
+ hydratordep + + + +
+ +
+
+
+
+
+
+
+
+
+

hydratordep: + object + + + +

+
+
+
+

represents a hydrator pipeline dependency. An equivelent to the following CURLs are formed with the below four params shown in CAPS "curl -v -w"\n" -X POST http://cdapurl:11015/v3/namespaces/setelsewhere/artifacts/ARTIFACT_NAME -H "Artifact-Extends:ARTIFACT_EXTENDS_HEADER" -H “Artifact-Version:ARTIFACT_VERSION_HEADER” --data-binary @(DOWNLOADED FROM ARTIFACT_URL)","curl -v -w"\n" -X PUT http://cdapurl:11015/v3/namespaces/setelsewhere/artifacts/ARTIFACT_NAME/versions/ARTIFACT_VERSION_HEADER/properties -d (DOWNLOADED FROM UI_PROPERTIES_URL)"

+ +
+ +
+
+
+ artifact_extends_header: + string + + + +
+
+

the value of the header that gets passed in for artifact-extends, e.g., "Artifact-Extends:system:cdap-data-pipeline[4.0.1,5.0.0)"

+ +
+ +
+
+
+ artifact_name: + string + + + +
+
+

the name of the artifact

+ +
+ +
+
+
+ artifact_version_header: + string + + + +
+
+

the value of the header that gets passed in for artifact-version, e.g., "Artifact-Version:1.0.0-SNAPSHOT"

+ +
+ +
+
+
+ artifact_url: + string + + + +
+
+

the URL of the artifact JAR

+ +
+ +
+
+
+ ui_properties_url: + string + + +
+
+

the URL of the properties.json if the custom artifact has UI properties. This is optional.

+ +
+ +
+
+
+
+
+
+
+
+

info: + object + + + +

+
+
+
+

some broker information

+ +
+ +
+
+
+ managed cdap url: + string + + +
+
+

the url of the CDAP cluster API this broker is managing

+ +
+ +
+
+
+ number of applications registered: + integer + + +
+
+ +
+ +
+
+
+ uptime (s): + integer + + +
+
+ +
+ +
+
+
+ cdap GUI port: + integer + + +
+
+

The GUI port of the CDAP cluster this broker is managing. Mostly to help users of this API check their application in cdap. Note, will return UNKNOWN_CDAP_VERSION if it cannot be determined.

+ +
+ +
+
+
+ cdap cluster version: + string + + +
+
+

the version of the CDAP cluster this broker is managing. Note, will return UKNOWN_CDAP_VERSION if it cannot be determined.

+ +
+ +
+
+
+ broker API version: + string + + +
+
+

the API version of this running broker

+ +
+ +
+
+
+
+
+
+
+
+

MetricsObject: + object + + + +

+
+
+
+

key,value object where the key is 'appmetrics' and the value is an app dependent json and specified by the component developer

+ +
+ +
+
+
+ appmetrics: + object + + +
+
+ +
+ +
+
+
+
+
+
+
+
+

multideleteput: + object + + + +

+
+
+ +
+
+
+ appnames: + object[] + + +
+
+ +
+ +
+ appname + + + +
+ +
+
+
+
+
+
+
+
+
+

programpref: + object + + + +

+
+
+
+

the list of programs in this CDAP app

+ +
+ +
+
+
+ program_type: + string + + +
+
+

must be one of flows, mapreduce, schedules, spark, workflows, workers, or services

+ +
+ +
+
+
+ program_id: + string + + +
+
+

the name of the program

+ +
+ +
+
+
+ program_pref: + object + + +
+
+

the preference JSON to set for this program

+ +
+ +
+
+
+
+
+
+
+
+

programs: + object + + + +

+
+
+
+

the list of programs in this CDAP app

+ +
+ +
+
+
+ program_type: + string + + +
+
+

must be one of flows, mapreduce, schedules, spark, workflows, workers, or services

+ +
+ +
+
+
+ program_id: + string + + +
+
+

the name of the program

+ +
+ +
+
+
+
+
+
+
+
+

reconfigput: + object + + + +

+
+
+ +
+
+
+ reconfiguration_type: + string , x ∈ { + program-flowlet-app-config + , + program-flowlet-app-preferences + , + program-flowlet-smart + + } + + + + +
+
+

the type of reconfiguration

+ +
+ +
+
+
+ config: + object + + + +
+
+

the config JSON

+ +
+ +
+
+
+
+
+
+
+
+

returncode: + integer + + + +

+
+
+
+

an httpreturncode

+ +
+ +
+
+
+
+

service_endpoint: + object + + + +

+
+
+
+

descirbes a service endpoint, including the service name, the method name, and the method type (GET, PUT, etc, most of the time will be GET)

+ +
+ +
+
+
+ service_name: + string + + +
+
+

the name of the service

+ +
+ +
+
+
+ service_endpoint: + string + + +
+
+

the name of the endpoint on the service

+ +
+ +
+
+
+ endpoint_method: + string + + +
+
+

GET, POST, PUT, etc

+ +
+ +
+
+
+
+
+
+
+
+

service_method: + object + + + +

+
+
+
+

a URL and HTTP method exposed via a CDAP service

+ +
+ +
+
+
+ url: + string + + +
+
+

the fully qualified URL in CDAP for this service

+ +
+ +
+
+
+ method: + string + + +
+
+

HTTP method you can perform on the URL, e.g., GET, PUT, etc

+ +
+ +
+
+
+
+
+
+
+ + diff --git a/swagger/swagger.json b/swagger/swagger.json new file mode 100644 index 0000000..4490a99 --- /dev/null +++ b/swagger/swagger.json @@ -0,0 +1,560 @@ +{ + "swagger": "2.0", + "info": { + "version": "4.0.3", + "title": "CDAP Broker API" + }, + "paths": { + "/": { + "get": { + "description": "shows some information about this service", + "responses": { + "200": { + "description": "successful response", + "schema": { + "$ref": "#/definitions/info" + } + } + } + } + }, + "/application": { + "get": { + "description": "get all applications registered with this broker", + "responses": { + "200": { + "description": "successful response", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/appname" + } + } + } + } + } + }, + "/application/delete": { + "post": { + "description": "endpoint to delete multiple applications at once. Returns an array of status codes, where statuscode[i] = response returned from DELETE(application/i)", + "parameters": [ + { + "name": "postbody", + "in": "body", + "description": "required post body", + "required": true, + "schema": { + "$ref": "#/definitions/multideleteput" + } + } + ], + "responses": { + "200": { + "description": "successful response", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/returncode" + } + } + } + } + } + }, + "/application/{appname}": { + "parameters": [ + { + "name": "appname", + "in": "path", + "description": "Name of the application.", + "required": true, + "type": "string", + "format": "text" + } + ], + "get": { + "description": "Returns the representation of the application resource, including the links for healthcheck and metrics.", + "responses": { + "200": { + "description": "Successful response", + "schema": { + "$ref": "#/definitions/Application" + } + }, + "404": { + "description": "no app with name 'appname' registered with this broker." + } + } + }, + "put": { + "description": "Register an app for service and configuration discovery. This will light up a metrics and health endpoint for this app. `appname` is assumed to also be the key in consul.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "parameters": [ + { + "name": "putbody", + "in": "body", + "description": "required put body", + "required": true, + "schema": { + "$ref": "#/definitions/appput" + } + } + ], + "responses": { + "200": { + "description": "Successful response", + "schema": { + "$ref": "#/definitions/Application" + } + }, + "400": { + "description": "put was performed but the appname was already registered with the broker, or Invalid PUT body" + } + } + }, + "delete": { + "description": "Remove an app for service and configuration discovery. This will remove the metrics and health endpoints for this app.", + "responses": { + "200": { + "description": "Successful response" + }, + "404": { + "description": "no app with name 'appname' registered with this broker." + } + } + } + }, + "/application*/{appname}": { + "parameters": [ + { + "name": "appname", + "in": "path", + "description": "Name of the application.", + "required": true, + "type": "string", + "format": "text" + } + ], + "put": { + "description": "(This is a hacky way of supporting \"oneOf\" because Swagger does not support oneOf https://github.com/OAI/OpenAPI-Specification/issues/333. This is the same endpoint as PUT /application/appname, except the PUT body is different.)\n\nRegister a hydrator app for service and configuration discovery. This will light up a metrics and health endpoint for this app. `appname` is assumed to also be the key in consul.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "parameters": [ + { + "name": "putbody", + "in": "body", + "description": "required put body", + "required": true, + "schema": { + "$ref": "#/definitions/hydratorappput" + } + } + ], + "responses": { + "200": { + "description": "Successful response", + "schema": { + "$ref": "#/definitions/Application" + } + }, + "400": { + "description": "put was performed but the appname was already registered with the broker, or Invalid PUT body" + } + } + } + }, + "/application/{appname}/metrics": { + "get": { + "description": "Get live (real-time) app specific metrics for the running app appname. Metrics are customized per each app by the component developer", + "parameters": [ + { + "name": "appname", + "in": "path", + "description": "Name of the application to get metrics for.", + "required": true, + "type": "string", + "format": "test" + } + ], + "responses": { + "200": { + "description": "Successful response", + "schema": { + "$ref": "#/definitions/MetricsObject" + } + }, + "404": { + "description": "no app with name 'appname' registered with this broker." + } + } + } + }, + "/application/{appname}/healthcheck": { + "get": { + "description": "Perform a healthcheck on the running app appname.", + "parameters": [ + { + "name": "appname", + "in": "path", + "description": "Name of the application to get the healthcheck for.", + "required": true, + "type": "string", + "format": "test" + } + ], + "responses": { + "200": { + "description": "Successful response, healthcheck pass" + }, + "404": { + "description": "no app with name 'appname' registered with this broker, or the healthcheck has failed (though I would like to disambiguiate from the first case, CDAP returns a 404 for this)." + } + } + } + }, + "/application/{appname}/reconfigure": { + "parameters": [ + { + "name": "appname", + "in": "path", + "description": "Name of the application.", + "required": true, + "type": "string", + "format": "text" + } + ], + "put": { + "description": "Reconfigures the application.", + "parameters": [ + { + "name": "putbody", + "in": "body", + "description": "required put body", + "required": true, + "schema": { + "$ref": "#/definitions/reconfigput" + } + } + ], + "responses": { + "200": { + "description": "Successful response" + }, + "400": { + "description": "Bad request. Can happen with 1) {appname} is not registered with the broker, 2) the required PUT body is wrong, or 3) the smart interface was chosen and none of the config keys match anything in app_config or app_preferences" + } + } + } + } + }, + "definitions": { + "MetricsObject": { + "type": "object", + "description": "key,value object where the key is 'appmetrics' and the value is an app dependent json and specified by the component developer", + "properties": { + "appmetrics": { + "type": "object" + } + } + }, + "Application": { + "type": "object", + "properties": { + "appname": { + "description": "application name", + "type": "string" + }, + "healthcheckurl": { + "description": "fully qualified url to perform healthcheck", + "type": "string" + }, + "metricsurl": { + "description": "fully qualified url to get metrics from", + "type": "string" + }, + "url": { + "description": "fully qualified url of the resource", + "type": "string" + }, + "connectionurl": { + "description": "input URL that you can POST data into (URL of the CDAP stream)", + "type": "string" + }, + "serviceendpoints": { + "description": "a list of HTTP services exposed by this CDAP application", + "type": "array", + "items": { + "$ref": "#/definitions/service_method" + } + } + } + }, + "reconfigput": { + "type": "object", + "properties": { + "reconfiguration_type": { + "description": "the type of reconfiguration", + "type": "string", + "enum": [ + "program-flowlet-app-config", + "program-flowlet-app-preferences", + "program-flowlet-smart" + ] + }, + "config": { + "description": "the config JSON", + "type": "object" + } + }, + "required": [ + "reconfiguration_type", + "config" + ] + }, + "multideleteput": { + "type": "object", + "properties": { + "appnames": { + "type": "array", + "items": { + "$ref": "#/definitions/appname" + } + } + } + }, + "appname": { + "description": "an application name", + "type": "string" + }, + "hydratorappput": { + "type": "object", + "properties": { + "cdap_application_type": { + "description": "denotes whether this is a program-flowlet style application or a hydrator pipeline. For hydrator, this value must be \"hydrator-pipeline\"", + "type": "string", + "enum": [ + "hydrator-pipeline" + ] + }, + "namespace": { + "description": "the cdap namespace this is deployed into", + "type": "string" + }, + "pipeline_config_json_url": { + "description": "the URL of the config.json for this pipeline", + "type": "string" + }, + "streamname": { + "description": "name of the CDAP stream to ingest data into this app. Should come from the developer and Tosca model.", + "type": "string" + }, + "dependencies": { + "description": "represents a list of dependencies to be loaded for this pipeline. Not required.", + "type": "array", + "items": { + "$ref": "#/definitions/hydratordep" + } + } + }, + "required": [ + "cdap_application_type", + "namespace", + "pipeline_config_json_url", + "streamname" + ] + }, + "appput": { + "type": "object", + "properties": { + "cdap_application_type": { + "description": "denotes whether this is a program-flowlet style application or a hydrator pipeline. For program-flowlet style apps, this value must be \"program-flowlet\"", + "type": "string", + "enum": [ + "program-flowlet" + ] + }, + "streamname": { + "description": "name of the CDAP stream to ingest data into this app. Should come from the developer and Tosca model.", + "type": "string" + }, + "namespace": { + "description": "the cdap namespace this is deployed into", + "type": "string" + }, + "jar_url": { + "description": "the URL that the JAR you're deploying resides", + "type": "string" + }, + "artifact_name": { + "description": "the name of the CDAP artifact to be added", + "type": "string" + }, + "artifact_ver": { + "description": "the version of the artifact. Must be in X.Y.Z form" + }, + "app_config": { + "description": "the application config JSON", + "type": "object" + }, + "app_preferences": { + "description": "the application preferences JSON", + "type": "object" + }, + "programs": { + "type": "array", + "items": { + "$ref": "#/definitions/programs" + } + }, + "program_preferences": { + "type": "array", + "items": { + "$ref": "#/definitions/programpref" + } + }, + "services": { + "type": "array", + "items": { + "$ref": "#/definitions/service_endpoint" + } + } + } + }, + "service_endpoint": { + "description": "descirbes a service endpoint, including the service name, the method name, and the method type (GET, PUT, etc, most of the time will be GET)", + "type": "object", + "properties": { + "service_name": { + "type": "string", + "description": "the name of the service" + }, + "service_endpoint": { + "type": "string", + "description": "the name of the endpoint on the service" + }, + "endpoint_method": { + "type": "string", + "description": "GET, POST, PUT, etc" + } + } + }, + "service_method": { + "description": "a URL and HTTP method exposed via a CDAP service", + "type": "object", + "properties": { + "url": { + "type": "string", + "description": "the fully qualified URL in CDAP for this service" + }, + "method": { + "type": "string", + "description": "HTTP method you can perform on the URL, e.g., GET, PUT, etc" + } + } + }, + "programs": { + "description": "the list of programs in this CDAP app", + "type": "object", + "properties": { + "program_type": { + "description": "must be one of flows, mapreduce, schedules, spark, workflows, workers, or services", + "type": "string" + }, + "program_id": { + "description": "the name of the program", + "type": "string" + } + } + }, + "returncode": { + "description": "an httpreturncode", + "type": "integer" + }, + "hydratordep": { + "description": "represents a hydrator pipeline dependency. An equivelent to the following CURLs are formed with the below four params shown in CAPS \"curl -v -w\"\\n\" -X POST http://cdapurl:11015/v3/namespaces/setelsewhere/artifacts/ARTIFACT_NAME -H \"Artifact-Extends:ARTIFACT_EXTENDS_HEADER\" -H “Artifact-Version:ARTIFACT_VERSION_HEADER” --data-binary @(DOWNLOADED FROM ARTIFACT_URL)\",\"curl -v -w\"\\n\" -X PUT http://cdapurl:11015/v3/namespaces/setelsewhere/artifacts/ARTIFACT_NAME/versions/ARTIFACT_VERSION_HEADER/properties -d (DOWNLOADED FROM UI_PROPERTIES_URL)\"", + "properties": { + "artifact_extends_header": { + "description": "the value of the header that gets passed in for artifact-extends, e.g., \"Artifact-Extends:system:cdap-data-pipeline[4.0.1,5.0.0)\"", + "type": "string" + }, + "artifact_name": { + "description": "the name of the artifact", + "type": "string" + }, + "artifact_version_header": { + "description": "the value of the header that gets passed in for artifact-version, e.g., \"Artifact-Version:1.0.0-SNAPSHOT\"", + "type": "string" + }, + "artifact_url": { + "description": "the URL of the artifact JAR", + "type": "string" + }, + "ui_properties_url": { + "description": "the URL of the properties.json if the custom artifact has UI properties. This is optional.", + "type": "string" + } + }, + "required": [ + "artifact_extends_header", + "artifact_name", + "artifact_version_header", + "artifact_url" + ] + }, + "programpref": { + "description": "the list of programs in this CDAP app", + "type": "object", + "properties": { + "program_type": { + "description": "must be one of flows, mapreduce, schedules, spark, workflows, workers, or services", + "type": "string" + }, + "program_id": { + "description": "the name of the program", + "type": "string" + }, + "program_pref": { + "description": "the preference JSON to set for this program", + "type": "object" + } + } + }, + "info": { + "description": "some broker information", + "type": "object", + "properties": { + "managed cdap url": { + "description": "the url of the CDAP cluster API this broker is managing", + "type": "string" + }, + "number of applications registered": { + "type": "integer" + }, + "uptime (s)": { + "type": "integer" + }, + "cdap GUI port": { + "type": "integer", + "description": "The GUI port of the CDAP cluster this broker is managing. Mostly to help users of this API check their application in cdap. Note, will return UNKNOWN_CDAP_VERSION if it cannot be determined." + }, + "cdap cluster version": { + "type": "string", + "description": "the version of the CDAP cluster this broker is managing. Note, will return UKNOWN_CDAP_VERSION if it cannot be determined." + }, + "broker API version": { + "type": "string", + "description": "the API version of this running broker" + } + } + } + } +} diff --git a/swagger/swagger.yaml b/swagger/swagger.yaml new file mode 100644 index 0000000..506908f --- /dev/null +++ b/swagger/swagger.yaml @@ -0,0 +1,418 @@ +# Example YAML to get you started quickly. +# Be aware that YAML has indentation based scoping. +# Code completion support is available so start typing for available options. +swagger: '2.0' + +# This is your document metadata +info: + version: "4.0.3" + title: CDAP Broker API + +paths: + /: + get: + description: shows some information about this service + responses: + 200: + description: successful response + schema: + $ref: '#/definitions/info' + + /application: + get: + description: get all applications registered with this broker + responses: + 200: + description: successful response + schema: + type: array + items: + $ref: '#/definitions/appname' + + /application/delete: + post: + description: endpoint to delete multiple applications at once. Returns an array of status codes, where statuscode[i] = response returned from DELETE(application/i) + parameters: + - name: postbody + in: body + description: required post body + required: true + schema: + $ref: '#/definitions/multideleteput' + responses: + 200: + description: successful response + schema: + type: array + items: + $ref: '#/definitions/returncode' + + /application/{appname}: + parameters: + - name: appname + in: path + description: Name of the application. + required: true + type: string + format: text + + get: + description: Returns the representation of the application resource, including the links for healthcheck and metrics. + responses: + 200: + description: Successful response + schema: + $ref: '#/definitions/Application' + 404: + description: no app with name 'appname' registered with this broker. + + put: + description: Register an app for service and configuration discovery. This will light up a metrics and health endpoint for this app. `appname` is assumed to also be the key in consul. + consumes: + - application/json + produces: + - application/json + parameters: + - name: putbody + in: body + description: required put body + required: true + schema: + $ref: '#/definitions/appput' + + responses: + 200: + description: Successful response + schema: + $ref: '#/definitions/Application' + 400: + description: put was performed but the appname was already registered with the broker, or Invalid PUT body + + + delete: + description: Remove an app for service and configuration discovery. This will remove the metrics and health endpoints for this app. + responses: + 200: + description: Successful response + 404: + description: no app with name 'appname' registered with this broker. + + /application*/{appname}: + parameters: + - name: appname + in: path + description: Name of the application. + required: true + type: string + format: text + + put: + description: (This is a hacky way of supporting "oneOf" because Swagger does not support oneOf https://github.com/OAI/OpenAPI-Specification/issues/333. This is the same endpoint as PUT /application/appname, except the PUT body is different.) + + + Register a hydrator app for service and configuration discovery. This will light up a metrics and health endpoint for this app. `appname` is assumed to also be the key in consul. + consumes: + - application/json + produces: + - application/json + parameters: + - name: putbody + in: body + description: required put body + required: true + schema: + $ref: '#/definitions/hydratorappput' + + responses: + 200: + description: Successful response + schema: + $ref: '#/definitions/Application' + 400: + description: put was performed but the appname was already registered with the broker, or Invalid PUT body + + + /application/{appname}/metrics: + get: + # This is array of GET operation parameters: + description: Get live (real-time) app specific metrics for the running app appname. Metrics are customized per each app by the component developer + parameters: + # An example parameter that is in query and is required + - name: appname + in: path + description: Name of the application to get metrics for. + required: true + type: string + format: test + + # Expected responses for this operation: + responses: + 200: + description: Successful response + schema: + $ref: '#/definitions/MetricsObject' + 404: + description: no app with name 'appname' registered with this broker. + + /application/{appname}/healthcheck: + get: + # This is array of GET operation parameters: + description: Perform a healthcheck on the running app appname. + parameters: + # An example parameter that is in query and is required + - name: appname + in: path + description: Name of the application to get the healthcheck for. + required: true + type: string + format: test + + # Expected responses for this operation: + responses: + # Response code + 200: + description: Successful response, healthcheck pass + 404: + description: no app with name 'appname' registered with this broker, or the healthcheck has failed (though I would like to disambiguiate from the first case, CDAP returns a 404 for this). + + /application/{appname}/reconfigure: + parameters: + - name: appname + in: path + description: Name of the application. + required: true + type: string + format: text + put: + description: Reconfigures the application. + parameters: + - name: putbody + in: body + description: required put body + required: true + schema: + $ref: '#/definitions/reconfigput' + responses: + 200: + description: Successful response + 400: + description: Bad request. Can happen with 1) {appname} is not registered with the broker, 2) the required PUT body is wrong, or 3) the smart interface was chosen and none of the config keys match anything in app_config or app_preferences + +definitions: + MetricsObject: + type: object + description: key,value object where the key is 'appmetrics' and the value is an app dependent json and specified by the component developer + properties: + appmetrics: + type: object + + Application: + type: object + properties: + appname: + description: application name + type: string + healthcheckurl: + description: fully qualified url to perform healthcheck + type: string + metricsurl: + description: fully qualified url to get metrics from + type: string + url: + description: fully qualified url of the resource + type: string + connectionurl: + description: input URL that you can POST data into (URL of the CDAP stream) + type: string + serviceendpoints: + description: a list of HTTP services exposed by this CDAP application + type: array + items: + $ref: '#/definitions/service_method' + + reconfigput: + type: object + properties: + reconfiguration_type: + description: the type of reconfiguration + type: string + enum: ["program-flowlet-app-config", "program-flowlet-app-preferences", "program-flowlet-smart"] + config: + description: the config JSON + type: object + required: ["reconfiguration_type", "config"] + + multideleteput: + type: object + properties: + appnames: + type: array + items: + $ref: '#/definitions/appname' + + appname: + description: an application name + type: string + + hydratorappput: + type: object + properties: + cdap_application_type: + description: denotes whether this is a program-flowlet style application or a hydrator pipeline. For hydrator, this value must be "hydrator-pipeline" + type: string + enum: ["hydrator-pipeline"] + namespace: + description: the cdap namespace this is deployed into + type: string + pipeline_config_json_url: + description: the URL of the config.json for this pipeline + type: string + streamname: + description: name of the CDAP stream to ingest data into this app. Should come from the developer and Tosca model. + type: string + dependencies: + description: represents a list of dependencies to be loaded for this pipeline. Not required. + type: array + items: + $ref: '#/definitions/hydratordep' + required: ["cdap_application_type", "namespace", "pipeline_config_json_url", "streamname"] + + appput: + type: object + properties: + cdap_application_type: + description: denotes whether this is a program-flowlet style application or a hydrator pipeline. For program-flowlet style apps, this value must be "program-flowlet" + type: string + enum: ["program-flowlet"] + streamname: + description: name of the CDAP stream to ingest data into this app. Should come from the developer and Tosca model. + type: string + namespace: + description: the cdap namespace this is deployed into + type: string + jar_url: + description: the URL that the JAR you're deploying resides + type: string + artifact_name: + description: the name of the CDAP artifact to be added + type: string + artifact_ver: + description: the version of the artifact. Must be in X.Y.Z form + app_config: + description: the application config JSON + type: object + app_preferences: + description: the application preferences JSON + type: object + programs: + type: array + items: + $ref: '#/definitions/programs' + program_preferences: + type: array + items: + $ref: '#/definitions/programpref' + services: + type: array + items: + $ref: '#/definitions/service_endpoint' + + service_endpoint: + description: descirbes a service endpoint, including the service name, the method name, and the method type (GET, PUT, etc, most of the time will be GET) + type: object + properties: + service_name: + type: string + description: the name of the service + service_endpoint: + type: string + description: the name of the endpoint on the service + endpoint_method: + type: string + description: GET, POST, PUT, etc + + service_method: + description: a URL and HTTP method exposed via a CDAP service + type: object + properties: + url: + type: string + description: the fully qualified URL in CDAP for this service + method: + type: string + description: HTTP method you can perform on the URL, e.g., GET, PUT, etc + + programs: + description: the list of programs in this CDAP app + type: object + properties: + program_type: + description: must be one of flows, mapreduce, schedules, spark, workflows, workers, or services + type: string + program_id: + description: the name of the program + type: string + + returncode: + description: an httpreturncode + type: integer + + hydratordep: + description: represents a hydrator pipeline dependency. An equivelent to the following CURLs are formed with the below four params shown in CAPS "curl -v -w"\n" -X POST http://cdapurl:11015/v3/namespaces/setelsewhere/artifacts/ARTIFACT_NAME -H "Artifact-Extends:ARTIFACT_EXTENDS_HEADER" -H “Artifact-Version:ARTIFACT_VERSION_HEADER” --data-binary @(DOWNLOADED FROM ARTIFACT_URL)","curl -v -w"\n" -X PUT http://cdapurl:11015/v3/namespaces/setelsewhere/artifacts/ARTIFACT_NAME/versions/ARTIFACT_VERSION_HEADER/properties -d (DOWNLOADED FROM UI_PROPERTIES_URL)" + properties: + artifact_extends_header: + description: the value of the header that gets passed in for artifact-extends, e.g., "Artifact-Extends:system:cdap-data-pipeline[4.0.1,5.0.0)" + type: string + artifact_name: + description: the name of the artifact + type: string + artifact_version_header : + description: the value of the header that gets passed in for artifact-version, e.g., "Artifact-Version:1.0.0-SNAPSHOT" + type: string + artifact_url: + description: the URL of the artifact JAR + type: string + ui_properties_url: + description: the URL of the properties.json if the custom artifact has UI properties. This is optional. + type: string + required: ["artifact_extends_header", "artifact_name", "artifact_version_header", "artifact_url"] + + programpref: + description: the list of programs in this CDAP app + type: object + properties: + program_type: + description: must be one of flows, mapreduce, schedules, spark, workflows, workers, or services + type: string + program_id: + description: the name of the program + type: string + program_pref: + description: the preference JSON to set for this program + type: object + + info: + description: some broker information + type: object + properties: + managed cdap url: + description: the url of the CDAP cluster API this broker is managing + type: string + number of applications registered: + type: integer + uptime (s): + type: integer + cdap GUI port: + type: integer + description: The GUI port of the CDAP cluster this broker is managing. Mostly to help users of this API check their application in cdap. Note, will return UNKNOWN_CDAP_VERSION if it cannot be determined. + cdap cluster version: + type: string + description: the version of the CDAP cluster this broker is managing. Note, will return UKNOWN_CDAP_VERSION if it cannot be determined. + broker API version: + type: string + description: the API version of this running broker + + + + + diff --git a/test/apitest/apitest_SUITE.erl b/test/apitest/apitest_SUITE.erl new file mode 100644 index 0000000..971204c --- /dev/null +++ b/test/apitest/apitest_SUITE.erl @@ -0,0 +1,826 @@ +% ============LICENSE_START======================================================= +% org.onap.dcae +% ================================================================================ +% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +% ================================================================================ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% ============LICENSE_END========================================================= +% +% ECOMP is a trademark and service mark of AT&T Intellectual Property. +-module(apitest_SUITE). +-include_lib("common_test/include/ct.hrl"). +-include("../../src/application.hrl"). +-export([all/0, groups/0, init_per_suite/1, end_per_suite/1]). +-export([server_health_test/1, app_deploy/1, hydrator_deploy/1, app_teardown/1, app_test/1, app_reconfigure/1, test_failures/1, app_botch_flows/1, app_botch_delete/1, app_botch_consul_delete/1, invalid_reconfigure/1, delete_all/1, + hydrator_app_teardown/1, hydrator_test/1, + hydrator_wdeps_deploy/1, + hydrator_wdeps_test/1, + hydrator_wdeps_teardown/1 + ]). + +%lazy shorthands (yay C style macros! miss these in python) +-define(SC(L), util:concat(L)). +-define(PLG(K, PL), proplists:get_value(K, PL)). +-define(XER, "testing-XER"). +-define(D(X), erlang:display(X)). + +all() -> [ + {group, progapi}, + {group, hydratorapi}, + {group, apibotchedflows}, + {group, apibotcheddeleted}, + {group, apibotchedconsuldeleted}, + {group, invalidreconfig}, + {group, apideleteall} + ]. +groups() -> [ + {progapi, %prog-flow test + [], + [ + server_health_test, + test_failures, + app_deploy, + app_test, + app_reconfigure, + app_test, + app_teardown + ]}, + {hydratorapi, %hydrator test + [], + [ + server_health_test, + test_failures, + hydrator_deploy, + hydrator_test, + hydrator_app_teardown, + hydrator_wdeps_deploy, + hydrator_wdeps_test, + hydrator_wdeps_teardown + ]}, + {apibotchedflows, %deploy, manually stop flows, then try to delete + [], + [ + server_health_test, + app_deploy, + app_botch_flows, + app_teardown + ]}, + {apibotcheddeleted, %deploy, manually stop flows, delete it manually from cdap, then try to delete + [], + [ + server_health_test, + app_deploy, + app_botch_delete, + app_teardown + ]}, + {apibotchedconsuldeleted, %deploy, manually stop flows, delete it manually from cdap, then try to delete + [], + [ + server_health_test, + app_deploy, + app_botch_consul_delete, + app_teardown + ]}, + {invalidreconfig, %call reconfigure on an app that DNE + [], + [ + server_health_test, + app_deploy, + invalid_reconfigure, + app_teardown + ]}, + {apideleteall, + [], + [ + server_health_test, + app_deploy, + delete_all + ]} + ]. + +%HELPER FUNCTIONS +setup_rels(Config, D) -> + %deploy/delete the testing keys into Consul. This would normally be done by the Cloudify plugin + % + %#NOTE: This is weird. The sequence of steps is: + % 1 Cloudify populates rels key + % 2 Cloudify sends broker the unbound config + % 3 Broker pushes unbound config to consul + % 4 Broker binds config + % 5 Broker pushes bound config to CDAP + % Between state 1 and 3 consul is in an inconsistent state where it has only the rels key but not the config key. Not so sure about this. They seem to be a pair. Maybe the rels key should be pushed to the source node to be dealt with. + % #Here, we are mocking step 1 + URL = ?SC([?PLG(consul_url, Config), "/v1/kv/", ?PLG(appname, Config), ":rel"]), + case D of + setup -> {200,"true"} = httpabs:put(?XER, URL, "application/json", jiffy:encode([<<"666_fake_testing_service">>])); + teardown -> {200,"true"} = httpabs:delete(?XER, URL) + end, + httpabs:get(?XER, URL). + +setup_fake_testing_service(Config, D) -> + %register a fake testing service to test that the CDAP app recieved it's bound configuration properly + Name = <<"666_fake_testing_service">>, + SrvURL = ?SC([?PLG(consul_url, Config), "/v1/catalog/service/", Name]), + case D of + setup -> + URL = ?SC([?PLG(consul_url, Config), "/v1/agent/service/register"]), + Body = {[{<<"name">>, Name}, + {<<"Address">>, <<"666.666.666.666">>}, + {<<"Port">>, 13} + ]}, + {200, []} = httpabs:put(?XER, URL, "application/json", jiffy:encode(Body)), + httpabs:get(?XER, SrvURL); + teardown -> + %total failure on Consul's part for this not to be a delete on the same endpoint + URL = ?SC([?PLG(consul_url, Config), "/v1/agent/service/deregister/", Name]), + {200, []} = httpabs:put(?XER, URL, "application/json", ""), + httpabs:get(?XER, SrvURL) + end. + +get_config_consul(C) -> + %get config from consul. returns the code too for tests testing for a 404 + {RC, RB} = consul_interface:consul_get_configuration(?XER, ?PLG(appname, C), ?PLG(consul_url, C)), + case RC of + 200 -> {RC, util:ejson_to_map(RB)}; + _ -> {RC, RB} + end. + +get_config_cdap(C) -> + {RC, RB} = cdap_interface:get_app_config(?XER, ?PLG(appname, C), ?PLG(namespace, C),?PLG(cdap_url, C)), + case RC of + 200 -> + %I think CDAP is DOUBLY encoding JSON!! + {RC, jiffy:decode(jiffy:decode(jiffy:encode(RB)), [return_maps])}; + _ -> {RC, RB} + end. + +get_preferences_cdap(C) -> + {RC, RB} = cdap_interface:get_app_preferences(?XER, ?PLG(appname, C), ?PLG(namespace, C),?PLG(cdap_url, C)), + case RC of + 200 -> {RC, util:ejson_to_map(RB)}; + _ -> {RC, RB} + end. + +get_preferences_consul(C) -> + %get preferences from consul. returns the code too for tests testing for a 404 + {RC, RB} = consul_interface:consul_get_preferences(?XER, ?PLG(appname, C), ?PLG(consul_url, C)), + case RC of + 200 -> {RC, util:ejson_to_map(RB)}; + _ -> {RC, RB} + end. + +valid_deploy_body(C) -> + {[ + {<<"cdap_application_type">>, <<"program-flowlet">>}, + {<<"namespace">>, ?PLG(namespace, C)}, + {<<"streamname">>, ?PLG(streamname, C)}, + {<<"jar_url">>, ?PLG(jar_url, C)}, + {<<"artifact_name">>, ?PLG(art_name, C)}, + {<<"artifact_version">>, ?PLG(art_ver, C)}, + {<<"app_config">>, ?PLG(init_config, C)}, + {<<"app_preferences">>, ?PLG(init_preferences, C)}, + {<<"services">>, [{[{<<"service_name">>, <<"Greeting">>}, + {<<"service_endpoint">>, <<"greet">>}, + {<<"endpoint_method">>, <<"GET">>}]}]}, + {<<"programs">>, [ + {[{<<"program_type">>, <<"flows">>}, + {<<"program_id">>, <<"WhoFlow">>}]}, + {[{<<"program_type">>, <<"services">>}, + {<<"program_id">>, <<"Greeting">>}]}]}, + {<<"program_preferences">>, [ + {[{<<"program_type">>,<<"flows">>}, + {<<"program_id">>, <<"WhoFlow">>}, + {<<"program_pref">>, ?PLG(whoflowpref, C)}]} + ]} + ]}. + +%%%%%%%%%%%%%%% +%TEST FUNCTIONS +init_per_suite(_C) -> + %get platform ENVs + [MyName, ConsulURL, _, _] = util:get_platform_envs_and_config(), + + BrokerUrl = case os:getenv("BROKER_TEST_TYPE") of + false -> %no env variable means start the broker on localhost + %start a local broker + {ok,[syntax_tools,compiler,goldrush,lager,jiffy,mnesia,ranch,cowlib,cowboy,leptus,uuid,iso8601,cdapbroker]} = application:ensure_all_started(cdapbroker), + "http://localhost:7777"; + "DOCKER" -> + "http://localhost:7777"; + "REMOTE" -> + %Using MyName, fetch from Consul the broker info + {MyIP, MyPort} = consul_interface:consul_get_service_ip_port(MyName, ConsulURL), + ?SC(["http://", MyIP, ":", integer_to_binary(MyPort)]) + end, + + %get NEXUS_ROOT for testing purposes. + Nexus = os:getenv("NEXUS_RAW_ROOT"), + true = (Nexus /= false), %blow if this wasn't set, we need it. + + {200, RB} = httpabs:get(?XER, BrokerUrl), + CDAPUrl = maps:get(<<"managed cdap url">>, jiffy:decode(RB, [return_maps])), + + %set properties that are shared between program-flowlet and hydrator + Namespace = <<"testns">>, + CDAPUrlNS = ?SC([CDAPUrl, "/v3/namespaces/", Namespace]), + + %set up config for program-flowlet app + Appname = <<"hwtest">>, + Streamname = <<"who">>, + + %setup config for hydrator pipeline + HydratorAppname = <<"hydratortest">>, + HydratorAppURL = ?SC([CDAPUrlNS, "/apps/", HydratorAppname]), + HydratorStreamname = <<"s1">>, %horrible name but not made by me + + HydratorWDepsAppname = <<"hydratorwdepstest">>, + HydratorWDepsAppURL = ?SC([CDAPUrlNS, "/apps/", HydratorWDepsAppname]), + HydratorWDepsStreamname = <<"t1">>, %horrible name but not made by me + + %Set up this test suites configuration + [{broker_url, BrokerUrl}, + {cdap_url, CDAPUrl}, + {cdap_ns_url, CDAPUrlNS}, + {jar_url, ?SC([Nexus, "/jar_files/HelloWorld-3.4.3.jar"])}, + {consul_url, ConsulURL}, + {consul_app_url, ?SC([ConsulURL, "/v1/catalog/service/", Appname])}, + {app_url, ?SC([CDAPUrlNS, "/apps/", Appname])}, + {namespace, Namespace}, + {appname, Appname}, + {broker_app_url, ?SC([BrokerUrl, "/application/", Appname])}, + {stream_url, ?SC([CDAPUrlNS, "/streams/", Streamname])}, + {art_ver, <<"3.4.3">>}, + {art_name, <<"HelloWorld">>}, + {streamname, Streamname}, + {init_config, {[ + {<<"streams_produces">>, <<"\{\{fake_testing_service\}\}">>}, + {<<"services_calls">>, <<"\{\{fake_testing_service\}\}">>}, + {<<"donotresolveme">>, <<"donotabsolveme">>} + ]}}, + {whoflowpref, {[{<<"progfoo">>, <<"progbar">>}]}}, + {init_preferences, {[{<<"preffoo">>, <<"prefbar">>}]}}, + {reconfig, {[{<<"foo">>, <<"bar">>}]}}, + + %hydrator test properties + {hydrator_appname, HydratorAppname}, + {broker_hydrator_app_url, ?SC([BrokerUrl, "/application/", HydratorAppname])}, + {hydrator_app_url, HydratorAppURL}, + {hydrator_json_url, ?SC([Nexus, "/json_files/t1-4.1.2.json "])}, + {hydrator_pipeline_status_url, ?SC([HydratorAppURL, "/schedules/dataPipelineSchedule/status"])}, + {hydrator_stream_url, ?SC([CDAPUrlNS, "/streams/", HydratorStreamname])}, + {hydrator_streamname, HydratorStreamname}, + {consul_hydrator_app_url, ?SC([ConsulURL, "/v1/catalog/service/", HydratorAppname])}, + + %hydrator with deps test properties + {hydrator_wdeps_appname, HydratorWDepsAppname}, + {hydrator_wdeps_artname, <<"demoTCA">>}, + {hydrator_wdeps_artver, <<"1.0.0-SNAPSHOT">>}, + {hydrator_wdeps_app_url, HydratorWDepsAppURL}, + {broker_hydrator_wdeps_app_url, ?SC([BrokerUrl, "/application/", HydratorWDepsAppname])}, + {hydrator_wdeps_streamname, HydratorWDepsStreamname}, + {hydrator_wdeps_stream_url, ?SC([CDAPUrlNS, "/streams/", HydratorWDepsStreamname])}, + {hydrator_wdeps_json_url, ?SC([Nexus, "/json_files/t1-4.1.2.json"])}, + {hydrator_wdeps_properties_json_url, ?SC([Nexus, "/json_files/demoTCA-1.0.0-SNAPSHOT-properties.json"])}, + {hydrator_wdeps_jar_url, ?SC([Nexus, "/json_files/demoTCA-1.0.0-SNAPSHOT.jar"])}, + {hydrator_wdeps_test_data_url, ?SC([Nexus, "/txt_files/tcaDemoData100k.txt"])}, + {consul_hydrator_wdeps_app_url, ?SC([ConsulURL, "/v1/catalog/service/", HydratorWDepsAppname])}, + {hydrator_pipeline_wdeps_status_url, ?SC([HydratorWDepsAppURL, "/schedules/dataPipelineSchedule/status"])} + ] + . + +end_per_suite(_C) -> + _ = application:stop(cdapbroker). + +server_health_test(C) -> + {200, RB} = httpabs:get(?XER, ?PLG(broker_url,C)), + M = jiffy:decode(RB, [return_maps]), + true = maps:is_key(<<"managed cdap url">>, M), + true = maps:is_key(<<"number of applications registered">>, M), + true = maps:is_key(<<"uptime (s)">>, M), + true = maps:is_key(<<"cdap cluster version">>, M), + true = maps:is_key(<<"cdap GUI port">>, M), + true = maps:is_key(<<"broker API version">>, M) + . + +app_deploy(C) -> %C == Config + %Deploy the test application + + %Deploy the rel key + {200, _} = setup_rels(C, setup), + + %Register the fake testing service to test config binding. Make sure it's not empty + {200, F} = setup_fake_testing_service(C, setup), + true = F /= [], + + ExpectedBoundConfg = maps:from_list([ + {<<"services_calls">> , [<<"666.666.666.666:13">>]}, + {<<"streams_produces">>, [<<"666.666.666.666:13">>]}, + {<<"donotresolveme">> , <<"donotabsolveme">>} + ]), + + %Maps can be used safely with the == operator but appears proplists cannot be + Expected = maps:from_list([ + {<<"appname">>, ?PLG(appname, C)}, + {<<"apptype">>, <<"program-flowlet">>}, + {<<"namespace">>, ?PLG(namespace, C)}, + {<<"healthcheckurl">>, list_to_binary(?SC([?PLG(broker_app_url, C), "/healthcheck"]))}, + {<<"metricsurl">>, list_to_binary(?SC([?PLG(broker_app_url, C), "/metrics"]))}, + {<<"url">>, list_to_binary(?PLG(broker_app_url, C))}, + {<<"connectionurl">>, list_to_binary(?PLG(stream_url, C))}, + {<<"serviceendpoints">>, [#{<<"url">> => list_to_binary(?SC([?PLG(app_url, C), "/services/Greeting/methods/greet"])), + <<"method">> => <<"GET">>}]}, + {<<"unbound_config">>, #{<<"streams_produces">> => <<"\{\{fake_testing_service\}\}">>, <<"services_calls">> => <<"\{\{fake_testing_service\}\}">>, <<"donotresolveme">> => <<"donotabsolveme">>}}, + {<<"bound_config">>, ExpectedBoundConfg} + ]), + + %assert the current appliccation list does not contain our test app + {200,RB0} = httpabs:get(?XER, ?SC([?PLG(broker_url, C), "/application"])), + true = lists:all(fun(X) -> X /= ?PLG(appname, C) end, jiffy:decode(RB0)), + + %deploy the app + Body = valid_deploy_body(C), + {200, RB} = httpabs:put(?XER, ?PLG(broker_app_url, C), "application/json", jiffy:encode(Body)), + + %The CDAP APIs return the config as a JSON dumped to a string, so we need to get that back into a real JSON to have key-order-independent equality testing + Fix = fun(X) -> + RBMap = jiffy:decode(X, [return_maps]), + maps:update(<<"bound_config">>, jiffy:decode(maps:get(<<"bound_config">>, RBMap), [return_maps]), RBMap) + end, + + %assert that the return and get matches what we put in + true = Fix(RB) == Expected, + + %assert hitting the get application endpoint works + {200, RB2} = httpabs:get(?XER, ?PLG(broker_app_url, C)), + true = Fix(RB2) == Expected, + + %assert the current application list now includes our new app + {200, RB3} = httpabs:get(?XER, ?SC([?PLG(broker_url, C), "/application"])), + true = lists:any(fun(X) -> X == ?PLG(appname, C) end, jiffy:decode(RB3)), + + %make sure it is in CDAP + {200, _} = httpabs:get(?XER, ?PLG(app_url, C)), + + %check metrics + {200, _} = httpabs:get(?XER, ?SC([?PLG(broker_app_url, C), "/metrics"])), + + %check healthcheck + {200, _} = httpabs:get(?XER,?SC([?PLG(broker_app_url, C), "/healthcheck"])), + + %make sure that the service is registered. TODO! Could get more fancy by manually checking a healthcheck + {200, RBHC} = httpabs:get(?XER,?PLG(consul_app_url, C)), + true = jiffy:decode(RBHC) /= [], + + %check that the UNbound config is correct + true = {200, util:ejson_to_map(?PLG(init_config, C))} == get_config_consul(C), + + %check that the preferences in Consul is correct + InitPrefMap = util:ejson_to_map(?PLG(init_preferences, C)), + true = {200, InitPrefMap} == get_preferences_consul(C), + + %make sure CDAP has right preferences + true = {200, InitPrefMap} == get_preferences_cdap(C), + + %make sure the config binding service and pulling config out of CDAP all match + %> get it strait from CBS + CBSUrl = util:resolve_cbs(?XER, ?PLG(consul_url, C)), + {200, RB4} = httpabs:get(?XER, ?SC([CBSUrl, "/service_component/", ?PLG(appname, C)])), + %get it from cdap + {200, CDAPConfig} = get_config_cdap(C), + %make sure everythng is as expected + true = ExpectedBoundConfg == jiffy:decode(RB4, [return_maps]), + true = ExpectedBoundConfg == CDAPConfig, + + %try to put the same app again and assert you get a 400 + {400,"State: Bad Request. Return Body: Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first"} = + httpabs:put(?XER, ?PLG(broker_app_url, C), "application/json", jiffy:encode(Body)). + +hydrator_deploy(C) -> + Body = {[ + {<<"cdap_application_type">>, <<"hydrator-pipeline">>}, + {<<"namespace">>, ?PLG(namespace, C)}, + {<<"streamname">>, ?PLG(hydrator_streamname, C)}, + {<<"pipeline_config_json_url">>, ?PLG(hydrator_json_url, C)} + ]}, + Expected = maps:from_list([ + {<<"appname">>, ?PLG(hydrator_appname, C)}, + {<<"apptype">>, <<"hydrator-pipeline">>}, + {<<"namespace">>, ?PLG(namespace, C)}, + {<<"healthcheckurl">>, list_to_binary(?SC([?PLG(broker_hydrator_app_url, C), "/healthcheck"]))}, + {<<"metricsurl">>, list_to_binary(?SC([?PLG(broker_hydrator_app_url, C), "/metrics"]))}, + {<<"url">>, list_to_binary(?PLG(broker_hydrator_app_url, C))}, + {<<"connectionurl">>, list_to_binary(?PLG(hydrator_stream_url, C))}, + {<<"serviceendpoints">>, []} + ]), + + %assert the current appliccation list does not contain our test app + {200,RB0} = httpabs:get(?XER, ?SC([?PLG(broker_url, C), "/application"])), + true = lists:all(fun(X) -> X /= ?PLG(hydrator_appname, C) end, jiffy:decode(RB0)), + + %try the deploy + {200, RB1} = httpabs:put(?XER, ?PLG(broker_hydrator_app_url, C), "application/json", jiffy:encode(Body)), + true = jiffy:decode(RB1, [return_maps]) == Expected, + + %make sure the Execution resume worked + {200, RB2} = httpabs:get(?XER, ?PLG(hydrator_pipeline_status_url, C)), + true = jiffy:decode(RB2) == {[{<<"status">>, <<"SCHEDULED">>}]}, + + %make sure it is in CDAP + {200, _} = httpabs:get(?XER, ?PLG(hydrator_app_url, C)), + + %assert the current application list now includes our new app + {200, RB3} = httpabs:get(?XER, ?SC([?PLG(broker_url, C), "/application"])), + true = lists:any(fun(X) -> X == ?PLG(hydrator_appname, C) end, jiffy:decode(RB3)), + + %check healthcheck + {200, _} = httpabs:get(?XER,?SC([?PLG(broker_hydrator_app_url, C), "/healthcheck"])), + + %check metrics + {200, _} = httpabs:get(?XER,?SC([?PLG(broker_hydrator_app_url, C), "/metrics"])), + + %make sure that the service is registered. TODO! Could get more fancy by manually checking a healthcheck + {200, RBHC} = httpabs:get(?XER,?PLG(consul_hydrator_app_url, C)), + true = jiffy:decode(RBHC) /= [] + . + +hydrator_wdeps_deploy(C) -> + Body = {[ + {<<"cdap_application_type">>, <<"hydrator-pipeline">>}, + {<<"namespace">>, ?PLG(namespace, C)}, + {<<"streamname">>, ?PLG(hydrator_wdeps_streamname, C)}, + {<<"pipeline_config_json_url">>, ?PLG(hydrator_wdeps_json_url, C)}, + {<<"dependencies">>, [ + {[ + {<<"artifact_extends_header">>, <<"system:cdap-data-pipeline[4.1.0,5.0.0)">>}, + {<<"artifact_name">>, ?PLG(hydrator_wdeps_artname, C)}, + {<<"artifact_version_header">>, ?PLG(hydrator_wdeps_artver, C)}, + {<<"artifact_url">>, ?PLG(hydrator_wdeps_jar_url, C)}, + {<<"ui_properties_url">>, ?PLG(hydrator_wdeps_properties_json_url, C)} + ]} + ]} + ]}, + Expected = maps:from_list([ + {<<"appname">>, ?PLG(hydrator_wdeps_appname, C)}, + {<<"apptype">>, <<"hydrator-pipeline">>}, + {<<"namespace">>, ?PLG(namespace, C)}, + {<<"healthcheckurl">>, list_to_binary(?SC([?PLG(broker_hydrator_wdeps_app_url, C), "/healthcheck"]))}, + {<<"metricsurl">>, list_to_binary(?SC([?PLG(broker_hydrator_wdeps_app_url, C), "/metrics"]))}, + {<<"url">>, list_to_binary(?PLG(broker_hydrator_wdeps_app_url, C))}, + {<<"connectionurl">>, list_to_binary(?PLG(hydrator_wdeps_stream_url, C))}, + {<<"serviceendpoints">>, []} + ]), + %assert the current appliccation list does not contain our test app + {200,RB0} = httpabs:get(?XER,?SC([?PLG(broker_url, C), "/application"])), + true = lists:all(fun(X) -> X /= ?PLG(hydrator_wdeps_appname, C) end, jiffy:decode(RB0)), + + %try the deploy + {200, RB1} = httpabs:put(?XER, ?PLG(broker_hydrator_wdeps_app_url, C), "application/json", jiffy:encode(Body)), + true = jiffy:decode(RB1, [return_maps]) == Expected, + + %make sure properties are loaded, test artifact + {200, _} = httpabs:get(?XER,?SC([?PLG(cdap_ns_url, C), "/artifacts/", ?PLG(hydrator_wdeps_artname, C), "/versions/", ?PLG(hydrator_wdeps_artver, C), "/properties"])), + + %make sure the Execution resume worked + {200, RB2} = httpabs:get(?XER,?PLG(hydrator_pipeline_wdeps_status_url, C)), + true = jiffy:decode(RB2) == {[{<<"status">>, <<"SCHEDULED">>}]}, + + %make sure it is in CDAP + {200, _} = httpabs:get(?XER,?PLG(hydrator_wdeps_app_url, C)), + + %assert the current application list now includes our new app + {200, RB3} = httpabs:get(?XER,?SC([?PLG(broker_url, C), "/application"])), + true = lists:any(fun(X) -> X == ?PLG(hydrator_wdeps_appname, C) end, jiffy:decode(RB3)), + + %check healthcheck + {200, _} = httpabs:get(?XER,?SC([?PLG(broker_hydrator_wdeps_app_url, C), "/healthcheck"])), + + %check metrics + {200, _} = httpabs:get(?XER,?SC([?PLG(broker_hydrator_wdeps_app_url, C), "/metrics"])), + + %make sure that the service is registered. TODO! Could get more fancy by manually checking a healthcheck + {200, RBHC} = httpabs:get(?XER,?PLG(consul_hydrator_wdeps_app_url, C)), + true = jiffy:decode(RBHC) /= [] + . + +hydrator_test(C) -> + %test te app by injecting some data into the stream and getting it out + %Sleeping since HTTP services may still be booting up: see https://issues.cask.co/browse/CDAP-812 + ok = timer:sleep(30000), %30s + %curl into stream + {200, _} = httpabs:post(?XER, ?PLG(hydrator_stream_url, C), "text/plain", "beer, vodka, gin"), + %query data out + PB = jiffy:encode({[{<<"query">>, <<"select v1, v2, v3 from dataset_pf1">>}]}), + {200, RB} = httpabs:post(?XER, ?SC([?PLG(cdap_ns_url, C), "/data/explore/queries"]), "text/plain", PB), + {[{<<"handle">>, Handle}]} = jiffy:decode(RB), + %results can take time, sleep again + ok = timer:sleep(30000), + Expected = {[ + {<<"status">>,<<"FINISHED">>}, + {<<"hasResults">>,true} + ]}, + {200, RB2} = httpabs:get(?XER, ?SC([?PLG(cdap_url, C), "/v3/data/explore/queries", "/", Handle, "/status"])), + true = Expected == jiffy:decode(RB2), + {200, _} = httpabs:post(?XER, ?SC([?PLG(cdap_url, C), "/v3/data/explore/queries", "/", Handle, "/next"]), "text/plain", "") + . + +app_test(C) -> + %Sleeping since HTTP services may still be booting up: see https://issues.cask.co/browse/CDAP-812 + ok = timer:sleep(30000), %30s + {200, _} = httpabs:post(?XER, ?PLG(stream_url, C), "text/plain", "'Prince of Darkness'"), + {200, "Hello 'Prince of Darkness'!"} = httpabs:get(?XER,?SC([?PLG(app_url, C), "/services/Greeting/methods/greet"])). + +app_reconfigure(C) -> + %Test app reconfiguration + %test new config right in Consul + true = {200, util:ejson_to_map(?PLG(init_config, C))} == get_config_consul(C), + + %do the reconfig + ReconfigMap = util:ejson_to_map({[{<<"foo REDUX EDITION">>, <<"bar">>}, {<<"LEAVE ME ALONE">>, <<"CONFIG EDITION">>}]}), + %test httpabs bad body (not encoded as JSON) + {400,"ERROR: The request Body is malformed"} = httpabs:put(?XER, ?SC([?PLG(broker_app_url, C), "/reconfigure"]), "application/json", {[{<<"reconfiguration_type">>, <<"program-flowlet-app-config">>}, {<<"config">>, ReconfigMap}]}), + %do it properly + {200, _} = httpabs:put(?XER, ?SC([?PLG(broker_app_url, C), "/reconfigure"]), "application/json", jiffy:encode({[{<<"reconfiguration_type">>, <<"program-flowlet-app-config">>},{<<"config">>, ReconfigMap}]})), + %test new config right in consul + true = {200, ReconfigMap} == get_config_consul(C), + %test new config right in cdap + true = {200, ReconfigMap} == get_config_cdap(C), + + %Test preferences reconfiguration + %check that the preferences in Consul is correct + InitMap = util:ejson_to_map(?PLG(init_preferences, C)), + true = {200, InitMap} == get_preferences_consul(C), + %check that the preferences in CDAP are correct + true = {200, InitMap} == get_preferences_cdap(C), + %reconfigure the preferences + PreferencesReconfigMap = util:ejson_to_map({[{<<"preffoo REDUX EDITION">>, <<"prefbar REMIXXX">>}, {<<"LEAVE ME ALONE">>, <<"PREFERENCES EDITION">>}]}), + {200, _} = httpabs:put(?XER, ?SC([?PLG(broker_app_url, C), "/reconfigure"]), "application/json", jiffy:encode({[{<<"reconfiguration_type">>, <<"program-flowlet-app-preferences">>},{<<"config">>, PreferencesReconfigMap}]})), + %make sure consul has right preferences + true = {200, PreferencesReconfigMap} == get_preferences_consul(C), + %make sure CDAP has right preferences + true = {200, PreferencesReconfigMap} == get_preferences_cdap(C), + + %test the smart reconfiguration call + %try to give it a smart where there are keys in just preferences + SmartReconfigPrefMap = util:ejson_to_map({[{<<"preffoo REDUX EDITION">>, <<"BAR'D AGAIN">>}]}), + {200, _} = httpabs:put(?XER, ?SC([?PLG(broker_app_url, C), "/reconfigure"]), "application/json", jiffy:encode({[{<<"reconfiguration_type">>, <<"program-flowlet-smart">>},{<<"config">>, SmartReconfigPrefMap}]})), + ExpectedNewPreferences = #{<<"LEAVE ME ALONE">>=><<"PREFERENCES EDITION">>,<<"preffoo REDUX EDITION">>=><<"BAR'D AGAIN">>}, + true = {200, ExpectedNewPreferences} == get_preferences_consul(C), + true = {200, ExpectedNewPreferences} == get_preferences_cdap(C), + + %try to give it a smart where there are keys in just config + SmartReconfigConfig = {[{<<"foo REDUX EDITION">>, <<"FOO'D AGAIN">>}]}, + {200, _} = httpabs:put(?XER, ?SC([?PLG(broker_app_url, C), "/reconfigure"]), "application/json", jiffy:encode({[{<<"reconfiguration_type">>, <<"program-flowlet-smart">>},{<<"config">>, SmartReconfigConfig}]})), + %make sure CDAP and Consul agree and are equal to what we expected + ExpectedNewConfig = #{<<"LEAVE ME ALONE">>=><<"CONFIG EDITION">>,<<"foo REDUX EDITION">>=><<"FOO'D AGAIN">>}, + true = {200, ExpectedNewConfig} == get_config_consul(C), + true = {200, ExpectedNewConfig} == get_config_cdap(C), + + %try to give it a smart where there are keys in both preferences and config + SmartReconfigBoth = {[{<<"foo REDUX EDITION">>, <<"FOO'D AGAIN AGAIN">>}, {<<"preffoo REDUX EDITION">>, <<"BAR'D AGAIN AGAIN">>}]}, + {200, _} = httpabs:put(?XER, ?SC([?PLG(broker_app_url, C), "/reconfigure"]), "application/json", jiffy:encode({[{<<"reconfiguration_type">>, <<"program-flowlet-smart">>},{<<"config">>, SmartReconfigBoth}]})), + ExpectedNewPreferencesBoth = #{<<"LEAVE ME ALONE">>=><<"PREFERENCES EDITION">>,<<"preffoo REDUX EDITION">>=><<"BAR'D AGAIN AGAIN">>}, + ExpectedNewConfigBoth = #{<<"LEAVE ME ALONE">>=><<"CONFIG EDITION">>,<<"foo REDUX EDITION">>=><<"FOO'D AGAIN AGAIN">>}, + true = {200, ExpectedNewPreferencesBoth} == get_preferences_consul(C), + true = {200, ExpectedNewPreferencesBoth} == get_preferences_cdap(C), + true = {200, ExpectedNewConfigBoth} == get_config_consul(C), + true = {200, ExpectedNewConfigBoth} == get_config_cdap(C), + + %try to give it a smart where there are no overlaps + SmartReconfigNone = {[{<<"EMPTY">>, <<"LIKE YOUR SOUL">>}]}, + {400, _} = httpabs:put(?XER, ?SC([?PLG(broker_app_url, C), "/reconfigure"]), "application/json", jiffy:encode({[{<<"reconfiguration_type">>, <<"program-flowlet-smart">>},{<<"config">>, SmartReconfigNone}]})), + true = {200, ExpectedNewPreferencesBoth} == get_preferences_consul(C), + true = {200, ExpectedNewPreferencesBoth} == get_preferences_cdap(C), + true = {200, ExpectedNewConfigBoth} == get_config_consul(C), + true = {200, ExpectedNewConfigBoth} == get_config_cdap(C) + . + +app_botch_flows(C) -> + %check healthcheck + {200, _} = httpabs:get(?XER,?SC([?PLG(broker_app_url, C), "/healthcheck"])), + + %purposely shut down a flow "manually" to test that undeploy works with a "partially deployed" app + {200, []} = cdap_interface:exec_programs(?XER, ?PLG(appname, C), ?PLG(namespace, C), ?PLG(cdap_url, C), + [#program{type = <<"flows">>, id = <<"WhoFlow">>}, #program{type = <<"services">>, id = <<"Greeting">>}], "stop"), + %make sure healthcheck now fails + {400, _} = httpabs:get(?XER,?SC([?PLG(broker_app_url, C), "/healthcheck"])) + . + +app_botch_delete(C) -> + %purposely shut down flows and then delete the app from the CDAP api to test undeploy works with a [gone] app + {200, []} = cdap_interface:exec_programs(?XER, ?PLG(appname, C), ?PLG(namespace, C), ?PLG(cdap_url, C), + [#program{type = <<"flows">>, id = <<"WhoFlow">>}, #program{type = <<"services">>, id = <<"Greeting">>}], "stop"), + {200, []} = cdap_interface:delete_app(?XER, ?PLG(appname, C), ?PLG(namespace, C), ?PLG(cdap_url, C)), + + %make sure healthcheck now fails + {400, _} = httpabs:get(?XER,?SC([?PLG(broker_app_url, C), "/healthcheck"])) + . + +app_botch_consul_delete(C) -> + %purposefully delete the config in consul to make sure delete doesnt blow up + {200, "true"} = consul_interface:consul_delete_config(?XER, ?PLG(appname, C),?PLG(consul_url, C)). + +app_teardown(C) -> + %Test app teardown and delete + %app is there for now in broker + {200,_ } = httpabs:get(?XER,?PLG(broker_app_url, C)), + + %teardown the test application + {200, []} = httpabs:delete(?XER, ?PLG(broker_app_url, C)), + + %make sure the broker deleted the config from Consul + {404, _} = get_config_consul(C), + + %make sure broker deleted the preferences + {404, _} = get_preferences_consul(C), + + %make sure the broker app url no longer exists + {404, _ } = httpabs:get(?XER,?PLG(broker_app_url, C)), + + %teardown the testing rels + {404, _} = setup_rels(C, teardown), + + %teardown the fake service and make sure it is gone + {200, Srv} = setup_fake_testing_service(C, teardown), + true = Srv == "[]", + + %cdap app gone + {404,"State: Not Found. Return Body: 'application:testns.hwtest.-SNAPSHOT' was not found."} = httpabs:get(?XER,?PLG(app_url, C)), + + %make sure that the service is not registered. TODO! Could get more fancy by manually checking a healthcheck + {200, RBHC} = httpabs:get(?XER,?PLG(consul_app_url, C)), + true = jiffy:decode(RBHC) == []. + +hydrator_app_teardown(C) -> + %Test app teardown and delete + %app is there for now in cdap + {200, _} = httpabs:get(?XER,?PLG(hydrator_app_url, C)), + %app is in broker + {200,_ } = httpabs:get(?XER,?PLG(broker_hydrator_app_url, C)), + %teardown the test application + {200, []} = httpabs:delete(?XER, ?PLG(broker_hydrator_app_url, C)), + %make sure the broker deleted the config from Consul + ?D(<<"todo! put this back:">>), + %{404, _} = get_config_consul(C), + %make sure the broker app url no longer exists + {404, _ } = httpabs:get(?XER,?PLG(broker_hydrator_app_url, C)), + %make sure gone from CDAP + {404,"State: Not Found. Return Body: 'application:testns.hydratortest.-SNAPSHOT' was not found."} = httpabs:get(?XER,?PLG(hydrator_app_url, C)), + %make sure that the service is not registered. TODO! Could get more fancy by manually checking a healthcheck + {200, RBHC} = httpabs:get(?XER,?PLG(consul_hydrator_app_url, C)), + true = jiffy:decode(RBHC) == [] + . + +hydrator_wdeps_test(C) -> + %test te app by injecting some data into the stream and getting it out + %Sleeping since HTTP services may still be booting up: see https://issues.cask.co/browse/CDAP-812 + ok = timer:sleep(30000), %30s + %curl into stream + %grab the test data + {200, TestData} = httpabs:get(?XER,?PLG(hydrator_wdeps_test_data_url, C)), + %push it in + {200, _} = httpabs:post(?XER, ?SC([?PLG(hydrator_wdeps_stream_url, C), "/batch"]), "text/plain", TestData), + %query data out + PB = jiffy:encode({[{<<"query">>, <<"select ts from dataset_t1file">>}]}), + {200, RB} = httpabs:post(?XER, ?SC([?PLG(cdap_ns_url, C), "/data/explore/queries"]), "text/plain", PB), + {[{<<"handle">>, Handle}]} = jiffy:decode(RB), + %results can take time, sleep again + ok = timer:sleep(30000), + Expected = {[ + {<<"status">>,<<"FINISHED">>}, + {<<"hasResults">>,true} + ]}, + {200, RB2} = httpabs:get(?XER,?SC([?PLG(cdap_url, C), "/v3/data/explore/queries", "/", Handle, "/status"])), + true = Expected == jiffy:decode(RB2), + {200, _} = httpabs:post(?XER, ?SC([?PLG(cdap_url, C), "/v3/data/explore/queries", "/", Handle, "/next"]), "text/plain", "") + . + +hydrator_wdeps_teardown(C) -> + %Test app teardown and delete + %app is there for now in cdap + {200, _} = httpabs:get(?XER,?PLG(hydrator_wdeps_app_url, C)), + %app is in broker + {200,_ } = httpabs:get(?XER,?PLG(broker_hydrator_wdeps_app_url, C)), + %teardown the test application + {200, []} = httpabs:delete(?XER, ?PLG(broker_hydrator_wdeps_app_url, C)), + %make sure the broker deleted the config from Consul + ?D(<<"todo! put this back:">>), + %{404, _} = get_config_consul(C), + %make sure the broker app url no longer exists + {404, _ } = httpabs:get(?XER,?PLG(broker_hydrator_wdeps_app_url, C)), + %make sure gone from CDAP + {404,"State: Not Found. Return Body: 'application:testns.hydratortest.-SNAPSHOT' was not found."} = httpabs:get(?XER,?PLG(hydrator_app_url, C)), + %make sure that the service is not registered. TODO! Could get more fancy by manually checking a healthcheck + {200, RBHC} = httpabs:get(?XER,?PLG(consul_hydrator_wdeps_app_url, C)), + true = jiffy:decode(RBHC) == [] + . + +test_failures(C) -> + %test things that should fail + %delete a non-existent app + {404, "State: Not Found. Return Body: Tried to delete an application that was not registered"} = + httpabs:delete(?XER, ?SC([?PLG(broker_app_url, C), "MYFRIENDOFMISERY"])), + + %malformed Broker put + URL = ?SC([?PLG(broker_app_url, C), "FAILURETEST"]), + Body = {[ + {<<"malformed">>, <<"i am">>} + ]}, + {400, "State: Bad Request. Return Body: Invalid PUT Body or unparseable URL"} = httpabs:put(?XER, URL, "application/json", jiffy:encode(Body)), + + %deploy a bad CDAP app with a bad program_id + Body2 = {[ + {<<"cdap_application_type">>, <<"program-flowlet">>}, + {<<"namespace">>, ?PLG(namespace, C)}, + {<<"streamname">>, ?PLG(streamname, C)}, + {<<"jar_url">>, ?PLG(jar_url, C)}, + {<<"artifact_name">>, ?PLG(art_name, C)}, + {<<"artifact_version">>, ?PLG(art_ver, C)}, + {<<"app_config">>, ?PLG(init_config, C)}, + {<<"app_preferences">>, ?PLG(init_preferences, C)}, + {<<"services">>, [{[{<<"service_name">>, <<"Greeting">>}, + {<<"service_endpoint">>, <<"greet">>}, + {<<"endpoint_method">>, <<"GET">>}]}]}, + {<<"programs">>, [ + {[{<<"program_type">>, <<"flows">>}, + {<<"program_id">>, <<"DISSAPOINTMENT">>}]} + ]}, + {<<"program_preferences">>, []} + ]}, + %WORKS IN CDAP 3: + %{404,"State: Not Found. Return Body: State: Not Found. Return Body: 'program:testns.hwtestFAILURETEST.flow.DISSAPOINTMENT' was not found."} = httpabs:put(?XER, URL, "application/json", jiffy:encode(Body2)), + %WORKS IN CDAP 4 (looks like they are doing more intrispection on the jar name) + {404,_} = httpabs:put(?XER, URL, "application/json", jiffy:encode(Body2)), + %make sure the rollback happened + {200, "[]"} = httpabs:get(?XER,?SC([?PLG(broker_url, C), "/application"])), + + %try to deploy with a bad URL where bad means nonexistent (504) + Body3 = {[ + {<<"cdap_application_type">>, <<"program-flowlet">>}, + {<<"namespace">>, ?PLG(namespace, C)}, + {<<"streamname">>, ?PLG(streamname, C)}, + {<<"jar_url">>, ?SC([?PLG(jar_url, C), "DOESNOTEXISTMOSTLIKELY"])}, + {<<"artifact_name">>, ?PLG(art_name, C)}, + {<<"artifact_version">>, ?PLG(art_ver, C)}, + {<<"app_config">>, ?PLG(init_config, C)}, + {<<"app_preferences">>, ?PLG(init_preferences, C)}, + {<<"services">>, [{[{<<"service_name">>, <<"Greeting">>}, {<<"service_endpoint">>, <<"greet">>}, {<<"endpoint_method">>, <<"GET">>}]}]}, + {<<"programs">>, [{[{<<"program_type">>, <<"flows">>},{<<"program_id">>, <<"WhoFlow">>}]},{[{<<"program_type">>, <<"services">>},{<<"program_id">>, <<"Greeting">>}]}]}, + {<<"program_preferences">>, [{[{<<"program_type">>,<<"flows">>}, {<<"program_id">>, <<"WhoFlow">>}, {<<"program_pref">>, ?PLG(whoflowpref, C)}]}]} + ]}, + {404, _} = httpabs:put(?XER, URL, "application/json", jiffy:encode(Body3)), + + %try to deploy with a bad URL where bad means malformed + Body4 = {[ + {<<"cdap_application_type">>, <<"program-flowlet">>}, + {<<"namespace">>, ?PLG(namespace, C)}, + {<<"streamname">>, ?PLG(streamname, C)}, + {<<"jar_url">>, <<"THIS IS NOT EVEN A URL WHAT ARE YOU DOING TO ME">>}, + {<<"artifact_name">>, ?PLG(art_name, C)}, + {<<"artifact_version">>, ?PLG(art_ver, C)}, + {<<"app_config">>, ?PLG(init_config, C)}, + {<<"app_preferences">>, ?PLG(init_preferences, C)}, + {<<"services">>, [{[{<<"service_name">>, <<"Greeting">>}, {<<"service_endpoint">>, <<"greet">>}, {<<"endpoint_method">>, <<"GET">>}]}]}, + {<<"programs">>, [{[{<<"program_type">>, <<"flows">>},{<<"program_id">>, <<"WhoFlow">>}]},{[{<<"program_type">>, <<"services">>},{<<"program_id">>, <<"Greeting">>}]}]}, + {<<"program_preferences">>, [{[{<<"program_type">>,<<"flows">>}, {<<"program_id">>, <<"WhoFlow">>}, {<<"program_pref">>, ?PLG(whoflowpref, C)}]}]} + ]}, + {400,"State: Bad Request. Return Body: ERROR: The following URL is malformed: THIS IS NOT EVEN A URL WHAT ARE YOU DOING TO ME"} = httpabs:put(?XER, URL, "application/json", jiffy:encode(Body4)) + . + +invalid_reconfigure(C) -> + %test reconfiguring an app that does not exist despite put body being correct + {404,"State: Not Found. Return Body: Reconfigure recieved but the app is not registered"} = httpabs:put(?XER, ?SC([?PLG(broker_app_url, C), "THE_VOID", "/reconfigure"]), "application/json", jiffy:encode({[{<<"reconfiguration_type">>, <<"program-flowlet-app-config>">>}, {<<"config">>, {[{<<"foo">>, <<"bar">>}]}}]})), + + %test reconfiguring with an invalid PUT body (missing "reconfiguration_type") + {400,"State: Bad Request. Return Body: Invalid PUT Reconfigure Body: key 'reconfiguration_type' is missing"} = httpabs:put(?XER, ?SC([?PLG(broker_app_url,C), "/reconfigure"]), "application/json", jiffy:encode({[{<<"config">>, <<"bar">>}]})), + + %test reconfiguring with an invalid PUT body (missing app_config) + {400,"State: Bad Request. Return Body: Invalid PUT Reconfigure Body: key 'config' is missing"} = httpabs:put(?XER, ?SC([?PLG(broker_app_url,C), "/reconfigure"]), "application/json", jiffy:encode({[{<<"reconfiguration_type">>, <<"program-flowlet-app-config">>}, {<<"foo">>, <<"bar">>}]})), + + %test reconfiguring an invalid (unimplemented) type + {501, "State: Not Implemented. Return Body: This type (EMPTINESS) of reconfiguration is not implemented"} = httpabs:put(?XER, ?SC([?PLG(broker_app_url,C), "/reconfigure"]), "application/json", jiffy:encode({[{<<"config">>, <<"bar">>}, {<<"reconfiguration_type">>, <<"EMPTINESS">>}]})) + . + +delete_all(C) -> + %test invalid key + Body1 = jiffy:encode({[{<<"ids">>, [<<"hwtest">>]}]}), + {400,"State: Bad Request. Return Body: Invalid PUT Body"} = httpabs:post(?XER, ?SC([?PLG(broker_url, C), "/application/delete"]), "application/json", Body1), + %test invalid: not a list + Body2 = jiffy:encode({[{<<"appnames">>, <<"hwtest">>}]}), + {400,"State: Bad Request. Return Body: Invalid PUT Body"} = httpabs:post(?XER, ?SC([?PLG(broker_url, C), "/application/delete"]), "application/json", Body2), + %test undeploy a real app and also an app that is not deployed + Body3 = jiffy:encode({[{<<"appnames">>, [<<"hwtest">>, <<"dissapointment">>]}]}), + {200, "[200,404]"} = httpabs:post(?XER, ?SC([?PLG(broker_url, C), "/application/delete"]), "application/json", Body3), + %teardown the fake service and make sure it is gone + {200, Srv} = setup_fake_testing_service(C, teardown), + true = Srv == "[]". + -- cgit 1.2.3-korg