summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorTommy Carpenter <tommy@research.att.com>2017-08-22 18:07:40 -0400
committerTommy Carpenter <tommy@research.att.com>2017-08-22 18:08:22 -0400
commit647addf5d6c78b2b8c941cc9cd8c57a3eb9f30b4 (patch)
tree4de88ed0c8b175b271b5d7da6076ebf3da40e466 /src
parentc7b6dc90e4cde0ac8524539fc02ab2943c88048a (diff)
[DCAEGEN2-42] Initial commit of broker
Change-Id: I1c553c82d5b39a4c134c44e2320ac0e44785e0ef Signed-off-by: Tommy Carpenter <tommy@research.att.com>
Diffstat (limited to 'src')
-rw-r--r--src/application.hrl28
-rw-r--r--src/cdap_interface.erl363
-rw-r--r--src/cdap_interface_tests.erl34
-rw-r--r--src/cdapbroker.app.src23
-rw-r--r--src/cdapbroker_app.erl94
-rw-r--r--src/cdapbroker_sup.erl55
-rw-r--r--src/consul_interface.erl139
-rw-r--r--src/httpabs.erl118
-rw-r--r--src/httpabs_tests.erl33
-rw-r--r--src/logging.erl157
-rw-r--r--src/resource_handler.erl465
-rw-r--r--src/util.erl192
-rw-r--r--src/util_tests.erl53
-rw-r--r--src/workflows.erl324
-rw-r--r--src/workflows_tests.erl27
15 files changed, 2105 insertions, 0 deletions
diff --git a/src/application.hrl b/src/application.hrl
new file mode 100644
index 0000000..fc7c80e
--- /dev/null
+++ b/src/application.hrl
@@ -0,0 +1,28 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+-record(program, {type :: binary(), id :: binary()}).
+-type lprogram() :: [#program{}].
+-type status_code() :: 100..101 | 200..206 | 300..307 | 400..417 | 500..505. %stolen from leptus
+-type httpstat() :: {status_code(), string()}.
+-record(application, {appname, apptype, namespace, healthcheckurl, metricsurl, url, connectionurl, serviceendpoints, creationtime}).
+-record(prog_flow_supp, {appname, programs::lprogram()}).
+
+
diff --git a/src/cdap_interface.erl b/src/cdap_interface.erl
new file mode 100644
index 0000000..7ce3b37
--- /dev/null
+++ b/src/cdap_interface.erl
@@ -0,0 +1,363 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+-module(cdap_interface).
+-export([get_app_metrics/4,
+ get_app_healthcheck/4,
+ push_down_config/5,
+ form_service_json_from_service_tuple/4,
+ form_stream_url_from_streamname/3,
+ exec_programs/6,
+ push_down_program_preferences/5,
+ push_down_app_preferences/5,
+ deploy_app/8,
+ deploy_pipeline/5,
+ delete_app/4,
+ %get_app_programs/3,
+ exec_pipeline/5,
+ exec_pipeline_workflow/5,
+ get_pipeline_healthcheck/5,
+ get_pipeline_metrics/3,
+ deploy_pipeline_dependencies/4,
+ deploy_pipeline_dependencies_properties/4,
+ create_namespace/3,
+ get_app_config/4,
+ get_app_preferences/4,
+ get_cdap_cluster_version/2,
+ get_cdap_gui_port_from_version/1
+ ]).
+-include("application.hrl").
+-define(SC(L), util:concat(L)).
+-define(BAD_HEALTH_CODE, 400). %%not sure if this is the best status code for "unhealthy". I don't like 500 because I am able to complete the user's request (healthcheck)
+
+%helpful: https://robhirschfeld.com/2012/08/15/erlang-http-client-restful-api-post-example-code/
+
+%%%
+%%%INTERNAL
+%%%
+map_appname(Appname) ->
+ %CDAP APIs do not allow app names with any special characters. Here we will map the
+ %name to ensure it does not contain special characters using a regex whitelist
+ %see http://stackoverflow.com/questions/3303420/regex-to-remove-all-special-characters-from-string
+ re:replace(Appname, "[^0-9a-zA-Z]+", "", [{return, binary}, global]).
+
+get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL) ->
+ URL = ?SC([CDAPURL, "/v3/metrics/search?target=metric&tag=namespace:", Namespace, "&tag=app:", map_appname(Appname)]),
+ {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", ""),
+ case ReturnCode of
+ 200 -> [ X || X <- jiffy:decode(RetBody),binary:match(X, <<"user.">>) /= nomatch];
+ _ -> 504 %must bubble this up
+ end.
+
+-spec get_app_healthcheck_program(string(), binary(), binary(), string(), #program{}) -> integer().
+get_app_healthcheck_program(XER, Appname, Namespace, CDAPURL, P) ->
+ %helper function: checks for a partocular program from an app
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/", P#program.type, "/", P#program.id]),
+ {RC, _} = httpabs:get(XER, URL),
+ case RC of
+ 200 ->
+ %Next make sure it's status is running
+ {RC2, RB2} = httpabs:get(XER, ?SC([URL, "/status"])),
+ case RC2 of
+ 200 ->
+ S = jiffy:decode(RB2, [return_maps]),
+ case maps:is_key(<<"status">>, S) andalso maps:get(<<"status">>, S) == <<"RUNNING">> of
+ true -> 200; %return 200
+ false -> ?BAD_HEALTH_CODE %return
+ end;
+ _ -> ?BAD_HEALTH_CODE
+ end;
+ _ -> ?BAD_HEALTH_CODE
+ end.
+
+-spec exec_program(string(), binary(), binary(), string(), #program{}, string()) -> httpstat().
+exec_program(XER, Appname, Namespace, CDAPURL, P, Exec) ->
+ %Exec should be 'start' or 'stop'
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/", P#program.type, "/", P#program.id, "/", Exec]),
+ httpabs:post(XER, URL, "application/json", "").
+
+push_down_program_preference(XER, Appname, Namespace, CDAPURL, {PT, PI, PP}) ->
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/", PT, "/", PI, "/", "preferences"]),
+ httpabs:put(XER, URL, "application/json", jiffy:encode(PP)).
+
+deploy_pipeline_dependency(XER, Namespace, CDAPURL, {ArtExtendsHeader, ArtName, ArtVerHeader, ArtURL, _}) ->
+ %deploys a single dependency
+ %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple.
+ {200, JarBody} = httpabs:get(XER, ArtURL),
+ Headers = [{"Artifact-Extends", ArtExtendsHeader}, {"Artifact-Version", ArtVerHeader}],
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName]),
+ httpabs:post(XER, URL, Headers, "application/octet-stream", JarBody).
+
+deploy_pipeline_dependency_property(XER, Namespace, CDAPURL, {_, ArtName, ArtVerHeader, _, UIPropertiesURL}) ->
+ %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple.
+ case UIPropertiesURL of
+ none -> {200, ""}; %nothing to do
+ _ ->
+ {200, PropertiesJSON} = httpabs:get(XER, UIPropertiesURL),
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName, "/versions/", ArtVerHeader, "/properties"]),
+ httpabs:put(XER, URL, "application/json", PropertiesJSON)
+ end.
+
+%%%
+%%%EXTERNAL
+%%%
+get_app_metrics(XER, Appname, Namespace, CDAPURL) ->
+ %Namespace should be a binary
+ MetricsList = get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL), %assert 200 or bomb
+ case MetricsList of
+ 504 -> {504, []};
+ %TODO! cdap seems to return a {200, []} for above call even if the app is not deployed. Is that OK? for now return empty list but maybe we should determine this and error with a 404 or something
+ [] -> {200, []};
+ _ ->
+ URL = ?SC([CDAPURL, "/v3/metrics/query"]),
+ Body = jiffy:encode(
+ {[{<<"appmetrics">>,
+ {[{<<"tags">>, {[{<<"namespace">>, Namespace}, {<<"app">>, map_appname(Appname)}]}},
+ {<<"metrics">>, MetricsList}]}
+ }]}),
+ {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", Body), %even when app does not exist this seems to return a 200 so assert it!
+ case ReturnCode of
+ 200 -> {200, jiffy:decode(RetBody)};
+ 504 -> {504, []}
+ end
+ end.
+
+%THIS FUNCTION WAS ONCE NEEDED
+%It gets the list of all programs in a running CDAP app.
+%However this is no longer used due to a feature request where people want to start/healthcheck only *some* programs in their application
+%So now this information is sourced from the initial PUT request, where those programs are saved as supplemntal state
+%However, leaving it here because maybe one day it will be useful
+%%%get_app_programs(Appname, Namespace, CDAPURL) ->
+%%% %fetch the list of programs from a running CDAP app.
+%%% %Parse this into a list of [{ProgramType, ProgramID}] tuples.
+%%% %Used as part of healthcheck, and also as part of undeploy so I don't have to keep Programs in MNesia pipeline.
+%%% %get informaation about application
+%%% URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
+%%% {ReturnCode, RetBody} = httpabs:http_get(URL),
+%%% case ReturnCode of
+%%% 504 -> 504;
+%%% 404 -> 404;
+%%% 400 -> 400;
+%%% 200 ->
+%%% Drb = jiffy:decode(RetBody, [return_maps]),
+%%% Programs = maps:get(<<"programs">>, Drb),
+%%% lists:map(fun(X) -> #program{
+%%% %stupid CDAP APIs require a different get call then what is returned as the Program Type. For example you need to get "flows" to get a program of type Flow. im filing a bug with them. see program-typeOne of flows, mapreduce, services, spark, workers, or workflows here: http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#details-of-a-deployed-application. I filed an issue: https://issues.cask.co/browse/CDAP-7191?filter=-2
+%%% %From http://docs.cask.co/cdap/current/en/developers-manual/building-blocks/program-lifecycle.html
+%%% %All the uppercase ones are: Flow, MapReduce, Service, Spark, Worker, Workflow
+%%% %From http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#details-of-a-program
+%%% %All the lowercase ones are: flows, mapreduce, services, spark, workers, or workflows
+%%% program_type = case maps:get(<<"type">>, X) of
+%%% <<"Flow">> -> <<"flows">>; %cdap api fail man
+%%% <<"Mapreduce">> -> <<"mapreduce">>;
+%%% <<"Service">> -> <<"services">>;
+%%% <<"Spark">> -> <<"spark">>;
+%%% <<"Worker">> -> <<"workers">>;
+%%% <<"Workflow">> -> <<"workflows">>
+%%% end,
+%%% program_id = maps:get(<<"id">>, X)
+%%% }
+%%% end, Programs)
+%%% end.
+%%%
+-spec get_app_healthcheck(string(), binary(), binary(), string()) -> integer().
+get_app_healthcheck(XER, Appname, Namespace, CDAPURL) ->
+ %cdap does not provide a simple "heathcheck" api for apps like it does metrics
+ %what I am using is:
+ % making sure the application is there
+ % making sure all programs (flows, services, etc) are there
+ %for now. From: http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#details-of-a-deployed-application
+
+ Programs = util:get_programs_for_pfapp_from_db(Appname),
+ %check each program
+ M = lists:map(fun(X) -> get_app_healthcheck_program(XER, Appname, Namespace, CDAPURL, X) end, Programs),
+ case lists:foldl(fun(X, Y) -> X == 200 andalso Y end, true, M) of %check all 200s
+ true -> 200;
+ false -> ?BAD_HEALTH_CODE
+ end.
+
+push_down_config(XER, Appname, Namespace, CDAPURL, ConfigJson) ->
+ %http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#update-an-application
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/update"]),
+ Body = jiffy:encode(
+ {[
+ {<<"config">>, ConfigJson}
+ ]}),
+ httpabs:post(XER, URL, "application/json", Body). %returns {ReturnCode, ReturnBody}
+
+push_down_program_preferences(XER, Appname, Namespace, CDAPURL, ParsedProgramPreferences) ->
+ FClosure = fun(X) -> push_down_program_preference(XER, Appname, Namespace, CDAPURL, X) end,
+ workflows:all_200s_else_showerror(FClosure, ParsedProgramPreferences).
+
+form_service_json_from_service_tuple(Appname, Namespace, CDAPURL, {SN, SE, EM}) ->
+ %transforms {SN, SE, EM} into {url: foo, method: bar}
+ URL = list_to_binary(?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/services/", SN, "/methods/", SE])),
+ {[{<<"url">>, URL},
+ {<<"method">>, EM}
+ ]}.
+
+form_stream_url_from_streamname(CDAPURL, Namespace, Streamname) ->
+ list_to_binary(?SC([CDAPURL, "/v3/namespaces/", Namespace, "/streams/", Streamname])).
+
+-spec exec_programs(string(), binary(), binary(), string(), lprogram(), string()) -> httpstat().
+exec_programs(XER, Appname, Namespace, CDAPURL, Programs, Exec) ->
+ FClosure = fun(X) -> exec_program(XER, Appname, Namespace, CDAPURL, X, Exec) end,
+ workflows:all_200s_else_showerror(FClosure, Programs).
+
+push_down_app_preferences(XER, Appname, Namespace, CDAPURL, AppPreferences) ->
+ %use app level preferences API if specified
+ %http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/preferences.html
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/preferences"]),
+ httpabs:put(XER, URL, "application/json", jiffy:encode(AppPreferences)).
+
+deploy_app(XER, Appname, Namespace, CDAPURL, JarBody, ArtifactName, ArtifactVersion, AppConfig) ->
+ %Create Artifact
+ %Deploy App
+
+ %post the artifact, OK if already exists, no check
+ %explicitly set artifact version becausme some JARS do not have it
+ httpabs:post(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtifactName]), [{"Artifact-Version", erlang:binary_to_list(ArtifactVersion)}], "application/octet-stream", JarBody),
+
+ %deploy the application
+ PutBody = jiffy:encode(
+ {[
+ {<<"artifact">>, {[
+ {<<"name">>, ArtifactName},
+ {<<"version">>, ArtifactVersion},
+ {<<"scope">>, <<"user">>}
+ ]}},
+ {<<"config">>, AppConfig}
+ ]}),
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
+ httpabs:put(XER, URL, "application/json", PutBody).
+
+delete_app(XER, Appname, Namespace, CDAPURL) ->
+ %delete an application; works for prog-flow and hydrator
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
+ httpabs:delete(XER, URL).
+
+deploy_pipeline(XER, Appname, Namespace, CDAPURL, PipelineJson) ->
+ %Deploy a hydrator pipeline, assumes namespace has already been set up
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
+ httpabs:put(XER, URL, "application/json", PipelineJson).
+
+exec_pipeline(XER, Appname, Namespace, CDAPURL, Exec) ->
+ %Exec assumed to be: "resume" or "suspend"
+ %TODO! REVISIT WHETHER datapipelineschedule is a parameter
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/schedules/dataPipelineSchedule/", Exec]),
+ httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body.
+
+exec_pipeline_workflow(XER, Appname, Namespace, CDAPURL, Exec) ->
+ %Exec assumed to be: "stop" or
+ %TODO! REVISIT WHETHER DataPipelineWorkflow is a parameter
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/workflows/DataPipelineWorkflow/", Exec]),
+ httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body.
+
+get_pipeline_healthcheck(XER, Appname, Namespace, CDAPURL, PipelineHealthLimit) ->
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/schedules/dataPipelineSchedule/status"]),
+ {RC, RB} = httpabs:get(XER, URL),
+ case RC /= 200 of
+ true -> ?BAD_HEALTH_CODE; %failed to even hit the status.
+ false ->
+ Status = jiffy:decode(RB, [return_maps]),
+ case maps:is_key(<<"status">>, Status) andalso maps:get(<<"status">>, Status) == <<"SCHEDULED">> of
+ false -> ?BAD_HEALTH_CODE; %status is malformed or the pipeline is not scheduled, both not good
+ true ->
+ %Next, check the last <LIMIT> number of runs, and report a failure if they were not sucessful, or report of more than one of them is running at the same time.
+ %This logic came from Terry.
+ %His logic is essentially that, if your application is running, but is failing, that should be interpeted as unhealthy and needs investigation.
+ %His logic was also that if you have two runs running at the same time, that requires investigation as that should not happen.
+ %RE list_to_binary(integer_to_list( see http://stackoverflow.com/questions/4010713/integer-to-binary-erlang
+ L = list_to_binary(integer_to_list(PipelineHealthLimit)),
+ {RC2, RB2} = httpabs:get(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/workflows/DataPipelineWorkflow/runs?limit=", L])),
+ case RC2 /= 200 of
+ true -> RC2; %failed to even hit the status
+ false ->
+ LRST = lists:map(fun(S) ->
+ case maps:is_key(<<"status">>, S) of
+ false -> critical;
+ true -> case maps:get(<<"status">>, S) of <<"COMPLETED">> -> ok; <<"RUNNING">> -> running; <<"FAILED">> -> critical end
+ end end, jiffy:decode(RB2, [return_maps])),
+ %now process the transformed list
+ %check if any had failed or if the status JSONs were malformed, or check if more than 2 running at once
+ case lists:any(fun(X) -> X == critical end, LRST) orelse length(lists:filter(fun(X) -> X == running end, LRST)) > 1 of
+ true -> ?BAD_HEALTH_CODE;
+ false -> 200 %ALL TESTS PASS
+ end
+ end
+ end
+ end.
+
+get_pipeline_metrics(_Appname, _Namespace, _CDAPURL) ->
+ lager:warning("WARNING, metrics not actually implemented yet for pipelines!!"),
+ {200, []}.
+
+deploy_pipeline_dependencies(XER, Namespace, CDAPURL, ParsedDependencies) ->
+ FClosure = fun(X) -> deploy_pipeline_dependency(XER, Namespace, CDAPURL, X) end,
+ workflows:all_200s_else_showerror(FClosure, ParsedDependencies).
+
+deploy_pipeline_dependencies_properties(XER, Namespace, CDAPURL, ParsedDependencies) ->
+ FClosure = fun(X) -> deploy_pipeline_dependency_property(XER, Namespace, CDAPURL, X) end,
+ workflows:all_200s_else_showerror(FClosure, ParsedDependencies).
+
+create_namespace(_XER, <<"default">>, _) -> {200, ""}; %no-op, already exists
+create_namespace(XER, Namespace, CDAPURL) ->
+ httpabs:put(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace]), "", "").
+
+get_app_config(XER, Appname, Namespace, CDAPURL) ->
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
+ {RC, RB} = httpabs:get(XER, URL),
+ case RC of
+ 200 -> {200, maps:get(<<"configuration">>, jiffy:decode(RB, [return_maps]))};
+ _ -> {RC, RB}
+ end.
+
+get_app_preferences(XER, Appname, Namespace, CDAPURL) ->
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/preferences"]),
+ {RC, RB} = httpabs:get(XER, URL),
+ case RC of
+ 200 -> {200, jiffy:decode(RB, [return_maps])};
+ _ -> {RC, RB}
+ end.
+
+get_cdap_cluster_version(XER, CDAPURL) ->
+ %CDAP decided to change their port numbers between release 3 and 4.
+ %The broker works with both.
+ %In order to add the correct GUI information into the broker "info endpoint", I have to know what CDAP version we are connected to.
+ %The GUI information is a convinence function for component developers that hit the broker via the CLI tool.
+ URL = ?SC([CDAPURL, "/v3/version"]),
+ {RC, RB} = httpabs:get(XER, URL),
+ case RC of
+ 200 -> maps:get(<<"version">>, jiffy:decode(RB, [return_maps]));
+ _ -> <<"UNKNOWN CDAP VERSION">>
+ end.
+
+-spec get_cdap_gui_port_from_version(binary() | string()) -> 9999 | 11011 | binary().
+get_cdap_gui_port_from_version(Version) ->
+ %given the cdap clsuter version, return the GUI port
+ case re:run(Version, "3\.\[0-9]+\.[0-9]+") of
+ nomatch ->
+ case re:run(Version, "4\.\[0-9]+\.[0-9]+") of
+ nomatch -> <<"UNKNOWN CDAP VERSION">>;
+ _ -> 11011
+ end;
+ _ -> 9999
+ end.
+
diff --git a/src/cdap_interface_tests.erl b/src/cdap_interface_tests.erl
new file mode 100644
index 0000000..37926a6
--- /dev/null
+++ b/src/cdap_interface_tests.erl
@@ -0,0 +1,34 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+-module(cdap_interface_tests).
+-include_lib("eunit/include/eunit.hrl").
+
+get_cdap_gui_port_from_version_test() ->
+ ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.0.0")),
+ ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.10.0")),
+ ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.0.10")),
+ ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.10.10")),
+ ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.0.0">>)),
+ ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.10.0">>)),
+ ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.0.10">>)),
+ ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.10.10">>)),
+ ?assert(<<"UNKNOWN CDAP VERSION">> == cdap_interface:get_cdap_gui_port_from_version("5.0.0")).
+
diff --git a/src/cdapbroker.app.src b/src/cdapbroker.app.src
new file mode 100644
index 0000000..1d04330
--- /dev/null
+++ b/src/cdapbroker.app.src
@@ -0,0 +1,23 @@
+{application, cdapbroker,
+ [{description, "Interface between Consul and CDAP in DCAE"},
+ {vsn, "4.0.3"},
+ {registered, []},
+ {mod, { cdapbroker_app, []}},
+ {applications,
+ [kernel,
+ stdlib,
+ lager,
+ inets,
+ ssl,
+ jiffy,
+ mnesia,
+ leptus,
+ uuid,
+ iso8601
+ ]},
+ {env,[]},
+ {modules, []},
+ {maintainers, []},
+ {licenses, []},
+ {links, []}
+ ]}.
diff --git a/src/cdapbroker_app.erl b/src/cdapbroker_app.erl
new file mode 100644
index 0000000..13256b0
--- /dev/null
+++ b/src/cdapbroker_app.erl
@@ -0,0 +1,94 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+%%%-------------------------------------------------------------------
+%% @doc cdapbroker public API
+%% @end
+%%%-------------------------------------------------------------------
+
+-module(cdapbroker_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%for application state
+-include("application.hrl").
+-define(SC(L), util:concat(L)).
+
+%%====================================================================
+%% API
+%%====================================================================
+
+start(_StartType, _StartArgs) ->
+ %starting inets to make request calls
+ inets:start(),
+ %from the HTTPC page:
+ %"If the scheme https is used, the SSL application must be started.
+ ssl:start(),
+
+ %EELF hello
+ audit:info("Audit log initlized"),
+ metrics:info("Metrics log initlized"),
+ error:info("Error log initlized"),
+
+ %fail fast; check all failure conditions first
+ PE = util:get_platform_envs_and_config(),
+ case PE of
+ [] -> %crash and burn. Need to exit else supervisor will restart this endlessly.
+ exit("fatal error, either HOSTNAME or CONSUL_HOST or CONFIG_BINDING_SERVICE is not set as an env variable");
+ [_, ConsulURL, CDAPUrl, BoundConfigMap] ->
+ try
+ S = dict:new(),
+ S1 = dict:store("consulurl", ConsulURL, S),
+ S2 = dict:store("configmap", BoundConfigMap, S1),
+ State = dict:store("cdapurl", CDAPUrl, S2),
+
+ %initialize database
+ ok = util:initialize_database(),
+
+ %print out currently registered apps at startup
+ lager:info("Currently installed apps: ~p~n", [util:get_all_appnames_from_db()]),
+
+ %start the REST server
+ leptus:start_listener(http, [{'_', [{resource_handler, State}]}], [{port, 7777}, {ip, {0,0,0,0}}]),
+
+ %start the supervisor
+ lager:info("Starting supervisor"),
+ cdapbroker_sup:start_link()
+ catch Class:Reason ->
+ lager:error("~nError Stacktrace:~s", [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})]),
+ exit(Reason)
+ end
+ end.
+
+%%--------------------------------------------------------------------
+stop(_State) ->
+ %need to make sure there are no pending transcations in RAM not written to disk yet on shutdown.
+ %Two hard problems in CS, this is one of then...
+ lager:info("Stop recieved."),
+ case mnesia:sync_log() of
+ ok -> ok;
+ {error, Reason} ->
+ lager:error(io_lib:format("While stopping, MNESIA could not by syncd due to: ~s. This means on bootup the database may be in a bad state!!", [Reason])),
+ notok
+ end.
+
diff --git a/src/cdapbroker_sup.erl b/src/cdapbroker_sup.erl
new file mode 100644
index 0000000..b0894e8
--- /dev/null
+++ b/src/cdapbroker_sup.erl
@@ -0,0 +1,55 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+%%%-------------------------------------------------------------------
+%% @doc cdapbroker top level supervisor.
+%% @end
+%%%-------------------------------------------------------------------
+
+-module(cdapbroker_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+%%====================================================================
+%% API functions
+%%====================================================================
+
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%%====================================================================
+%% Supervisor callbacks
+%%====================================================================
+
+%% Child :: {Id,StartFunc,Restart,Shutdown,Type,Modules}
+init([]) ->
+ {ok, { {one_for_all, 0, 1}, []} }.
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
diff --git a/src/consul_interface.erl b/src/consul_interface.erl
new file mode 100644
index 0000000..e52abd8
--- /dev/null
+++ b/src/consul_interface.erl
@@ -0,0 +1,139 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+-module(consul_interface).
+-export([consul_register/8,
+ consul_deregister/3,
+ consul_get_configuration/3,
+ consul_get_preferences/3,
+ consul_get_service_ip_port/3,
+ consul_push_config/4,
+ consul_push_preferences/4,
+ consul_bind_config/3,
+ consul_delete_config/3,
+ consul_delete_preferences/3
+ ]).
+
+-define(SC(L), util:concat(L)).
+-define(PrefKey(Appname), ?SC([Appname, ":preferences"])).
+
+%the Erlang library linked on the Consul webpage does not seem to be equivelent to python-consul. It does something else: https://github.com/undeadlabs/discovery
+%and this one seems to be just elixer: https://github.com/undeadlabs/consul-ex
+%so, for now, I'm just doing some HTTP calls directly against the REST API.
+
+consul_get_service(XER, Appname, ConsulURL) ->
+ %returns a list of maps
+ URL = ?SC([ConsulURL, "/v1/catalog/service/", Appname]),
+ {200, ReturnBody} = httpabs:get(XER, URL),
+ jiffy:decode(ReturnBody, [return_maps]).
+
+consul_write_kv(XER, Key, Value, ConsulURL) ->
+ %generic helper function to write a value into Consul
+ URL = ?SC([ConsulURL,"/v1/kv/", Key]),
+ httpabs:put(XER, URL, "application/json", Value).
+
+consul_read_kv(XER, Key, ConsulURL) ->
+ %does a get on the KV store, see https://www.consul.io/docs/agent/http/kv.html
+ %Appname MUST be a binary not a string!
+ URL = ?SC([ConsulURL, "/v1/kv/", Key]),
+ {RC, RB} = httpabs:get(XER, URL),
+ case RC of
+ 200 ->
+ [Drb] = jiffy:decode(RB, [return_maps]),
+ {200, base64:decode(maps:get(<<"Value">>, Drb))}; %NOTE! Does not do a JSON decode here in case you want to read non-JSON from Consul. Leaves the decode to the caller.
+ _ ->
+ {RC, RB}
+ end.
+
+consul_delete_kv(XER, Key, ConsulURL) ->
+ %the config has not been jiffy'd prior to this function
+ URL = ?SC([ConsulURL,"/v1/kv/", Key]),
+ httpabs:delete(XER, URL).
+
+%%%%%%%%%%%%%%%%%
+%PUBLIC FUNCTIONS
+%%%%%%%%%%%%%%%%%
+
+consul_register(XER, Appname, ConsulURL, SDIP, SDPort, HealthURL, HCInterval, AutoDeregisterAfter) ->
+ %uses the agent api to register this app as a service and it's healthcheck URL, which is this broker as a proxy.
+ %/v1/agent/service/register from https://www.consul.io/docs/agent/http/agent.html#agent_service_register
+ URL = ?SC([ConsulURL, "/v1/agent/service/register"]),
+ Body = jiffy:encode(
+ {[{<<"Name">>, Appname},
+ {<<"Address">>, SDIP},
+ {<<"Port">>, SDPort},
+ {<<"Check">> ,
+ {[{<<"HTTP">>, HealthURL}, {<<"Interval">>, HCInterval}, {<<"DeregisterCriticalServiceAfter">>, AutoDeregisterAfter}]}
+ }
+ ]}),
+ httpabs:put(XER, URL, "application/json", Body).
+
+consul_deregister(XER, Appname, ConsulURL) ->
+ %/v1/agent/service/deregister/<serviceId> from https://www.consul.io/docs/agent/http/agent.html#agent_service_register
+ URL = ?SC([ConsulURL, "/v1/agent/service/deregister/", Appname]),
+ httpabs:put(XER, URL, "application/json", ""). %kinda weird that this isnt a DELETE
+
+consul_get_configuration(XER, Appname, ConsulURL) ->
+ %fetch configuration from consul, assumes it is a json
+ %returns it as a map. can be encoded again.
+ {RC, RB} = consul_read_kv(XER, Appname, ConsulURL),
+ case RC of
+ 200 -> {200, jiffy:decode(RB, [return_maps])}; %configuration is a JSON
+ _ -> {RC, RB}
+ end.
+
+consul_get_preferences(XER, Appname, ConsulURL) ->
+ %This function is currently only used in testing and is not used in resource_handler, but could be useful later
+ %returns it as a map. can be encoded again.
+ {RC, RB} = consul_read_kv(XER, ?PrefKey(Appname), ConsulURL),
+ case RC of
+ 200 -> {200, jiffy:decode(RB, [return_maps])}; %configuration is a JSON
+ _ -> {RC, RB}
+ end.
+
+consul_get_service_ip_port(XER, Appname, ConsulURL) ->
+ %use when you are expecting consul_get_service to return a list of exactly one service and all you want is ip:port
+ M = lists:nth(1, consul_get_service(XER, Appname, ConsulURL)),
+ {maps:get(<<"ServiceAddress">>, M), maps:get(<<"ServicePort">>, M)}.
+
+consul_push_config(XER, Appname, ConsulURL, Config) ->
+ %pushes Config into Consul under the key "Appname".
+ %TODO: Possibly this should be under "Appname:config" to be consistent with preferences but this came first and that's an invasive change.
+ %the config has not been jiffy'd prior to this function
+ consul_write_kv(XER, Appname, jiffy:encode(Config), ConsulURL).
+
+consul_push_preferences(XER, Appname, ConsulURL, Preferences) ->
+ %pushes the preferences into Consul under the key "Appname:preferences"
+ %the config has not been jiffy'd prior to this function
+ consul_write_kv(XER, ?PrefKey(Appname), jiffy:encode(Preferences), ConsulURL).
+
+consul_delete_config(XER, Appname, ConsulURL) ->
+ consul_delete_kv(XER, Appname, ConsulURL).
+
+consul_delete_preferences(XER, Appname, ConsulURL) ->
+ consul_delete_kv(XER, ?PrefKey(Appname), ConsulURL).
+
+consul_bind_config(XER, Appname, ConsulURL) ->
+ URL = ?SC([util:resolve_cbs(XER, ConsulURL), "/service_component/", Appname]),
+ {ReturnCode, ReturnBody} = httpabs:get(XER, URL),
+ case ReturnCode of
+ 200 -> {200, jiffy:decode(ReturnBody)};
+ _ -> {ReturnCode, ReturnBody} %do not try to decode if not correct
+ end.
diff --git a/src/httpabs.erl b/src/httpabs.erl
new file mode 100644
index 0000000..bc9e068
--- /dev/null
+++ b/src/httpabs.erl
@@ -0,0 +1,118 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+-module(httpabs).
+-export([get/2,
+ post/4, %I miss python's default arguments..
+ post/5,
+ put/4,
+ delete/2
+ ]).
+-include("application.hrl").
+-define(SC(L), util:concat(L)).
+
+%NOTE
+%Consider the Erlang statement:
+%
+%{ok, {{"HTTP/1.1",ReturnCode, State}, Head, Body}} = httpc:get(URL).
+%CDAP returns error messages in the “Body” field above.
+%
+%However, Consul:
+%1) Always (in all HTTP failures I’ve tested) returns Body == “500\n”
+%2) Returns the error message in the State field
+%
+%Example:
+%
+%{{"HTTP/1.0",404,"Client Error: Not Found for url: http://consul.[...].com:8500/v1/kv/hwtestYOUHAVEFAILEDME:rel"},[{"date","Mon, 14 Nov 2016 14:41:03 GMT"},{"server","Werkzeug/0.11.11 Python/3.5.1"},{"content-length","4"},{"content-type","application/json"}],"500\n"}
+%
+%This means that error handling in HTTP is not consistent across CDAP and Consul.
+%
+%Thus below, on a failure, I return the concatenation of State and Body
+
+%%%
+%%%HELPER
+%%%
+-spec parse_response({error|ok, any()}, string()) -> httpstat().
+parse_response({Status, Response}, URL) ->
+ case Status of
+ error ->
+ lager:error("httpc error: cannot hit: ~s", [URL]),
+ case Response of
+ no_scheme -> {400, io_lib:format("ERROR: The following URL is malformed: ~s", [URL])};
+ {bad_body, _} -> {400, "ERROR: The request Body is malformed"};
+ {bad_body_generator,_} -> {400, "ERROR: The request Body is malformed"};
+ _ ->
+ lager:error(io_lib:format("Unexpected ERROR hitting ~s", [URL])),
+ {504, list_to_binary(io_lib:format("ERROR: The following URL is unreachable or the request was unable to be parsed due to an unknown error: ~s", [URL]))} %Are there other reasons other than bad body and unreachable that crash request? (Sneak peak: the answer is probably)
+ end;
+ ok ->
+ {{_, ReturnCode, State}, _Head, Body} = Response,
+ case ReturnCode of
+ 200 ->
+ {ReturnCode, Body};
+ _ ->
+ lager:error("Error While hitting ~s, Non-200 status code returned. HTTP Code ~p, State ~s, ResponseBody ~s:", [URL, ReturnCode, State, Body]),
+ %see Note at the top of this file
+ RetBody = ?SC(["State: ", State, ". Return Body: ", Body]),
+ {ReturnCode, RetBody}
+ end
+ end.
+
+sanitize(URL) ->
+ %allow URL to look like "www.foo.com" or <<"www.foo.com">>, trim it
+ case is_binary(URL) of
+ true -> string:strip(binary_to_list(URL));
+ false -> string:strip(URL)
+ end.
+
+%anywhere you see any() is essentially lazy typing.. fix these someday when time is abundant
+-spec post(string(), string()|binary(), string(), any()) -> httpstat().
+post(XER, URL, ContentType, Body) ->
+ %post that sends the XER, no headers signature
+ Headers = [{"x-ecomp-requestid", XER}],
+ U = sanitize(URL),
+ parse_response(httpc:request(post, {U, Headers, ContentType, Body}, [],[]), U).
+
+-spec post(string(), string()|binary(), list(), string(), any()) -> httpstat().
+post(XER, URL, Headers, ContentType, Body) ->
+ %post that sends XER, appends the header onto the list of desired headers
+ U = sanitize(URL),
+ parse_response(httpc:request(post, {U, [{"x-ecomp-requestid", XER} | Headers], ContentType, Body}, [],[]), U).
+
+-spec get(string(), string()|binary()) -> httpstat().
+get(XER, URL) ->
+ %http get that always sends the XER.. even if the server doesn't want it; maybe this will blow up on me one day.
+ U = sanitize(URL),
+ Headers = [{"x-ecomp-requestid", XER}],
+ parse_response(httpc:request(get, {U, Headers}, [], []), U).
+
+-spec put(string(), string()|binary(), string(), any()) -> httpstat().
+put(XER, URL, ContentType, Body) ->
+ %http put that always sends the XER
+ U = sanitize(URL),
+ Headers = [{"x-ecomp-requestid", XER}],
+ parse_response(httpc:request(put, {U, Headers, ContentType, Body}, [],[]), U).
+
+-spec delete(string(), string()|binary()) -> httpstat().
+delete(XER, URL) ->
+ %http delete that always sends the XER
+ U = sanitize(URL),
+ Headers = [{"x-ecomp-requestid", XER}],
+ parse_response(httpc:request(delete, {U, Headers}, [],[]), U).
diff --git a/src/httpabs_tests.erl b/src/httpabs_tests.erl
new file mode 100644
index 0000000..d8ad529
--- /dev/null
+++ b/src/httpabs_tests.erl
@@ -0,0 +1,33 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+-module(httpabs_tests).
+-include_lib("eunit/include/eunit.hrl").
+-import(httpabs, [
+ sanitize/1
+ ]
+ ).
+
+sanitize_test() ->
+ ?assert(sanitize(<<" www.foo.com ">>) == "www.foo.com"),
+ ?assert(sanitize(" www.foo.com ") == "www.foo.com"),
+ ?assert(sanitize(<<"www.foo.com">>) == "www.foo.com").
+
+
diff --git a/src/logging.erl b/src/logging.erl
new file mode 100644
index 0000000..942637d
--- /dev/null
+++ b/src/logging.erl
@@ -0,0 +1,157 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+-module(logging).
+-export([
+ audit/3,
+ metrics/3,
+ err/2
+ ]).
+
+-import(util, [iso/0, iso_elapsed/2, to_str/1, ip_to_str/1]).
+%..lazy macros
+-define(SC(L), util:concat(L)).
+-define(PV(Name, PL), proplists:get_value(Name, PL, "")).
+
+%This module is intended to support the logging format standard for ONAP/ECOMP components. SOmetimes that is reffered to as "EELF".
+
+%levels are none | debug | info | notice | warning | error | critical | alert | emergency.
+
+%%%
+%%%Helper functions
+%%%
+pid() -> pid_to_list(self()).
+%they wanted milleseconds but they are getting seconds rounded to ms because I don't have an erlang BIF that gives me this
+elapsed(Endtime, Starttime) -> to_str(iso_elapsed(Endtime, Starttime)*1000).
+
+start_end_elapsed(ArgPropl) ->
+ %returns start time, end time,... and elapsed time
+ EndT = iso(),
+ StartT = ?PV(bts, ArgPropl),
+ ElapT = elapsed(EndT, StartT),
+ {StartT, EndT, ElapT}.
+
+%things this logging class can compute based on the Req so need not be in every logging function
+server_add(Req) ->
+ {MyUrl, _} = cowboy_req:host_url((leptus_req:get_req(Req))),
+ %discard HTTP portion using Erlang binary matching
+ <<_:7/binary, URL/binary>> = MyUrl,
+ URL.
+
+ip(Req) -> ip_to_str(leptus_req:peer(Req)).
+
+path(Req) ->
+ %get us the method and API path that was hit from the request
+ {Path, {_,_,_,_,_,Method,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_}} = cowboy_req:path(leptus_req:get_req(Req)),
+ ?SC([Method, " ", Path]).
+
+%%%
+%%%External
+%%%
+
+audit(Sev, Req, ArgPropl) ->
+ F = case Sev of
+ info -> fun(M)->audit:info(M) end; %cant use the shorthand "fun -> audit:info/2" because of parse_transform
+ warning -> fun(M)->audit:warning(M) end
+ end,
+ %The audit field list:
+ %
+ %1 BeginTimestamp Implemented (bts)
+ %2 EndTimestamp Auto Injected when this is called
+ %3 RequestID Implemented (xer)
+ %4 serviceInstanceID
+ %5 threadId Auto Injected... however this is a VM language and so this is the Erlang process PID, not the "OS level thread id". Unclear what they want here.
+ %6 physical/virtual server name 
+ %7 serviceName Implemented (from Req)
+ %8 PartnerName
+ %9 StatusCode
+ %10 ResponseCode Implemented (rcode)
+ %11 Response Description Will not implement. This says "human readable description of the *code*. They don't want the response here. I won't do that because this is a generic function that gets called no matter what the code is, so I can't do that. But since this is supposed to be human readable, they can look up the code in the swagger spec.
+ %12 instanceUUID
+ %13 Category log level Implemented (Sev)
+ %14 Severity
+ %15 Server IP address Implemented (from Req)
+ %16 ElapsedTime Auto Injected but THIS IS SO DUMB TO BE IN THE STANDARD WASTING DISK SPACE THIS IS DIRECTLY COMPUTABLE FROM 1,2 WTF
+ %17 Server
+ %18 ClientIPaddress Implemented (from Req)
+ %19 class name Implemented (mod), though docs say OOP, I am using the Erlang module here
+ %20 Unused Implemented..
+ %21 ProcessKey
+ %22 CustomField1
+ %23 CustomField2
+ %24 CustomField3
+ %25 CustomField4
+ %26 detailMessage Implemented (msg)
+
+ {StartT, EndT, ElapT} = start_end_elapsed(ArgPropl),
+ %compute message
+ Message = ?SC([StartT, "|", EndT, "|", ?PV(xer, ArgPropl), "||", pid(), "||", path(Req), "|||", to_str(?PV(rcode, ArgPropl)), "|see swagger spec||", to_str(Sev), "||", server_add(Req), "|", ElapT, "||", ip(Req), "|", ?PV(mod, ArgPropl), "|||||||", ?PV(msg, ArgPropl)]),
+ F(Message).
+
+metrics(Sev, Req, ArgPropl) ->
+ F = case Sev of
+ debug -> fun(M)->metrics:debug(M) end;
+ info -> fun(M)->metrics:info(M) end;
+ warning -> fun(M)->metrics:warning(M) end
+ end,
+ %The metrics field list:
+ %SAME AS METRICS 1-8
+ %_____
+ %9 TargetEntity Implemented (tgte)
+ %10 TargetServiceName Implemented (tgts)
+ %11 Status Code Implemented (tgtrsc)
+ %_____
+ %SAME AS METRICS 9 ->
+ %total 29 fields
+
+ {StartT, EndT, ElapT} = start_end_elapsed(ArgPropl),
+ %compute message
+ Message = ?SC([StartT, "|", EndT, "|", ?PV(xer, ArgPropl), "||", pid(), "||", path(Req), "||", ?PV(tgte, ArgPropl), "|", ?PV(tgts, ArgPropl), "|", to_str(?PV(tgtrsc, ArgPropl)), "|", to_str(?PV(rcode, ArgPropl)), "|see swagger spec||", to_str(Sev), "||", server_add(Req), "|", ElapT, "||", ip(Req), "|", ?PV(mod, ArgPropl), "||||||||", ?PV(msg, ArgPropl)]),
+ F(Message).
+
+err(Sev, ArgPropl) ->
+ F = case Sev of
+ warning -> fun(M)->error:warning(M) end;
+ error -> fun(M)->error:error(M) end;
+ critical -> fun(M)->error:critical(M) end;
+ alert -> fun(M)->error:alert(M) end;
+ emergency -> fun(M)->error:emergency(M) end
+ end,
+ SevInLog = case Sev of
+ warning -> "WARN";
+ error -> "ERROR";
+ _ -> "FATAL"
+ end,
+ %Error field list:
+ %1 Timestamp Auto Injected when this is called
+ %2 RequestID Implemented
+ %3 ThreadId Auto Injected... however this is a VM language and so this is the Erlang thread PID. Not the "OS level pid". Unclear what they want here.
+ %4 ServiceName Implemented
+ %5 PartnerName
+ %6 TargetEntity
+ %7 TargetServiceName
+ %8 ErrorCategory Implemented
+ %9 ErrorCode
+ %10 ErrorDescription This is what I'm using detailMessage for, these seem to be the same to me.
+ %11 detailMessage Implemented
+
+ Message = ?SC([iso(), "|", ?PV(xer, ArgPropl), "|", pid(), "|", ?PV(servn, ArgPropl), "||||", SevInLog, "|||", ?PV(msg, ArgPropl)]),
+ F(Message).
+
diff --git a/src/resource_handler.erl b/src/resource_handler.erl
new file mode 100644
index 0000000..a9703fc
--- /dev/null
+++ b/src/resource_handler.erl
@@ -0,0 +1,465 @@
+-module(resource_handler).
+-compile({parse_transform, leptus_pt}).
+
+%% leptus callbacks
+-export([init/3]).
+-export([terminate/4]).
+-export([get/3, put/3, delete/3, post/3]).
+-export([cross_domains/3]).
+
+%%for keeping state
+%%The application record is defined in application.hrl
+%%In Mnesia, the first element is the type of record and the second element is the key
+%%http://erlang.org/doc/apps/mnesia/Mnesia_chap2.html
+-include("application.hrl").
+
+-define(CONSURL, dict:fetch("consulurl", State)).
+-define(CDAPURL, dict:fetch("cdapurl", State)).
+%below come from config map
+-define(HCInterval, maps:get(<<"hcinterval">>, dict:fetch("configmap", State))).
+-define(AutoDeregisterAfter, maps:get(<<"autoderegisterafter">>, dict:fetch("configmap", State))).
+-define(PipelineHealthLimit, maps:get(<<"pipelinehealthlimit">>, dict:fetch("configmap", State))).
+-define(PUBLICFIELDS, [<<"appname">>, <<"apptype">>, <<"namespace">>, <<"healthcheckurl">>, <<"metricsurl">>, <<"url">>, <<"connectionurl">>, <<"serviceendpoints">>]).
+
+%super lazy macros/imports...
+-import(logging, [audit/3, metrics/3, err/2]).
+-import(util, [iso/0, to_str/1]).
+%lazy concat
+-define(SC(L), util:concat(L)).
+%lazy shorthand to write info audit records. man I miss defines in python. c ftw.
+-define(AUDI(Req, Bts, XER, Rcode), audit(info, Req, [{bts, Bts}, {xer,XER}, {rcode, RCode}, {mod, mod()}])).
+
+
+%%%
+%%Helper functions
+%%%
+mod() -> to_str(?MODULE).
+
+get_request_id(Req) ->
+ %ECOMP request tracing
+ %see if we got a X-ECOMP-REQUESTID, or generate a new one if not
+ HXER = leptus_req:header(Req, <<"x-ecomp-requestid">>),
+ case HXER of
+ undefined ->
+ XER = util:gen_uuid(),
+ %LOL, use the client ip here to shame them into their request id
+ audit(warning, Req, [{bts, iso()}, {xer, XER}, {mod, mod()}, {msg, "Request is missing requestID. Assigned this one."}]), %eelf documentation says to log this message if requestid was missing
+ XER;
+ _ ->
+ binary_to_list(HXER) %httpc expects strings as headers, so this needs to be str for subsequent passing
+ end.
+
+%shared-code function initlization that creates a begining timestamp and gets or generates a request ID
+init_api_call(Req) ->
+ Bts = iso(), %record begining timestamp
+ XER = get_request_id(Req), %get or generate XER
+ {Bts, XER}.
+
+lookup_application(Appname) ->
+ %do a lookup in mnesia of an appname
+ Ret = mnesia:transaction(fun() -> mnesia:match_object(application, {application, Appname, '_', '_', '_', '_', '_', '_', '_', '_'}, read) end),
+ case Ret of
+ {atomic, []} -> none; %no matches
+ {atomic, [Rec]} -> Rec
+ %fail hard if there was more than one result
+ end.
+
+appname_to_application_map(Appname) ->
+ %return a Map of an Mnesia record
+ Rec = lookup_application(Appname),
+ case Rec of
+ none -> none;
+ {application, Appname, AppType, Namespace, Healthcheckurl, Metricsurl, Url, Connectionurl, ServiceEndpoints, CreationTime} ->
+ #{<<"appname">> => Appname,
+ <<"apptype">> => AppType,
+ <<"namespace">> => Namespace,
+ <<"healthcheckurl">> => Healthcheckurl,
+ <<"metricsurl">> => Metricsurl,
+ <<"url">> => Url,
+ <<"connectionurl">> => Connectionurl,
+ <<"serviceendpoints">> => ServiceEndpoints,
+ <<"creationtime">> => CreationTime
+ }
+ end.
+
+appname_to_field_vals(Appname, FieldList) ->
+ %Return just a list of values of an application with fields FieldList
+ M = appname_to_application_map(Appname),
+ case M of
+ none -> none;
+ _ -> [maps:get(F, M) || F <- FieldList]
+ end.
+
+appname_to_application_http(XER, Appname, State) ->
+ %Return an HTTP response of an application record. If this is a program flowlet style app, additionally return it's bound and unbound config
+ A = appname_to_application_map(Appname),
+ case A of
+ none -> {404, "", State};
+ _ ->
+ Body = maps:with(?PUBLICFIELDS, A),
+ case maps:get(<<"apptype">>, Body) of
+ %if program-flowlet style app, append the bound and unbound config into the return JSON
+ <<"program-flowlet">> ->
+ UB = case consul_interface:consul_get_configuration(XER, Appname, ?CONSURL) of
+ {200, Unbound} -> Unbound;
+ {_, _} -> <<"WARNING: COULD NOT FETCH CONFIG FROM CONSUL">>
+ end,
+ B = case cdap_interface:get_app_config(XER, Appname, maps:get(<<"namespace">>, Body), ?CDAPURL) of
+ {200, Bound} -> Bound;
+ {_, _} -> <<"WARNING: COULD NOT FETCH CONFIG FROM CDAP">>
+ end,
+ CM = #{<<"unbound_config">> => UB,
+ <<"bound_config">> => B},
+ {200, {json, maps:merge(Body, CM)}, State};
+ %TODO! can we do something for hydrator apps?
+ <<"hydrator-pipeline">> ->
+ {200, {json, Body}, State}
+ end
+ end.
+
+-spec parse_progflow_put_body_map(map()) ->
+ {binary(), binary(), string(), binary(), binary(), map(), map(), any(), lprogram(), any()}. %TODO! Spec parsedservices and parsedprogrampreferences so we don't have any() here...
+parse_progflow_put_body_map(Body) ->
+ Namespace = maps:get(<<"namespace">>, Body),
+ Streamname = maps:get(<<"streamname">>, Body),
+ JarURL = maps:get(<<"jar_url">>, Body),
+ ArtifactName = maps:get(<<"artifact_name">>, Body),
+ ArtifactVersion = maps:get(<<"artifact_version">>, Body),
+ AppConfig = maps:get(<<"app_config">>, Body),
+ AppPreferences = maps:get(<<"app_preferences">>, Body),
+ ParsedServices = lists:map(fun(S) -> {maps:get(<<"service_name">>, S),
+ maps:get(<<"service_endpoint">>, S),
+ maps:get(<<"endpoint_method">>, S)}
+ end, maps:get(<<"services">>, Body)),
+ Programs = lists:map(fun(P) -> #program{type=maps:get(<<"program_type">>, P),
+ id= maps:get(<<"program_id">>, P)}
+ end, maps:get(<<"programs">>, Body)),
+ ParsedProgramPreferences = lists:map(fun(P) -> {maps:get(<<"program_type">>, P),
+ maps:get(<<"program_id">>, P),
+ maps:get(<<"program_pref">>, P)}
+ end, maps:get(<<"program_preferences">>, Body)),
+ {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences}.
+
+parse_hydrator_pipeline_put_body_map(Body) ->
+ Namespace = maps:get(<<"namespace">>, Body),
+ Streamname = maps:get(<<"streamname">>, Body),
+ PipelineConfigJsonURL = maps:get(<<"pipeline_config_json_url">>, Body),
+
+ %Dependencies is optional. This function will normalize it's return with [] if the dependencies key was not passed in.
+ ParsedDependencies = case maps:is_key(<<"dependencies">>, Body) of
+ true ->
+ D = maps:get(<<"dependencies">>, Body),
+ %crash and let caller deal with it if not a list or if required keys are missing. Else parse it into
+ % {artifact-extends-header, artifact_name, artifact-version-header, artifact_url}
+ %tuples
+ %
+ %regarding the binart_to_lists: these all come in as binaries but they need to be "strings" (which are just lists of integers in erlang)
+ %for headers requiring strings, see http://stackoverflow.com/questions/28292576/setting-headers-in-a-httpc-post-request-in-erlang
+ %
+ lists:map(fun(X) -> {binary_to_list(maps:get(<<"artifact_extends_header">>, X)),
+ maps:get(<<"artifact_name">>, X),
+ binary_to_list(maps:get(<<"artifact_version_header">>, X)),
+ maps:get(<<"artifact_url">>, X),
+ %even if dependencies is specified, ui_properties is optional. This will normalize it's return with 'none' if not passed in
+ case maps:is_key(<<"ui_properties_url">>, X) of true -> maps:get(<<"ui_properties_url">>, X); false -> none end
+ } end, D);
+ false -> [] %normalize optional user input into []; just prevents user from having to explicitly pass in []
+ end,
+
+ {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies}.
+
+parse_put_body(B) ->
+ Body = jiffy:decode(B, [return_maps]),
+ Type = maps:get(<<"cdap_application_type">>, Body),
+ case Type of
+ <<"program-flowlet">> ->
+ {pf, <<"program-flowlet">>, parse_progflow_put_body_map(Body)};
+ <<"hydrator-pipeline">> ->
+ {hp, <<"hydrator-pipeline">>, parse_hydrator_pipeline_put_body_map(Body)};
+ _ ->
+ unsupported
+ end.
+
+delete_app_helper(Appname, State, XER, Req) ->
+ %Helper because it is used by both delete and rollback on failed deploy
+ %
+ %%Internal Crisis Alert:
+ %
+ %I pondered this for some time. There are three points of state for this: the cdap cluster, consul, and the broker's internal database
+ %The question is, if something in the delete fails, do we:
+ %1) Tell the user to try again later
+ %2) Clean up as much as we can, log the error, and keep going
+ %
+ %I have decided for now on taking number 2). This is the "Cloudify" way of doing things where you don't raise a NonRecoerable in a Delete operation.
+ %This has the benefit that this delete operation can be used as the *rollback*, so if anything fails in the deploy, this delete function is called to clean up any dirty state.
+ %
+ %Number 1 is not so straitforward, because "putting back things the way they were" is difficult. For example, the deletion from CDAP succeeds, but Consul can't be reached.
+ %What happens? Do I *redeploy* the CDAP app to try to make their state as it was before the botched delete was called?
+ %
+ %My conclusion is that transactions across distributed systems is hard. It's much easier if it is all local (e.g., Transactions in a single Postgres DB)
+ %
+ %SO, as a result of this decision, the broker does *NOT* assert the status code of any delete operations to be 200.
+ %The only way this function does not return a 200 is if I can't even delete from my own database.
+ %
+ metrics(info, Req, [{bts, iso()}, {xer, XER}, {mod, mod()}, {msg, io_lib:format("Delete recieved for ~s", [Appname])}]),
+ case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of
+ none -> {404, "Tried to delete an application that was not registered", State};
+ [AppType, Namespace] ->
+ try
+ case AppType of
+ <<"program-flowlet">> ->
+ ok = workflows:undeploy_cdap_app(Req, XER, Appname, ?CDAPURL, ?CONSURL, Namespace),
+ %delete from the program-flowlet supplementary table
+ {atomic, ok} = mnesia:transaction(fun() -> mnesia:delete(prog_flow_supp, Appname, write) end);
+ <<"hydrator-pipeline">> -> ok = workflows:undeploy_hydrator_pipeline(Req, XER, Appname, Namespace, ?CDAPURL, ?CONSURL)
+ end,
+ %delete from application table (shared between both types of apps)
+ {atomic, ok} = mnesia:transaction(fun() -> mnesia:delete(application, Appname, write) end),
+ {200, "", State} %Return
+ catch
+ %this is really bad, means I can't even delete from my own database. For now, log and pray.
+ %generic failure catch-all, catastrophic
+ Class:Reason ->
+ err(emergency, [{xer, XER}, {msg, io_lib:format("Catastrophic failure, can't delete ~s from my database. ~s:~s", [Appname, Class, Reason])}]),
+ err(error, [{xer, XER}, {msg, io_lib:format("~nError Stacktrace:~s", [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})])}]),
+ {500, "Please report this error", State}
+ end
+ end.
+
+%%%CALLBACKS %%%
+init(_Route, _Req, State) ->
+ {ok, State}.
+terminate(_Reason, _Route, _Req, _State) ->
+ ok.
+%%%FOR Cors support
+%%%Note! only matches on host. Does not handle ports. See: https://github.com/s1n4/leptus/issues/55
+cross_domains(_Route, _Req, State) ->
+ {['_'], State}.
+
+%%%GET Methods
+get("/", Req, State) ->
+ %The broker's "info" endpoint; returns some possibly useful information
+ {Bts, XER} = init_api_call(Req),
+ Apps = util:get_all_appnames_from_db(),
+ {UT, _} = statistics(wall_clock),
+ CDAPVer = cdap_interface:get_cdap_cluster_version(XER, ?CDAPURL),
+ RB = {[
+ {<<"cdap cluster version">>, CDAPVer},
+ {<<"managed cdap url">>, ?CDAPURL},
+ {<<"cdap GUI port">>, cdap_interface:get_cdap_gui_port_from_version(CDAPVer)},
+ {<<"number of applications registered">>, length(Apps)},
+ {<<"uptime (s)">>, UT/1000},
+ {<<"broker API version">>, util:get_my_version()}
+ ]},
+ {RCode, RBody, RState} = {200, {json, RB}, State},
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+get("/application", Req, State) ->
+ %get a list of all registered apps
+ {Bts, XER} = init_api_call(Req),
+ {RCode, RBody, RState} = {200, {json, util:get_all_appnames_from_db()}, State},
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+get("/application/:appname", Req, State) ->
+ %get information about a registered application
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ {RCode, RBody, RState} = appname_to_application_http(XER, Appname, State),
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+get("/application/:appname/metrics", Req, State) ->
+ %get metrics for a registered application
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of
+ none -> {404, "", State};
+ [<<"program-flowlet">>, Namespace] ->
+ {ReturnCode, ReturnBody} = cdap_interface:get_app_metrics(XER, Appname, Namespace, ?CDAPURL), %warning, see note in README, this always reutrns 200
+ {ReturnCode, {json, ReturnBody}, State};
+ [<<"hydrator-pipeline">>, Namespace] ->
+ lager:warning("WARNING, metrics not actually implemented yet for pipelines!!"),
+ {ReturnCode, ReturnBody} = cdap_interface:get_pipeline_metrics(Appname, Namespace, ?CDAPURL),
+ {ReturnCode, {json, ReturnBody}, State}
+ end,
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+get("/application/:appname/healthcheck", Req, State) ->
+ %get healthcheck of an application
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ lager:info(io_lib:format("Get Healthcheck recieved for ~s", [Appname])),
+ {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of
+ none -> {404, "", State};
+ [<<"program-flowlet">>, Namespace] ->
+ {cdap_interface:get_app_healthcheck(XER, Appname, Namespace, ?CDAPURL), "", State};
+ [<<"hydrator-pipeline">>, Namespace] ->
+ {cdap_interface:get_pipeline_healthcheck(XER, Appname, Namespace, ?CDAPURL, ?PipelineHealthLimit), "", State}
+ end,
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState}.
+
+%%%DELETE Methods
+delete("/application/:appname", Req, State) ->
+ %Uninstall and delete a CDAP app
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ {RCode, RBody, RState} = delete_app_helper(Appname, State, XER, Req),
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState}.
+
+%%%PUT Methods
+put("/application/:appname", Req, State) ->
+ %create a new registration; deploys and starts a cdap application
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"appname">>]) of
+ [Appname] ->
+ {400, "Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first", State};
+ none -> %no matches, create the resource, return the application record
+ %Initial put requires the put body parameters
+ case try parse_put_body(leptus_req:body_raw(Req)) catch _:_ -> invalid end of
+ %could not parse the body
+ invalid -> {400, "Invalid PUT Body or unparseable URL", State};
+
+ %unsupported cdap application type
+ unsupported -> {404, "Unsupported CDAP Application Type", State};
+
+ {Type, AppType, Params} ->
+ %form shared info
+ %hateaos cuz they aintaos
+ {RequestUrl,_} = cowboy_req:url((leptus_req:get_req(Req))),
+ Metricsurl = <<RequestUrl/binary, <<"/metrics">>/binary>>,
+ Healthcheckurl = <<RequestUrl/binary, <<"/healthcheck">>/binary>>,
+
+ try
+ case Type of
+ hp ->
+ {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies} = Params,
+ ConnectionURL = cdap_interface:form_stream_url_from_streamname(?CDAPURL, Namespace, Streamname),
+
+ %TODO: This!
+ ServiceEndpoints = [], %unclear if this is possible with pipelines
+
+ %write into mnesia, deploy
+ A = #application{appname = Appname, apptype = AppType, namespace = Namespace, healthcheckurl = Healthcheckurl, metricsurl = Metricsurl, url = RequestUrl, connectionurl = ConnectionURL, serviceendpoints = ServiceEndpoints, creationtime=erlang:system_time()},
+ {atomic,ok} = mnesia:transaction(fun() -> mnesia:write(A) end),
+ ok = workflows:deploy_hydrator_pipeline(Req, XER, Appname, Namespace, ?CDAPURL, PipelineConfigJsonURL, ParsedDependencies, ?CONSURL, RequestUrl, Healthcheckurl, ?HCInterval, ?AutoDeregisterAfter),
+ metrics(info, Req, [{bts, iso()}, {xer, XER}, {mod, mod()}, {msg, io_lib:format("New Hydrator Application Created: ~p", [lager:pr(A, ?MODULE)])}]), %see Record Pretty Printing: https://github.com/basho/lager
+ ok;
+ pf ->
+ {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences} = Params,
+ %Form URLs that are part of the record
+ %NOTE: These are both String concatenation functions and neither make an HTTP call so not catching normal {Code, Status} return here
+ ConnectionURL = cdap_interface:form_stream_url_from_streamname(?CDAPURL, Namespace, Streamname),
+ ServiceEndpoints = lists:map(fun(X) -> cdap_interface:form_service_json_from_service_tuple(Appname, Namespace, ?CDAPURL, X) end, ParsedServices),
+
+ %write into mnesia. deploy
+ A = #application{appname = Appname, apptype = AppType, namespace = Namespace, healthcheckurl = Healthcheckurl, metricsurl = Metricsurl, url = RequestUrl, connectionurl = ConnectionURL, serviceendpoints = ServiceEndpoints, creationtime=erlang:system_time()},
+ ASupplemental = #prog_flow_supp{appname = Appname, programs = Programs},
+ {atomic,ok} = mnesia:transaction(fun() -> mnesia:write(A) end), %warning, here be mnesia magic that knows what table you want to write to based on the record type
+ {atomic,ok} = mnesia:transaction(fun() -> mnesia:write(ASupplemental) end), %warning: ""
+ ok = workflows:deploy_cdap_app(Req, XER, Appname, ?CONSURL, ?CDAPURL, ?HCInterval, ?AutoDeregisterAfter, AppConfig, JarURL, ArtifactName, ArtifactVersion, Namespace, AppPreferences, ParsedProgramPreferences, Programs, RequestUrl, Healthcheckurl),
+ metrics(info, Req, [{bts, iso()}, {xer, XER}, {mod, mod()}, {msg, io_lib:format("New Program-Flowlet Application Created: ~p with supplemental data: ~p", [lager:pr(A, ?MODULE), lager:pr(ASupplemental, ?MODULE)])}]),
+ ok
+ end,
+ appname_to_application_http(XER, Appname, State)
+
+ catch
+ %catch a bad HTTP error code
+ error:{badmatch, {BadErrorCode, BadStatusMsg}} ->
+ err(error, [{xer, XER}, {msg, io_lib:format("Badmatch caught in Deploy. Rolling Back. ~p ~s", [BadErrorCode, BadStatusMsg])}]),
+ {_,_,_} = delete_app_helper(Appname, State, XER, Req),
+ {BadErrorCode, BadStatusMsg, State}; %pass the bad error/status back to user
+ Class:Reason ->
+ %generic failure catch-all, catastrophic
+ err(error, [{xer, XER}, {msg, io_lib:format("~nUnexpected Exception caught in Deploy. Error Stacktrace:~s", [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})])}]),
+ {_,_,_} = delete_app_helper(Appname, State, XER, Req),
+ {500, "Please report this error", State}
+ end
+ end
+ end,
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+put("/application/:appname/reconfigure", Req, State) ->
+ %if appname already is registerd, trigger a consul pull and reconfigure
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"namespace">>]) of
+ none -> {404, "Reconfigure recieved but the app is not registered", State};
+ [Namespace] ->
+ D = jiffy:decode(leptus_req:body_raw(Req), [return_maps]),
+ case try maps:get(<<"config">>, D) catch _:_ -> invalid end of
+ invalid -> {400, "Invalid PUT Reconfigure Body: key 'config' is missing", State};
+ Config ->
+ case try maps:get(<<"reconfiguration_type">>, D) catch _:_ -> invalid end of
+ invalid -> {400, "Invalid PUT Reconfigure Body: key 'reconfiguration_type' is missing", State};
+ <<"program-flowlet-app-config">> ->
+ %reconfigure a program-flowlet style app's app config
+ try
+ ok = workflows:app_config_reconfigure(Req, XER, Appname, Namespace, ?CONSURL, ?CDAPURL, Config),
+ {200, "", State}
+ catch Class:Reason ->
+ err(error, [{xer,XER}, {msg, io_lib:format("~nError Stacktrace:~s", [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})])}]),
+ {500, "", State}
+ end;
+ <<"program-flowlet-app-preferences">> ->
+ %reconfigure a program-flowlet style app's app config
+ try
+ ok = workflows:app_preferences_reconfigure(Req, XER, Appname, Namespace, ?CONSURL, ?CDAPURL, Config),
+ {200, "", State}
+ catch Class:Reason ->
+ err(error, [{xer,XER}, {msg, io_lib:format("~nError Stacktrace:~s", [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})])}]),
+ {500, "", State}
+ end;
+ <<"program-flowlet-smart">> ->
+ %try to "figure out" whether the supplied JSON contains keys in appconfig, app preferences, or both
+ try
+ ok = workflows:smart_reconfigure(Req, XER, Appname, Namespace, ?CONSURL, ?CDAPURL, Config),
+ {200, "", State}
+ catch
+ %catch a bad HTTP error code; also catches the non-overlapping configuration case
+ error:{badmatch, {BadErrorCode, BadStatusMsg}} ->
+ err(error, [{xer, XER}, {msg, io_lib:format("~p ~s", [BadErrorCode, BadStatusMsg])}]),
+ {BadErrorCode, BadStatusMsg, State};
+ Class:Reason ->
+ err(error, [{xer,XER}, {msg, io_lib:format("~nError Stacktrace:~s", [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})])}]),
+ {500, "", State}
+ end;
+ NI ->
+ %TODO! Implement other types of reconfig once CDAP APIs exis
+ {501, io_lib:format("This type (~s) of reconfiguration is not implemented", [NI]), State}
+ end
+ end
+ end,
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState}.
+
+%%%POST methods
+post("/application/delete", Req, State) ->
+ %This follows the AWS S3 Multi Key Delete: http://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
+ %Except I added an additional special value called "*"
+ {Bts, XER} = init_api_call(Req),
+ {RCode, RBody, RState} = case try
+ B = maps:get(<<"appnames">>, jiffy:decode(leptus_req:body_raw(Req), [return_maps])),
+ true = erlang:is_list(B),
+ B
+ catch _:_ ->
+ invalid
+ end
+ of
+ invalid -> {400, "Invalid PUT Body", State};
+ IDs ->
+ case IDs of
+ [] -> {200, "EMPTY PUT BODY", State};
+ _ ->
+ %<<"*">> ->
+ %this block deleted all apps, but decided this backdoor wasn't very RESTy
+ %% {atomic, Apps} = mnesia:transaction(fun() -> mnesia:match_object(application, {application, '_', '_', '_', '_', '_', '_', '_', '_', '_'}, read) end),
+ % AppsToDelete = lists:map(fun(X) -> {application, Appname, _,_,_,_,_,_,_,_} = X, Appname end, Apps),
+ Returns = lists:map(fun(X) -> delete_app_helper(X, State, XER, Req) end, IDs),
+ RL = lists:map(fun({RC, _, _}) -> RC end, Returns),
+ {200, jiffy:encode(RL), State}
+ end
+ end,
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState}.
diff --git a/src/util.erl b/src/util.erl
new file mode 100644
index 0000000..d96675b
--- /dev/null
+++ b/src/util.erl
@@ -0,0 +1,192 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+-module(util).
+-include("application.hrl").
+
+-export([concat/1,
+ get_platform_envs_and_config/0,
+ resolve_cbs/2,
+ initialize_database/0,
+ get_all_appnames_from_db/0,
+ get_my_version/0,
+ get_programs_for_pfapp_from_db/1,
+ gen_uuid/0,
+ iso/0,
+ iso_elapsed/2,
+ to_str/1,
+ ip_to_str/1,
+ update_with_new_config_map/2,
+ ejson_to_map/1
+ ]).
+
+%http://stackoverflow.com/questions/39757020/erlang-drying-up-stringbinary-concatenation
+%NOTE! Does not work or bomb when an element in the list is an atom. Must be a string or binary. Maybe add a check for this
+to_string(Value) when is_binary(Value) -> binary_to_list(Value);
+to_string(Value) -> Value.
+concat(List) ->
+ lists:flatten(lists:map(fun to_string/1, List)).
+
+resolve_cbs(XER, ConsulURL) ->
+ %Ideally this function would dissapear if we get real DNS. This essentially is doing an SRV record lookup every time someone needs the bindng URL
+ %This allows the broker to handle the case where the CBS moves IP or Ports
+ %New as of 6/28/17: Uses the hardcoded short name for the CBS
+ {IP, Port} = consul_interface:consul_get_service_ip_port(XER, "config_binding_service", ConsulURL),
+ concat(["http://", IP, ":", integer_to_binary(Port)]).
+
+get_platform_envs_and_config() ->
+ %Get platform envs needed for broker operation, then fetch my config.
+ %If something critical fails, returns [], else [ConsulURL, CDAPUrl, BoundConfigMap]
+ MyName = os:getenv("HOSTNAME"),
+ ConsulHost = os:getenv("CONSUL_HOST"),
+ case MyName == false orelse ConsulHost == false of
+ true -> [];
+ false ->
+ %build Consul URL
+ ConsulURL = concat(["http://", ConsulHost, ":8500"]),
+
+ %Bind my own config map
+ %generate my own XER here
+ XER = gen_uuid(),
+ {200, BoundConfig} = consul_interface:consul_bind_config(XER, MyName, ConsulURL),
+ BoundConfigMap = jiffy:decode(jiffy:encode(BoundConfig), [return_maps]), %kind of an interesting way to turn an erlang proplist into a map
+
+ %Here, we waterfall looking for "CDAP_CLUSTER_TO_MANAGE".
+ %First, we will check for environmnental variables for a cluster *NAME*
+ %If that is not found, then we will check out bound config for a fully bound URL
+ %If that is also not found, let it crash baby.
+ CDAPURL = case os:getenv("CDAP_CLUSTER_TO_MANAGE") of
+ false ->
+ list_to_binary(concat(["http://", lists:nth(1, maps:get(<<"cdap_cluster_to_manage">>, BoundConfigMap))])); %cbs returns ip:port. need http:// or will get "no adaptors found" error
+ CDAPName ->
+ {IP, Port} = consul_interface:consul_get_service_ip_port(XER, CDAPName, ConsulURL),
+ list_to_binary(concat(["http://", IP, ":", integer_to_binary(Port)]))
+ end,
+ [MyName, ConsulURL, CDAPURL, BoundConfigMap]
+ end.
+
+initialize_database() ->
+ %Create the database (currently MNesia) if it does not exist, and the application table.
+ %Or, do nothing.
+ N = node(),
+ lager:info(io_lib:format("Initializing database. My node name is ~s", [N])),
+
+ %set MNesia dir
+ application:set_env(mnesia, dir, "/var/mnesia/"),
+
+ %stop if running, can't create schema if it is. Dont check status, OK if stopped
+ mnesia:stop(),
+
+ %create the schema if it does not already exist. Dont check status, ok if exists
+ %erlang:display(mnesia:delete_schema([N])),
+ mnesia:create_schema([N]),
+ %start MNesia, assert it works
+
+ ok = mnesia:start(), %start MNesia, bomb if alreay started, should not happen
+ lager:info("Mnesia started"),
+
+ %try to create the table, or if it exists, do nothing
+ %erlang:display(mnesia:delete_table(application)),
+ case mnesia:create_table(application, [{attributes, record_info(fields, application)}, {disc_copies, [N]}]) of
+ {aborted,{already_exists,application}} ->
+ lager:info("Application table already exists");
+ {atomic,ok} ->
+ lager:info(io_lib:format("Created application table on ~s", [N]))
+ end,
+
+ %try to create the app supplementaty table, or if it exists, do nothing
+ %erlang:display(mnesia:delete_table(application)),
+ case mnesia:create_table(prog_flow_supp, [{attributes, record_info(fields, prog_flow_supp)}, {disc_copies, [N]}]) of
+ {aborted,{already_exists, prog_flow_supp}} ->
+ lager:info("prog_flow_supp table already exists");
+ {atomic,ok} ->
+ lager:info(io_lib:format("Created prog_flow_supp table on ~s", [N]))
+ end,
+
+ %wait up to 30s for the table to come up. Usually instantaneous. If it takes more crash abd burn
+ ok = mnesia:wait_for_tables([application, prog_flow_supp], 30000),
+ ok.
+
+get_all_appnames_from_db() ->
+ {atomic, Apps} = mnesia:transaction(fun() -> mnesia:match_object(application, #application{_ = '_'}, read) end),
+ lists:map(fun(X) -> {application, Appname,_,_,_,_,_,_,_,_} = X,
+ Appname
+ end, Apps).
+
+-spec get_programs_for_pfapp_from_db(binary()) -> lprogram().
+get_programs_for_pfapp_from_db(Appname) ->
+ {atomic, [#prog_flow_supp{appname = Appname, programs=Programs}]} = mnesia:transaction(fun() -> mnesia:match_object(prog_flow_supp, #prog_flow_supp{appname = Appname, _ = '_'}, read) end),
+ Programs.
+
+get_my_version() ->
+ %stolen from the SO post I asked about: http://stackoverflow.com/questions/43147530/erlang-programmatically-get-application-version/43152182#43152182
+ case lists:keyfind(cdapbroker, 1, application:loaded_applications()) of
+ {_, _, Ver} -> list_to_binary(Ver);
+ false -> <<"error">>
+ end.
+
+gen_uuid() ->
+ %generate an RFC compliant v1 uuid using lib
+ uuid:to_string(uuid:uuid1()).
+
+iso() ->
+ %generate 8601 ts
+ iso8601:format(erlang:timestamp()).
+
+iso_elapsed(Endtime, Starttime) ->
+ %%%...subtract two isos and return the number of seconds elapsed between Starttime and Endtime
+ Edt = iso8601:parse(Endtime),
+ Sdt = iso8601:parse(Starttime),
+ Egs = calendar:datetime_to_gregorian_seconds(Edt),
+ Sgs = calendar:datetime_to_gregorian_seconds(Sdt),
+ Egs - Sgs.
+
+to_str("") -> "";
+to_str(Term) -> lists:flatten(io_lib:format("~p", [Term])).
+
+-spec ip_to_str({inet:ip_address(), inet:port_number()}) -> binary().
+%nasty.. I miss pythons x <= Foo <= Y syntax.. or something mathematical like Foo in [X..Y].. erlang not good 4 math
+ip_to_str({{A,B,C,D}, Port}) when A >= 0 andalso A =< 255 andalso B >= 0 andalso B =< 255 andalso C >= 0 andalso C =< 255 andalso D >= 0 andalso D =< 255 andalso port >= 0 andalso Port =<65535 ->
+ concat([to_str(A),".",to_str(B),".", to_str(C),".",to_str(D),":",to_str(Port)]);
+ip_to_str({_,_}) -> invalid.
+
+update_with_new_config_map(NewConfig, OldConfig) ->
+ %helper for smart_reconfigure, broken out so we can unit test it.
+ %
+ %Takes in a new config, some keys in which may be shared with OldConfig, and returns a new map with the same keys as OldConfig, except values that had overlap were replaced by NewConfig
+ %if no keys in NewConfig overlap with OldConfig, returns the atom 'nooverlap'
+ %
+ %This is very similar to the maps:merge/2 builtin but that will inject keys of newconfig that were not in oldconfig. We need a "RIGHT JOIN"
+ NCKeys = maps:keys(NewConfig),
+ ConfigOverlaps = [X || X <- NCKeys, maps:is_key(X, OldConfig)],
+ case ConfigOverlaps of
+ [] -> nooverlap;
+ _ ->
+ %we have an entry that should be in app config
+ %build a new map with just the keys to update
+ Pred = fun(X,_) -> lists:member(X, ConfigOverlaps) end,
+ NewVals = maps:filter(Pred, NewConfig),
+ maps:merge(OldConfig, NewVals)
+ end.
+
+ejson_to_map(E) ->
+ %takes the jiffy "ejson: format of {[{<<"foo">>, <<"bar">>}, {<<"foo2">>, <<"bar2">>}]} and turns it into a map,
+ %usefu because ejsons do not appear to be order-independent-comparable, but maps are (e.g., two maps are equal if all their k+v are equal but agnostic to order)
+ jiffy:decode(jiffy:encode(E), [return_maps]).
diff --git a/src/util_tests.erl b/src/util_tests.erl
new file mode 100644
index 0000000..e37e492
--- /dev/null
+++ b/src/util_tests.erl
@@ -0,0 +1,53 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+-module(util_tests).
+-include_lib("eunit/include/eunit.hrl").
+-import(util, [
+ iso_elapsed/2,
+ ip_to_str/1,
+ update_with_new_config_map/2,
+ ejson_to_map/1]).
+
+iso_elapsed_test() ->
+ ?assert(iso_elapsed(<<"2017-04-27T18:38:10Z">>, <<"2017-04-27T18:38:08Z">>) == 2),
+ ?assert(iso_elapsed(<<"2017-04-29T18:38:10Z">>, <<"2017-04-27T18:38:08Z">>) == 60*60*24*2+2).
+
+ip_to_str_test() ->
+ ?assert(ip_to_str({{6,6,6,6}, 666}) == "6.6.6.6:666"),
+ ?assert(ip_to_str({{196,196,196,196}, 1}) == "196.196.196.196:1"),
+ ?assert(ip_to_str({{6,6,6,6666}, 666}) == invalid),
+ ?assert(ip_to_str({{6,6,6,6}, 66666}) == invalid),
+ ?assert(ip_to_str({{6,6,-6,6}, 666}) == invalid),
+ ?assert(ip_to_str({{6,6,six,6}, 666}) == invalid).
+
+update_with_new_config_map_test() ->
+ ?assert(update_with_new_config_map(#{<<"foo">>=><<"smartbar">>, <<"preffoo">>=><<"smartprefbar">>}, #{<<"foo">>=><<"bar">>}) == #{<<"foo">>=><<"smartbar">>}),
+ ?assert(update_with_new_config_map(#{<<"fooD">>=><<"smartbar">>}, #{<<"foo">>=><<"bar">>}) == nooverlap),
+ ?assert(update_with_new_config_map(#{<<"foo">>=><<"smartbar">>,<<"foo2">>=><<"smartbar2">>}, #{<<"foo">>=><<"bar">>, <<"foo2">>=><<"bar2">>}) == #{<<"foo">>=><<"smartbar">>, <<"foo2">>=><<"smartbar2">>}).
+
+ejson_to_map_test() ->
+ EJ1 = {[{<<"foo">>, <<"bar">>}, {<<"foo2">>, <<"bar2">>}]},
+ EJ2 = {[{<<"foo2">>, <<"bar2">>}, {<<"foo">>, <<"bar">>}]},
+ M1 = ejson_to_map(EJ1),
+ M2 = ejson_to_map(EJ2),
+ ?assert(EJ1 /= EJ2), %HERE LIES THE PROBLEM HUDSON
+ ?assert(M1 == M2). %GREAT SUCCESS!
+
diff --git a/src/workflows.erl b/src/workflows.erl
new file mode 100644
index 0000000..a8c6abb
--- /dev/null
+++ b/src/workflows.erl
@@ -0,0 +1,324 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+-module(workflows).
+
+%Module holds functions that execute big workflows, like deploying a CDAP application.
+
+-include("application.hrl").
+-export([deploy_cdap_app/17, %super offensive arity.. should probably start using some structs to cut this
+ undeploy_cdap_app/6,
+ undeploy_hydrator_pipeline/6,
+ deploy_hydrator_pipeline/12,
+ all_200s_else_showerror/2,
+ app_config_reconfigure/7,
+ app_preferences_reconfigure/7,
+ smart_reconfigure/7
+ ]).
+
+-import(util, [iso/0, to_str/1]).
+-import(logging, [metrics/3]).
+
+-define(MET(Sev, Req, Bts, XER, TgtE, TgtS, TgtRSC, Msg), metrics(Sev, Req, [{bts, Bts}, {xer,XER}, {tgte, TgtE}, {tgts, TgtS}, {tgtrsc, TgtRSC}, {mod, to_str(?MODULE)}, {msg, Msg}])).
+-define(CDAPE, "cdap cluster").
+-define(CNSE, "consul cluster").
+
+%private
+attempt( Req, XER, { Mod, Func, Args }, ServiceName, Action, LogResponse) ->
+ %Thanks Garry!!
+ %Helper function to
+ %1. log the start timestamp
+ %2. Do an action specificed by mod:func(args). Assumes XER always first arg
+ %3. Log a metrics info statement about the API cll
+ %4. assert the return code was a 200, let it crash otehrwise, caller catches
+ Start = iso(),
+ {RC, RB} = apply( Mod, Func, [XER | Args] ),
+ ?MET(info, Req, Start, XER, ServiceName, Action, RC, case LogResponse of true -> to_str(RB); false -> "" end),
+ {RC, RB}.
+
+%public
+-spec all_200s_else_showerror(fun((any()) -> httpstat()), list()) -> httpstat().
+all_200s_else_showerror(FClosure, ListToMap) ->
+ %Takes a "partial" with the spec: f(X) -> {HTTP_Status_Code, HTTP_Response}, maps it onto ListToMap, and either
+ %returns {200, ""} or else the first error encountered after executing the entire list (does not short circuit!)
+ %
+ %I say "partial" because there are no real "partials" in Erlang but you can make them using Closure's out of funs (anonymous functions), so FClosure is a Closure just waiting for the last argument
+ %See:
+ %https://www.google.com/url?sa=t&rct=j&q=&esrc=s&source=web&cd=1&ved=0ahUKEwjtyeiC6LbSAhVH0FQKHffhAr0QFggcMAA&url=http%3A%2F%2Fstackoverflow.com%2Fquestions%2F13355544%2Ferlang-equivalents-of-haskell-where-partial-lambda&usg=AFQjCNHnEZjQHtQhKXN67DBKKoJpqRXztg&cad=rja
+ %http://stackoverflow.com/questions/16183971/currying-functions-erlang
+ L = lists:filter(fun({X, _}) -> X /= 200 end, lists:map(FClosure, ListToMap)),
+ case L of
+ [] -> {200, ""};
+ _ -> lists:nth(1, L)
+ end.
+
+deploy_cdap_app(Req, XER, Appname, ConsulURL, CDAPURL, HCInterval, AutoDeregisterAfter, AppConfig, JarURL, ArtifactName, ArtifactVersion, Namespace, AppPreferences, ParsedProgramPreferences, Programs, RequestUrl, Healthcheckurl) ->
+ %push the UNBOUND config and preferences into Consul.
+ %I don't think we should push bound configs because triggering a "rebind" will suffer if the templating language is lost.
+ {200,_} = attempt(Req, XER, { consul_interface, consul_push_config, [ Appname, ConsulURL, AppConfig ] }, ?CNSE, "push config", true),
+
+ %push the preferences
+ {200,_} = attempt(Req, XER, { consul_interface, consul_push_preferences, [ Appname, ConsulURL, AppPreferences ] }, ?CNSE, "push preferences", true),
+
+ %get the bound config
+ {200,BoundConfig} = attempt(Req, XER, { consul_interface, consul_bind_config, [ Appname, ConsulURL ] }, "config binding service", "bind config", false),
+
+ %fetch the JAR.
+ {200,JarBody} = attempt(Req, XER, { httpabs, get, [ JarURL ] }, "nexus", "file get", false),
+
+ %create the Namespace
+ {200,_} = attempt( Req, XER, { cdap_interface, create_namespace, [ Namespace, CDAPURL ] }, ?CDAPE, "create namespace", true),
+
+ %deploy the application
+ {200,_} = attempt( Req, XER, { cdap_interface, deploy_app, [ Appname, Namespace, CDAPURL, JarBody, ArtifactName, ArtifactVersion, BoundConfig ] }, ?CDAPE, "deploy application", true),
+
+ %set app preferences
+ {200,_} = attempt( Req, XER, { cdap_interface, push_down_app_preferences, [ Appname, Namespace, CDAPURL, AppPreferences ] }, ?CDAPE, "set app preferences", true),
+
+ %push down the program preferences
+ {200,_} = attempt( Req, XER, { cdap_interface, push_down_program_preferences, [ Appname, Namespace, CDAPURL, ParsedProgramPreferences ] }, ?CDAPE, "set program preferences", true),
+
+ %start the CDAP application services
+ {200,_} = attempt( Req, XER, { cdap_interface, exec_programs, [ Appname, Namespace, CDAPURL, Programs, "start" ] }, ?CDAPE, "start program", true),
+
+ %Parse my IP and port
+ {ok, {http, _, IPaS, Port, _, _}} = http_uri:parse(binary_to_list(RequestUrl)),
+
+ %register with Consul; We will register the broker's URL as the address in Consul then the upstream service can do a GET on this Broker to get the resource object
+ {200,_} = attempt( Req, XER, { consul_interface, consul_register, [ Appname, ConsulURL, list_to_binary(IPaS), Port, Healthcheckurl, HCInterval, AutoDeregisterAfter ] }, ?CNSE, "service register", true),
+
+ ok. %if got to here, all went well.
+
+-spec undeploy_cdap_app(any(), string(), binary(), string(), string(), binary()) -> ok.
+undeploy_cdap_app(Req, XER, Appname, CDAPURL, ConsulURL, Namespace) ->
+ %stop the CDAP programs assuming they are valid
+ Programs = util:get_programs_for_pfapp_from_db(Appname),
+ Bts = iso(), %record begining timestamp
+ {RC, RB} = cdap_interface:exec_programs(XER, Appname, Namespace, CDAPURL, Programs, "stop"),
+ case RC of
+ 200 -> ?MET(info, Req, Bts, XER, ?CDAPE, "program stop", RC, "OK");
+ 400 -> ?MET(warning, Req, Bts, XER, ?CDAPE, "program stop", RC, io_lib:format("Delete called on ~s but it's programs were not running, probably indicates the app crashed in some way: ~s", [Appname, RB]));
+ 404 -> ?MET(warning, Req, Bts, XER, ?CDAPE, "program stop", RC, io_lib:format("Delete called on ~s but it's gone, probably indicates a horrible manual deletion from CDAP: ~s", [Appname, RB]));
+ _ -> ?MET(warning, Req, Bts, XER, ?CDAPE, "program stop", RC, io_lib:format("Delete called on ~s but CDAP returned a ~p. This likely means things will NOT be cleaned up properly!! ~s", [Appname, RC, RB]))
+ end,
+
+ %delete the application
+ Bts2 = iso(),
+ {RC2, RB2} = cdap_interface:delete_app(XER, Appname, Namespace, CDAPURL),
+ case RC2 of
+ 200 -> ?MET(info, Req, Bts2, XER, ?CDAPE, "app delete", RC2, "OK");
+ 404 -> ?MET(warning, Req, Bts2, XER, ?CDAPE, "app delete", RC2, io_lib:format("Delete called on ~s but it's gone, probably indicates a horrible manual deletion from CDAP: ~s", [Appname, RB2]));
+ _ -> ?MET(warning, Req, Bts2, XER, ?CDAPE, "app delete", RC2, io_lib:format("Delete called on ~s but CDAP returned a ~p. This likely means things will NOT be cleaned up properly!! ~s", [Appname, RC2, RB2]))
+ end,
+
+ %deregister with consul
+ Bts3 = iso(),
+ {RC3, RB3} = consul_interface:consul_deregister(XER, Appname, ConsulURL),
+ case RC3 of
+ 200 -> ?MET(info, Req, Bts3, XER, ?CNSE, "service deregister", RC3, "OK");
+ _ -> ?MET(warning, Req, Bts3, XER, ?CNSE, "service deregister", RC3, io_lib:format("Delete called on ~s but Consul returned a ~p. This likely means a service is not cleaned up properly! ~s", [Appname, RC3, RB3]))
+ end,
+
+ %delete the config key stored earlier
+ Bts4 = iso(),
+ {RC4, RB4} = consul_interface:consul_delete_config(XER, Appname, ConsulURL),
+ case RC4 of
+ 200 -> ?MET(info, Req, Bts4, XER, ?CNSE, "key (config) delete", RC4, "OK");
+ 404 -> ?MET(warning, Req, Bts4, XER, ?CNSE, "key (config) delete", RC4, io_lib:format("Delete called on ~s but it's consul key is gone, probably indicates a horrible manual deletion from Consul: ~s", [Appname, RB4]));
+ _ -> ?MET(warning, Req, Bts4, XER, ?CNSE, "key (config) delete", RC4, io_lib:format("Delete called on ~s but Consul returned a ~p. This likely means a key is not cleaned up properly! ~s", [Appname, RC4, RB4]))
+ end,
+
+ %delete the config key stored earlier
+ Bts5 = iso(),
+ {RC5, RB5} = consul_interface:consul_delete_preferences(XER, Appname, ConsulURL),
+ case RC5 of
+ 200 -> ?MET(info, Req, Bts5, XER, ?CNSE, "key (preferences) delete", RC5, "OK");
+ 404 -> ?MET(warning, Req, Bts5, XER, ?CNSE, "key (preferences) delete", RC5, io_lib:format("Delete called on ~s but it's consul key is gone, probably indicates a horrible manual deletion from Consul: ~s", [Appname, RB5]));
+ _ -> ?MET(warning, Req, Bts5, XER, ?CNSE, "key (preferences) delete", RC5, io_lib:format("Delete called on ~s but Consul returned a ~p. This likely means a key is not cleaned up properly! ~s", [Appname, RC5, RB5]))
+ end,
+
+ ok.
+
+deploy_hydrator_pipeline(Req, XER, Appname, Namespace, CDAPURL, PipelineConfigJsonURL, Dependencies, ConsulURL, RequestUrl, Healthcheckurl, HCInterval, AutoDeregisterAfter) ->
+ %fetch the JSON
+ {200,PipelineJson} = attempt(Req, XER, { httpabs, get, [ PipelineConfigJsonURL ] }, "nexus", "file get", false),
+
+ %TODO! Config
+
+ %create the Namespace
+ {200,_} = attempt( Req, XER, { cdap_interface, create_namespace, [ Namespace, CDAPURL ] }, ?CDAPE, "create namespace", true),
+
+ %deploy pipeline dependencies%
+ {200,_} = attempt( Req, XER, { cdap_interface, deploy_pipeline_dependencies, [ Namespace, CDAPURL, Dependencies ] }, ?CDAPE, "deploy dependencies", true),
+
+ %deploy pipeline dependencies UI properties
+ %NOTE! There is a bit of redundancy with the above call. I debated merging the two.
+ %I decided against it because I want failures to load the deps seperated from failures to load the properties files, because they are different URLs.
+ %Splitting them like this allows me to return the error to the user on the exact step that failed
+ {200,_} = attempt( Req, XER, { cdap_interface, deploy_pipeline_dependencies_properties, [ Namespace, CDAPURL, Dependencies ] }, ?CDAPE, "deploy dependency properties", true),
+
+ %deploy the pipeline
+ {200,"Deploy Complete"} = attempt( Req, XER, { cdap_interface, deploy_pipeline, [ Appname, Namespace, CDAPURL, PipelineJson ] }, ?CDAPE, "deploy pipeline", true),
+
+ %start the pipeline
+ {200,_} = attempt( Req, XER, { cdap_interface, exec_pipeline, [ Appname, Namespace, CDAPURL, "resume" ] }, ?CDAPE, "start pipeline", true),
+
+ %Parse my IP and port
+ {ok, {http, _, IPaS, Port, _, _}} = http_uri:parse(binary_to_list(RequestUrl)),
+
+ %register with Consul; We will register the broker's URL as the address in Consul, then the upstream service can do a GET on this Broker to get the resource object
+ {200,_} = attempt( Req, XER, { consul_interface, consul_register, [ Appname, ConsulURL, list_to_binary(IPaS), Port, Healthcheckurl, HCInterval, AutoDeregisterAfter] }, ?CNSE, "service register", true),
+
+ ok.
+
+undeploy_hydrator_pipeline(Req, XER, Appname, Namespace, CDAPURL, ConsulURL) ->
+ %UNDEPLOY NOTES:
+ % 1 Never fail on undeploy, log and continue.
+ % 2 Leave artifact dependencies on the cluster. We can revisit this if we need a "LEAVE NO TRACE" solution. TODO.
+ % 3 I noticed an asymetry in deploy/undeplopy here: there is no need to start workflows. Terry clarified this is correct: "Batch pipelines contain a schedule, but deploying the pipeline does not activate the schedule. Resuming the schedule makes it active so the pipeline will run at its next scheduled time. When undeploying, if you only suspend the schedule which prevents future runs from starting, then any currently active runs will continue until they finish (or not finish if they are hung). So we follow up with a stop workflow to kill any run that may be in progress so the following commands will not fail (delete pipeline or delete namespace).
+ %We avoid a race condition by suspending the schedule first.
+
+ %suspend the pipeline
+ Bts = iso(),
+ {RC1, RB1} = cdap_interface:exec_pipeline(XER, Appname, Namespace, CDAPURL, "suspend"),
+ case RC1 of
+ 200 -> ?MET(info, Req, Bts, XER, ?CDAPE, "pipeline suspend", RC1, "OK");
+ 400 -> ?MET(warning, Req, Bts, XER, ?CDAPE, "pipeline suspend", RC1, io_lib:format("Pipeline suspend called on ~s but it's was not running, probably OK, probably it is on a schedule ~s", [Appname, RB1]));
+ 404 -> ?MET(warning, Req, Bts, XER, ?CDAPE, "pipeline suspend", RC1, io_lib:format("Pipeline suspend called on ~s but it's gone, probably indicates a horrible manual deletion from CDAP: ~s", [Appname, RB1]));
+ _ -> ?MET(warning, Req, Bts, XER, ?CDAPE, "pipeline suspend", RC1, io_lib:format("Pipeline suspend called on ~s but CDAP unexpectedly return a ~p. This likely means things will NOT be cleaned up properly!! ~s", [Appname, RC1, RB1]))
+ end,
+
+ %stop the workflow
+ Bts2 = iso(),
+ {RC2, RB2} = cdap_interface:exec_pipeline_workflow(XER, Appname, Namespace, CDAPURL, "stop"),
+ case RC2 of
+ 200 -> ?MET(info, Req, Bts2, XER, ?CDAPE, "workflow stop", RC2, "OK");
+ 400 -> ?MET(warning, Req, Bts2, XER, ?CDAPE, "workflow stop", RC2, io_lib:format("Workflow stop called on ~s but it's was not running, probably OK, probably it is on a schedule ~s", [Appname, RB2]));
+ 404 -> ?MET(warning, Req, Bts2, XER, ?CDAPE, "workflow stop", RC2, io_lib:format("Workflow stop called on ~s but it's gone, probably indicates a horrible manual deletion from CDAP: ~s", [Appname, RB2]));
+ _ -> ?MET(warning, Req, Bts2, XER, ?CDAPE, "workflow stop", RC2, io_lib:format("Workflow stop called on ~s but CDAP unexpectedly return a ~p. This likely means things will NOT be cleaned up properly!! ~s", [Appname, RC2, RB2]))
+ end,
+
+ %?MET(warning, Req, Bts2, XER, ?CDAPE, "workflow stop", RC2, io_lib:format());
+
+ %TODO! Delete config (Configs are currently not pushed for hydrator pipelines, so have to do that first)
+
+ %delete the application
+ Bts3 = iso(),
+ {RC3, RB3} = cdap_interface:delete_app(XER, Appname, Namespace, CDAPURL),
+ case RC3 of
+ 200 -> ?MET(info, Req, Bts3, XER, ?CDAPE, "app delete", RC3, "OK");
+ 404 -> ?MET(warning, Req, Bts3, XER, ?CDAPE, "app delete", RC3, io_lib:format("Delete called on ~s but it's gone, probably indicates a horrible manual deletion from CDAP: ~s", [Appname, RB3]));
+ _ -> ?MET(warning, Req, Bts3, XER, ?CDAPE, "app delete", RC3, io_lib:format("Delete called on ~s but CDAP returned a ~p. This likely means things will NOT be cleaned up properly!! ~s", [Appname, RC3, RB3]))
+ end,
+
+ %deregister with consul
+ Bts4 = iso(),
+ {RC4, RB4} = consul_interface:consul_deregister(XER, Appname, ConsulURL),
+ case RC4 of
+ 200 -> ?MET(info, Req, Bts4, XER, ?CNSE, "service deregister", RC4, "OK");
+ _ -> ?MET(warning, Req, Bts4, XER, ?CNSE, "service deregister", RC3, io_lib:format("Delete called on ~s but Consul returned a ~p. This likely means a service is not cleaned up properly! ~s", [Appname, RC4, RB4]))
+ end,
+ ok.
+
+app_config_reconfigure(Req, XER, Appname, Namespace, ConsulURL, CDAPURL, AppConfig) ->
+ %Reconfigure CDAP App's App Config
+
+ %push the UNBOUND config into Consul. I don't think we should push bound configs because triggering a "rebind" will suffer if the templating language is lost.
+ {200,_} = attempt( Req, XER, { consul_interface, consul_push_config, [ Appname, ConsulURL, AppConfig ] }, ?CNSE, "push config", true),
+
+ %get the bound config
+ {200,BoundConfig} = attempt(Req, XER, { consul_interface, consul_bind_config, [ Appname, ConsulURL ] }, "config binding service", "bind config", false),
+
+ %push it to CDAP
+ %TODO! What happens when we push to consul but connection to CDAP fails? Then CDAP and Consul are out of sync.
+ %Maybe create a "BACKUP" key in Consul for the old config and "rollback" if the below fails
+ %Transactions across distributed systems is hard =(
+ {200,_} = attempt( Req, XER, { cdap_interface, push_down_config, [ Appname, Namespace, CDAPURL, BoundConfig ] }, ?CDAPE, "reconfigure app config", true),
+
+ ok.
+
+app_preferences_reconfigure(Req, XER, Appname, Namespace, ConsulURL, CDAPURL, AppPreferences) ->
+ %Workflow:
+ % 1) push the new preferences to Cosnul
+ % 2) stop all the programs
+ % 3) push the programs to CDAP
+ % 4) start all the programs
+ %
+ % NOTE! Currently it is assumed that preferences do not need to be bound by the config_binding_service,
+ % as only app config contains service discovery items.
+
+ Programs = util:get_programs_for_pfapp_from_db(Appname),
+
+ %1 push the new prefs up to Consul
+ {200,_} = attempt(Req, XER, { consul_interface, consul_push_preferences, [ Appname, ConsulURL, AppPreferences ] }, ?CNSE, "push preferences", true),
+
+ %2 stop the programs
+ {200,_} = attempt( Req, XER, { cdap_interface, exec_programs, [ Appname, Namespace, CDAPURL, Programs, "stop" ] }, ?CDAPE, "stop programs", true),
+
+ %3 set app preferences
+ {200,_} = attempt( Req, XER, { cdap_interface, push_down_app_preferences, [ Appname, Namespace, CDAPURL, AppPreferences ] }, ?CDAPE, "set app preferences", true),
+
+ %4 start er' up again
+ {200,_} = attempt( Req, XER, { cdap_interface, exec_programs, [ Appname, Namespace, CDAPURL, Programs, "start" ] }, ?CDAPE, "start program", true),
+
+ ok.
+
+smart_reconfigure(Req, XER, Appname, Namespace, ConsulURL, CDAPURL, NewConfig) ->
+ %Smart reconfigure takes in a JSON (NewConfig) and tries to be "smart"; it tries to figure out whether Config is a reconfiguration of
+ %app config, app preferences, or both.
+ %
+ %Specifically this workflow works as follows;
+ %1) pull down AppConfig in consul
+ %2) pull down Prefernces in consul
+ %3) see if any keynames in this function's input (NewConfig) are keynames in AppConfig
+ % 3a if so, reconfigure it
+ % 3b write the delta'd AppConfig back to consul
+ %4) see if any keynames in this fucntion's input (NewConfig) are keynames in Preferences
+ % 4a if so, reconfigure ppreferences
+ % 4b write the delta'd preferences back to consul
+ %5) Return a status
+
+
+ %see if we have app config overlaps
+ {200, ConsulAppConfig} = consul_interface:consul_get_configuration(XER, Appname, ConsulURL),
+ NewAppConfig = util:update_with_new_config_map(NewConfig, ConsulAppConfig),
+ WasNewAppConfig = case NewAppConfig of
+ nooverlap -> nooverlap;
+ _ ->
+ ok = app_config_reconfigure(Req, XER, Appname, Namespace, ConsulURL, CDAPURL, NewAppConfig)
+ end,
+
+ %see if we have preferences overlap
+ {200, ConsulPreferences} = consul_interface:consul_get_preferences(XER, Appname, ConsulURL),
+ NewAppPreferences = util:update_with_new_config_map(NewConfig, ConsulPreferences),
+ WasNewAppPreferences = case NewAppPreferences of
+ nooverlap -> nooverlap;
+ _ ->
+ ok = app_preferences_reconfigure(Req, XER, Appname, Namespace, ConsulURL, CDAPURL, NewAppPreferences)
+ end,
+
+ case WasNewAppConfig == nooverlap andalso WasNewAppPreferences == nooverlap of
+ true ->
+ {400, "non-overlapping configuration was sent"};
+ false ->
+ ok
+ end.
+
+
diff --git a/src/workflows_tests.erl b/src/workflows_tests.erl
new file mode 100644
index 0000000..1b7b51c
--- /dev/null
+++ b/src/workflows_tests.erl
@@ -0,0 +1,27 @@
+% ============LICENSE_START=======================================================
+% org.onap.dcae
+% ================================================================================
+% Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+% ================================================================================
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+% ============LICENSE_END=========================================================
+%
+% ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+-module(workflows_tests).
+-include_lib("eunit/include/eunit.hrl").
+
+all_200s_else_showerror_test() ->
+ ?assert({200, ""} == workflows:all_200s_else_showerror(fun(_) -> {200, "all good"} end, [1,"A", foo])),
+ ?assert({500, "constant dissapointment"} == workflows:all_200s_else_showerror(fun(X) -> if X < 5 -> {200, "all good"}; true -> {500, "constant dissapointment"} end end, [0,10])).
+