diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cdapbroker.app.src | 2 | ||||
-rw-r--r-- | src/httpabs.erl | 28 | ||||
-rw-r--r-- | src/httpabs_tests.erl | 18 | ||||
-rw-r--r-- | src/resource_handler.erl | 231 | ||||
-rw-r--r-- | src/resource_handler_tests.erl | 63 | ||||
-rw-r--r-- | src/util.erl | 54 | ||||
-rw-r--r-- | src/util_tests.erl | 14 |
7 files changed, 223 insertions, 187 deletions
diff --git a/src/cdapbroker.app.src b/src/cdapbroker.app.src index b95d534..611d873 100644 --- a/src/cdapbroker.app.src +++ b/src/cdapbroker.app.src @@ -1,6 +1,6 @@ {application, cdapbroker, [{description, "Interface between Consul and CDAP in DCAE"}, - {vsn, "4.0.5"}, + {vsn, "4.0.6"}, {registered, []}, {mod, { cdapbroker_app, []}}, {applications, diff --git a/src/httpabs.erl b/src/httpabs.erl index bc9e068..152621e 100644 --- a/src/httpabs.erl +++ b/src/httpabs.erl @@ -6,9 +6,9 @@ % Licensed under the Apache License, Version 2.0 (the "License"); % you may not use this file except in compliance with the License. % You may obtain a copy of the License at -% +% % http://www.apache.org/licenses/LICENSE-2.0 -% +% % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, % WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,7 +21,7 @@ -module(httpabs). -export([get/2, post/4, %I miss python's default arguments.. - post/5, + post/5, put/4, delete/2 ]). @@ -29,10 +29,10 @@ -define(SC(L), util:concat(L)). %NOTE -%Consider the Erlang statement: +%Consider the Erlang statement: % %{ok, {{"HTTP/1.1",ReturnCode, State}, Head, Body}} = httpc:get(URL). -%CDAP returns error messages in the “Body” field above. +%CDAP returns error messages in the “Body” field above. % %However, Consul: %1) Always (in all HTTP failures I’ve tested) returns Body == “500\n” @@ -51,21 +51,21 @@ %%% -spec parse_response({error|ok, any()}, string()) -> httpstat(). parse_response({Status, Response}, URL) -> - case Status of - error -> + case Status of + error -> lager:error("httpc error: cannot hit: ~s", [URL]), case Response of no_scheme -> {400, io_lib:format("ERROR: The following URL is malformed: ~s", [URL])}; {bad_body, _} -> {400, "ERROR: The request Body is malformed"}; {bad_body_generator,_} -> {400, "ERROR: The request Body is malformed"}; _ -> - lager:error(io_lib:format("Unexpected ERROR hitting ~s", [URL])), + lager:error(io_lib:format("Unexpected ERROR hitting ~s", [URL])), {504, list_to_binary(io_lib:format("ERROR: The following URL is unreachable or the request was unable to be parsed due to an unknown error: ~s", [URL]))} %Are there other reasons other than bad body and unreachable that crash request? (Sneak peak: the answer is probably) end; - ok -> + ok -> {{_, ReturnCode, State}, _Head, Body} = Response, - case ReturnCode of - 200 -> + case ReturnCode of + 200 -> {ReturnCode, Body}; _ -> lager:error("Error While hitting ~s, Non-200 status code returned. HTTP Code ~p, State ~s, ResponseBody ~s:", [URL, ReturnCode, State, Body]), @@ -77,14 +77,14 @@ parse_response({Status, Response}, URL) -> sanitize(URL) -> %allow URL to look like "www.foo.com" or <<"www.foo.com">>, trim it - case is_binary(URL) of + case is_binary(URL) of true -> string:strip(binary_to_list(URL)); false -> string:strip(URL) end. %anywhere you see any() is essentially lazy typing.. fix these someday when time is abundant -spec post(string(), string()|binary(), string(), any()) -> httpstat(). -post(XER, URL, ContentType, Body) -> +post(XER, URL, ContentType, Body) -> %post that sends the XER, no headers signature Headers = [{"x-ecomp-requestid", XER}], U = sanitize(URL), @@ -97,7 +97,7 @@ post(XER, URL, Headers, ContentType, Body) -> parse_response(httpc:request(post, {U, [{"x-ecomp-requestid", XER} | Headers], ContentType, Body}, [],[]), U). -spec get(string(), string()|binary()) -> httpstat(). -get(XER, URL) -> +get(XER, URL) -> %http get that always sends the XER.. even if the server doesn't want it; maybe this will blow up on me one day. U = sanitize(URL), Headers = [{"x-ecomp-requestid", XER}], diff --git a/src/httpabs_tests.erl b/src/httpabs_tests.erl index d8ad529..27b7116 100644 --- a/src/httpabs_tests.erl +++ b/src/httpabs_tests.erl @@ -6,9 +6,9 @@ % Licensed under the Apache License, Version 2.0 (the "License"); % you may not use this file except in compliance with the License. % You may obtain a copy of the License at -% +% % http://www.apache.org/licenses/LICENSE-2.0 -% +% % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, % WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,7 +21,8 @@ -module(httpabs_tests). -include_lib("eunit/include/eunit.hrl"). -import(httpabs, [ - sanitize/1 + sanitize/1, + parse_response/2 ] ). @@ -30,4 +31,15 @@ sanitize_test() -> ?assert(sanitize(" www.foo.com ") == "www.foo.com"), ?assert(sanitize(<<"www.foo.com">>) == "www.foo.com"). +parse_response_test() -> + NoURL = "THIS IS NOT EVEN A URL WHAT ARE YOU DOING TO ME", + ?assert(parse_response({error,no_scheme},"THIS IS NOT EVEN A URL WHAT ARE YOU DOING TO ME") == {400, io_lib:format("ERROR: The following URL is malformed: ~s", [NoURL])}), + ?assert(httpabs:put("testxer", NoURL, "application/json", jiffy:encode({[{<<"a">>, <<"b">>}]})) == {400, io_lib:format("ERROR: The following URL is malformed: ~s", [NoURL])}), + + %test httpabs bad body (not encoded as JSON) + ReconfigMap = util:ejson_to_map({[{<<"foo">>, <<"bar">>}]}), + BadBody = {[{<<"config">>, ReconfigMap}]}, + ?assert(httpabs:put("testxer", "www.foo.com", "application/json", BadBody) == {400,"ERROR: The request Body is malformed"}), + ?assert(parse_response({error,{bad_body_generator, BadBody}}, "www.foo.com") == {400,"ERROR: The request Body is malformed"}), + ?assert(parse_response({ok,{{"HTTP/1.1",200,"OK"},[{"cache-control","private, max-age=0"},{"date","Mon, 11 Sep 2017 15:05:11 GMT"},{"accept-ranges","none"},{"server","gws"},{"vary","Accept-Encoding"},{"content-length","46376"},{"content-type","text/html; charset=ISO-8859-1"},{"expires","-1"},{"p3p","..."},{"x-xss-protection","1; mode=block"},{"x-frame-options","SAMEORIGIN"},{"set-cookie","NID=111=nGQHl8ljJ3nXHmGmIZmGaTgoq3WAdbgWaxAUOQJm-0AaOkS64iiXtm-HojIFSpqowj7Nr-KpqS8o-oDOROq-4AaDs0J4M92V7yBOAJQYPkuK7wtVav0BhpOOYgCHysUN; expires=Tue, 13-Mar-2018 15:05:11 GMT; path=/; domain=.google.com; HttpOnly"},{"alt-svc","quic=\":443\"; ma=2592000; v=\"39,38,37,35\""}],"<!doctype html>..."}}, "https://google.com") == {200, "<!doctype html>..."}). diff --git a/src/resource_handler.erl b/src/resource_handler.erl index 8c1b343..798f457 100644 --- a/src/resource_handler.erl +++ b/src/resource_handler.erl @@ -7,6 +7,8 @@ -export([get/3, put/3, delete/3, post/3]). -export([cross_domains/3]). +-export([appname_to_field_vals/2]). + %%for keeping state %%The application record is defined in application.hrl %%In Mnesia, the first element is the type of record and the second element is the key @@ -163,88 +165,6 @@ delete_app_helper(Appname, State, XER, Req) -> end end. -%%%CALLBACKS %%% -init(_Route, _Req, State) -> - {ok, State}. -terminate(_Reason, _Route, _Req, _State) -> - ok. -%%%FOR Cors support -%%%Note! only matches on host. Does not handle ports. See: https://github.com/s1n4/leptus/issues/55 -cross_domains(_Route, _Req, State) -> - {['_'], State}. - -%%%GET Methods -get("/", Req, State) -> - %The broker's "info" endpoint; returns some possibly useful information - {Bts, XER} = init_api_call(Req), - Apps = util:get_all_appnames_from_db(), - {UT, _} = statistics(wall_clock), - CDAPVer = cdap_interface:get_cdap_cluster_version(XER, ?CDAPURL), - RB = {[ - {<<"cdap cluster version">>, CDAPVer}, - {<<"managed cdap url">>, ?CDAPURL}, - {<<"cdap GUI port">>, cdap_interface:get_cdap_gui_port_from_version(CDAPVer)}, - {<<"number of applications registered">>, length(Apps)}, - {<<"uptime (s)">>, UT/1000}, - {<<"broker API version">>, util:get_my_version()} - ]}, - {RCode, RBody, RState} = {200, {json, RB}, State}, - ?AUDI(Req, Bts, XER, Rcode), - {RCode, RBody, RState}; -get("/application", Req, State) -> - %get a list of all registered apps - {Bts, XER} = init_api_call(Req), - {RCode, RBody, RState} = {200, {json, util:get_all_appnames_from_db()}, State}, - ?AUDI(Req, Bts, XER, Rcode), - {RCode, RBody, RState}; -get("/application/:appname", Req, State) -> - %get information about a registered application - {Bts, XER} = init_api_call(Req), - Appname = leptus_req:param(Req, appname), - {RCode, RBody, RState} = appname_to_application_http(XER, Appname, State), - ?AUDI(Req, Bts, XER, Rcode), - {RCode, RBody, RState}; -get("/application/:appname/metrics", Req, State) -> - %get metrics for a registered application - {Bts, XER} = init_api_call(Req), - Appname = leptus_req:param(Req, appname), - {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of - none -> {404, "", State}; - [<<"program-flowlet">>, Namespace] -> - {ReturnCode, ReturnBody} = cdap_interface:get_app_metrics(XER, Appname, Namespace, ?CDAPURL), %warning, see note in README, this always reutrns 200 - {ReturnCode, {json, ReturnBody}, State}; - [<<"hydrator-pipeline">>, Namespace] -> - lager:warning("WARNING, metrics not actually implemented yet for pipelines!!"), - {ReturnCode, ReturnBody} = cdap_interface:get_pipeline_metrics(Appname, Namespace, ?CDAPURL), - {ReturnCode, {json, ReturnBody}, State} - end, - ?AUDI(Req, Bts, XER, Rcode), - {RCode, RBody, RState}; -get("/application/:appname/healthcheck", Req, State) -> - %get healthcheck of an application - {Bts, XER} = init_api_call(Req), - Appname = leptus_req:param(Req, appname), - lager:info(io_lib:format("Get Healthcheck recieved for ~s", [Appname])), - {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of - none -> {404, "", State}; - [<<"program-flowlet">>, Namespace] -> - {cdap_interface:get_app_healthcheck(XER, Appname, Namespace, ?CDAPURL), "", State}; - [<<"hydrator-pipeline">>, Namespace] -> - {cdap_interface:get_pipeline_healthcheck(XER, Appname, Namespace, ?CDAPURL, ?PipelineHealthLimit), "", State} - end, - ?AUDI(Req, Bts, XER, Rcode), - {RCode, RBody, RState}. - -%%%DELETE Methods -delete("/application/:appname", Req, State) -> - %Uninstall and delete a CDAP app - {Bts, XER} = init_api_call(Req), - Appname = leptus_req:param(Req, appname), - {RCode, RBody, RState} = delete_app_helper(Appname, State, XER, Req), - ?AUDI(Req, Bts, XER, Rcode), - {RCode, RBody, RState}. - -%%%PUT Methods parse_put_body(B) -> %parse the PUT body to application try @@ -274,7 +194,7 @@ parse_put_body(B) -> maps:get(<<"program_id">>, P), maps:get(<<"program_pref">>, P)} end, maps:get(<<"program_preferences">>, Body)), - {pf, <<"program-flowlet">>, {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences}}; + {<<"program-flowlet">>, {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences}}; <<"hydrator-pipeline">> -> PipelineConfigJsonURL = maps:get(<<"pipeline_config_json_url">>, Body), @@ -295,7 +215,7 @@ parse_put_body(B) -> } end, D); false -> [] %normalize optional user input into []; just prevents user from having to explicitly pass in [] end, - {hp, <<"hydrator-pipeline">>, {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies}} + {<<"hydrator-pipeline">>, {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies}} end end catch _:_ -> invalid @@ -315,12 +235,12 @@ parse_reconfiguration_put_body(Body) -> catch _:_ -> invalid end. -handle_reconfigure_put(Req, State, XER, Appname, ReqBody, AppnameToNS) -> +handle_reconfigure_put(Req, State, XER, Appname, ReqBody) -> %handle the reconfiguration put. broker out from the http call, and takes the lookup func as an arg, to allow for better unit testing. %this is still not a pure function due to the workflows call, still needs enhancement - case AppnameToNS(Appname) of + case ?MODULE:appname_to_field_vals(Appname, [<<"namespace">>]) of none -> {404, "Reconfigure recieved but the app is not registered", State}; - Namespace -> + [Namespace] -> ParsedBody = parse_reconfiguration_put_body(ReqBody), case ParsedBody of invalid -> {400, "Invalid PUT Reconfigure Body", State}; @@ -351,37 +271,27 @@ handle_reconfigure_put(Req, State, XER, Appname, ReqBody, AppnameToNS) -> end end. -put("/application/:appname", Req, State) -> - %create a new registration; deploys and starts a cdap application - {Bts, XER} = init_api_call(Req), - Appname = leptus_req:param(Req, appname), - {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"appname">>]) of - [Appname] -> - {400, "Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first", State}; - none -> %no matches, create the resource, return the application record +handle_put(Req, State, XER, Appname, ReqBody, RequestUrl) -> + %use of ?MODULE here is due to the meck limitation described here: https://github.com/eproxus/meck + case ?MODULE:appname_to_field_vals(Appname, [<<"appname">>]) == none of + false -> {400, "Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first", State}; + true -> %Initial put requires the put body parameters - case parse_put_body(leptus_req:body_raw(Req)) of - %could not parse the body - invalid -> {400, "Invalid PUT Body or unparseable URL", State}; - - %unsupported cdap application type - unsupported -> {404, "Unsupported CDAP Application Type", State}; - - {Type, AppType, Params} -> + ParsedBody = parse_put_body(ReqBody), + case ParsedBody of + invalid -> {400, "Invalid PUT Body or unparseable URL", State}; %could not parse the body + unsupported -> {400, "Unsupported CDAP Application Type", State}; %unsupported cdap application type + {AppType, Params} -> %form shared info - %hateaos cuz they aintaos - {RequestUrl,_} = cowboy_req:url((leptus_req:get_req(Req))), Metricsurl = <<RequestUrl/binary, <<"/metrics">>/binary>>, Healthcheckurl = <<RequestUrl/binary, <<"/healthcheck">>/binary>>, try - case Type of - hp -> + case AppType of + <<"hydrator-pipeline">> -> {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies} = Params, ConnectionURL = cdap_interface:form_stream_url_from_streamname(?CDAPURL, Namespace, Streamname), - - %TODO: This! - ServiceEndpoints = [], %unclear if this is possible with pipelines + ServiceEndpoints = [], %TODO: unclear if this is possible with pipelines %write into mnesia, deploy A = #application{appname = Appname, apptype = AppType, namespace = Namespace, healthcheckurl = Healthcheckurl, metricsurl = Metricsurl, url = RequestUrl, connectionurl = ConnectionURL, serviceendpoints = ServiceEndpoints, creationtime=erlang:system_time()}, @@ -389,7 +299,7 @@ put("/application/:appname", Req, State) -> ok = workflows:deploy_hydrator_pipeline(Req, XER, Appname, Namespace, ?CDAPURL, PipelineConfigJsonURL, ParsedDependencies, ?CONSURL, RequestUrl, Healthcheckurl, ?HCInterval, ?AutoDeregisterAfter), metrics(info, Req, [{bts, iso()}, {xer, XER}, {mod, mod()}, {msg, io_lib:format("New Hydrator Application Created: ~p", [lager:pr(A, ?MODULE)])}]), %see Record Pretty Printing: https://github.com/basho/lager ok; - pf -> + <<"program-flowlet">> -> {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences} = Params, %Form URLs that are part of the record %NOTE: These are both String concatenation functions and neither make an HTTP call so not catching normal {Code, Status} return here @@ -420,7 +330,95 @@ put("/application/:appname", Req, State) -> {500, "Please report this error", State} end end - end, + end. + +%%% HTTP API CALLBACKS %%% +init(_Route, _Req, State) -> + {ok, State}. +terminate(_Reason, _Route, _Req, _State) -> + ok. +%%%FOR Cors support +%%%Note! only matches on host. Does not handle ports. See: https://github.com/s1n4/leptus/issues/55 +cross_domains(_Route, _Req, State) -> + {['_'], State}. + +%%%GET Methods +get("/", Req, State) -> + %The broker's "info" endpoint; returns some possibly useful information + {Bts, XER} = init_api_call(Req), + Apps = util:get_all_appnames_from_db(), + {UT, _} = statistics(wall_clock), + CDAPVer = cdap_interface:get_cdap_cluster_version(XER, ?CDAPURL), + RB = {[ + {<<"cdap cluster version">>, CDAPVer}, + {<<"managed cdap url">>, ?CDAPURL}, + {<<"cdap GUI port">>, cdap_interface:get_cdap_gui_port_from_version(CDAPVer)}, + {<<"number of applications registered">>, length(Apps)}, + {<<"uptime (s)">>, UT/1000}, + {<<"broker API version">>, util:get_my_version()} + ]}, + {RCode, RBody, RState} = {200, {json, RB}, State}, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +get("/application", Req, State) -> + %get a list of all registered apps + {Bts, XER} = init_api_call(Req), + {RCode, RBody, RState} = {200, {json, util:get_all_appnames_from_db()}, State}, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +get("/application/:appname", Req, State) -> + %get information about a registered application + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + {RCode, RBody, RState} = appname_to_application_http(XER, Appname, State), + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +get("/application/:appname/metrics", Req, State) -> + %get metrics for a registered application + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of + none -> {404, "", State}; + [<<"program-flowlet">>, Namespace] -> + {ReturnCode, ReturnBody} = cdap_interface:get_app_metrics(XER, Appname, Namespace, ?CDAPURL), %warning, see note in README, this always reutrns 200 + {ReturnCode, {json, ReturnBody}, State}; + [<<"hydrator-pipeline">>, Namespace] -> + lager:warning("WARNING, metrics not actually implemented yet for pipelines!!"), + {ReturnCode, ReturnBody} = cdap_interface:get_pipeline_metrics(Appname, Namespace, ?CDAPURL), + {ReturnCode, {json, ReturnBody}, State} + end, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +get("/application/:appname/healthcheck", Req, State) -> + %get healthcheck of an application + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + lager:info(io_lib:format("Get Healthcheck recieved for ~s", [Appname])), + {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of + none -> {404, "", State}; + [<<"program-flowlet">>, Namespace] -> + {cdap_interface:get_app_healthcheck(XER, Appname, Namespace, ?CDAPURL), "", State}; + [<<"hydrator-pipeline">>, Namespace] -> + {cdap_interface:get_pipeline_healthcheck(XER, Appname, Namespace, ?CDAPURL, ?PipelineHealthLimit), "", State} + end, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}. +%%%DELETE Methods +delete("/application/:appname", Req, State) -> + %Uninstall and delete a CDAP app + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + {RCode, RBody, RState} = delete_app_helper(Appname, State, XER, Req), + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}. +%%%PUT Methods +put("/application/:appname", Req, State) -> + %create a new registration; deploys and starts a cdap application + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + ReqBody = leptus_req:body_raw(Req), + {RequestUrl,_} = cowboy_req:url((leptus_req:get_req(Req))), + {RCode, RBody, RState} = handle_put(Req, State, XER, Appname, ReqBody, RequestUrl), ?AUDI(Req, Bts, XER, Rcode), {RCode, RBody, RState}; put("/application/:appname/reconfigure", Req, State) -> @@ -428,14 +426,7 @@ put("/application/:appname/reconfigure", Req, State) -> {Bts, XER} = init_api_call(Req), Appname = leptus_req:param(Req, appname), ReqBody = leptus_req:body_raw(Req), - AppnameToNS = fun(App) -> - X = appname_to_field_vals(App, [<<"namespace">>]), - case X of - none -> X; - [Namespace] -> Namespace - end - end, - {RCode, RBody, RState} = handle_reconfigure_put(Req, State, XER, Appname, ReqBody, AppnameToNS), + {RCode, RBody, RState} = handle_reconfigure_put(Req, State, XER, Appname, ReqBody), ?AUDI(Req, Bts, XER, Rcode), {RCode, RBody, RState}. diff --git a/src/resource_handler_tests.erl b/src/resource_handler_tests.erl index cab9558..75e5302 100644 --- a/src/resource_handler_tests.erl +++ b/src/resource_handler_tests.erl @@ -24,7 +24,8 @@ -import(resource_handler, [ parse_put_body/1, parse_reconfiguration_put_body/1, - handle_reconfigure_put/6 + handle_reconfigure_put/5, + handle_put/6 ]). parse_put_body_test() -> @@ -58,7 +59,7 @@ parse_put_body_test() -> [{<<"Greeting">>,<<"greet">>,<<"GET">>}], [#program{type = <<"flows">>, id = <<"WhoFlow">>}, #program{type = <<"services">>, id = <<"Greeting">>}], [{<<"flows">>,<<"WhoFlow">>,#{<<"foopprog">>=><<"barpprog">>}}]}, - Expected = {pf, <<"program-flowlet">>, ExpectedL}, + Expected = {<<"program-flowlet">>, ExpectedL}, ?assert(parse_put_body(jiffy:encode(Valid)) == Expected), ValidHydrator1 = @@ -68,7 +69,7 @@ parse_put_body_test() -> {<<"streamname">>, <<"sn">>}, {<<"pipeline_config_json_url">>, "www.foo.com"} ]}, - ExpectedHy1 = {hp,<<"hydrator-pipeline">>,{<<"ns">>,<<"sn">>,"www.foo.com",[]}}, + ExpectedHy1 = {<<"hydrator-pipeline">>, {<<"ns">>,<<"sn">>,"www.foo.com",[]}}, ?assert(parse_put_body(jiffy:encode(ValidHydrator1)) == ExpectedHy1), ValidHydrator2 = @@ -88,42 +89,65 @@ parse_put_body_test() -> ]} ]}, %{hp, <<"hydrator-pipeline">>, {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies}} - ExpectedHy2 = {hp,<<"hydrator-pipeline">>,{<<"ns">>,<<"sn">>,"www.foo.com",[{"system:cdap-data-pipeline[4.1.0,5.0.0)",<<"art carney">>,"1.0.0-SNAPSHOT",<<"www.foo.com/sup/baphomet.jar">>,<<"www.foo2.com/sup/baphomet.jar">>}]}}, + ExpectedHy2 = {<<"hydrator-pipeline">>, {<<"ns">>,<<"sn">>,"www.foo.com",[{"system:cdap-data-pipeline[4.1.0,5.0.0)",<<"art carney">>,"1.0.0-SNAPSHOT",<<"www.foo.com/sup/baphomet.jar">>,<<"www.foo2.com/sup/baphomet.jar">>}]}}, ?assert(parse_put_body(jiffy:encode(ValidHydrator2)) == ExpectedHy2), - InvalidType = {[{<<"cdap_application_type">>, <<"NOT TODAY">>}]}, - erlang:display(parse_put_body(jiffy:encode(InvalidType))), - ?assert(parse_put_body(jiffy:encode(InvalidType)) == unsupported), + %Test the unexpected cases + EmptyD = dict:new(), + try meck:new(resource_handler, [passthrough]) catch _:_ -> ok end, + meck:expect(resource_handler, appname_to_field_vals, fun(X, [<<"appname">>]) -> + case X of + <<"notexist">> -> none; + <<"exist">> -> [<<"exist">>] + end + end), + + + %check already exists + ?assert(handle_put("", EmptyD, "textxer", <<"exist">>, Valid, "www.validurl.com") == {400, "Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first", EmptyD}), + + InvalidType = jiffy:encode({[{<<"cdap_application_type">>, <<"NOT TODAY">>}]}), + ?assert(parse_put_body(InvalidType) == unsupported), + ?assert(handle_put("", EmptyD, "textxer", <<"notexist">>, InvalidType, "www.validurl.com") == {400,"Unsupported CDAP Application Type", EmptyD}), InvalidMissing = {[ {<<"cdap_application_type">>, <<"program-flowlet">>}, {<<"namespace">>, <<"ns">>} ]}, - ?assert(parse_put_body(jiffy:encode(InvalidMissing)) == invalid). + ?assert(parse_put_body(jiffy:encode(InvalidMissing)) == invalid), + ?assert(handle_put("", EmptyD, "textxer", <<"notexist">>, InvalidMissing, "www.validurl.com") == {400, "Invalid PUT Body or unparseable URL", EmptyD}), + + InvalidMissing2 = {[{<<"malformed">>, <<"i am">>}]}, + ?assert(parse_put_body(jiffy:encode(InvalidMissing2)) == invalid), + ?assert(handle_put("", EmptyD, "textxer", <<"notexist">>, InvalidMissing2, "www.validurl.com") == {400, "Invalid PUT Body or unparseable URL", EmptyD}), + + meck:unload(resource_handler). + reconfiguration_put_test() -> %test reconfiguring with an invalid PUT body (missing "reconfiguration_type") - AppnameToNS = fun(X) -> - case X of - <<"notexist">> -> none; - <<"exist">> -> <<"ns">> - end - end, EmptyD = dict:new(), + try meck:new(resource_handler, [passthrough]) catch _:_ -> ok end, + meck:expect(resource_handler, appname_to_field_vals, fun(X, _) -> + case X of + <<"notexist">> -> none; + <<"exist">> -> [<<"ns">>] + end + end), I1 = jiffy:encode({[{<<"config">>, <<"bar">>}]}), ?assert(parse_reconfiguration_put_body(I1) == invalid), - ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I1, AppnameToNS) == {400,"Invalid PUT Reconfigure Body",EmptyD}), + ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I1) == {400,"Invalid PUT Reconfigure Body",EmptyD}), %test reconfiguring with an invalid PUT body (missing app_config) I2 = jiffy:encode({[{<<"reconfiguration_type">>, <<"program-flowlet-app-config">>}, {<<"foo">>, <<"bar">>}]}), ?assert(parse_reconfiguration_put_body(I2) == invalid), - ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I2, AppnameToNS) == {400,"Invalid PUT Reconfigure Body",EmptyD}), + ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I2) == {400,"Invalid PUT Reconfigure Body",EmptyD}), %test reconfiguring an invalid (unimplemented) type I3 = jiffy:encode({[{<<"config">>, <<"bar">>}, {<<"reconfiguration_type">>, <<"EMPTINESS">>}]}), ?assert(parse_reconfiguration_put_body(I3) == notimplemented), - ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I3, AppnameToNS) == {501,"This type of reconfiguration is not implemented",EmptyD}), + ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I3) == {501,"This type of reconfiguration is not implemented",EmptyD}), Valid = jiffy:encode({[{<<"config">>, {[{<<"foo">>, <<"bar">>}]}}, {<<"reconfiguration_type">>,<<"program-flowlet-app-config">>}]}), ?assert(parse_reconfiguration_put_body(Valid) == {<<"program-flowlet-app-config">>,#{<<"foo">>=><<"bar">>}}), @@ -132,5 +156,8 @@ reconfiguration_put_test() -> %test for valid but missing %?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I3, AppnameToNS) == {501,"This type of reconfiguration is not implemented",EmptyD}), - ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"notexist">>, Valid, AppnameToNS) == {404,"Reconfigure recieved but the app is not registered", EmptyD}). + ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"notexist">>, Valid) == {404,"Reconfigure recieved but the app is not registered", EmptyD}), + + meck:unload(resource_handler). + diff --git a/src/util.erl b/src/util.erl index d96675b..520b071 100644 --- a/src/util.erl +++ b/src/util.erl @@ -6,9 +6,9 @@ % Licensed under the Apache License, Version 2.0 (the "License"); % you may not use this file except in compliance with the License. % You may obtain a copy of the License at -% +% % http://www.apache.org/licenses/LICENSE-2.0 -% +% % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, % WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,8 +21,8 @@ -module(util). -include("application.hrl"). --export([concat/1, - get_platform_envs_and_config/0, +-export([concat/1, + get_platform_envs_and_config/0, resolve_cbs/2, initialize_database/0, get_all_appnames_from_db/0, @@ -39,10 +39,7 @@ %http://stackoverflow.com/questions/39757020/erlang-drying-up-stringbinary-concatenation %NOTE! Does not work or bomb when an element in the list is an atom. Must be a string or binary. Maybe add a check for this -to_string(Value) when is_binary(Value) -> binary_to_list(Value); -to_string(Value) -> Value. -concat(List) -> - lists:flatten(lists:map(fun to_string/1, List)). +concat(List) -> lists:flatten(lists:map(fun to_str/1, List)). resolve_cbs(XER, ConsulURL) -> %Ideally this function would dissapear if we get real DNS. This essentially is doing an SRV record lookup every time someone needs the bindng URL @@ -52,30 +49,30 @@ resolve_cbs(XER, ConsulURL) -> concat(["http://", IP, ":", integer_to_binary(Port)]). get_platform_envs_and_config() -> - %Get platform envs needed for broker operation, then fetch my config. + %Get platform envs needed for broker operation, then fetch my config. %If something critical fails, returns [], else [ConsulURL, CDAPUrl, BoundConfigMap] MyName = os:getenv("HOSTNAME"), ConsulHost = os:getenv("CONSUL_HOST"), - case MyName == false orelse ConsulHost == false of + case MyName == false orelse ConsulHost == false of true -> []; - false -> + false -> %build Consul URL ConsulURL = concat(["http://", ConsulHost, ":8500"]), - + %Bind my own config map %generate my own XER here XER = gen_uuid(), {200, BoundConfig} = consul_interface:consul_bind_config(XER, MyName, ConsulURL), BoundConfigMap = jiffy:decode(jiffy:encode(BoundConfig), [return_maps]), %kind of an interesting way to turn an erlang proplist into a map - + %Here, we waterfall looking for "CDAP_CLUSTER_TO_MANAGE". %First, we will check for environmnental variables for a cluster *NAME* %If that is not found, then we will check out bound config for a fully bound URL %If that is also not found, let it crash baby. - CDAPURL = case os:getenv("CDAP_CLUSTER_TO_MANAGE") of - false -> + CDAPURL = case os:getenv("CDAP_CLUSTER_TO_MANAGE") of + false -> list_to_binary(concat(["http://", lists:nth(1, maps:get(<<"cdap_cluster_to_manage">>, BoundConfigMap))])); %cbs returns ip:port. need http:// or will get "no adaptors found" error - CDAPName -> + CDAPName -> {IP, Port} = consul_interface:consul_get_service_ip_port(XER, CDAPName, ConsulURL), list_to_binary(concat(["http://", IP, ":", integer_to_binary(Port)])) end, @@ -84,13 +81,13 @@ get_platform_envs_and_config() -> initialize_database() -> %Create the database (currently MNesia) if it does not exist, and the application table. - %Or, do nothing. + %Or, do nothing. N = node(), lager:info(io_lib:format("Initializing database. My node name is ~s", [N])), %set MNesia dir application:set_env(mnesia, dir, "/var/mnesia/"), - + %stop if running, can't create schema if it is. Dont check status, OK if stopped mnesia:stop(), @@ -98,16 +95,16 @@ initialize_database() -> %erlang:display(mnesia:delete_schema([N])), mnesia:create_schema([N]), %start MNesia, assert it works - + ok = mnesia:start(), %start MNesia, bomb if alreay started, should not happen lager:info("Mnesia started"), - + %try to create the table, or if it exists, do nothing %erlang:display(mnesia:delete_table(application)), case mnesia:create_table(application, [{attributes, record_info(fields, application)}, {disc_copies, [N]}]) of {aborted,{already_exists,application}} -> lager:info("Application table already exists"); - {atomic,ok} -> + {atomic,ok} -> lager:info(io_lib:format("Created application table on ~s", [N])) end, @@ -116,14 +113,14 @@ initialize_database() -> case mnesia:create_table(prog_flow_supp, [{attributes, record_info(fields, prog_flow_supp)}, {disc_copies, [N]}]) of {aborted,{already_exists, prog_flow_supp}} -> lager:info("prog_flow_supp table already exists"); - {atomic,ok} -> + {atomic,ok} -> lager:info(io_lib:format("Created prog_flow_supp table on ~s", [N])) end, %wait up to 30s for the table to come up. Usually instantaneous. If it takes more crash abd burn ok = mnesia:wait_for_tables([application, prog_flow_supp], 30000), ok. - + get_all_appnames_from_db() -> {atomic, Apps} = mnesia:transaction(fun() -> mnesia:match_object(application, #application{_ = '_'}, read) end), lists:map(fun(X) -> {application, Appname,_,_,_,_,_,_,_,_} = X, @@ -159,7 +156,10 @@ iso_elapsed(Endtime, Starttime) -> Egs - Sgs. to_str("") -> ""; -to_str(Term) -> lists:flatten(io_lib:format("~p", [Term])). +to_str(Term) when is_list(Term) -> Term; +to_str(Term) when is_binary(Term) -> binary_to_list(Term); +to_str(Term) -> + lists:flatten(io_lib:format("~p", [Term])). -spec ip_to_str({inet:ip_address(), inet:port_number()}) -> binary(). %nasty.. I miss pythons x <= Foo <= Y syntax.. or something mathematical like Foo in [X..Y].. erlang not good 4 math @@ -176,9 +176,9 @@ update_with_new_config_map(NewConfig, OldConfig) -> %This is very similar to the maps:merge/2 builtin but that will inject keys of newconfig that were not in oldconfig. We need a "RIGHT JOIN" NCKeys = maps:keys(NewConfig), ConfigOverlaps = [X || X <- NCKeys, maps:is_key(X, OldConfig)], - case ConfigOverlaps of + case ConfigOverlaps of [] -> nooverlap; - _ -> + _ -> %we have an entry that should be in app config %build a new map with just the keys to update Pred = fun(X,_) -> lists:member(X, ConfigOverlaps) end, @@ -187,6 +187,6 @@ update_with_new_config_map(NewConfig, OldConfig) -> end. ejson_to_map(E) -> - %takes the jiffy "ejson: format of {[{<<"foo">>, <<"bar">>}, {<<"foo2">>, <<"bar2">>}]} and turns it into a map, + %takes the jiffy "ejson: format of {[{<<"foo">>, <<"bar">>}, {<<"foo2">>, <<"bar2">>}]} and turns it into a map, %usefu because ejsons do not appear to be order-independent-comparable, but maps are (e.g., two maps are equal if all their k+v are equal but agnostic to order) jiffy:decode(jiffy:encode(E), [return_maps]). diff --git a/src/util_tests.erl b/src/util_tests.erl index e37e492..78ca851 100644 --- a/src/util_tests.erl +++ b/src/util_tests.erl @@ -6,9 +6,9 @@ % Licensed under the Apache License, Version 2.0 (the "License"); % you may not use this file except in compliance with the License. % You may obtain a copy of the License at -% +% % http://www.apache.org/licenses/LICENSE-2.0 -% +% % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, % WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,10 +21,16 @@ -module(util_tests). -include_lib("eunit/include/eunit.hrl"). -import(util, [ - iso_elapsed/2, + iso_elapsed/2, ip_to_str/1, update_with_new_config_map/2, - ejson_to_map/1]). + ejson_to_map/1, + to_str/1 + ]). + +to_str_test() -> + ?assert(to_str("") == ""), + ?assert(to_str(<<"asdf">>) == "asdf"). iso_elapsed_test() -> ?assert(iso_elapsed(<<"2017-04-27T18:38:10Z">>, <<"2017-04-27T18:38:08Z">>) == 2), |