aboutsummaryrefslogtreecommitdiffstats
path: root/src/resource_handler.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/resource_handler.erl')
-rw-r--r--src/resource_handler.erl231
1 files changed, 111 insertions, 120 deletions
diff --git a/src/resource_handler.erl b/src/resource_handler.erl
index 8c1b343..798f457 100644
--- a/src/resource_handler.erl
+++ b/src/resource_handler.erl
@@ -7,6 +7,8 @@
-export([get/3, put/3, delete/3, post/3]).
-export([cross_domains/3]).
+-export([appname_to_field_vals/2]).
+
%%for keeping state
%%The application record is defined in application.hrl
%%In Mnesia, the first element is the type of record and the second element is the key
@@ -163,88 +165,6 @@ delete_app_helper(Appname, State, XER, Req) ->
end
end.
-%%%CALLBACKS %%%
-init(_Route, _Req, State) ->
- {ok, State}.
-terminate(_Reason, _Route, _Req, _State) ->
- ok.
-%%%FOR Cors support
-%%%Note! only matches on host. Does not handle ports. See: https://github.com/s1n4/leptus/issues/55
-cross_domains(_Route, _Req, State) ->
- {['_'], State}.
-
-%%%GET Methods
-get("/", Req, State) ->
- %The broker's "info" endpoint; returns some possibly useful information
- {Bts, XER} = init_api_call(Req),
- Apps = util:get_all_appnames_from_db(),
- {UT, _} = statistics(wall_clock),
- CDAPVer = cdap_interface:get_cdap_cluster_version(XER, ?CDAPURL),
- RB = {[
- {<<"cdap cluster version">>, CDAPVer},
- {<<"managed cdap url">>, ?CDAPURL},
- {<<"cdap GUI port">>, cdap_interface:get_cdap_gui_port_from_version(CDAPVer)},
- {<<"number of applications registered">>, length(Apps)},
- {<<"uptime (s)">>, UT/1000},
- {<<"broker API version">>, util:get_my_version()}
- ]},
- {RCode, RBody, RState} = {200, {json, RB}, State},
- ?AUDI(Req, Bts, XER, Rcode),
- {RCode, RBody, RState};
-get("/application", Req, State) ->
- %get a list of all registered apps
- {Bts, XER} = init_api_call(Req),
- {RCode, RBody, RState} = {200, {json, util:get_all_appnames_from_db()}, State},
- ?AUDI(Req, Bts, XER, Rcode),
- {RCode, RBody, RState};
-get("/application/:appname", Req, State) ->
- %get information about a registered application
- {Bts, XER} = init_api_call(Req),
- Appname = leptus_req:param(Req, appname),
- {RCode, RBody, RState} = appname_to_application_http(XER, Appname, State),
- ?AUDI(Req, Bts, XER, Rcode),
- {RCode, RBody, RState};
-get("/application/:appname/metrics", Req, State) ->
- %get metrics for a registered application
- {Bts, XER} = init_api_call(Req),
- Appname = leptus_req:param(Req, appname),
- {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of
- none -> {404, "", State};
- [<<"program-flowlet">>, Namespace] ->
- {ReturnCode, ReturnBody} = cdap_interface:get_app_metrics(XER, Appname, Namespace, ?CDAPURL), %warning, see note in README, this always reutrns 200
- {ReturnCode, {json, ReturnBody}, State};
- [<<"hydrator-pipeline">>, Namespace] ->
- lager:warning("WARNING, metrics not actually implemented yet for pipelines!!"),
- {ReturnCode, ReturnBody} = cdap_interface:get_pipeline_metrics(Appname, Namespace, ?CDAPURL),
- {ReturnCode, {json, ReturnBody}, State}
- end,
- ?AUDI(Req, Bts, XER, Rcode),
- {RCode, RBody, RState};
-get("/application/:appname/healthcheck", Req, State) ->
- %get healthcheck of an application
- {Bts, XER} = init_api_call(Req),
- Appname = leptus_req:param(Req, appname),
- lager:info(io_lib:format("Get Healthcheck recieved for ~s", [Appname])),
- {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of
- none -> {404, "", State};
- [<<"program-flowlet">>, Namespace] ->
- {cdap_interface:get_app_healthcheck(XER, Appname, Namespace, ?CDAPURL), "", State};
- [<<"hydrator-pipeline">>, Namespace] ->
- {cdap_interface:get_pipeline_healthcheck(XER, Appname, Namespace, ?CDAPURL, ?PipelineHealthLimit), "", State}
- end,
- ?AUDI(Req, Bts, XER, Rcode),
- {RCode, RBody, RState}.
-
-%%%DELETE Methods
-delete("/application/:appname", Req, State) ->
- %Uninstall and delete a CDAP app
- {Bts, XER} = init_api_call(Req),
- Appname = leptus_req:param(Req, appname),
- {RCode, RBody, RState} = delete_app_helper(Appname, State, XER, Req),
- ?AUDI(Req, Bts, XER, Rcode),
- {RCode, RBody, RState}.
-
-%%%PUT Methods
parse_put_body(B) ->
%parse the PUT body to application
try
@@ -274,7 +194,7 @@ parse_put_body(B) ->
maps:get(<<"program_id">>, P),
maps:get(<<"program_pref">>, P)}
end, maps:get(<<"program_preferences">>, Body)),
- {pf, <<"program-flowlet">>, {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences}};
+ {<<"program-flowlet">>, {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences}};
<<"hydrator-pipeline">> ->
PipelineConfigJsonURL = maps:get(<<"pipeline_config_json_url">>, Body),
@@ -295,7 +215,7 @@ parse_put_body(B) ->
} end, D);
false -> [] %normalize optional user input into []; just prevents user from having to explicitly pass in []
end,
- {hp, <<"hydrator-pipeline">>, {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies}}
+ {<<"hydrator-pipeline">>, {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies}}
end
end
catch _:_ -> invalid
@@ -315,12 +235,12 @@ parse_reconfiguration_put_body(Body) ->
catch _:_ -> invalid
end.
-handle_reconfigure_put(Req, State, XER, Appname, ReqBody, AppnameToNS) ->
+handle_reconfigure_put(Req, State, XER, Appname, ReqBody) ->
%handle the reconfiguration put. broker out from the http call, and takes the lookup func as an arg, to allow for better unit testing.
%this is still not a pure function due to the workflows call, still needs enhancement
- case AppnameToNS(Appname) of
+ case ?MODULE:appname_to_field_vals(Appname, [<<"namespace">>]) of
none -> {404, "Reconfigure recieved but the app is not registered", State};
- Namespace ->
+ [Namespace] ->
ParsedBody = parse_reconfiguration_put_body(ReqBody),
case ParsedBody of
invalid -> {400, "Invalid PUT Reconfigure Body", State};
@@ -351,37 +271,27 @@ handle_reconfigure_put(Req, State, XER, Appname, ReqBody, AppnameToNS) ->
end
end.
-put("/application/:appname", Req, State) ->
- %create a new registration; deploys and starts a cdap application
- {Bts, XER} = init_api_call(Req),
- Appname = leptus_req:param(Req, appname),
- {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"appname">>]) of
- [Appname] ->
- {400, "Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first", State};
- none -> %no matches, create the resource, return the application record
+handle_put(Req, State, XER, Appname, ReqBody, RequestUrl) ->
+ %use of ?MODULE here is due to the meck limitation described here: https://github.com/eproxus/meck
+ case ?MODULE:appname_to_field_vals(Appname, [<<"appname">>]) == none of
+ false -> {400, "Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first", State};
+ true ->
%Initial put requires the put body parameters
- case parse_put_body(leptus_req:body_raw(Req)) of
- %could not parse the body
- invalid -> {400, "Invalid PUT Body or unparseable URL", State};
-
- %unsupported cdap application type
- unsupported -> {404, "Unsupported CDAP Application Type", State};
-
- {Type, AppType, Params} ->
+ ParsedBody = parse_put_body(ReqBody),
+ case ParsedBody of
+ invalid -> {400, "Invalid PUT Body or unparseable URL", State}; %could not parse the body
+ unsupported -> {400, "Unsupported CDAP Application Type", State}; %unsupported cdap application type
+ {AppType, Params} ->
%form shared info
- %hateaos cuz they aintaos
- {RequestUrl,_} = cowboy_req:url((leptus_req:get_req(Req))),
Metricsurl = <<RequestUrl/binary, <<"/metrics">>/binary>>,
Healthcheckurl = <<RequestUrl/binary, <<"/healthcheck">>/binary>>,
try
- case Type of
- hp ->
+ case AppType of
+ <<"hydrator-pipeline">> ->
{Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies} = Params,
ConnectionURL = cdap_interface:form_stream_url_from_streamname(?CDAPURL, Namespace, Streamname),
-
- %TODO: This!
- ServiceEndpoints = [], %unclear if this is possible with pipelines
+ ServiceEndpoints = [], %TODO: unclear if this is possible with pipelines
%write into mnesia, deploy
A = #application{appname = Appname, apptype = AppType, namespace = Namespace, healthcheckurl = Healthcheckurl, metricsurl = Metricsurl, url = RequestUrl, connectionurl = ConnectionURL, serviceendpoints = ServiceEndpoints, creationtime=erlang:system_time()},
@@ -389,7 +299,7 @@ put("/application/:appname", Req, State) ->
ok = workflows:deploy_hydrator_pipeline(Req, XER, Appname, Namespace, ?CDAPURL, PipelineConfigJsonURL, ParsedDependencies, ?CONSURL, RequestUrl, Healthcheckurl, ?HCInterval, ?AutoDeregisterAfter),
metrics(info, Req, [{bts, iso()}, {xer, XER}, {mod, mod()}, {msg, io_lib:format("New Hydrator Application Created: ~p", [lager:pr(A, ?MODULE)])}]), %see Record Pretty Printing: https://github.com/basho/lager
ok;
- pf ->
+ <<"program-flowlet">> ->
{Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences} = Params,
%Form URLs that are part of the record
%NOTE: These are both String concatenation functions and neither make an HTTP call so not catching normal {Code, Status} return here
@@ -420,7 +330,95 @@ put("/application/:appname", Req, State) ->
{500, "Please report this error", State}
end
end
- end,
+ end.
+
+%%% HTTP API CALLBACKS %%%
+init(_Route, _Req, State) ->
+ {ok, State}.
+terminate(_Reason, _Route, _Req, _State) ->
+ ok.
+%%%FOR Cors support
+%%%Note! only matches on host. Does not handle ports. See: https://github.com/s1n4/leptus/issues/55
+cross_domains(_Route, _Req, State) ->
+ {['_'], State}.
+
+%%%GET Methods
+get("/", Req, State) ->
+ %The broker's "info" endpoint; returns some possibly useful information
+ {Bts, XER} = init_api_call(Req),
+ Apps = util:get_all_appnames_from_db(),
+ {UT, _} = statistics(wall_clock),
+ CDAPVer = cdap_interface:get_cdap_cluster_version(XER, ?CDAPURL),
+ RB = {[
+ {<<"cdap cluster version">>, CDAPVer},
+ {<<"managed cdap url">>, ?CDAPURL},
+ {<<"cdap GUI port">>, cdap_interface:get_cdap_gui_port_from_version(CDAPVer)},
+ {<<"number of applications registered">>, length(Apps)},
+ {<<"uptime (s)">>, UT/1000},
+ {<<"broker API version">>, util:get_my_version()}
+ ]},
+ {RCode, RBody, RState} = {200, {json, RB}, State},
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+get("/application", Req, State) ->
+ %get a list of all registered apps
+ {Bts, XER} = init_api_call(Req),
+ {RCode, RBody, RState} = {200, {json, util:get_all_appnames_from_db()}, State},
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+get("/application/:appname", Req, State) ->
+ %get information about a registered application
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ {RCode, RBody, RState} = appname_to_application_http(XER, Appname, State),
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+get("/application/:appname/metrics", Req, State) ->
+ %get metrics for a registered application
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of
+ none -> {404, "", State};
+ [<<"program-flowlet">>, Namespace] ->
+ {ReturnCode, ReturnBody} = cdap_interface:get_app_metrics(XER, Appname, Namespace, ?CDAPURL), %warning, see note in README, this always reutrns 200
+ {ReturnCode, {json, ReturnBody}, State};
+ [<<"hydrator-pipeline">>, Namespace] ->
+ lager:warning("WARNING, metrics not actually implemented yet for pipelines!!"),
+ {ReturnCode, ReturnBody} = cdap_interface:get_pipeline_metrics(Appname, Namespace, ?CDAPURL),
+ {ReturnCode, {json, ReturnBody}, State}
+ end,
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState};
+get("/application/:appname/healthcheck", Req, State) ->
+ %get healthcheck of an application
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ lager:info(io_lib:format("Get Healthcheck recieved for ~s", [Appname])),
+ {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of
+ none -> {404, "", State};
+ [<<"program-flowlet">>, Namespace] ->
+ {cdap_interface:get_app_healthcheck(XER, Appname, Namespace, ?CDAPURL), "", State};
+ [<<"hydrator-pipeline">>, Namespace] ->
+ {cdap_interface:get_pipeline_healthcheck(XER, Appname, Namespace, ?CDAPURL, ?PipelineHealthLimit), "", State}
+ end,
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState}.
+%%%DELETE Methods
+delete("/application/:appname", Req, State) ->
+ %Uninstall and delete a CDAP app
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ {RCode, RBody, RState} = delete_app_helper(Appname, State, XER, Req),
+ ?AUDI(Req, Bts, XER, Rcode),
+ {RCode, RBody, RState}.
+%%%PUT Methods
+put("/application/:appname", Req, State) ->
+ %create a new registration; deploys and starts a cdap application
+ {Bts, XER} = init_api_call(Req),
+ Appname = leptus_req:param(Req, appname),
+ ReqBody = leptus_req:body_raw(Req),
+ {RequestUrl,_} = cowboy_req:url((leptus_req:get_req(Req))),
+ {RCode, RBody, RState} = handle_put(Req, State, XER, Appname, ReqBody, RequestUrl),
?AUDI(Req, Bts, XER, Rcode),
{RCode, RBody, RState};
put("/application/:appname/reconfigure", Req, State) ->
@@ -428,14 +426,7 @@ put("/application/:appname/reconfigure", Req, State) ->
{Bts, XER} = init_api_call(Req),
Appname = leptus_req:param(Req, appname),
ReqBody = leptus_req:body_raw(Req),
- AppnameToNS = fun(App) ->
- X = appname_to_field_vals(App, [<<"namespace">>]),
- case X of
- none -> X;
- [Namespace] -> Namespace
- end
- end,
- {RCode, RBody, RState} = handle_reconfigure_put(Req, State, XER, Appname, ReqBody, AppnameToNS),
+ {RCode, RBody, RState} = handle_reconfigure_put(Req, State, XER, Appname, ReqBody),
?AUDI(Req, Bts, XER, Rcode),
{RCode, RBody, RState}.