diff options
Diffstat (limited to 'src/cdap_interface.erl')
-rw-r--r-- | src/cdap_interface.erl | 121 |
1 files changed, 61 insertions, 60 deletions
diff --git a/src/cdap_interface.erl b/src/cdap_interface.erl index 7ce3b37..a1b0982 100644 --- a/src/cdap_interface.erl +++ b/src/cdap_interface.erl @@ -6,9 +6,9 @@ % Licensed under the Apache License, Version 2.0 (the "License"); % you may not use this file except in compliance with the License. % You may obtain a copy of the License at -% +% % http://www.apache.org/licenses/LICENSE-2.0 -% +% % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, % WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,12 +19,12 @@ % ECOMP is a trademark and service mark of AT&T Intellectual Property. -module(cdap_interface). --export([get_app_metrics/4, - get_app_healthcheck/4, - push_down_config/5, - form_service_json_from_service_tuple/4, - form_stream_url_from_streamname/3, - exec_programs/6, +-export([get_app_metrics/4, + get_app_healthcheck/4, + push_down_config/5, + form_service_json_from_service_tuple/4, + form_stream_url_from_streamname/3, + exec_programs/6, push_down_program_preferences/5, push_down_app_preferences/5, deploy_app/8, @@ -53,15 +53,15 @@ %%%INTERNAL %%% map_appname(Appname) -> - %CDAP APIs do not allow app names with any special characters. Here we will map the + %CDAP APIs do not allow app names with any special characters. Here we will map the %name to ensure it does not contain special characters using a regex whitelist %see http://stackoverflow.com/questions/3303420/regex-to-remove-all-special-characters-from-string re:replace(Appname, "[^0-9a-zA-Z]+", "", [{return, binary}, global]). -get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL) -> +get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL) -> URL = ?SC([CDAPURL, "/v3/metrics/search?target=metric&tag=namespace:", Namespace, "&tag=app:", map_appname(Appname)]), - {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", ""), - case ReturnCode of + {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", ""), + case ReturnCode of 200 -> [ X || X <- jiffy:decode(RetBody),binary:match(X, <<"user.">>) /= nomatch]; _ -> 504 %must bubble this up end. @@ -71,7 +71,7 @@ get_app_healthcheck_program(XER, Appname, Namespace, CDAPURL, P) -> %helper function: checks for a partocular program from an app URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/", P#program.type, "/", P#program.id]), {RC, _} = httpabs:get(XER, URL), - case RC of + case RC of 200 -> %Next make sure it's status is running {RC2, RB2} = httpabs:get(XER, ?SC([URL, "/status"])), @@ -80,9 +80,9 @@ get_app_healthcheck_program(XER, Appname, Namespace, CDAPURL, P) -> S = jiffy:decode(RB2, [return_maps]), case maps:is_key(<<"status">>, S) andalso maps:get(<<"status">>, S) == <<"RUNNING">> of true -> 200; %return 200 - false -> ?BAD_HEALTH_CODE %return - end; - _ -> ?BAD_HEALTH_CODE + false -> ?BAD_HEALTH_CODE %return + end; + _ -> ?BAD_HEALTH_CODE end; _ -> ?BAD_HEALTH_CODE end. @@ -99,41 +99,41 @@ push_down_program_preference(XER, Appname, Namespace, CDAPURL, {PT, PI, PP}) -> deploy_pipeline_dependency(XER, Namespace, CDAPURL, {ArtExtendsHeader, ArtName, ArtVerHeader, ArtURL, _}) -> %deploys a single dependency - %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple. + %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple. {200, JarBody} = httpabs:get(XER, ArtURL), Headers = [{"Artifact-Extends", ArtExtendsHeader}, {"Artifact-Version", ArtVerHeader}], - URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName]), + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName]), httpabs:post(XER, URL, Headers, "application/octet-stream", JarBody). deploy_pipeline_dependency_property(XER, Namespace, CDAPURL, {_, ArtName, ArtVerHeader, _, UIPropertiesURL}) -> - %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple. - case UIPropertiesURL of + %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple. + case UIPropertiesURL of none -> {200, ""}; %nothing to do - _ -> + _ -> {200, PropertiesJSON} = httpabs:get(XER, UIPropertiesURL), - URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName, "/versions/", ArtVerHeader, "/properties"]), + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName, "/versions/", ArtVerHeader, "/properties"]), httpabs:put(XER, URL, "application/json", PropertiesJSON) end. %%% %%%EXTERNAL %%% -get_app_metrics(XER, Appname, Namespace, CDAPURL) -> +get_app_metrics(XER, Appname, Namespace, CDAPURL) -> %Namespace should be a binary MetricsList = get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL), %assert 200 or bomb - case MetricsList of + case MetricsList of 504 -> {504, []}; %TODO! cdap seems to return a {200, []} for above call even if the app is not deployed. Is that OK? for now return empty list but maybe we should determine this and error with a 404 or something - [] -> {200, []}; + [] -> {200, []}; _ -> URL = ?SC([CDAPURL, "/v3/metrics/query"]), Body = jiffy:encode( - {[{<<"appmetrics">>, + {[{<<"appmetrics">>, {[{<<"tags">>, {[{<<"namespace">>, Namespace}, {<<"app">>, map_appname(Appname)}]}}, {<<"metrics">>, MetricsList}]} }]}), {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", Body), %even when app does not exist this seems to return a 200 so assert it! - case ReturnCode of + case ReturnCode of 200 -> {200, jiffy:decode(RetBody)}; 504 -> {504, []} end @@ -145,13 +145,13 @@ get_app_metrics(XER, Appname, Namespace, CDAPURL) -> %So now this information is sourced from the initial PUT request, where those programs are saved as supplemntal state %However, leaving it here because maybe one day it will be useful %%%get_app_programs(Appname, Namespace, CDAPURL) -> -%%% %fetch the list of programs from a running CDAP app. +%%% %fetch the list of programs from a running CDAP app. %%% %Parse this into a list of [{ProgramType, ProgramID}] tuples. -%%% %Used as part of healthcheck, and also as part of undeploy so I don't have to keep Programs in MNesia pipeline. +%%% %Used as part of healthcheck, and also as part of undeploy so I don't have to keep Programs in MNesia pipeline. %%% %get informaation about application %%% URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), %%% {ReturnCode, RetBody} = httpabs:http_get(URL), -%%% case ReturnCode of +%%% case ReturnCode of %%% 504 -> 504; %%% 404 -> 404; %%% 400 -> 400; @@ -164,16 +164,16 @@ get_app_metrics(XER, Appname, Namespace, CDAPURL) -> %%% %All the uppercase ones are: Flow, MapReduce, Service, Spark, Worker, Workflow %%% %From http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#details-of-a-program %%% %All the lowercase ones are: flows, mapreduce, services, spark, workers, or workflows -%%% program_type = case maps:get(<<"type">>, X) of +%%% program_type = case maps:get(<<"type">>, X) of %%% <<"Flow">> -> <<"flows">>; %cdap api fail man %%% <<"Mapreduce">> -> <<"mapreduce">>; %%% <<"Service">> -> <<"services">>; %%% <<"Spark">> -> <<"spark">>; %%% <<"Worker">> -> <<"workers">>; %%% <<"Workflow">> -> <<"workflows">> -%%% end, +%%% end, %%% program_id = maps:get(<<"id">>, X) -%%% } +%%% } %%% end, Programs) %%% end. %%% @@ -197,7 +197,7 @@ push_down_config(XER, Appname, Namespace, CDAPURL, ConfigJson) -> %http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#update-an-application URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/update"]), Body = jiffy:encode( - {[ + {[ {<<"config">>, ConfigJson} ]}), httpabs:post(XER, URL, "application/json", Body). %returns {ReturnCode, ReturnBody} @@ -209,7 +209,7 @@ push_down_program_preferences(XER, Appname, Namespace, CDAPURL, ParsedProgramPre form_service_json_from_service_tuple(Appname, Namespace, CDAPURL, {SN, SE, EM}) -> %transforms {SN, SE, EM} into {url: foo, method: bar} URL = list_to_binary(?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/services/", SN, "/methods/", SE])), - {[{<<"url">>, URL}, + {[{<<"url">>, URL}, {<<"method">>, EM} ]}. @@ -230,17 +230,17 @@ push_down_app_preferences(XER, Appname, Namespace, CDAPURL, AppPreferences) -> deploy_app(XER, Appname, Namespace, CDAPURL, JarBody, ArtifactName, ArtifactVersion, AppConfig) -> %Create Artifact %Deploy App - + %post the artifact, OK if already exists, no check %explicitly set artifact version becausme some JARS do not have it httpabs:post(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtifactName]), [{"Artifact-Version", erlang:binary_to_list(ArtifactVersion)}], "application/octet-stream", JarBody), %deploy the application - PutBody = jiffy:encode( + PutBody = jiffy:encode( {[ {<<"artifact">>, {[ - {<<"name">>, ArtifactName}, - {<<"version">>, ArtifactVersion}, + {<<"name">>, ArtifactName}, + {<<"version">>, ArtifactVersion}, {<<"scope">>, <<"user">>} ]}}, {<<"config">>, AppConfig} @@ -257,41 +257,42 @@ deploy_pipeline(XER, Appname, Namespace, CDAPURL, PipelineJson) -> %Deploy a hydrator pipeline, assumes namespace has already been set up URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), httpabs:put(XER, URL, "application/json", PipelineJson). - -exec_pipeline(XER, Appname, Namespace, CDAPURL, Exec) -> + +exec_pipeline(XER, Appname, Namespace, CDAPURL, Exec) -> %Exec assumed to be: "resume" or "suspend" %TODO! REVISIT WHETHER datapipelineschedule is a parameter URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/schedules/dataPipelineSchedule/", Exec]), - httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body. + httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body. -exec_pipeline_workflow(XER, Appname, Namespace, CDAPURL, Exec) -> - %Exec assumed to be: "stop" or +exec_pipeline_workflow(XER, Appname, Namespace, CDAPURL, Exec) -> + %Exec assumed to be: "stop" or %TODO! REVISIT WHETHER DataPipelineWorkflow is a parameter URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/workflows/DataPipelineWorkflow/", Exec]), - httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body. + httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body. get_pipeline_healthcheck(XER, Appname, Namespace, CDAPURL, PipelineHealthLimit) -> - URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/schedules/dataPipelineSchedule/status"]), + URLRoot = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), + URL = ?SC([URLRoot, "/schedules/dataPipelineSchedule/status"]), {RC, RB} = httpabs:get(XER, URL), case RC /= 200 of - true -> ?BAD_HEALTH_CODE; %failed to even hit the status. - false -> + true -> ?BAD_HEALTH_CODE; %failed to even hit the status. + false -> Status = jiffy:decode(RB, [return_maps]), - case maps:is_key(<<"status">>, Status) andalso maps:get(<<"status">>, Status) == <<"SCHEDULED">> of + case maps:is_key(<<"status">>, Status) andalso maps:get(<<"status">>, Status) == <<"SCHEDULED">> of false -> ?BAD_HEALTH_CODE; %status is malformed or the pipeline is not scheduled, both not good - true -> + true -> %Next, check the last <LIMIT> number of runs, and report a failure if they were not sucessful, or report of more than one of them is running at the same time. - %This logic came from Terry. + %This logic came from Terry. %His logic is essentially that, if your application is running, but is failing, that should be interpeted as unhealthy and needs investigation. - %His logic was also that if you have two runs running at the same time, that requires investigation as that should not happen. + %His logic was also that if you have two runs running at the same time, that requires investigation as that should not happen. %RE list_to_binary(integer_to_list( see http://stackoverflow.com/questions/4010713/integer-to-binary-erlang L = list_to_binary(integer_to_list(PipelineHealthLimit)), - {RC2, RB2} = httpabs:get(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/workflows/DataPipelineWorkflow/runs?limit=", L])), - case RC2 /= 200 of + {RC2, RB2} = httpabs:get(XER, ?SC([URLRoot, "/workflows/DataPipelineWorkflow/runs?limit=", L])), + case RC2 /= 200 of true -> RC2; %failed to even hit the status - false -> + false -> LRST = lists:map(fun(S) -> - case maps:is_key(<<"status">>, S) of + case maps:is_key(<<"status">>, S) of false -> critical; true -> case maps:get(<<"status">>, S) of <<"COMPLETED">> -> ok; <<"RUNNING">> -> running; <<"FAILED">> -> critical end end end, jiffy:decode(RB2, [return_maps])), @@ -324,7 +325,7 @@ create_namespace(XER, Namespace, CDAPURL) -> get_app_config(XER, Appname, Namespace, CDAPURL) -> URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), {RC, RB} = httpabs:get(XER, URL), - case RC of + case RC of 200 -> {200, maps:get(<<"configuration">>, jiffy:decode(RB, [return_maps]))}; _ -> {RC, RB} end. @@ -332,16 +333,16 @@ get_app_config(XER, Appname, Namespace, CDAPURL) -> get_app_preferences(XER, Appname, Namespace, CDAPURL) -> URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/preferences"]), {RC, RB} = httpabs:get(XER, URL), - case RC of + case RC of 200 -> {200, jiffy:decode(RB, [return_maps])}; _ -> {RC, RB} end. get_cdap_cluster_version(XER, CDAPURL) -> %CDAP decided to change their port numbers between release 3 and 4. - %The broker works with both. + %The broker works with both. %In order to add the correct GUI information into the broker "info endpoint", I have to know what CDAP version we are connected to. - %The GUI information is a convinence function for component developers that hit the broker via the CLI tool. + %The GUI information is a convinence function for component developers that hit the broker via the CLI tool. URL = ?SC([CDAPURL, "/v3/version"]), {RC, RB} = httpabs:get(XER, URL), case RC of |