aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cdapbroker.app.src2
-rw-r--r--src/httpabs.erl28
-rw-r--r--src/httpabs_tests.erl18
-rw-r--r--src/resource_handler.erl231
-rw-r--r--src/resource_handler_tests.erl63
-rw-r--r--src/util.erl54
-rw-r--r--src/util_tests.erl14
7 files changed, 223 insertions, 187 deletions
diff --git a/src/cdapbroker.app.src b/src/cdapbroker.app.src
index b95d534..611d873 100644
--- a/src/cdapbroker.app.src
+++ b/src/cdapbroker.app.src
@@ -1,6 +1,6 @@
{application, cdapbroker,
[{description, "Interface between Consul and CDAP in DCAE"},
- {vsn, "4.0.5"},
+ {vsn, "4.0.6"},
{registered, []},
{mod, { cdapbroker_app, []}},
{applications,
diff --git a/src/httpabs.erl b/src/httpabs.erl
index bc9e068..152621e 100644
--- a/src/httpabs.erl
+++ b/src/httpabs.erl
@@ -6,9 +6,9 @@
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
-%
+%
% http://www.apache.org/licenses/LICENSE-2.0
-%
+%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,7 +21,7 @@
-module(httpabs).
-export([get/2,
post/4, %I miss python's default arguments..
- post/5,
+ post/5,
put/4,
delete/2
]).
@@ -29,10 +29,10 @@
-define(SC(L), util:concat(L)).
%NOTE
-%Consider the Erlang statement:
+%Consider the Erlang statement:
%
%{ok, {{"HTTP/1.1",ReturnCode, State}, Head, Body}} = httpc:get(URL).
-%CDAP returns error messages in the “Body” field above.
+%CDAP returns error messages in the “Body” field above.
%
%However, Consul:
%1) Always (in all HTTP failures I’ve tested) returns Body == “500\n”
@@ -51,21 +51,21 @@
%%%
-spec parse_response({error|ok, any()}, string()) -> httpstat().
parse_response({Status, Response}, URL) ->
- case Status of
- error ->
+ case Status of
+ error ->
lager:error("httpc error: cannot hit: ~s", [URL]),
case Response of
no_scheme -> {400, io_lib:format("ERROR: The following URL is malformed: ~s", [URL])};
{bad_body, _} -> {400, "ERROR: The request Body is malformed"};
{bad_body_generator,_} -> {400, "ERROR: The request Body is malformed"};
_ ->
- lager:error(io_lib:format("Unexpected ERROR hitting ~s", [URL])),
+ lager:error(io_lib:format("Unexpected ERROR hitting ~s", [URL])),
{504, list_to_binary(io_lib:format("ERROR: The following URL is unreachable or the request was unable to be parsed due to an unknown error: ~s", [URL]))} %Are there other reasons other than bad body and unreachable that crash request? (Sneak peak: the answer is probably)
end;
- ok ->
+ ok ->
{{_, ReturnCode, State}, _Head, Body} = Response,
- case ReturnCode of
- 200 ->
+ case ReturnCode of
+ 200 ->
{ReturnCode, Body};
_ ->
lager:error("Error While hitting ~s, Non-200 status code returned. HTTP Code ~p, State ~s, ResponseBody ~s:", [URL, ReturnCode, State, Body]),
@@ -77,14 +77,14 @@ parse_response({Status, Response}, URL) ->
sanitize(URL) ->
%allow URL to look like "www.foo.com" or <<"www.foo.com">>, trim it
- case is_binary(URL) of
+ case is_binary(URL) of
true -> string:strip(binary_to_list(URL));
false -> string:strip(URL)
end.
%anywhere you see any() is essentially lazy typing.. fix these someday when time is abundant
-spec post(string(), string()|binary(), string(), any()) -> httpstat().
-post(XER, URL, ContentType, Body) ->
+post(XER, URL, ContentType, Body) ->
%post that sends the XER, no headers signature
Headers = [{"x-ecomp-requestid", XER}],
U = sanitize(URL),
@@ -97,7 +97,7 @@ post(XER, URL, Headers, ContentType, Body) ->
parse_response(httpc:request(post, {U, [{"x-ecomp-requestid", XER} | Headers], ContentType, Body}, [],[]), U).
-spec get(string(), string()|binary()) -> httpstat().
-get(XER, URL) ->
+get(XER, URL) ->
%http get that always sends the XER.. even if the server doesn't want it; maybe this will blow up on me one day.
U = sanitize(URL),
Headers = [{"x-ecomp-requestid", XER}],
diff --git a/src/httpabs_tests.erl b/src/httpabs_tests.erl
index d8ad529..27b7116 100644
--- a/src/httpabs_tests.erl
+++ b/src/httpabs_tests.erl
@@ -6,9 +6,9 @@
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
-%
+%
% http://www.apache.org/licenses/LICENSE-2.0
-%
+%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,7 +21,8 @@
-module(httpabs_tests).
-include_lib("eunit/include/eunit.hrl").
-import(httpabs, [
- sanitize/1
+ sanitize/1,
+ parse_response/2
]
).
@@ -30,4 +31,15 @@ sanitize_test() ->
?assert(sanitize(" www.foo.com ") == "www.foo.com"),
?assert(sanitize(<<"www.foo.com">>) == "www.foo.com").
+parse_response_test() ->
+ NoURL = "THIS IS NOT EVEN A URL WHAT ARE YOU DOING TO ME",
+ ?assert(parse_response({error,no_scheme},"THIS IS NOT EVEN A URL WHAT ARE YOU DOING TO ME") == {400, io_lib:format("ERROR: The following URL is malformed: ~s", [NoURL])}),
+ ?assert(httpabs:put("testxer", NoURL, "application/json", jiffy:encode({[{<<"a">>, <<"b">>}]})) == {400, io_lib:format("ERROR: The following URL is malformed: ~s", [NoURL])}),
+
+ %test httpabs bad body (not encoded as JSON)
+ ReconfigMap = util:ejson_to_map({[{<<"foo">>, <<"bar">>}]}),
+ BadBody = {[{<<"config">>, ReconfigMap}]},
+ ?assert(httpabs:put("testxer", "www.foo.com", "application/json", BadBody) == {400,"ERROR: The request Body is malformed"}),
+ ?assert(parse_response({error,{bad_body_generator, BadBody}}, "www.foo.com") == {400,"ERROR: The request Body is malformed"}),
+ ?assert(parse_response({ok,{{"HTTP/1.1",200,"OK"},[{"cache-control","private, max-age=0"},{"date","Mon, 11 Sep 2017 15:05:11 GMT"},{"accept-ranges","none"},{"server","gws"},{"vary","Accept-Encoding"},{"content-length","46376"},{"content-type","text/html; charset=ISO-8859-1"},{"expires","-1"},{"p3p","..."},{"x-xss-protection","1; mode=block"},{"x-frame-options","SAMEORIGIN"},{"set-cookie","NID=111=nGQHl8ljJ3nXHmGmIZmGaTgoq3WAdbgWaxAUOQJm-0AaOkS64iiXtm-HojIFSpqowj7Nr-KpqS8o-oDOROq-4AaDs0J4M92V7yBOAJQYPkuK7wtVav0BhpOOYgCHysUN; expires=Tue, 13-Mar-2018 15:05:11 GMT; path=/; domain=.google.com; HttpOnly"},{"alt-svc","quic=\":443\"; ma=2592000; v=\"39,38,37,35\""}],"<!doctype html>..."}}, "https://google.com") == {200, "<!doctype html>..."}).
diff --git a/src/resource_handler.erl b/src/resource_handler.erl
index 8c1b343..798f457 100644
--- a/src/resource_handler.erl
+++ b/src/resource_handler.erl
@@ -7,6 +7,8 @@
-export([get/3, put/3, delete/3, post/3]).
-export([cross_domains/3]).
+-export([appname_to_field_vals/2]).
+
%%for keeping state
%%The application record is defined in application.hrl
%%In Mnesia, the first element is the type of record and the second element is the key
@@ -163,88 +165,6 @@ delete_app_helper(Appname, State, XER, Req) ->
end
end.
-%%%CALLBACKS %%%
-init(_Route, _Req, State) ->
- {ok, State}.
-terminate(_Reason, _Route, _Req, _State) ->
- ok.
-%%%FOR Cors support
-%%%Note! only matches on host. Does not handle ports. See: https://github.com/s1n4/leptus/issues/55
-cross_domains(_Route, _Req, State) ->
- {['_'], State}.
-
-%%%GET Methods
-get("/", Req, State) ->
- %The broker's "info" endpoint; returns some possibly useful information
- {Bts, XER} = init_api_call(Req),
- Apps = util:get_all_appnames_from_db(),
- {UT, _} = statistics(wall_clock),
- CDAPVer = cdap_interface:get_cdap_cluster_version(XER, ?CDAPURL),
- RB = {[
- {<<"cdap cluster version">>, CDAPVer},
- {<<"managed cdap url">>, ?CDAPURL},
- {<<"cdap GUI port">>, cdap_interface:get_cdap_gui_port_from_version(CDAPVer)},
- {<<"number of applications registered">>, length(Apps)},
- {<<"uptime (s)">>, UT/1000},
- {<<"broker API version">>, util:get_my_version()}
- ]},
- {RCode, RBody, RState} = {200, {json, RB}, State},
- ?AUDI(Req, Bts, XER, Rcode),
- {RCode, RBody, RState};
-get("/application", Req, State) ->
- %get a list of all registered apps
- {Bts, XER} = init_api_call(Req),
- {RCode, RBody, RState} = {200, {json, util:get_all_appnames_from_db()}, State},
- ?AUDI(Req, Bts, XER, Rcode),
- {RCode, RBody, RState};
-get("/application/:appname", Req, State) ->
- %get information about a registered application
- {Bts, XER} = init_api_call(Req),
- Appname = leptus_req:param(Req, appname),
- {RCode, RBody, RState} = appname_to_application_http(XER, Appname, State),
- ?AUDI(Req, Bts, XER, Rcode),
- {RCode, RBody, RState};
-get("/application/:appname/metrics", Req, State) ->
- %get metrics for a registered application
- {Bts, XER} = init_api_call(Req),
- Appname = leptus_req:param(Req, appname),
- {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of
- none -> {404, "", State};
- [<<"program-flowlet">>, Namespace] ->
- {ReturnCode, ReturnBody} = cdap_interface:get_app_metrics(XER, Appname, Namespace, ?CDAPURL), %warning, see note in README, this always reutrns 200
- {ReturnCode, {json, ReturnBody}, State};
- [<<"hydrator-pipeline">>, Namespace] ->
- lager:warning("WARNING, metrics not actually implemented yet for pipelines!!"),
- {ReturnCode, ReturnBody} = cdap_interface:get_pipeline_metrics(Appname, Namespace, ?CDAPURL),
- {ReturnCode, {json, ReturnBody}, State}
- end,
- ?AUDI(Req, Bts, XER, Rcode),
- {RCode, RBody, RState};
-get("/application/:appname/healthcheck", Req, State) ->
- %get healthcheck of an application
- {Bts, XER} = init_api_call(Req),
- Appname = leptus_req:param(Req, appname),
- lager:info(io_lib:format("Get Healthcheck recieved for ~s", [Appname])),
- {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of
- none -> {404, "", State};
- [<<"program-flowlet">>, Namespace] ->
- {cdap_interface:get_app_healthcheck(XER, Appname, Namespace, ?CDAPURL), "", State};
- [<<"hydrator-pipeline">>, Namespace] ->
- {cdap_interface:get_pipeline_healthcheck(XER, Appname, Namespace, ?CDAPURL, ?PipelineHealthLimit), "", State}
- end,
- ?AUDI(Req, Bts, XER, Rcode),
- {RCode, RBody, RState}.
-
-%%%DELETE Methods
-delete("/application/:appname", Req, State) ->
- %Uninstall and delete a CDAP app
- {Bts, XER} = init_api_call(Req),
- Appname = leptus_req:param(Req, appname),
- {RCode, RBody, RState} = delete_app_helper(Appname, State, XER, Req),
- ?AUDI(Req, Bts, XER, Rcode),
- {RCode, RBody, RState}.
-
-%%%PUT Methods
parse_put_body(B) ->
%parse the PUT body to application
try
@@ -274,7 +194,7 @@ parse_put_body(B) ->
maps:get(<<"program_id">>, P),
maps:get(<<"program_pref">>, P)}
end, maps:get(<<"program_preferences">>, Body)),
- {pf, <<"program-flowlet">>, {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences}};
+ {<<"program-flowlet">>, {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences}};
<<"hydrator-pipeline">> ->
PipelineConfigJsonURL = maps:get(<<"pipeline_config_json_url">>, Body),
@@ -295,7 +215,7 @@ parse_put_body(B) ->
} end, D);
false -> [] %normalize optional user input into []; just prevents user from having to explicitly pass in []
end,
- {hp, <<"hydrator-pipeline">>, {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies}}
+ {<<"hydrator-pipeline">>, {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies}}
end
end
catch _:_ -> invalid
@@ -315,12 +235,12 @@ parse_reconfiguration_put_body(Body) ->
catch _:_ -> invalid
end.
-handle_reconfigure_put(Req, State, XER, Appname, ReqBody, AppnameToNS) ->
+handle_reconfigure_put(Req, State, XER, Appname, ReqBody) ->
%handle the reconfiguration put. broker out from the http call, and takes the lookup func as an arg, to allow for better unit testing.
%this is still not a pure function due to the workflows call, still needs enhancement
- case AppnameToNS(Appname) of
+ case ?MODULE:appname_to_field_vals(Appname, [<<"namespace">>]) of
none -> {404, "Reconfigure recieved but the app is not registered", State};
- Namespace ->
+ [Namespace] ->
ParsedBody = parse_reconfiguration_put_body(ReqBody),
case ParsedBody of
invalid -> {400, "Invalid PUT Reconfigure Body", State};
@@ -351,37 +271,27 @@ handle_reconfigure_put(Req, State, XER, Appname, ReqBody, AppnameToNS) ->
end
end.
-put("/application/:appname", Req, State) ->
- %create a new registration; deploys and starts a cdap application
- {Bts, XER} = init_api_call(Req),
- Appname = leptus_req:param(Req, appname),
- {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"appname">>]) of
- [Appname] ->
- {400, "Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first", State};
- none -> %no matches, create the resource, return the application record
+handle_put(Req, State, XER, Appname, ReqBody, RequestUrl) ->
+ %use of ?MODULE here is due to the meck limitation described here: https://github.com/eproxus/meck
+ case ?MODULE:appname_to_field_vals(Appname, [<<"appname">>]) == none of
+ false -> {400, "Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first", State};
+ true ->
%Initial put requires the put body parameters
- case parse_put_body(leptus_req:body_raw(Req)) of
- %could not parse the body
- invalid -> {400, "Invalid PUT Body or unparseable URL", State};
-
- %unsupported cdap application type
- unsupported -> {404, "Unsupported CDAP Application Type", State};
-
- {Type, AppType, Params} ->
+ ParsedBody = parse_put_body(ReqBody),
+ case ParsedBody of
+ invalid -> {400, "Invalid PUT Body or unparseable URL", State}; %could not parse the body
+ unsupported -> {400, "Unsupported CDAP Application Type", State}; %unsupported cdap application type
+ {AppType, Params} ->
%form shared info
- %hateaos cuz they aintaos
- {RequestUrl,_} = cowboy_req:url((leptus_req:get_req(Req))),
Metricsurl = <<RequestUrl/binary, <<"/metrics">>/binary>>,
Healthcheckurl = <<RequestUrl/binary, <<"/healthcheck">>/binary>>,
try
- case Type of
- hp ->
+ case AppType of
+ <<"hydrator-pipeline">> ->
{Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies} = Params,
ConnectionURL = cdap_interface:form_stream_url_from_streamname(?CDAPURL, Namespace, Streamname),
-
- %TODO: This!
- ServiceEndpoints = [], %unclear if this is possible with pipelines
+ ServiceEndpoints = [], %TODO: unclear if this is possible with pipelines
%write into mnesia, deploy
A = #application{appname = Appname, apptype = AppType, namespace = Namespace, healthcheckurl = Healthcheckurl, metricsurl = Metricsurl, url = RequestUrl, connectionurl = ConnectionURL, serviceendpoints = ServiceEndpoints, creationtime=erlang:system_time()},
@@ -389,7 +299,7 @@ put("/application/:appname", Req, State) ->
ok = workflows:deploy_hydrator_pipeline(Req, XER, Appname, Namespace, ?CDAPURL, PipelineConfigJsonURL, ParsedDependencies, ?CONSURL, RequestUrl, Healthcheckurl, ?HCInterval, ?AutoDeregisterAfter),
metrics(info, Req, [{bts, iso()}, {xer, XER}, {mod, mod()}, {msg, io_lib:format("New Hydrator Application Created: ~p", [lager:pr(A, ?MODULE)])}]), %see Record Pretty Printing: https://github.com/basho/lager
ok;
- pf ->
+ <<"program-flowlet">> ->
{Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences} = Params,
%Form URLs that are part of the record
%NOTE: These are both String concatenation functions and neither make an HTTP call so not catching normal {Code, Status} return here
@@ -420,7 +330,95 @@ put("/application/:appname", Req, State) ->
{500, "Please report this error", State}
end
end
- end,
+ end.
+
+%%% HTTP API CALLBACKS %%%
+init(_Route, _Req, State) ->
+ {ok, State}.
+terminate(_Reason, _Route, _Req, _State) ->
+ ok.
+%%%FOR Cors support
+%%%Note! only matches on host. Does not handle ports. See: https://github.com/s1n4/leptus/issues/55
+cross_domains(_Route, _Req, State) ->
+ {['_'], State}.
+
+%%%GET Methods
+get("/", Req, State) ->
+ %The broker's "info" endpoint; returns some possibly useful information
+ {Bts, XER} = init_api_call(Req),
+ Apps = util:get_all_appnames_from_db(),
+ {UT, _} = statistics(wall_clock),
+ CDAPVer = cdap_interface:get_cdap_cluster_version(XER, ?CDAPURL),
+ RB = {[
+ {<<"cdap cluster version">>, CDAPVer},
+ {<<"managed cdap url">>, ?CDAPURL},
+ {<<"cdap GUI port">>, cdap_interface:get_cdap_gui_port_from_version(CDAPVer)},
+ {<<"number of applications registered">>, length(Apps)},
+ {<<"uptime (s)">>, UT/1000},
+ {<<"broker API version">>, util:get_my_version()}
+ ]},
+ {RCode, RBody, RState} = {200, {json, RB}, State},
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+get("/application", Req, State) ->
+ %get a list of all registered apps
+ {Bts, XER} = init_api_call(Req),
+ {RCode, RBody, RState} = {200, {json, util:get_all_appnames_from_db()}, State},
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+get("/application/:appname", Req, State) ->
+ %get information about a registered application
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ {RCode, RBody, RState} = appname_to_application_http(XER, Appname, State),
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+get("/application/:appname/metrics", Req, State) ->
+ %get metrics for a registered application
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of
+ none -> {404, "", State};
+ [<<"program-flowlet">>, Namespace] ->
+ {ReturnCode, ReturnBody} = cdap_interface:get_app_metrics(XER, Appname, Namespace, ?CDAPURL), %warning, see note in README, this always reutrns 200
+ {ReturnCode, {json, ReturnBody}, State};
+ [<<"hydrator-pipeline">>, Namespace] ->
+ lager:warning("WARNING, metrics not actually implemented yet for pipelines!!"),
+ {ReturnCode, ReturnBody} = cdap_interface:get_pipeline_metrics(Appname, Namespace, ?CDAPURL),
+ {ReturnCode, {json, ReturnBody}, State}
+ end,
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+get("/application/:appname/healthcheck", Req, State) ->
+ %get healthcheck of an application
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ lager:info(io_lib:format("Get Healthcheck recieved for ~s", [Appname])),
+ {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of
+ none -> {404, "", State};
+ [<<"program-flowlet">>, Namespace] ->
+ {cdap_interface:get_app_healthcheck(XER, Appname, Namespace, ?CDAPURL), "", State};
+ [<<"hydrator-pipeline">>, Namespace] ->
+ {cdap_interface:get_pipeline_healthcheck(XER, Appname, Namespace, ?CDAPURL, ?PipelineHealthLimit), "", State}
+ end,
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState}.
+%%%DELETE Methods
+delete("/application/:appname", Req, State) ->
+ %Uninstall and delete a CDAP app
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ {RCode, RBody, RState} = delete_app_helper(Appname, State, XER, Req),
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState}.
+%%%PUT Methods
+put("/application/:appname", Req, State) ->
+ %create a new registration; deploys and starts a cdap application
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ ReqBody = leptus_req:body_raw(Req),
+ {RequestUrl,_} = cowboy_req:url((leptus_req:get_req(Req))),
+ {RCode, RBody, RState} = handle_put(Req, State, XER, Appname, ReqBody, RequestUrl),
?AUDI(Req, Bts, XER, Rcode),
{RCode, RBody, RState};
put("/application/:appname/reconfigure", Req, State) ->
@@ -428,14 +426,7 @@ put("/application/:appname/reconfigure", Req, State) ->
{Bts, XER} = init_api_call(Req),
Appname = leptus_req:param(Req, appname),
ReqBody = leptus_req:body_raw(Req),
- AppnameToNS = fun(App) ->
- X = appname_to_field_vals(App, [<<"namespace">>]),
- case X of
- none -> X;
- [Namespace] -> Namespace
- end
- end,
- {RCode, RBody, RState} = handle_reconfigure_put(Req, State, XER, Appname, ReqBody, AppnameToNS),
+ {RCode, RBody, RState} = handle_reconfigure_put(Req, State, XER, Appname, ReqBody),
?AUDI(Req, Bts, XER, Rcode),
{RCode, RBody, RState}.
diff --git a/src/resource_handler_tests.erl b/src/resource_handler_tests.erl
index cab9558..75e5302 100644
--- a/src/resource_handler_tests.erl
+++ b/src/resource_handler_tests.erl
@@ -24,7 +24,8 @@
-import(resource_handler, [
parse_put_body/1,
parse_reconfiguration_put_body/1,
- handle_reconfigure_put/6
+ handle_reconfigure_put/5,
+ handle_put/6
]).
parse_put_body_test() ->
@@ -58,7 +59,7 @@ parse_put_body_test() ->
[{<<"Greeting">>,<<"greet">>,<<"GET">>}],
[#program{type = <<"flows">>, id = <<"WhoFlow">>}, #program{type = <<"services">>, id = <<"Greeting">>}],
[{<<"flows">>,<<"WhoFlow">>,#{<<"foopprog">>=><<"barpprog">>}}]},
- Expected = {pf, <<"program-flowlet">>, ExpectedL},
+ Expected = {<<"program-flowlet">>, ExpectedL},
?assert(parse_put_body(jiffy:encode(Valid)) == Expected),
ValidHydrator1 =
@@ -68,7 +69,7 @@ parse_put_body_test() ->
{<<"streamname">>, <<"sn">>},
{<<"pipeline_config_json_url">>, "www.foo.com"}
]},
- ExpectedHy1 = {hp,<<"hydrator-pipeline">>,{<<"ns">>,<<"sn">>,"www.foo.com",[]}},
+ ExpectedHy1 = {<<"hydrator-pipeline">>, {<<"ns">>,<<"sn">>,"www.foo.com",[]}},
?assert(parse_put_body(jiffy:encode(ValidHydrator1)) == ExpectedHy1),
ValidHydrator2 =
@@ -88,42 +89,65 @@ parse_put_body_test() ->
]}
]},
%{hp, <<"hydrator-pipeline">>, {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies}}
- ExpectedHy2 = {hp,<<"hydrator-pipeline">>,{<<"ns">>,<<"sn">>,"www.foo.com",[{"system:cdap-data-pipeline[4.1.0,5.0.0)",<<"art carney">>,"1.0.0-SNAPSHOT",<<"www.foo.com/sup/baphomet.jar">>,<<"www.foo2.com/sup/baphomet.jar">>}]}},
+ ExpectedHy2 = {<<"hydrator-pipeline">>, {<<"ns">>,<<"sn">>,"www.foo.com",[{"system:cdap-data-pipeline[4.1.0,5.0.0)",<<"art carney">>,"1.0.0-SNAPSHOT",<<"www.foo.com/sup/baphomet.jar">>,<<"www.foo2.com/sup/baphomet.jar">>}]}},
?assert(parse_put_body(jiffy:encode(ValidHydrator2)) == ExpectedHy2),
- InvalidType = {[{<<"cdap_application_type">>, <<"NOT TODAY">>}]},
- erlang:display(parse_put_body(jiffy:encode(InvalidType))),
- ?assert(parse_put_body(jiffy:encode(InvalidType)) == unsupported),
+ %Test the unexpected cases
+ EmptyD = dict:new(),
+ try meck:new(resource_handler, [passthrough]) catch _:_ -> ok end,
+ meck:expect(resource_handler, appname_to_field_vals, fun(X, [<<"appname">>]) ->
+ case X of
+ <<"notexist">> -> none;
+ <<"exist">> -> [<<"exist">>]
+ end
+ end),
+
+
+ %check already exists
+ ?assert(handle_put("", EmptyD, "textxer", <<"exist">>, Valid, "www.validurl.com") == {400, "Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first", EmptyD}),
+
+ InvalidType = jiffy:encode({[{<<"cdap_application_type">>, <<"NOT TODAY">>}]}),
+ ?assert(parse_put_body(InvalidType) == unsupported),
+ ?assert(handle_put("", EmptyD, "textxer", <<"notexist">>, InvalidType, "www.validurl.com") == {400,"Unsupported CDAP Application Type", EmptyD}),
InvalidMissing = {[
{<<"cdap_application_type">>, <<"program-flowlet">>},
{<<"namespace">>, <<"ns">>}
]},
- ?assert(parse_put_body(jiffy:encode(InvalidMissing)) == invalid).
+ ?assert(parse_put_body(jiffy:encode(InvalidMissing)) == invalid),
+ ?assert(handle_put("", EmptyD, "textxer", <<"notexist">>, InvalidMissing, "www.validurl.com") == {400, "Invalid PUT Body or unparseable URL", EmptyD}),
+
+ InvalidMissing2 = {[{<<"malformed">>, <<"i am">>}]},
+ ?assert(parse_put_body(jiffy:encode(InvalidMissing2)) == invalid),
+ ?assert(handle_put("", EmptyD, "textxer", <<"notexist">>, InvalidMissing2, "www.validurl.com") == {400, "Invalid PUT Body or unparseable URL", EmptyD}),
+
+ meck:unload(resource_handler).
+
reconfiguration_put_test() ->
%test reconfiguring with an invalid PUT body (missing "reconfiguration_type")
- AppnameToNS = fun(X) ->
- case X of
- <<"notexist">> -> none;
- <<"exist">> -> <<"ns">>
- end
- end,
EmptyD = dict:new(),
+ try meck:new(resource_handler, [passthrough]) catch _:_ -> ok end,
+ meck:expect(resource_handler, appname_to_field_vals, fun(X, _) ->
+ case X of
+ <<"notexist">> -> none;
+ <<"exist">> -> [<<"ns">>]
+ end
+ end),
I1 = jiffy:encode({[{<<"config">>, <<"bar">>}]}),
?assert(parse_reconfiguration_put_body(I1) == invalid),
- ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I1, AppnameToNS) == {400,"Invalid PUT Reconfigure Body",EmptyD}),
+ ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I1) == {400,"Invalid PUT Reconfigure Body",EmptyD}),
%test reconfiguring with an invalid PUT body (missing app_config)
I2 = jiffy:encode({[{<<"reconfiguration_type">>, <<"program-flowlet-app-config">>}, {<<"foo">>, <<"bar">>}]}),
?assert(parse_reconfiguration_put_body(I2) == invalid),
- ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I2, AppnameToNS) == {400,"Invalid PUT Reconfigure Body",EmptyD}),
+ ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I2) == {400,"Invalid PUT Reconfigure Body",EmptyD}),
%test reconfiguring an invalid (unimplemented) type
I3 = jiffy:encode({[{<<"config">>, <<"bar">>}, {<<"reconfiguration_type">>, <<"EMPTINESS">>}]}),
?assert(parse_reconfiguration_put_body(I3) == notimplemented),
- ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I3, AppnameToNS) == {501,"This type of reconfiguration is not implemented",EmptyD}),
+ ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I3) == {501,"This type of reconfiguration is not implemented",EmptyD}),
Valid = jiffy:encode({[{<<"config">>, {[{<<"foo">>, <<"bar">>}]}}, {<<"reconfiguration_type">>,<<"program-flowlet-app-config">>}]}),
?assert(parse_reconfiguration_put_body(Valid) == {<<"program-flowlet-app-config">>,#{<<"foo">>=><<"bar">>}}),
@@ -132,5 +156,8 @@ reconfiguration_put_test() ->
%test for valid but missing
%?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"exist">>, I3, AppnameToNS) == {501,"This type of reconfiguration is not implemented",EmptyD}),
- ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"notexist">>, Valid, AppnameToNS) == {404,"Reconfigure recieved but the app is not registered", EmptyD}).
+ ?assert(handle_reconfigure_put("", EmptyD, "testXER", <<"notexist">>, Valid) == {404,"Reconfigure recieved but the app is not registered", EmptyD}),
+
+ meck:unload(resource_handler).
+
diff --git a/src/util.erl b/src/util.erl
index d96675b..520b071 100644
--- a/src/util.erl
+++ b/src/util.erl
@@ -6,9 +6,9 @@
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
-%
+%
% http://www.apache.org/licenses/LICENSE-2.0
-%
+%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,8 +21,8 @@
-module(util).
-include("application.hrl").
--export([concat/1,
- get_platform_envs_and_config/0,
+-export([concat/1,
+ get_platform_envs_and_config/0,
resolve_cbs/2,
initialize_database/0,
get_all_appnames_from_db/0,
@@ -39,10 +39,7 @@
%http://stackoverflow.com/questions/39757020/erlang-drying-up-stringbinary-concatenation
%NOTE! Does not work or bomb when an element in the list is an atom. Must be a string or binary. Maybe add a check for this
-to_string(Value) when is_binary(Value) -> binary_to_list(Value);
-to_string(Value) -> Value.
-concat(List) ->
- lists:flatten(lists:map(fun to_string/1, List)).
+concat(List) -> lists:flatten(lists:map(fun to_str/1, List)).
resolve_cbs(XER, ConsulURL) ->
%Ideally this function would dissapear if we get real DNS. This essentially is doing an SRV record lookup every time someone needs the bindng URL
@@ -52,30 +49,30 @@ resolve_cbs(XER, ConsulURL) ->
concat(["http://", IP, ":", integer_to_binary(Port)]).
get_platform_envs_and_config() ->
- %Get platform envs needed for broker operation, then fetch my config.
+ %Get platform envs needed for broker operation, then fetch my config.
%If something critical fails, returns [], else [ConsulURL, CDAPUrl, BoundConfigMap]
MyName = os:getenv("HOSTNAME"),
ConsulHost = os:getenv("CONSUL_HOST"),
- case MyName == false orelse ConsulHost == false of
+ case MyName == false orelse ConsulHost == false of
true -> [];
- false ->
+ false ->
%build Consul URL
ConsulURL = concat(["http://", ConsulHost, ":8500"]),
-
+
%Bind my own config map
%generate my own XER here
XER = gen_uuid(),
{200, BoundConfig} = consul_interface:consul_bind_config(XER, MyName, ConsulURL),
BoundConfigMap = jiffy:decode(jiffy:encode(BoundConfig), [return_maps]), %kind of an interesting way to turn an erlang proplist into a map
-
+
%Here, we waterfall looking for "CDAP_CLUSTER_TO_MANAGE".
%First, we will check for environmnental variables for a cluster *NAME*
%If that is not found, then we will check out bound config for a fully bound URL
%If that is also not found, let it crash baby.
- CDAPURL = case os:getenv("CDAP_CLUSTER_TO_MANAGE") of
- false ->
+ CDAPURL = case os:getenv("CDAP_CLUSTER_TO_MANAGE") of
+ false ->
list_to_binary(concat(["http://", lists:nth(1, maps:get(<<"cdap_cluster_to_manage">>, BoundConfigMap))])); %cbs returns ip:port. need http:// or will get "no adaptors found" error
- CDAPName ->
+ CDAPName ->
{IP, Port} = consul_interface:consul_get_service_ip_port(XER, CDAPName, ConsulURL),
list_to_binary(concat(["http://", IP, ":", integer_to_binary(Port)]))
end,
@@ -84,13 +81,13 @@ get_platform_envs_and_config() ->
initialize_database() ->
%Create the database (currently MNesia) if it does not exist, and the application table.
- %Or, do nothing.
+ %Or, do nothing.
N = node(),
lager:info(io_lib:format("Initializing database. My node name is ~s", [N])),
%set MNesia dir
application:set_env(mnesia, dir, "/var/mnesia/"),
-
+
%stop if running, can't create schema if it is. Dont check status, OK if stopped
mnesia:stop(),
@@ -98,16 +95,16 @@ initialize_database() ->
%erlang:display(mnesia:delete_schema([N])),
mnesia:create_schema([N]),
%start MNesia, assert it works
-
+
ok = mnesia:start(), %start MNesia, bomb if alreay started, should not happen
lager:info("Mnesia started"),
-
+
%try to create the table, or if it exists, do nothing
%erlang:display(mnesia:delete_table(application)),
case mnesia:create_table(application, [{attributes, record_info(fields, application)}, {disc_copies, [N]}]) of
{aborted,{already_exists,application}} ->
lager:info("Application table already exists");
- {atomic,ok} ->
+ {atomic,ok} ->
lager:info(io_lib:format("Created application table on ~s", [N]))
end,
@@ -116,14 +113,14 @@ initialize_database() ->
case mnesia:create_table(prog_flow_supp, [{attributes, record_info(fields, prog_flow_supp)}, {disc_copies, [N]}]) of
{aborted,{already_exists, prog_flow_supp}} ->
lager:info("prog_flow_supp table already exists");
- {atomic,ok} ->
+ {atomic,ok} ->
lager:info(io_lib:format("Created prog_flow_supp table on ~s", [N]))
end,
%wait up to 30s for the table to come up. Usually instantaneous. If it takes more crash abd burn
ok = mnesia:wait_for_tables([application, prog_flow_supp], 30000),
ok.
-
+
get_all_appnames_from_db() ->
{atomic, Apps} = mnesia:transaction(fun() -> mnesia:match_object(application, #application{_ = '_'}, read) end),
lists:map(fun(X) -> {application, Appname,_,_,_,_,_,_,_,_} = X,
@@ -159,7 +156,10 @@ iso_elapsed(Endtime, Starttime) ->
Egs - Sgs.
to_str("") -> "";
-to_str(Term) -> lists:flatten(io_lib:format("~p", [Term])).
+to_str(Term) when is_list(Term) -> Term;
+to_str(Term) when is_binary(Term) -> binary_to_list(Term);
+to_str(Term) ->
+ lists:flatten(io_lib:format("~p", [Term])).
-spec ip_to_str({inet:ip_address(), inet:port_number()}) -> binary().
%nasty.. I miss pythons x <= Foo <= Y syntax.. or something mathematical like Foo in [X..Y].. erlang not good 4 math
@@ -176,9 +176,9 @@ update_with_new_config_map(NewConfig, OldConfig) ->
%This is very similar to the maps:merge/2 builtin but that will inject keys of newconfig that were not in oldconfig. We need a "RIGHT JOIN"
NCKeys = maps:keys(NewConfig),
ConfigOverlaps = [X || X <- NCKeys, maps:is_key(X, OldConfig)],
- case ConfigOverlaps of
+ case ConfigOverlaps of
[] -> nooverlap;
- _ ->
+ _ ->
%we have an entry that should be in app config
%build a new map with just the keys to update
Pred = fun(X,_) -> lists:member(X, ConfigOverlaps) end,
@@ -187,6 +187,6 @@ update_with_new_config_map(NewConfig, OldConfig) ->
end.
ejson_to_map(E) ->
- %takes the jiffy "ejson: format of {[{<<"foo">>, <<"bar">>}, {<<"foo2">>, <<"bar2">>}]} and turns it into a map,
+ %takes the jiffy "ejson: format of {[{<<"foo">>, <<"bar">>}, {<<"foo2">>, <<"bar2">>}]} and turns it into a map,
%usefu because ejsons do not appear to be order-independent-comparable, but maps are (e.g., two maps are equal if all their k+v are equal but agnostic to order)
jiffy:decode(jiffy:encode(E), [return_maps]).
diff --git a/src/util_tests.erl b/src/util_tests.erl
index e37e492..78ca851 100644
--- a/src/util_tests.erl
+++ b/src/util_tests.erl
@@ -6,9 +6,9 @@
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
-%
+%
% http://www.apache.org/licenses/LICENSE-2.0
-%
+%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,10 +21,16 @@
-module(util_tests).
-include_lib("eunit/include/eunit.hrl").
-import(util, [
- iso_elapsed/2,
+ iso_elapsed/2,
ip_to_str/1,
update_with_new_config_map/2,
- ejson_to_map/1]).
+ ejson_to_map/1,
+ to_str/1
+ ]).
+
+to_str_test() ->
+ ?assert(to_str("") == ""),
+ ?assert(to_str(<<"asdf">>) == "asdf").
iso_elapsed_test() ->
?assert(iso_elapsed(<<"2017-04-27T18:38:10Z">>, <<"2017-04-27T18:38:08Z">>) == 2),