diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cdap_interface.erl | 121 | ||||
-rw-r--r-- | src/cdap_interface_tests.erl | 160 | ||||
-rw-r--r-- | src/cdapbroker.app.src | 2 | ||||
-rw-r--r-- | src/httpabs.erl | 12 | ||||
-rw-r--r-- | src/httpabs_tests.erl | 9 | ||||
-rw-r--r-- | src/util.erl | 12 | ||||
-rw-r--r-- | src/util_tests.erl | 60 |
7 files changed, 290 insertions, 86 deletions
diff --git a/src/cdap_interface.erl b/src/cdap_interface.erl index 7ce3b37..a1b0982 100644 --- a/src/cdap_interface.erl +++ b/src/cdap_interface.erl @@ -6,9 +6,9 @@ % Licensed under the Apache License, Version 2.0 (the "License"); % you may not use this file except in compliance with the License. % You may obtain a copy of the License at -% +% % http://www.apache.org/licenses/LICENSE-2.0 -% +% % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, % WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,12 +19,12 @@ % ECOMP is a trademark and service mark of AT&T Intellectual Property. -module(cdap_interface). --export([get_app_metrics/4, - get_app_healthcheck/4, - push_down_config/5, - form_service_json_from_service_tuple/4, - form_stream_url_from_streamname/3, - exec_programs/6, +-export([get_app_metrics/4, + get_app_healthcheck/4, + push_down_config/5, + form_service_json_from_service_tuple/4, + form_stream_url_from_streamname/3, + exec_programs/6, push_down_program_preferences/5, push_down_app_preferences/5, deploy_app/8, @@ -53,15 +53,15 @@ %%%INTERNAL %%% map_appname(Appname) -> - %CDAP APIs do not allow app names with any special characters. Here we will map the + %CDAP APIs do not allow app names with any special characters. Here we will map the %name to ensure it does not contain special characters using a regex whitelist %see http://stackoverflow.com/questions/3303420/regex-to-remove-all-special-characters-from-string re:replace(Appname, "[^0-9a-zA-Z]+", "", [{return, binary}, global]). -get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL) -> +get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL) -> URL = ?SC([CDAPURL, "/v3/metrics/search?target=metric&tag=namespace:", Namespace, "&tag=app:", map_appname(Appname)]), - {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", ""), - case ReturnCode of + {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", ""), + case ReturnCode of 200 -> [ X || X <- jiffy:decode(RetBody),binary:match(X, <<"user.">>) /= nomatch]; _ -> 504 %must bubble this up end. @@ -71,7 +71,7 @@ get_app_healthcheck_program(XER, Appname, Namespace, CDAPURL, P) -> %helper function: checks for a partocular program from an app URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/", P#program.type, "/", P#program.id]), {RC, _} = httpabs:get(XER, URL), - case RC of + case RC of 200 -> %Next make sure it's status is running {RC2, RB2} = httpabs:get(XER, ?SC([URL, "/status"])), @@ -80,9 +80,9 @@ get_app_healthcheck_program(XER, Appname, Namespace, CDAPURL, P) -> S = jiffy:decode(RB2, [return_maps]), case maps:is_key(<<"status">>, S) andalso maps:get(<<"status">>, S) == <<"RUNNING">> of true -> 200; %return 200 - false -> ?BAD_HEALTH_CODE %return - end; - _ -> ?BAD_HEALTH_CODE + false -> ?BAD_HEALTH_CODE %return + end; + _ -> ?BAD_HEALTH_CODE end; _ -> ?BAD_HEALTH_CODE end. @@ -99,41 +99,41 @@ push_down_program_preference(XER, Appname, Namespace, CDAPURL, {PT, PI, PP}) -> deploy_pipeline_dependency(XER, Namespace, CDAPURL, {ArtExtendsHeader, ArtName, ArtVerHeader, ArtURL, _}) -> %deploys a single dependency - %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple. + %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple. {200, JarBody} = httpabs:get(XER, ArtURL), Headers = [{"Artifact-Extends", ArtExtendsHeader}, {"Artifact-Version", ArtVerHeader}], - URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName]), + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName]), httpabs:post(XER, URL, Headers, "application/octet-stream", JarBody). deploy_pipeline_dependency_property(XER, Namespace, CDAPURL, {_, ArtName, ArtVerHeader, _, UIPropertiesURL}) -> - %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple. - case UIPropertiesURL of + %TODO! I should really be using Erlang records more and not relying on positional arguments so much. For example, I could add a record that represents a Dependency instead of using a positionally-based tuple. + case UIPropertiesURL of none -> {200, ""}; %nothing to do - _ -> + _ -> {200, PropertiesJSON} = httpabs:get(XER, UIPropertiesURL), - URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName, "/versions/", ArtVerHeader, "/properties"]), + URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtName, "/versions/", ArtVerHeader, "/properties"]), httpabs:put(XER, URL, "application/json", PropertiesJSON) end. %%% %%%EXTERNAL %%% -get_app_metrics(XER, Appname, Namespace, CDAPURL) -> +get_app_metrics(XER, Appname, Namespace, CDAPURL) -> %Namespace should be a binary MetricsList = get_metrics_list_for_app(XER, Appname, Namespace, CDAPURL), %assert 200 or bomb - case MetricsList of + case MetricsList of 504 -> {504, []}; %TODO! cdap seems to return a {200, []} for above call even if the app is not deployed. Is that OK? for now return empty list but maybe we should determine this and error with a 404 or something - [] -> {200, []}; + [] -> {200, []}; _ -> URL = ?SC([CDAPURL, "/v3/metrics/query"]), Body = jiffy:encode( - {[{<<"appmetrics">>, + {[{<<"appmetrics">>, {[{<<"tags">>, {[{<<"namespace">>, Namespace}, {<<"app">>, map_appname(Appname)}]}}, {<<"metrics">>, MetricsList}]} }]}), {ReturnCode, RetBody} = httpabs:post(XER, URL, "application/json", Body), %even when app does not exist this seems to return a 200 so assert it! - case ReturnCode of + case ReturnCode of 200 -> {200, jiffy:decode(RetBody)}; 504 -> {504, []} end @@ -145,13 +145,13 @@ get_app_metrics(XER, Appname, Namespace, CDAPURL) -> %So now this information is sourced from the initial PUT request, where those programs are saved as supplemntal state %However, leaving it here because maybe one day it will be useful %%%get_app_programs(Appname, Namespace, CDAPURL) -> -%%% %fetch the list of programs from a running CDAP app. +%%% %fetch the list of programs from a running CDAP app. %%% %Parse this into a list of [{ProgramType, ProgramID}] tuples. -%%% %Used as part of healthcheck, and also as part of undeploy so I don't have to keep Programs in MNesia pipeline. +%%% %Used as part of healthcheck, and also as part of undeploy so I don't have to keep Programs in MNesia pipeline. %%% %get informaation about application %%% URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), %%% {ReturnCode, RetBody} = httpabs:http_get(URL), -%%% case ReturnCode of +%%% case ReturnCode of %%% 504 -> 504; %%% 404 -> 404; %%% 400 -> 400; @@ -164,16 +164,16 @@ get_app_metrics(XER, Appname, Namespace, CDAPURL) -> %%% %All the uppercase ones are: Flow, MapReduce, Service, Spark, Worker, Workflow %%% %From http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#details-of-a-program %%% %All the lowercase ones are: flows, mapreduce, services, spark, workers, or workflows -%%% program_type = case maps:get(<<"type">>, X) of +%%% program_type = case maps:get(<<"type">>, X) of %%% <<"Flow">> -> <<"flows">>; %cdap api fail man %%% <<"Mapreduce">> -> <<"mapreduce">>; %%% <<"Service">> -> <<"services">>; %%% <<"Spark">> -> <<"spark">>; %%% <<"Worker">> -> <<"workers">>; %%% <<"Workflow">> -> <<"workflows">> -%%% end, +%%% end, %%% program_id = maps:get(<<"id">>, X) -%%% } +%%% } %%% end, Programs) %%% end. %%% @@ -197,7 +197,7 @@ push_down_config(XER, Appname, Namespace, CDAPURL, ConfigJson) -> %http://docs.cask.co/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#update-an-application URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/update"]), Body = jiffy:encode( - {[ + {[ {<<"config">>, ConfigJson} ]}), httpabs:post(XER, URL, "application/json", Body). %returns {ReturnCode, ReturnBody} @@ -209,7 +209,7 @@ push_down_program_preferences(XER, Appname, Namespace, CDAPURL, ParsedProgramPre form_service_json_from_service_tuple(Appname, Namespace, CDAPURL, {SN, SE, EM}) -> %transforms {SN, SE, EM} into {url: foo, method: bar} URL = list_to_binary(?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/services/", SN, "/methods/", SE])), - {[{<<"url">>, URL}, + {[{<<"url">>, URL}, {<<"method">>, EM} ]}. @@ -230,17 +230,17 @@ push_down_app_preferences(XER, Appname, Namespace, CDAPURL, AppPreferences) -> deploy_app(XER, Appname, Namespace, CDAPURL, JarBody, ArtifactName, ArtifactVersion, AppConfig) -> %Create Artifact %Deploy App - + %post the artifact, OK if already exists, no check %explicitly set artifact version becausme some JARS do not have it httpabs:post(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/artifacts/", ArtifactName]), [{"Artifact-Version", erlang:binary_to_list(ArtifactVersion)}], "application/octet-stream", JarBody), %deploy the application - PutBody = jiffy:encode( + PutBody = jiffy:encode( {[ {<<"artifact">>, {[ - {<<"name">>, ArtifactName}, - {<<"version">>, ArtifactVersion}, + {<<"name">>, ArtifactName}, + {<<"version">>, ArtifactVersion}, {<<"scope">>, <<"user">>} ]}}, {<<"config">>, AppConfig} @@ -257,41 +257,42 @@ deploy_pipeline(XER, Appname, Namespace, CDAPURL, PipelineJson) -> %Deploy a hydrator pipeline, assumes namespace has already been set up URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), httpabs:put(XER, URL, "application/json", PipelineJson). - -exec_pipeline(XER, Appname, Namespace, CDAPURL, Exec) -> + +exec_pipeline(XER, Appname, Namespace, CDAPURL, Exec) -> %Exec assumed to be: "resume" or "suspend" %TODO! REVISIT WHETHER datapipelineschedule is a parameter URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/schedules/dataPipelineSchedule/", Exec]), - httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body. + httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body. -exec_pipeline_workflow(XER, Appname, Namespace, CDAPURL, Exec) -> - %Exec assumed to be: "stop" or +exec_pipeline_workflow(XER, Appname, Namespace, CDAPURL, Exec) -> + %Exec assumed to be: "stop" or %TODO! REVISIT WHETHER DataPipelineWorkflow is a parameter URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/workflows/DataPipelineWorkflow/", Exec]), - httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body. + httpabs:post(XER, URL, "application/json", ""). %this CDAP API is a POST but there is no body. get_pipeline_healthcheck(XER, Appname, Namespace, CDAPURL, PipelineHealthLimit) -> - URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/schedules/dataPipelineSchedule/status"]), + URLRoot = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), + URL = ?SC([URLRoot, "/schedules/dataPipelineSchedule/status"]), {RC, RB} = httpabs:get(XER, URL), case RC /= 200 of - true -> ?BAD_HEALTH_CODE; %failed to even hit the status. - false -> + true -> ?BAD_HEALTH_CODE; %failed to even hit the status. + false -> Status = jiffy:decode(RB, [return_maps]), - case maps:is_key(<<"status">>, Status) andalso maps:get(<<"status">>, Status) == <<"SCHEDULED">> of + case maps:is_key(<<"status">>, Status) andalso maps:get(<<"status">>, Status) == <<"SCHEDULED">> of false -> ?BAD_HEALTH_CODE; %status is malformed or the pipeline is not scheduled, both not good - true -> + true -> %Next, check the last <LIMIT> number of runs, and report a failure if they were not sucessful, or report of more than one of them is running at the same time. - %This logic came from Terry. + %This logic came from Terry. %His logic is essentially that, if your application is running, but is failing, that should be interpeted as unhealthy and needs investigation. - %His logic was also that if you have two runs running at the same time, that requires investigation as that should not happen. + %His logic was also that if you have two runs running at the same time, that requires investigation as that should not happen. %RE list_to_binary(integer_to_list( see http://stackoverflow.com/questions/4010713/integer-to-binary-erlang L = list_to_binary(integer_to_list(PipelineHealthLimit)), - {RC2, RB2} = httpabs:get(XER, ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/workflows/DataPipelineWorkflow/runs?limit=", L])), - case RC2 /= 200 of + {RC2, RB2} = httpabs:get(XER, ?SC([URLRoot, "/workflows/DataPipelineWorkflow/runs?limit=", L])), + case RC2 /= 200 of true -> RC2; %failed to even hit the status - false -> + false -> LRST = lists:map(fun(S) -> - case maps:is_key(<<"status">>, S) of + case maps:is_key(<<"status">>, S) of false -> critical; true -> case maps:get(<<"status">>, S) of <<"COMPLETED">> -> ok; <<"RUNNING">> -> running; <<"FAILED">> -> critical end end end, jiffy:decode(RB2, [return_maps])), @@ -324,7 +325,7 @@ create_namespace(XER, Namespace, CDAPURL) -> get_app_config(XER, Appname, Namespace, CDAPURL) -> URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname)]), {RC, RB} = httpabs:get(XER, URL), - case RC of + case RC of 200 -> {200, maps:get(<<"configuration">>, jiffy:decode(RB, [return_maps]))}; _ -> {RC, RB} end. @@ -332,16 +333,16 @@ get_app_config(XER, Appname, Namespace, CDAPURL) -> get_app_preferences(XER, Appname, Namespace, CDAPURL) -> URL = ?SC([CDAPURL, "/v3/namespaces/", Namespace, "/apps/", map_appname(Appname), "/preferences"]), {RC, RB} = httpabs:get(XER, URL), - case RC of + case RC of 200 -> {200, jiffy:decode(RB, [return_maps])}; _ -> {RC, RB} end. get_cdap_cluster_version(XER, CDAPURL) -> %CDAP decided to change their port numbers between release 3 and 4. - %The broker works with both. + %The broker works with both. %In order to add the correct GUI information into the broker "info endpoint", I have to know what CDAP version we are connected to. - %The GUI information is a convinence function for component developers that hit the broker via the CLI tool. + %The GUI information is a convinence function for component developers that hit the broker via the CLI tool. URL = ?SC([CDAPURL, "/v3/version"]), {RC, RB} = httpabs:get(XER, URL), case RC of diff --git a/src/cdap_interface_tests.erl b/src/cdap_interface_tests.erl index 37926a6..c1d1e6c 100644 --- a/src/cdap_interface_tests.erl +++ b/src/cdap_interface_tests.erl @@ -6,9 +6,9 @@ % Licensed under the Apache License, Version 2.0 (the "License"); % you may not use this file except in compliance with the License. % You may obtain a copy of the License at -% +% % http://www.apache.org/licenses/LICENSE-2.0 -% +% % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, % WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,14 +21,152 @@ -module(cdap_interface_tests). -include_lib("eunit/include/eunit.hrl"). +-import(cdap_interface, [ + get_cdap_gui_port_from_version/1, + map_appname/1, + get_cdap_cluster_version/2, + form_stream_url_from_streamname/3, + form_service_json_from_service_tuple/4, + get_app_preferences/4, + get_app_config/4, + get_pipeline_healthcheck/5 + ]). + +get_pipeline_healthcheck_test() -> + FakeReturn = jiffy:encode({[{<<"status">>, <<"SCHEDULED">>}]}), + try meck:new(httpabs, [passthrough]) catch _:_ -> ok end, + %notfound + meck:expect(httpabs, get, fun(_XER, URL) -> case URL of + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturn}; + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234appNOTFOUND/schedules/dataPipelineSchedule/status" -> {404, ""} + end end), + ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.appNOTFOUND", "testns", "http://666.666.666.666:666", 666) == 400), + + %bad status + FakeReturnBadStatus = jiffy:encode({[{<<"status">>, <<"nosoupforyou">>}]}), + meck:expect(httpabs, get, fun(_XER, URL) -> case URL of + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturnBadStatus} + end end), + ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666", 666) == 400), + + %good status but no status endpoint + meck:expect(httpabs, get, fun(_XER, URL) -> case URL of + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturn}; + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/workflows/DataPipelineWorkflow/runs?limit=666" -> {404, ""} + end end), + ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666", 666) == 404), + + %good status but a bad malformed inner + MalformedRuns = jiffy:encode([ + {[{<<"nostatus">>, <<"foryou">>}]}, + {[{<<"status">>, <<"RUNNING">>}]} + ]), + meck:expect(httpabs, get, fun(_XER, URL) -> case URL of + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturn}; + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/workflows/DataPipelineWorkflow/runs?limit=666" -> {200, MalformedRuns} + end end), + + ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666", 666) == 400), + + %good status but a not good inner + BadRuns = jiffy:encode([ + {[{<<"status">>, <<"FAILED">>}]}, + {[{<<"status">>, <<"RUNNING">>}]} + ]), + meck:expect(httpabs, get, fun(_XER, URL) -> case URL of + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturn}; + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/workflows/DataPipelineWorkflow/runs?limit=666" -> {200, BadRuns} + end end), + ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666", 666) == 400), + + %two running + TwoRunning = jiffy:encode([ + {[{<<"status">>, <<"RUNNING">>}]}, + {[{<<"status">>, <<"COMPLETED">>}]}, + {[{<<"status">>, <<"RUNNING">>}]} + ]), + meck:expect(httpabs, get, fun(_XER, URL) -> case URL of + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturn}; + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/workflows/DataPipelineWorkflow/runs?limit=666" -> {200, TwoRunning} + end end), + ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666", 666) == 400), + + %all good + %good status but a not good inner + GoodRuns = jiffy:encode([ + {[{<<"status">>, <<"COMPLETED">>}]}, + {[{<<"status">>, <<"RUNNING">>}]}, + {[{<<"status">>, <<"COMPLETED">>}]} + ]), + meck:expect(httpabs, get, fun(_XER, URL) -> case URL of + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/schedules/dataPipelineSchedule/status" -> {200, FakeReturn}; + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/workflows/DataPipelineWorkflow/runs?limit=666" -> {200, GoodRuns} + end end), + ?assert(get_pipeline_healthcheck("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666", 666) == 200), + + meck:unload(httpabs). + + + +get_app_config_test() -> + FakeReturn = jiffy:encode({[{<<"configuration">>, {[{<<"welcome">>, <<"toeternity">>}]}}]}), + try meck:new(httpabs, [passthrough]) catch _:_ -> ok end, + meck:expect(httpabs, get, fun(_XER, URL) -> + case URL of + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app" -> {200, FakeReturn}; + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234appNOTFOUND" -> {404, ""} + end + end), + ?assert(get_app_config("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666") == + {200, #{<<"welcome">> => <<"toeternity">>}}), + ?assert(get_app_config("", "1234%%%%^@#$%@#$%#$^@$.appNOTFOUND", "testns", "http://666.666.666.666:666") == {404, ""}), + meck:unload(httpabs). + +get_app_preferences_test() -> + FakeReturn = jiffy:encode({[{<<"welcome">>, <<"toeternity">>}]}), + try meck:new(httpabs, [passthrough]) catch _:_ -> ok end, + meck:expect(httpabs, get, fun(_XER, URL) -> + case URL of + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234app/preferences" -> {200, FakeReturn}; + "http://666.666.666.666:666/v3/namespaces/testns/apps/1234appNOTFOUND/preferences" -> {404, ""} + end + end), + ?assert(get_app_preferences("", "1234%%%%^@#$%@#$%#$^@$.app", "testns", "http://666.666.666.666:666") == + {200, #{<<"welcome">> => <<"toeternity">>}}), + ?assert(get_app_preferences("", "1234%%%%^@#$%@#$%#$^@$.appNOTFOUND", "testns", "http://666.666.666.666:666") == {404, ""}), + meck:unload(httpabs). + +form_service_json_from_service_tuple_test() -> + ?assert(form_service_json_from_service_tuple(<<"amazin@)#$%@#)$%gapp">>, "amazingns", "http://666.666.666.666:666", {"seeme", "feelme", "PUT"}) + == {[{<<"url">> ,<<"http://666.666.666.666:666/v3/namespaces/amazingns/apps/amazingapp/services/seeme/methods/feelme">>}, + {<<"method">>, "PUT"}]}). + +form_stream_url_from_streamname_test() -> + ?assert(form_stream_url_from_streamname("666.666.666.666:666", "thevoid", "souls") == <<"666.666.666.666:666/v3/namespaces/thevoid/streams/souls">>). + get_cdap_gui_port_from_version_test() -> - ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.0.0")), - ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.10.0")), - ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.0.10")), - ?assert(9999 == cdap_interface:get_cdap_gui_port_from_version("3.10.10")), - ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.0.0">>)), - ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.10.0">>)), - ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.0.10">>)), - ?assert(11011 == cdap_interface:get_cdap_gui_port_from_version(<<"4.10.10">>)), - ?assert(<<"UNKNOWN CDAP VERSION">> == cdap_interface:get_cdap_gui_port_from_version("5.0.0")). + ?assert(9999 == get_cdap_gui_port_from_version("3.0.0")), + ?assert(9999 == get_cdap_gui_port_from_version("3.10.0")), + ?assert(9999 == get_cdap_gui_port_from_version("3.0.10")), + ?assert(9999 == get_cdap_gui_port_from_version("3.10.10")), + ?assert(11011 == get_cdap_gui_port_from_version(<<"4.0.0">>)), + ?assert(11011 == get_cdap_gui_port_from_version(<<"4.10.0">>)), + ?assert(11011 == get_cdap_gui_port_from_version(<<"4.0.10">>)), + ?assert(11011 == get_cdap_gui_port_from_version(<<"4.10.10">>)), + ?assert(<<"UNKNOWN CDAP VERSION">> == get_cdap_gui_port_from_version("5.0.0")). + +map_appname_test() -> + ?assert(map_appname(<<"foo">>) == <<"foo">>), + ?assert(map_appname(<<"fo.o">>) == <<"foo">>), + ?assert(map_appname(<<"f_oo">>) == <<"foo">>), + ?assert(map_appname(<<"._*#$%#*%$$#@#foo">>) == <<"foo">>). + +get_cdap_cluster_version_test() -> + FakeReturn = jiffy:encode({[{<<"version">>, <<"4.0.3">>}]}), + try meck:new(httpabs, [passthrough]) catch _:_ -> ok end, + meck:expect(httpabs, get, fun(_XER, _URL) -> {200, FakeReturn} end), + ?assert(get_cdap_cluster_version("","") == <<"4.0.3">>), + meck:expect(httpabs, get, fun(_XER, _URL) -> {404, ""} end), + ?assert(get_cdap_cluster_version("","") == <<"UNKNOWN CDAP VERSION">>), + meck:unload(httpabs). diff --git a/src/cdapbroker.app.src b/src/cdapbroker.app.src index eb39868..f5a1821 100644 --- a/src/cdapbroker.app.src +++ b/src/cdapbroker.app.src @@ -1,6 +1,6 @@ {application, cdapbroker, [{description, "Interface between Consul and CDAP in DCAE"}, - {vsn, "4.0.7"}, + {vsn, "4.0.8"}, {registered, []}, {mod, { cdapbroker_app, []}}, {applications, diff --git a/src/httpabs.erl b/src/httpabs.erl index 152621e..4a29f83 100644 --- a/src/httpabs.erl +++ b/src/httpabs.erl @@ -86,9 +86,8 @@ sanitize(URL) -> -spec post(string(), string()|binary(), string(), any()) -> httpstat(). post(XER, URL, ContentType, Body) -> %post that sends the XER, no headers signature - Headers = [{"x-ecomp-requestid", XER}], U = sanitize(URL), - parse_response(httpc:request(post, {U, Headers, ContentType, Body}, [],[]), U). + parse_response(httpc:request(post, {U, [{"x-ecomp-requestid", XER}], ContentType, Body}, [],[]), U). -spec post(string(), string()|binary(), list(), string(), any()) -> httpstat(). post(XER, URL, Headers, ContentType, Body) -> @@ -100,19 +99,16 @@ post(XER, URL, Headers, ContentType, Body) -> get(XER, URL) -> %http get that always sends the XER.. even if the server doesn't want it; maybe this will blow up on me one day. U = sanitize(URL), - Headers = [{"x-ecomp-requestid", XER}], - parse_response(httpc:request(get, {U, Headers}, [], []), U). + parse_response(httpc:request(get, {U, [{"x-ecomp-requestid", XER}]}, [], []), U). -spec put(string(), string()|binary(), string(), any()) -> httpstat(). put(XER, URL, ContentType, Body) -> %http put that always sends the XER U = sanitize(URL), - Headers = [{"x-ecomp-requestid", XER}], - parse_response(httpc:request(put, {U, Headers, ContentType, Body}, [],[]), U). + parse_response(httpc:request(put, {U, [{"x-ecomp-requestid", XER}], ContentType, Body}, [],[]), U). -spec delete(string(), string()|binary()) -> httpstat(). delete(XER, URL) -> %http delete that always sends the XER U = sanitize(URL), - Headers = [{"x-ecomp-requestid", XER}], - parse_response(httpc:request(delete, {U, Headers}, [],[]), U). + parse_response(httpc:request(delete, {U, [{"x-ecomp-requestid", XER}]}, [],[]), U). diff --git a/src/httpabs_tests.erl b/src/httpabs_tests.erl index 27b7116..14dc7e6 100644 --- a/src/httpabs_tests.erl +++ b/src/httpabs_tests.erl @@ -41,5 +41,12 @@ parse_response_test() -> BadBody = {[{<<"config">>, ReconfigMap}]}, ?assert(httpabs:put("testxer", "www.foo.com", "application/json", BadBody) == {400,"ERROR: The request Body is malformed"}), ?assert(parse_response({error,{bad_body_generator, BadBody}}, "www.foo.com") == {400,"ERROR: The request Body is malformed"}), + ?assert(parse_response({error,{bad_body, BadBody}}, "www.foo.com") == {400,"ERROR: The request Body is malformed"}), + ?assert(parse_response({error,{this_was, "not expected"}}, "www.fubar.com") == {504,<<"ERROR: The following URL is unreachable or the request was unable to be parsed due to an unknown error: www.fubar.com">>}), + + %try a 200 + ?assert(parse_response({ok,{{"HTTP/1.1",200,"OK"},[{"cache-control","private, max-age=0"},{"date","Mon, 11 Sep 2017 15:05:11 GMT"},{"accept-ranges","none"},{"server","gws"},{"vary","Accept-Encoding"},{"content-length","46376"},{"content-type","text/html; charset=ISO-8859-1"},{"expires","-1"},{"p3p","..."},{"x-xss-protection","1; mode=block"},{"x-frame-options","SAMEORIGIN"},{"set-cookie","NID=111=nGQHl8ljJ3nXHmGmIZmGaTgoq3WAdbgWaxAUOQJm-0AaOkS64iiXtm-HojIFSpqowj7Nr-KpqS8o-oDOROq-4AaDs0J4M92V7yBOAJQYPkuK7wtVav0BhpOOYgCHysUN; expires=Tue, 13-Mar-2018 15:05:11 GMT; path=/; domain=.google.com; HttpOnly"},{"alt-svc","quic=\":443\"; ma=2592000; v=\"39,38,37,35\""}],"<!doctype html>..."}}, "https://google.com") == {200, "<!doctype html>..."}), + + % try a 404 + ?assert(parse_response({ok,{{"HTTP/1.1",404,"OK"},[{"cache-control","private, max-age=0"},{"date","Mon, 11 Sep 2017 15:05:11 GMT"},{"accept-ranges","none"},{"server","gws"},{"vary","Accept-Encoding"},{"content-length","46376"},{"content-type","text/html; charset=ISO-8859-1"},{"expires","-1"},{"p3p","..."},{"x-xss-protection","1; mode=block"},{"x-frame-options","SAMEORIGIN"},{"set-cookie","NID=111=nGQHl8ljJ3nXHmGmIZmGaTgoq3WAdbgWaxAUOQJm-0AaOkS64iiXtm-HojIFSpqowj7Nr-KpqS8o-oDOROq-4AaDs0J4M92V7yBOAJQYPkuK7wtVav0BhpOOYgCHysUN; expires=Tue, 13-Mar-2018 15:05:11 GMT; path=/; domain=.google.com; HttpOnly"},{"alt-svc","quic=\":443\"; ma=2592000; v=\"39,38,37,35\""}],""}}, "https://google.com") == {404,"State: OK. Return Body: "}). - ?assert(parse_response({ok,{{"HTTP/1.1",200,"OK"},[{"cache-control","private, max-age=0"},{"date","Mon, 11 Sep 2017 15:05:11 GMT"},{"accept-ranges","none"},{"server","gws"},{"vary","Accept-Encoding"},{"content-length","46376"},{"content-type","text/html; charset=ISO-8859-1"},{"expires","-1"},{"p3p","..."},{"x-xss-protection","1; mode=block"},{"x-frame-options","SAMEORIGIN"},{"set-cookie","NID=111=nGQHl8ljJ3nXHmGmIZmGaTgoq3WAdbgWaxAUOQJm-0AaOkS64iiXtm-HojIFSpqowj7Nr-KpqS8o-oDOROq-4AaDs0J4M92V7yBOAJQYPkuK7wtVav0BhpOOYgCHysUN; expires=Tue, 13-Mar-2018 15:05:11 GMT; path=/; domain=.google.com; HttpOnly"},{"alt-svc","quic=\":443\"; ma=2592000; v=\"39,38,37,35\""}],"<!doctype html>..."}}, "https://google.com") == {200, "<!doctype html>..."}). diff --git a/src/util.erl b/src/util.erl index 520b071..76f8bbf 100644 --- a/src/util.erl +++ b/src/util.erl @@ -34,7 +34,8 @@ to_str/1, ip_to_str/1, update_with_new_config_map/2, - ejson_to_map/1 + ejson_to_map/1, + get_envs/0 ]). %http://stackoverflow.com/questions/39757020/erlang-drying-up-stringbinary-concatenation @@ -48,11 +49,14 @@ resolve_cbs(XER, ConsulURL) -> {IP, Port} = consul_interface:consul_get_service_ip_port(XER, "config_binding_service", ConsulURL), concat(["http://", IP, ":", integer_to_binary(Port)]). +get_envs() -> + %breaking this out for mock testing of get_platform_envs_and_config() + {os:getenv("HOSTNAME"), os:getenv("CONSUL_HOST"), os:getenv("CDAP_CLUSTER_TO_MANAGE")}. + get_platform_envs_and_config() -> %Get platform envs needed for broker operation, then fetch my config. %If something critical fails, returns [], else [ConsulURL, CDAPUrl, BoundConfigMap] - MyName = os:getenv("HOSTNAME"), - ConsulHost = os:getenv("CONSUL_HOST"), + {MyName, ConsulHost, CDAPEnvName} = ?MODULE:get_envs(), case MyName == false orelse ConsulHost == false of true -> []; false -> @@ -69,7 +73,7 @@ get_platform_envs_and_config() -> %First, we will check for environmnental variables for a cluster *NAME* %If that is not found, then we will check out bound config for a fully bound URL %If that is also not found, let it crash baby. - CDAPURL = case os:getenv("CDAP_CLUSTER_TO_MANAGE") of + CDAPURL = case CDAPEnvName of false -> list_to_binary(concat(["http://", lists:nth(1, maps:get(<<"cdap_cluster_to_manage">>, BoundConfigMap))])); %cbs returns ip:port. need http:// or will get "no adaptors found" error CDAPName -> diff --git a/src/util_tests.erl b/src/util_tests.erl index 78ca851..1774fb5 100644 --- a/src/util_tests.erl +++ b/src/util_tests.erl @@ -25,7 +25,9 @@ ip_to_str/1, update_with_new_config_map/2, ejson_to_map/1, - to_str/1 + to_str/1, + get_platform_envs_and_config/0, + resolve_cbs/2 ]). to_str_test() -> @@ -57,3 +59,59 @@ ejson_to_map_test() -> ?assert(EJ1 /= EJ2), %HERE LIES THE PROBLEM HUDSON ?assert(M1 == M2). %GREAT SUCCESS! +get_platform_envs_and_config_test() -> + try meck:new(util, [passthrough]) catch _:_ -> ok end, + try meck:new(consul_interface, [passthrough]) catch _:_ -> ok end, + + %test no envs + meck:expect(util, get_envs, fun() -> {"", false, <<"cdap_test">>} end), + ?assert(get_platform_envs_and_config() == []), + + %test good case where the env variable is passed + %needed monkeypatching + FakeConfig = {[ + {<<"autoderegisterafter">>, <<"10m">>}, + {<<"bindingttw">>,5}, + {<<"pipelinehealthlimit">>, 2}, + {<<"hcinterval">>, <<"60s">>} + ]}, + FakeBrokerName = "cdap_broker_test", + FakeConsulName = "myconsuldotcom", + meck:expect(util, get_envs, fun() -> {FakeBrokerName, FakeConsulName, <<"cdap_test">>} end), + meck:expect(consul_interface, consul_bind_config, fun(_XER, _MyName, _ConsulURL) -> {200, FakeConfig} end), + meck:expect(consul_interface, consul_get_service_ip_port, fun(_XER, Appname, _ConsulURL) -> + case Appname of + <<"cdap_test">> -> {"666.666.666.666", 666} + end + end), + + ?assert(get_platform_envs_and_config() == [FakeBrokerName, "http://myconsuldotcom:8500", <<"http://666.666.666.666:666">>, jiffy:decode(jiffy:encode(FakeConfig), [return_maps])]), + + %test bad case where env is not passed + meck:expect(util, get_envs, fun() -> {FakeBrokerName, FakeConsulName, false} end), + FakeConfigwCDAP = {[ + {<<"autoderegisterafter">>, <<"10m">>}, + {<<"bindingttw">>,5}, + {<<"pipelinehealthlimit">>, 2}, + {<<"hcinterval">>, <<"60s">>}, + {<<"cdap_cluster_to_manage">>, [<<"666.666.666.666:666">>]} + ]}, + meck:expect(consul_interface, consul_bind_config, fun(_XER, _MyName, _ConsulURL) -> {200, FakeConfigwCDAP} end), + ?assert(get_platform_envs_and_config() == [FakeBrokerName, "http://myconsuldotcom:8500", <<"http://666.666.666.666:666">>, jiffy:decode(jiffy:encode(FakeConfigwCDAP), [return_maps])]), + + meck:unload(util), + meck:unload(consul_interface). + +resolve_cbs_test() -> + try meck:new(consul_interface, [passthrough]) catch _:_ -> ok end, + meck:expect(consul_interface, consul_get_service_ip_port, fun(_XER, Appname, _ConsulURL) -> + case Appname of + "config_binding_service" -> {"666.666.666.666", 10000} + end + end), + ?assert(resolve_cbs("", "") == "http://666.666.666.666:10000"), + meck:unload(consul_interface). + + + + |