summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cdap_interface.erl121
-rw-r--r--src/cdap_interface_tests.erl160
-rw-r--r--src/cdapbroker.app.src2
-rw-r--r--src/httpabs.erl12
-rw-r--r--src/httpabs_tests.erl9
-rw-r--r--src/util.erl12
-rw-r--r--src/util_tests.erl60
7 files changed, 290 insertions, 86 deletions
diff --git a/src/cdap_interface.erl b/src/cdap_interface.erl
index 7ce3b37..a1b0982 100644
--- a/src/cdap_interface.erl
+++ b/src/cdap_interface.erl
@@ -6,9 +6,9 @@
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
-%
+%
% http://www.apache.org/licenses/LICENSE-2.0
-%
+%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,12 +19,12 @@
% ECOMP is a trademark and service mark of AT&T Intellectual Property.
-module(cdap_interface).
--export([get_app_metrics/4,
- get_app_healthcheck/4,
- push_down_config/5,
- form_service_json_from_service_tuple/4,
- form_stream_url_from_streamname/3,
- exec_programs/6,
+-export([get_app_metrics/4,
+ get_app_healthcheck/4,
+ push_down_config/5,
+ form_service_json_from_service_tuple/4,
+ form_stream_url_from_streamname/3,
+ exec_programs/6,
push_down_program_preferences/5,
push_down_app_preferences/5,
deploy_app/8,
@@ -53,15 +53,15 @@
%%%INTERNAL
%%%
map_appname(Appname) ->
- %CDAP APIs do not allow app names with any special characters. Here we will map the
+ %CDAP APIs do not allow app names with any special characters. Here we will map the
%name to ensure it does not contain special characters using a regex whitelist
%see http://stackoverflow.com/questions/3303420/regex-to-remove-all-special-characters-from-string
re:replace(Appname, "[^0-9a-zA-Z]+", "", [{return, binary}, global]).
-get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL) ->
+get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL) ->
URL = ?SC([CDAPURL, "/v3/metrics/search?target=metric&tag=namespace:", Namespace, "&tag=app:", map_appname(Appname)]),
- {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", ""),
- case ReturnCode of
+ {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", ""),
+ case ReturnCode of
200 -> [ X || X <- jiffy:decode(RetBody),binary:match(X, <<"user.">>) /= nomatch];
_ -> 504 %must bubble this up
end.
@@ -71,7 +71,7 @@ get_app_healthcheck_program(XER, Appname, Namespace, CDAPURL, P) ->
%helper function: checks for a partocular program from an app
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/", P#program.type, "/", P#program.id]),
{RC, _} = httpabs:get(XER, URL),
- case RC of
+ case RC of
200 ->
%Next make sure it's status is running
{RC2, RB2} = httpabs:get(XER, ?SC([URL, "/status"])),
@@ -80,9 +80,9 @@ get_app_healthcheck_program(XER, Appname, Namespace, CDAPURL, P) ->
S = jiffy:decode(RB2, [return_maps]),
case maps:is_key(<<"status">>, S) andalso maps:get(<<"status">>, S) == <<"RUNNING">> of
true -> 200; %return 200
- false -> ?BAD_HEALTH_CODE %return
- end;
- _ -> ?BAD_HEALTH_CODE
+ false -> ?BAD_HEALTH_CODE %return
+ end;
+ _ -> ?BAD_HEALTH_CODE
end;
_ -> ?BAD_HEALTH_CODE
end.
@@ -99,41 +99,41 @@ push_down_program_preference(XER, Appname, Namespace, CDAPURL, {PT, PI, PP}) ->
deploy_pipeline_dependency(XER, Namespace, CDAPURL, {ArtExtendsHeader, ArtName, ArtVerHeader, ArtURL, _}) ->
%deploys a single dependency
- %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple.
+ %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple.
{200, JarBody} = httpabs:get(XER, ArtURL),
Headers = [{"Artifact-Extends", ArtExtendsHeader}, {"Artifact-Version", ArtVerHeader}],
- URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName]),
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName]),
httpabs:post(XER, URL, Headers, "application/octet-stream", JarBody).
deploy_pipeline_dependency_property(XER, Namespace, CDAPURL, {_, ArtName, ArtVerHeader, _, UIPropertiesURL}) ->
- %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple.
- case UIPropertiesURL of
+ %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple.
+ case UIPropertiesURL of
none -> {200, ""}; %nothing to do
- _ ->
+ _ ->
{200, PropertiesJSON} = httpabs:get(XER, UIPropertiesURL),
- URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName, "/versions/", ArtVerHeader, "/properties"]),
+ URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName, "/versions/", ArtVerHeader, "/properties"]),
httpabs:put(XER, URL, "application/json", PropertiesJSON)
end.
%%%
%%%EXTERNAL
%%%
-get_app_metrics(XER, Appname, Namespace, CDAPURL) ->
+get_app_metrics(XER, Appname, Namespace, CDAPURL) ->
%Namespace should be a binary
MetricsList = get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL), %assert 200 or bomb
- case MetricsList of
+ case MetricsList of
504 -> {504, []};
%TODO! cdap seems to return a {200, []} for above call even if the app is not deployed. Is that OK? for now return empty list but maybe we should determine this and error with a 404 or something
- [] -> {200, []};
+ [] -> {200, []};
_ ->
URL = ?SC([CDAPURL, "/v3/metrics/query"]),
Body = jiffy:encode(
- {[{<<"appmetrics">>,
+ {[{<<"appmetrics">>,
{[{<<"tags">>, {[{<<"namespace">>, Namespace}, {<<"app">>, map_appname(Appname)}]}},
{<<"metrics">>, MetricsList}]}
}]}),
{ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", Body), %even when app does not exist this seems to return a 200 so assert it!
- case ReturnCode of
+ case ReturnCode of
200 -> {200, jiffy:decode(RetBody)};
504 -> {504, []}
end
@@ -145,13 +145,13 @@ get_app_metrics(XER, Appname, Namespace, CDAPURL) ->
%So now this information is sourced from the initial PUT request, where those programs are saved as supplemntal state
%However, leaving it here because maybe one day it will be useful
%%%get_app_programs(Appname, Namespace, CDAPURL) ->
-%%% %fetch the list of programs from a running CDAP app.
+%%% %fetch the list of programs from a running CDAP app.
%%% %Parse this into a list of [{ProgramType, ProgramID}] tuples.
-%%% %Used as part of healthcheck, and also as part of undeploy so I don't have to keep Programs in MNesia pipeline.
+%%% %Used as part of healthcheck, and also as part of undeploy so I don't have to keep Programs in MNesia pipeline.
%%% %get informaation about application
%%% URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
%%% {ReturnCode, RetBody} = httpabs:http_get(URL),
-%%% case ReturnCode of
+%%% case ReturnCode of
%%% 504 -> 504;
%%% 404 -> 404;
%%% 400 -> 400;
@@ -164,16 +164,16 @@ get_app_metrics(XER, Appname, Namespace, CDAPURL) ->
%%% %All the uppercase ones are: Flow, MapReduce, Service, Spark, Worker, Workflow
%%% %From http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#details-of-a-program
%%% %All the lowercase ones are: flows, mapreduce, services, spark, workers, or workflows
-%%% program_type = case maps:get(<<"type">>, X) of
+%%% program_type = case maps:get(<<"type">>, X) of
%%% <<"Flow">> -> <<"flows">>; %cdap api fail man
%%% <<"Mapreduce">> -> <<"mapreduce">>;
%%% <<"Service">> -> <<"services">>;
%%% <<"Spark">> -> <<"spark">>;
%%% <<"Worker">> -> <<"workers">>;
%%% <<"Workflow">> -> <<"workflows">>
-%%% end,
+%%% end,
%%% program_id = maps:get(<<"id">>, X)
-%%% }
+%%% }
%%% end, Programs)
%%% end.
%%%
@@ -197,7 +197,7 @@ push_down_config(XER, Appname, Namespace, CDAPURL, ConfigJson) ->
%http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#update-an-application
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/update"]),
Body = jiffy:encode(
- {[
+ {[
{<<"config">>, ConfigJson}
]}),
httpabs:post(XER, URL, "application/json", Body). %returns {ReturnCode, ReturnBody}
@@ -209,7 +209,7 @@ push_down_program_preferences(XER, Appname, Namespace, CDAPURL, ParsedProgramPre
form_service_json_from_service_tuple(Appname, Namespace, CDAPURL, {SN, SE, EM}) ->
%transforms {SN, SE, EM} into {url: foo, method: bar}
URL = list_to_binary(?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/services/", SN, "/methods/", SE])),
- {[{<<"url">>, URL},
+ {[{<<"url">>, URL},
{<<"method">>, EM}
]}.
@@ -230,17 +230,17 @@ push_down_app_preferences(XER, Appname, Namespace, CDAPURL, AppPreferences) ->
deploy_app(XER, Appname, Namespace, CDAPURL, JarBody, ArtifactName, ArtifactVersion, AppConfig) ->
%Create Artifact
%Deploy App
-
+
%post the artifact, OK if already exists, no check
%explicitly set artifact version becausme some JARS do not have it
httpabs:post(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtifactName]), [{"Artifact-Version", erlang:binary_to_list(ArtifactVersion)}], "application/octet-stream", JarBody),
%deploy the application
- PutBody = jiffy:encode(
+ PutBody = jiffy:encode(
{[
{<<"artifact">>, {[
- {<<"name">>, ArtifactName},
- {<<"version">>, ArtifactVersion},
+ {<<"name">>, ArtifactName},
+ {<<"version">>, ArtifactVersion},
{<<"scope">>, <<"user">>}
]}},
{<<"config">>, AppConfig}
@@ -257,41 +257,42 @@ deploy_pipeline(XER, Appname, Namespace, CDAPURL, PipelineJson) ->
%Deploy a hydrator pipeline, assumes namespace has already been set up
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
httpabs:put(XER, URL, "application/json", PipelineJson).
-
-exec_pipeline(XER, Appname, Namespace, CDAPURL, Exec) ->
+
+exec_pipeline(XER, Appname, Namespace, CDAPURL, Exec) ->
%Exec assumed to be: "resume" or "suspend"
%TODO! REVISIT WHETHER datapipelineschedule is a parameter
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/schedules/dataPipelineSchedule/", Exec]),
- httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body.
+ httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body.
-exec_pipeline_workflow(XER, Appname, Namespace, CDAPURL, Exec) ->
- %Exec assumed to be: "stop" or
+exec_pipeline_workflow(XER, Appname, Namespace, CDAPURL, Exec) ->
+ %Exec assumed to be: "stop" or
%TODO! REVISIT WHETHER DataPipelineWorkflow is a parameter
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/workflows/DataPipelineWorkflow/", Exec]),
- httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body.
+ httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body.
get_pipeline_healthcheck(XER, Appname, Namespace, CDAPURL, PipelineHealthLimit) ->
- URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/schedules/dataPipelineSchedule/status"]),
+ URLRoot = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
+ URL = ?SC([URLRoot, "/schedules/dataPipelineSchedule/status"]),
{RC, RB} = httpabs:get(XER, URL),
case RC /= 200 of
- true -> ?BAD_HEALTH_CODE; %failed to even hit the status.
- false ->
+ true -> ?BAD_HEALTH_CODE; %failed to even hit the status.
+ false ->
Status = jiffy:decode(RB, [return_maps]),
- case maps:is_key(<<"status">>, Status) andalso maps:get(<<"status">>, Status) == <<"SCHEDULED">> of
+ case maps:is_key(<<"status">>, Status) andalso maps:get(<<"status">>, Status) == <<"SCHEDULED">> of
false -> ?BAD_HEALTH_CODE; %status is malformed or the pipeline is not scheduled, both not good
- true ->
+ true ->
%Next, check the last <LIMIT> number of runs, and report a failure if they were not sucessful, or report of more than one of them is running at the same time.
- %This logic came from Terry.
+ %This logic came from Terry.
%His logic is essentially that, if your application is running, but is failing, that should be interpeted as unhealthy and needs investigation.
- %His logic was also that if you have two runs running at the same time, that requires investigation as that should not happen.
+ %His logic was also that if you have two runs running at the same time, that requires investigation as that should not happen.
%RE list_to_binary(integer_to_list( see http://stackoverflow.com/questions/4010713/integer-to-binary-erlang
L = list_to_binary(integer_to_list(PipelineHealthLimit)),
- {RC2, RB2} = httpabs:get(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/workflows/DataPipelineWorkflow/runs?limit=", L])),
- case RC2 /= 200 of
+ {RC2, RB2} = httpabs:get(XER, ?SC([URLRoot, "/workflows/DataPipelineWorkflow/runs?limit=", L])),
+ case RC2 /= 200 of
true -> RC2; %failed to even hit the status
- false ->
+ false ->
LRST = lists:map(fun(S) ->
- case maps:is_key(<<"status">>, S) of
+ case maps:is_key(<<"status">>, S) of
false -> critical;
true -> case maps:get(<<"status">>, S) of <<"COMPLETED">> -> ok; <<"RUNNING">> -> running; <<"FAILED">> -> critical end
end end, jiffy:decode(RB2, [return_maps])),
@@ -324,7 +325,7 @@ create_namespace(XER, Namespace, CDAPURL) ->
get_app_config(XER, Appname, Namespace, CDAPURL) ->
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]),
{RC, RB} = httpabs:get(XER, URL),
- case RC of
+ case RC of
200 -> {200, maps:get(<<"configuration">>, jiffy:decode(RB, [return_maps]))};
_ -> {RC, RB}
end.
@@ -332,16 +333,16 @@ get_app_config(XER, Appname, Namespace, CDAPURL) ->
get_app_preferences(XER, Appname, Namespace, CDAPURL) ->
URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/preferences"]),
{RC, RB} = httpabs:get(XER, URL),
- case RC of
+ case RC of
200 -> {200, jiffy:decode(RB, [return_maps])};
_ -> {RC, RB}
end.
get_cdap_cluster_version(XER, CDAPURL) ->
%CDAP decided to change their port numbers between release 3 and 4.
- %The broker works with both.
+ %The broker works with both.
%In order to add the correct GUI information into the broker "info endpoint", I have to know what CDAP version we are connected to.
- %The GUI information is a convinence function for component developers that hit the broker via the CLI tool.
+ %The GUI information is a convinence function for component developers that hit the broker via the CLI tool.
URL = ?SC([CDAPURL, "/v3/version"]),
{RC, RB} = httpabs:get(XER, URL),
case RC of
diff --git a/src/cdap_interface_tests.erl b/src/cdap_interface_tests.erl
index 37926a6..c1d1e6c 100644
--- a/src/cdap_interface_tests.erl
+++ b/src/cdap_interface_tests.erl
@@ -6,9 +6,9 @@
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
-%
+%
% http://www.apache.org/licenses/LICENSE-2.0
-%
+%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,14 +21,152 @@
-module(cdap_interface_tests).
-include_lib("eunit/include/eunit.hrl").
+-import(cdap_interface, [
+ get_cdap_gui_port_from_version/1,
+ map_appname/1,
+ get_cdap_cluster_version/2,
+ form_stream_url_from_streamname/3,
+ form_service_json_from_service_tuple/4,
+ get_app_preferences/4,
+ get_app_config/4,
+ get_pipeline_healthcheck/5
+ ]).
+
+get_pipeline_healthcheck_test() ->
+ FakeReturn = jiffy:encode({[{<<"status">>, <<"SCHEDULED">>}]}),
+ try meck:new(httpabs, [passthrough]) catch _:_ -> ok end,
+ %notfound
+ meck:expect(httpabs, get, fun(_XER, URL) -> case URL of
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturn};
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234appNOTFOUND/schedules/dataPipelineSchedule/status" -> {404, ""}
+ end end),
+ ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.appNOTFOUND", "testns", "http://666.666.666.666:666", 666) == 400),
+
+ %bad status
+ FakeReturnBadStatus = jiffy:encode({[{<<"status">>, <<"nosoupforyou">>}]}),
+ meck:expect(httpabs, get, fun(_XER, URL) -> case URL of
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturnBadStatus}
+ end end),
+ ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666", 666) == 400),
+
+ %good status but no status endpoint
+ meck:expect(httpabs, get, fun(_XER, URL) -> case URL of
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturn};
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/workflows/DataPipelineWorkflow/runs?limit=666" -> {404, ""}
+ end end),
+ ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666", 666) == 404),
+
+ %good status but a bad malformed inner
+ MalformedRuns = jiffy:encode([
+ {[{<<"nostatus">>, <<"foryou">>}]},
+ {[{<<"status">>, <<"RUNNING">>}]}
+ ]),
+ meck:expect(httpabs, get, fun(_XER, URL) -> case URL of
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturn};
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/workflows/DataPipelineWorkflow/runs?limit=666" -> {200, MalformedRuns}
+ end end),
+
+ ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666", 666) == 400),
+
+ %good status but a not good inner
+ BadRuns = jiffy:encode([
+ {[{<<"status">>, <<"FAILED">>}]},
+ {[{<<"status">>, <<"RUNNING">>}]}
+ ]),
+ meck:expect(httpabs, get, fun(_XER, URL) -> case URL of
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturn};
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/workflows/DataPipelineWorkflow/runs?limit=666" -> {200, BadRuns}
+ end end),
+ ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666", 666) == 400),
+
+ %two running
+ TwoRunning = jiffy:encode([
+ {[{<<"status">>, <<"RUNNING">>}]},
+ {[{<<"status">>, <<"COMPLETED">>}]},
+ {[{<<"status">>, <<"RUNNING">>}]}
+ ]),
+ meck:expect(httpabs, get, fun(_XER, URL) -> case URL of
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturn};
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/workflows/DataPipelineWorkflow/runs?limit=666" -> {200, TwoRunning}
+ end end),
+ ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666", 666) == 400),
+
+ %all good
+ %good status but a not good inner
+ GoodRuns = jiffy:encode([
+ {[{<<"status">>, <<"COMPLETED">>}]},
+ {[{<<"status">>, <<"RUNNING">>}]},
+ {[{<<"status">>, <<"COMPLETED">>}]}
+ ]),
+ meck:expect(httpabs, get, fun(_XER, URL) -> case URL of
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturn};
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/workflows/DataPipelineWorkflow/runs?limit=666" -> {200, GoodRuns}
+ end end),
+ ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666", 666) == 200),
+
+ meck:unload(httpabs).
+
+
+
+get_app_config_test() ->
+ FakeReturn = jiffy:encode({[{<<"configuration">>, {[{<<"welcome">>, <<"toeternity">>}]}}]}),
+ try meck:new(httpabs, [passthrough]) catch _:_ -> ok end,
+ meck:expect(httpabs, get, fun(_XER, URL) ->
+ case URL of
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app" -> {200, FakeReturn};
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234appNOTFOUND" -> {404, ""}
+ end
+ end),
+ ?assert(get_app_config("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666") ==
+ {200, #{<<"welcome">> => <<"toeternity">>}}),
+ ?assert(get_app_config("", "1234%%%%^@#$%@#$%#$^@$.appNOTFOUND", "testns", "http://666.666.666.666:666") == {404, ""}),
+ meck:unload(httpabs).
+
+get_app_preferences_test() ->
+ FakeReturn = jiffy:encode({[{<<"welcome">>, <<"toeternity">>}]}),
+ try meck:new(httpabs, [passthrough]) catch _:_ -> ok end,
+ meck:expect(httpabs, get, fun(_XER, URL) ->
+ case URL of
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/preferences" -> {200, FakeReturn};
+ "http://666.666.666.666:666/v3/namespaces/testns/apps/1234appNOTFOUND/preferences" -> {404, ""}
+ end
+ end),
+ ?assert(get_app_preferences("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666") ==
+ {200, #{<<"welcome">> => <<"toeternity">>}}),
+ ?assert(get_app_preferences("", "1234%%%%^@#$%@#$%#$^@$.appNOTFOUND", "testns", "http://666.666.666.666:666") == {404, ""}),
+ meck:unload(httpabs).
+
+form_service_json_from_service_tuple_test() ->
+ ?assert(form_service_json_from_service_tuple(<<"amazin@)#$%@#)$%gapp">>, "amazingns", "http://666.666.666.666:666", {"seeme", "feelme", "PUT"})
+ == {[{<<"url">> ,<<"http://666.666.666.666:666/v3/namespaces/amazingns/apps/amazingapp/services/seeme/methods/feelme">>},
+ {<<"method">>, "PUT"}]}).
+
+form_stream_url_from_streamname_test() ->
+ ?assert(form_stream_url_from_streamname("666.666.666.666:666", "thevoid", "souls") == <<"666.666.666.666:666/v3/namespaces/thevoid/streams/souls">>).
+
get_cdap_gui_port_from_version_test() ->
- ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.0.0")),
- ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.10.0")),
- ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.0.10")),
- ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.10.10")),
- ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.0.0">>)),
- ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.10.0">>)),
- ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.0.10">>)),
- ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.10.10">>)),
- ?assert(<<"UNKNOWN CDAP VERSION">> == cdap_interface:get_cdap_gui_port_from_version("5.0.0")).
+ ?assert(9999 == get_cdap_gui_port_from_version("3.0.0")),
+ ?assert(9999 == get_cdap_gui_port_from_version("3.10.0")),
+ ?assert(9999 == get_cdap_gui_port_from_version("3.0.10")),
+ ?assert(9999 == get_cdap_gui_port_from_version("3.10.10")),
+ ?assert(11011 == get_cdap_gui_port_from_version(<<"4.0.0">>)),
+ ?assert(11011 == get_cdap_gui_port_from_version(<<"4.10.0">>)),
+ ?assert(11011 == get_cdap_gui_port_from_version(<<"4.0.10">>)),
+ ?assert(11011 == get_cdap_gui_port_from_version(<<"4.10.10">>)),
+ ?assert(<<"UNKNOWN CDAP VERSION">> == get_cdap_gui_port_from_version("5.0.0")).
+
+map_appname_test() ->
+ ?assert(map_appname(<<"foo">>) == <<"foo">>),
+ ?assert(map_appname(<<"fo.o">>) == <<"foo">>),
+ ?assert(map_appname(<<"f_oo">>) == <<"foo">>),
+ ?assert(map_appname(<<"._*#$%#*%$$#@#foo">>) == <<"foo">>).
+
+get_cdap_cluster_version_test() ->
+ FakeReturn = jiffy:encode({[{<<"version">>, <<"4.0.3">>}]}),
+ try meck:new(httpabs, [passthrough]) catch _:_ -> ok end,
+ meck:expect(httpabs, get, fun(_XER, _URL) -> {200, FakeReturn} end),
+ ?assert(get_cdap_cluster_version("","") == <<"4.0.3">>),
+ meck:expect(httpabs, get, fun(_XER, _URL) -> {404, ""} end),
+ ?assert(get_cdap_cluster_version("","") == <<"UNKNOWN CDAP VERSION">>),
+ meck:unload(httpabs).
diff --git a/src/cdapbroker.app.src b/src/cdapbroker.app.src
index eb39868..f5a1821 100644
--- a/src/cdapbroker.app.src
+++ b/src/cdapbroker.app.src
@@ -1,6 +1,6 @@
{application, cdapbroker,
[{description, "Interface between Consul and CDAP in DCAE"},
- {vsn, "4.0.7"},
+ {vsn, "4.0.8"},
{registered, []},
{mod, { cdapbroker_app, []}},
{applications,
diff --git a/src/httpabs.erl b/src/httpabs.erl
index 152621e..4a29f83 100644
--- a/src/httpabs.erl
+++ b/src/httpabs.erl
@@ -86,9 +86,8 @@ sanitize(URL) ->
-spec post(string(), string()|binary(), string(), any()) -> httpstat().
post(XER, URL, ContentType, Body) ->
%post that sends the XER, no headers signature
- Headers = [{"x-ecomp-requestid", XER}],
U = sanitize(URL),
- parse_response(httpc:request(post, {U, Headers, ContentType, Body}, [],[]), U).
+ parse_response(httpc:request(post, {U, [{"x-ecomp-requestid", XER}], ContentType, Body}, [],[]), U).
-spec post(string(), string()|binary(), list(), string(), any()) -> httpstat().
post(XER, URL, Headers, ContentType, Body) ->
@@ -100,19 +99,16 @@ post(XER, URL, Headers, ContentType, Body) ->
get(XER, URL) ->
%http get that always sends the XER.. even if the server doesn't want it; maybe this will blow up on me one day.
U = sanitize(URL),
- Headers = [{"x-ecomp-requestid", XER}],
- parse_response(httpc:request(get, {U, Headers}, [], []), U).
+ parse_response(httpc:request(get, {U, [{"x-ecomp-requestid", XER}]}, [], []), U).
-spec put(string(), string()|binary(), string(), any()) -> httpstat().
put(XER, URL, ContentType, Body) ->
%http put that always sends the XER
U = sanitize(URL),
- Headers = [{"x-ecomp-requestid", XER}],
- parse_response(httpc:request(put, {U, Headers, ContentType, Body}, [],[]), U).
+ parse_response(httpc:request(put, {U, [{"x-ecomp-requestid", XER}], ContentType, Body}, [],[]), U).
-spec delete(string(), string()|binary()) -> httpstat().
delete(XER, URL) ->
%http delete that always sends the XER
U = sanitize(URL),
- Headers = [{"x-ecomp-requestid", XER}],
- parse_response(httpc:request(delete, {U, Headers}, [],[]), U).
+ parse_response(httpc:request(delete, {U, [{"x-ecomp-requestid", XER}]}, [],[]), U).
diff --git a/src/httpabs_tests.erl b/src/httpabs_tests.erl
index 27b7116..14dc7e6 100644
--- a/src/httpabs_tests.erl
+++ b/src/httpabs_tests.erl
@@ -41,5 +41,12 @@ parse_response_test() ->
BadBody = {[{<<"config">>, ReconfigMap}]},
?assert(httpabs:put("testxer", "www.foo.com", "application/json", BadBody) == {400,"ERROR: The request Body is malformed"}),
?assert(parse_response({error,{bad_body_generator, BadBody}}, "www.foo.com") == {400,"ERROR: The request Body is malformed"}),
+ ?assert(parse_response({error,{bad_body, BadBody}}, "www.foo.com") == {400,"ERROR: The request Body is malformed"}),
+ ?assert(parse_response({error,{this_was, "not expected"}}, "www.fubar.com") == {504,<<"ERROR: The following URL is unreachable or the request was unable to be parsed due to an unknown error: www.fubar.com">>}),
+
+ %try a 200
+ ?assert(parse_response({ok,{{"HTTP/1.1",200,"OK"},[{"cache-control","private, max-age=0"},{"date","Mon, 11 Sep 2017 15:05:11 GMT"},{"accept-ranges","none"},{"server","gws"},{"vary","Accept-Encoding"},{"content-length","46376"},{"content-type","text/html; charset=ISO-8859-1"},{"expires","-1"},{"p3p","..."},{"x-xss-protection","1; mode=block"},{"x-frame-options","SAMEORIGIN"},{"set-cookie","NID=111=nGQHl8ljJ3nXHmGmIZmGaTgoq3WAdbgWaxAUOQJm-0AaOkS64iiXtm-HojIFSpqowj7Nr-KpqS8o-oDOROq-4AaDs0J4M92V7yBOAJQYPkuK7wtVav0BhpOOYgCHysUN; expires=Tue, 13-Mar-2018 15:05:11 GMT; path=/; domain=.google.com; HttpOnly"},{"alt-svc","quic=\":443\"; ma=2592000; v=\"39,38,37,35\""}],"<!doctype html>..."}}, "https://google.com") == {200, "<!doctype html>..."}),
+
+ % try a 404
+ ?assert(parse_response({ok,{{"HTTP/1.1",404,"OK"},[{"cache-control","private, max-age=0"},{"date","Mon, 11 Sep 2017 15:05:11 GMT"},{"accept-ranges","none"},{"server","gws"},{"vary","Accept-Encoding"},{"content-length","46376"},{"content-type","text/html; charset=ISO-8859-1"},{"expires","-1"},{"p3p","..."},{"x-xss-protection","1; mode=block"},{"x-frame-options","SAMEORIGIN"},{"set-cookie","NID=111=nGQHl8ljJ3nXHmGmIZmGaTgoq3WAdbgWaxAUOQJm-0AaOkS64iiXtm-HojIFSpqowj7Nr-KpqS8o-oDOROq-4AaDs0J4M92V7yBOAJQYPkuK7wtVav0BhpOOYgCHysUN; expires=Tue, 13-Mar-2018 15:05:11 GMT; path=/; domain=.google.com; HttpOnly"},{"alt-svc","quic=\":443\"; ma=2592000; v=\"39,38,37,35\""}],""}}, "https://google.com") == {404,"State: OK. Return Body: "}).
- ?assert(parse_response({ok,{{"HTTP/1.1",200,"OK"},[{"cache-control","private, max-age=0"},{"date","Mon, 11 Sep 2017 15:05:11 GMT"},{"accept-ranges","none"},{"server","gws"},{"vary","Accept-Encoding"},{"content-length","46376"},{"content-type","text/html; charset=ISO-8859-1"},{"expires","-1"},{"p3p","..."},{"x-xss-protection","1; mode=block"},{"x-frame-options","SAMEORIGIN"},{"set-cookie","NID=111=nGQHl8ljJ3nXHmGmIZmGaTgoq3WAdbgWaxAUOQJm-0AaOkS64iiXtm-HojIFSpqowj7Nr-KpqS8o-oDOROq-4AaDs0J4M92V7yBOAJQYPkuK7wtVav0BhpOOYgCHysUN; expires=Tue, 13-Mar-2018 15:05:11 GMT; path=/; domain=.google.com; HttpOnly"},{"alt-svc","quic=\":443\"; ma=2592000; v=\"39,38,37,35\""}],"<!doctype html>..."}}, "https://google.com") == {200, "<!doctype html>..."}).
diff --git a/src/util.erl b/src/util.erl
index 520b071..76f8bbf 100644
--- a/src/util.erl
+++ b/src/util.erl
@@ -34,7 +34,8 @@
to_str/1,
ip_to_str/1,
update_with_new_config_map/2,
- ejson_to_map/1
+ ejson_to_map/1,
+ get_envs/0
]).
%http://stackoverflow.com/questions/39757020/erlang-drying-up-stringbinary-concatenation
@@ -48,11 +49,14 @@ resolve_cbs(XER, ConsulURL) ->
{IP, Port} = consul_interface:consul_get_service_ip_port(XER, "config_binding_service", ConsulURL),
concat(["http://", IP, ":", integer_to_binary(Port)]).
+get_envs() ->
+ %breaking this out for mock testing of get_platform_envs_and_config()
+ {os:getenv("HOSTNAME"), os:getenv("CONSUL_HOST"), os:getenv("CDAP_CLUSTER_TO_MANAGE")}.
+
get_platform_envs_and_config() ->
%Get platform envs needed for broker operation, then fetch my config.
%If something critical fails, returns [], else [ConsulURL, CDAPUrl, BoundConfigMap]
- MyName = os:getenv("HOSTNAME"),
- ConsulHost = os:getenv("CONSUL_HOST"),
+ {MyName, ConsulHost, CDAPEnvName} = ?MODULE:get_envs(),
case MyName == false orelse ConsulHost == false of
true -> [];
false ->
@@ -69,7 +73,7 @@ get_platform_envs_and_config() ->
%First, we will check for environmnental variables for a cluster *NAME*
%If that is not found, then we will check out bound config for a fully bound URL
%If that is also not found, let it crash baby.
- CDAPURL = case os:getenv("CDAP_CLUSTER_TO_MANAGE") of
+ CDAPURL = case CDAPEnvName of
false ->
list_to_binary(concat(["http://", lists:nth(1, maps:get(<<"cdap_cluster_to_manage">>, BoundConfigMap))])); %cbs returns ip:port. need http:// or will get "no adaptors found" error
CDAPName ->
diff --git a/src/util_tests.erl b/src/util_tests.erl
index 78ca851..1774fb5 100644
--- a/src/util_tests.erl
+++ b/src/util_tests.erl
@@ -25,7 +25,9 @@
ip_to_str/1,
update_with_new_config_map/2,
ejson_to_map/1,
- to_str/1
+ to_str/1,
+ get_platform_envs_and_config/0,
+ resolve_cbs/2
]).
to_str_test() ->
@@ -57,3 +59,59 @@ ejson_to_map_test() ->
?assert(EJ1 /= EJ2), %HERE LIES THE PROBLEM HUDSON
?assert(M1 == M2). %GREAT SUCCESS!
+get_platform_envs_and_config_test() ->
+ try meck:new(util, [passthrough]) catch _:_ -> ok end,
+ try meck:new(consul_interface, [passthrough]) catch _:_ -> ok end,
+
+ %test no envs
+ meck:expect(util, get_envs, fun() -> {"", false, <<"cdap_test">>} end),
+ ?assert(get_platform_envs_and_config() == []),
+
+ %test good case where the env variable is passed
+ %needed monkeypatching
+ FakeConfig = {[
+ {<<"autoderegisterafter">>, <<"10m">>},
+ {<<"bindingttw">>,5},
+ {<<"pipelinehealthlimit">>, 2},
+ {<<"hcinterval">>, <<"60s">>}
+ ]},
+ FakeBrokerName = "cdap_broker_test",
+ FakeConsulName = "myconsuldotcom",
+ meck:expect(util, get_envs, fun() -> {FakeBrokerName, FakeConsulName, <<"cdap_test">>} end),
+ meck:expect(consul_interface, consul_bind_config, fun(_XER, _MyName, _ConsulURL) -> {200, FakeConfig} end),
+ meck:expect(consul_interface, consul_get_service_ip_port, fun(_XER, Appname, _ConsulURL) ->
+ case Appname of
+ <<"cdap_test">> -> {"666.666.666.666", 666}
+ end
+ end),
+
+ ?assert(get_platform_envs_and_config() == [FakeBrokerName, "http://myconsuldotcom:8500", <<"http://666.666.666.666:666">>, jiffy:decode(jiffy:encode(FakeConfig), [return_maps])]),
+
+ %test bad case where env is not passed
+ meck:expect(util, get_envs, fun() -> {FakeBrokerName, FakeConsulName, false} end),
+ FakeConfigwCDAP = {[
+ {<<"autoderegisterafter">>, <<"10m">>},
+ {<<"bindingttw">>,5},
+ {<<"pipelinehealthlimit">>, 2},
+ {<<"hcinterval">>, <<"60s">>},
+ {<<"cdap_cluster_to_manage">>, [<<"666.666.666.666:666">>]}
+ ]},
+ meck:expect(consul_interface, consul_bind_config, fun(_XER, _MyName, _ConsulURL) -> {200, FakeConfigwCDAP} end),
+ ?assert(get_platform_envs_and_config() == [FakeBrokerName, "http://myconsuldotcom:8500", <<"http://666.666.666.666:666">>, jiffy:decode(jiffy:encode(FakeConfigwCDAP), [return_maps])]),
+
+ meck:unload(util),
+ meck:unload(consul_interface).
+
+resolve_cbs_test() ->
+ try meck:new(consul_interface, [passthrough]) catch _:_ -> ok end,
+ meck:expect(consul_interface, consul_get_service_ip_port, fun(_XER, Appname, _ConsulURL) ->
+ case Appname of
+ "config_binding_service" -> {"666.666.666.666", 10000}
+ end
+ end),
+ ?assert(resolve_cbs("", "") == "http://666.666.666.666:10000"),
+ meck:unload(consul_interface).
+
+
+
+