aboutsummaryrefslogtreecommitdiffstats
path: root/src/cdap_interface.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cdap_interface.erl')
-rw-r--r--src/cdap_interface.erl121
1 files changed, 61 insertions, 60 deletions
diff --git a/src/cdap_interface.erl b/src/cdap_interface.erl
index 7ce3b37..a1b0982 100644
--- a/src/cdap_interface.erl
+++ b/src/cdap_interface.erl
@@ -6,9 +6,9 @@
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
-%
+%
% http://www.apache.org/licenses/LICENSE-2.0
-%
+%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,12 +19,12 @@
% ECOMP is a trademark and service mark of AT&T Intellectual Property.
-module(cdap_interface).
--export([get_app_metrics/4,
- get_app_healthcheck/4,
- push_down_config/5,
- form_service_json_from_service_tuple/4,
- form_stream_url_from_streamname/3,
- exec_programs/6,
+-export([get_app_metrics/4,
+ get_app_healthcheck/4,
+ push_down_config/5,
+ form_service_json_from_service_tuple/4,
+ form_stream_url_from_streamname/3,
+ exec_programs/6,
push_down_program_preferences/5,
push_down_app_preferences/5,
deploy_app/8,
@@ -53,15 +53,15 @@
%%%INTERNAL
%%%
map_appname(Appname) ->
- %CDAP APIs do not allow app names with any special characters. Here we will map the
+ %CDAP APIs do not allow app names with any special characters. Here we will map the
%name to ensure it does not contain special characters using a regex whitelist
%see http://stackoverflow.com/questions/3303420/regex-to-remove-all-special-characters-from-string
re:replace(Appname, "[^0-9a-zA-Z]+", "", [{return, binary}, global]).
-get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL) ->
+get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL) ->
URL = ?SC([CDAPURL, "/v3/metrics/search?target=metric&tag=namespace:", Namespace, "&tag=app:", map_appname(Appname)]),
- {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", ""),
- case ReturnCode of
+ {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", ""),
+ case ReturnCode of
200 -> [ X || X <- jiffy:decode(RetBody),binary:match(X, <<"user.">>) /= nomatch];
_ -> 504 %must bubble this up
end.
@@ -71,7 +71,7 @@ get_app_healthcheck_program(XER, Appname, Namespace, CDAPURL, P) ->
%helper function: checks for a partocular program from an app
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/", P#program.type, "/", P#program.id]),
{RC, _} = httpabs:get(XER, URL),
- case RC of
+ case RC of
200 ->
%Next make sure it's status is running
{RC2, RB2} = httpabs:get(XER, ?SC([URL, "/status"])),
@@ -80,9 +80,9 @@ get_app_healthcheck_program(XER, Appname, Namespace, CDAPURL, P) ->
S = jiffy:decode(RB2, [return_maps]),
case maps:is_key(<<"status">>, S) andalso maps:get(<<"status">>, S) == <<"RUNNING">> of
true -> 200; %return 200
- false -> ?BAD_HEALTH_CODE %return
- end;
- _ -> ?BAD_HEALTH_CODE
+ false -> ?BAD_HEALTH_CODE %return
+ end;
+ _ -> ?BAD_HEALTH_CODE
end;
_ -> ?BAD_HEALTH_CODE
end.
@@ -99,41 +99,41 @@ push_down_program_preference(XER, Appname, Namespace, CDAPURL, {PT, PI, PP}) ->
deploy_pipeline_dependency(XER, Namespace, CDAPURL, {ArtExtendsHeader, ArtName, ArtVerHeader, ArtURL, _}) ->
%deploys a single dependency
- %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple.
+ %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple.
{200, JarBody} = httpabs:get(XER, ArtURL),
Headers = [{"Artifact-Extends", ArtExtendsHeader}, {"Artifact-Version", ArtVerHeader}],
- URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName]),
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName]),
httpabs:post(XER, URL, Headers, "application/octet-stream", JarBody).
deploy_pipeline_dependency_property(XER, Namespace, CDAPURL, {_, ArtName, ArtVerHeader, _, UIPropertiesURL}) ->
- %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple.
- case UIPropertiesURL of
+ %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple.
+ case UIPropertiesURL of
none -> {200, ""}; %nothing to do
- _ ->
+ _ ->
{200, PropertiesJSON} = httpabs:get(XER, UIPropertiesURL),
- URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName, "/versions/", ArtVerHeader, "/properties"]),
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName, "/versions/", ArtVerHeader, "/properties"]),
httpabs:put(XER, URL, "application/json", PropertiesJSON)
end.
%%%
%%%EXTERNAL
%%%
-get_app_metrics(XER, Appname, Namespace, CDAPURL) ->
+get_app_metrics(XER, Appname, Namespace, CDAPURL) ->
%Namespace should be a binary
MetricsList = get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL), %assert 200 or bomb
- case MetricsList of
+ case MetricsList of
504 -> {504, []};
%TODO! cdap seems to return a {200, []} for above call even if the app is not deployed. Is that OK? for now return empty list but maybe we should determine this and error with a 404 or something
- [] -> {200, []};
+ [] -> {200, []};
_ ->
URL = ?SC([CDAPURL, "/v3/metrics/query"]),
Body = jiffy:encode(
- {[{<<"appmetrics">>,
+ {[{<<"appmetrics">>,
{[{<<"tags">>, {[{<<"namespace">>, Namespace}, {<<"app">>, map_appname(Appname)}]}},
{<<"metrics">>, MetricsList}]}
}]}),
{ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", Body), %even when app does not exist this seems to return a 200 so assert it!
- case ReturnCode of
+ case ReturnCode of
200 -> {200, jiffy:decode(RetBody)};
504 -> {504, []}
end
@@ -145,13 +145,13 @@ get_app_metrics(XER, Appname, Namespace, CDAPURL) ->
%So now this information is sourced from the initial PUT request, where those programs are saved as supplemntal state
%However, leaving it here because maybe one day it will be useful
%%%get_app_programs(Appname, Namespace, CDAPURL) ->
-%%% %fetch the list of programs from a running CDAP app.
+%%% %fetch the list of programs from a running CDAP app.
%%% %Parse this into a list of [{ProgramType, ProgramID}] tuples.
-%%% %Used as part of healthcheck, and also as part of undeploy so I don't have to keep Programs in MNesia pipeline.
+%%% %Used as part of healthcheck, and also as part of undeploy so I don't have to keep Programs in MNesia pipeline.
%%% %get informaation about application
%%% URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
%%% {ReturnCode, RetBody} = httpabs:http_get(URL),
-%%% case ReturnCode of
+%%% case ReturnCode of
%%% 504 -> 504;
%%% 404 -> 404;
%%% 400 -> 400;
@@ -164,16 +164,16 @@ get_app_metrics(XER, Appname, Namespace, CDAPURL) ->
%%% %All the uppercase ones are: Flow, MapReduce, Service, Spark, Worker, Workflow
%%% %From http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#details-of-a-program
%%% %All the lowercase ones are: flows, mapreduce, services, spark, workers, or workflows
-%%% program_type = case maps:get(<<"type">>, X) of
+%%% program_type = case maps:get(<<"type">>, X) of
%%% <<"Flow">> -> <<"flows">>; %cdap api fail man
%%% <<"Mapreduce">> -> <<"mapreduce">>;
%%% <<"Service">> -> <<"services">>;
%%% <<"Spark">> -> <<"spark">>;
%%% <<"Worker">> -> <<"workers">>;
%%% <<"Workflow">> -> <<"workflows">>
-%%% end,
+%%% end,
%%% program_id = maps:get(<<"id">>, X)
-%%% }
+%%% }
%%% end, Programs)
%%% end.
%%%
@@ -197,7 +197,7 @@ push_down_config(XER, Appname, Namespace, CDAPURL, ConfigJson) ->
%http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#update-an-application
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/update"]),
Body = jiffy:encode(
- {[
+ {[
{<<"config">>, ConfigJson}
]}),
httpabs:post(XER, URL, "application/json", Body). %returns {ReturnCode, ReturnBody}
@@ -209,7 +209,7 @@ push_down_program_preferences(XER, Appname, Namespace, CDAPURL, ParsedProgramPre
form_service_json_from_service_tuple(Appname, Namespace, CDAPURL, {SN, SE, EM}) ->
%transforms {SN, SE, EM} into {url: foo, method: bar}
URL = list_to_binary(?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/services/", SN, "/methods/", SE])),
- {[{<<"url">>, URL},
+ {[{<<"url">>, URL},
{<<"method">>, EM}
]}.
@@ -230,17 +230,17 @@ push_down_app_preferences(XER, Appname, Namespace, CDAPURL, AppPreferences) ->
deploy_app(XER, Appname, Namespace, CDAPURL, JarBody, ArtifactName, ArtifactVersion, AppConfig) ->
%Create Artifact
%Deploy App
-
+
%post the artifact, OK if already exists, no check
%explicitly set artifact version becausme some JARS do not have it
httpabs:post(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtifactName]), [{"Artifact-Version", erlang:binary_to_list(ArtifactVersion)}], "application/octet-stream", JarBody),
%deploy the application
- PutBody = jiffy:encode(
+ PutBody = jiffy:encode(
{[
{<<"artifact">>, {[
- {<<"name">>, ArtifactName},
- {<<"version">>, ArtifactVersion},
+ {<<"name">>, ArtifactName},
+ {<<"version">>, ArtifactVersion},
{<<"scope">>, <<"user">>}
]}},
{<<"config">>, AppConfig}
@@ -257,41 +257,42 @@ deploy_pipeline(XER, Appname, Namespace, CDAPURL, PipelineJson) ->
%Deploy a hydrator pipeline, assumes namespace has already been set up
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
httpabs:put(XER, URL, "application/json", PipelineJson).
-
-exec_pipeline(XER, Appname, Namespace, CDAPURL, Exec) ->
+
+exec_pipeline(XER, Appname, Namespace, CDAPURL, Exec) ->
%Exec assumed to be: "resume" or "suspend"
%TODO! REVISIT WHETHER datapipelineschedule is a parameter
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/schedules/dataPipelineSchedule/", Exec]),
- httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body.
+ httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body.
-exec_pipeline_workflow(XER, Appname, Namespace, CDAPURL, Exec) ->
- %Exec assumed to be: "stop" or
+exec_pipeline_workflow(XER, Appname, Namespace, CDAPURL, Exec) ->
+ %Exec assumed to be: "stop" or
%TODO! REVISIT WHETHER DataPipelineWorkflow is a parameter
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/workflows/DataPipelineWorkflow/", Exec]),
- httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body.
+ httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body.
get_pipeline_healthcheck(XER, Appname, Namespace, CDAPURL, PipelineHealthLimit) ->
- URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/schedules/dataPipelineSchedule/status"]),
+ URLRoot = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
+ URL = ?SC([URLRoot, "/schedules/dataPipelineSchedule/status"]),
{RC, RB} = httpabs:get(XER, URL),
case RC /= 200 of
- true -> ?BAD_HEALTH_CODE; %failed to even hit the status.
- false ->
+ true -> ?BAD_HEALTH_CODE; %failed to even hit the status.
+ false ->
Status = jiffy:decode(RB, [return_maps]),
- case maps:is_key(<<"status">>, Status) andalso maps:get(<<"status">>, Status) == <<"SCHEDULED">> of
+ case maps:is_key(<<"status">>, Status) andalso maps:get(<<"status">>, Status) == <<"SCHEDULED">> of
false -> ?BAD_HEALTH_CODE; %status is malformed or the pipeline is not scheduled, both not good
- true ->
+ true ->
%Next, check the last <LIMIT> number of runs, and report a failure if they were not sucessful, or report of more than one of them is running at the same time.
- %This logic came from Terry.
+ %This logic came from Terry.
%His logic is essentially that, if your application is running, but is failing, that should be interpeted as unhealthy and needs investigation.
- %His logic was also that if you have two runs running at the same time, that requires investigation as that should not happen.
+ %His logic was also that if you have two runs running at the same time, that requires investigation as that should not happen.
%RE list_to_binary(integer_to_list( see http://stackoverflow.com/questions/4010713/integer-to-binary-erlang
L = list_to_binary(integer_to_list(PipelineHealthLimit)),
- {RC2, RB2} = httpabs:get(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/workflows/DataPipelineWorkflow/runs?limit=", L])),
- case RC2 /= 200 of
+ {RC2, RB2} = httpabs:get(XER, ?SC([URLRoot, "/workflows/DataPipelineWorkflow/runs?limit=", L])),
+ case RC2 /= 200 of
true -> RC2; %failed to even hit the status
- false ->
+ false ->
LRST = lists:map(fun(S) ->
- case maps:is_key(<<"status">>, S) of
+ case maps:is_key(<<"status">>, S) of
false -> critical;
true -> case maps:get(<<"status">>, S) of <<"COMPLETED">> -> ok; <<"RUNNING">> -> running; <<"FAILED">> -> critical end
end end, jiffy:decode(RB2, [return_maps])),
@@ -324,7 +325,7 @@ create_namespace(XER, Namespace, CDAPURL) ->
get_app_config(XER, Appname, Namespace, CDAPURL) ->
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
{RC, RB} = httpabs:get(XER, URL),
- case RC of
+ case RC of
200 -> {200, maps:get(<<"configuration">>, jiffy:decode(RB, [return_maps]))};
_ -> {RC, RB}
end.
@@ -332,16 +333,16 @@ get_app_config(XER, Appname, Namespace, CDAPURL) ->
get_app_preferences(XER, Appname, Namespace, CDAPURL) ->
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/preferences"]),
{RC, RB} = httpabs:get(XER, URL),
- case RC of
+ case RC of
200 -> {200, jiffy:decode(RB, [return_maps])};
_ -> {RC, RB}
end.
get_cdap_cluster_version(XER, CDAPURL) ->
%CDAP decided to change their port numbers between release 3 and 4.
- %The broker works with both.
+ %The broker works with both.
%In order to add the correct GUI information into the broker "info endpoint", I have to know what CDAP version we are connected to.
- %The GUI information is a convinence function for component developers that hit the broker via the CLI tool.
+ %The GUI information is a convinence function for component developers that hit the broker via the CLI tool.
URL = ?SC([CDAPURL, "/v3/version"]),
{RC, RB} = httpabs:get(XER, URL),
case RC of