diff options
Diffstat (limited to 'src/resource_handler.erl')
-rw-r--r-- | src/resource_handler.erl | 231 |
1 files changed, 111 insertions, 120 deletions
diff --git a/src/resource_handler.erl b/src/resource_handler.erl index 8c1b343..798f457 100644 --- a/src/resource_handler.erl +++ b/src/resource_handler.erl @@ -7,6 +7,8 @@ -export([get/3, put/3, delete/3, post/3]). -export([cross_domains/3]). +-export([appname_to_field_vals/2]). + %%for keeping state %%The application record is defined in application.hrl %%In Mnesia, the first element is the type of record and the second element is the key @@ -163,88 +165,6 @@ delete_app_helper(Appname, State, XER, Req) -> end end. -%%%CALLBACKS %%% -init(_Route, _Req, State) -> - {ok, State}. -terminate(_Reason, _Route, _Req, _State) -> - ok. -%%%FOR Cors support -%%%Note! only matches on host. Does not handle ports. See: https://github.com/s1n4/leptus/issues/55 -cross_domains(_Route, _Req, State) -> - {['_'], State}. - -%%%GET Methods -get("/", Req, State) -> - %The broker's "info" endpoint; returns some possibly useful information - {Bts, XER} = init_api_call(Req), - Apps = util:get_all_appnames_from_db(), - {UT, _} = statistics(wall_clock), - CDAPVer = cdap_interface:get_cdap_cluster_version(XER, ?CDAPURL), - RB = {[ - {<<"cdap cluster version">>, CDAPVer}, - {<<"managed cdap url">>, ?CDAPURL}, - {<<"cdap GUI port">>, cdap_interface:get_cdap_gui_port_from_version(CDAPVer)}, - {<<"number of applications registered">>, length(Apps)}, - {<<"uptime (s)">>, UT/1000}, - {<<"broker API version">>, util:get_my_version()} - ]}, - {RCode, RBody, RState} = {200, {json, RB}, State}, - ?AUDI(Req, Bts, XER, Rcode), - {RCode, RBody, RState}; -get("/application", Req, State) -> - %get a list of all registered apps - {Bts, XER} = init_api_call(Req), - {RCode, RBody, RState} = {200, {json, util:get_all_appnames_from_db()}, State}, - ?AUDI(Req, Bts, XER, Rcode), - {RCode, RBody, RState}; -get("/application/:appname", Req, State) -> - %get information about a registered application - {Bts, XER} = init_api_call(Req), - Appname = leptus_req:param(Req, appname), - {RCode, RBody, RState} = appname_to_application_http(XER, Appname, State), - ?AUDI(Req, Bts, XER, Rcode), - {RCode, RBody, RState}; -get("/application/:appname/metrics", Req, State) -> - %get metrics for a registered application - {Bts, XER} = init_api_call(Req), - Appname = leptus_req:param(Req, appname), - {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of - none -> {404, "", State}; - [<<"program-flowlet">>, Namespace] -> - {ReturnCode, ReturnBody} = cdap_interface:get_app_metrics(XER, Appname, Namespace, ?CDAPURL), %warning, see note in README, this always reutrns 200 - {ReturnCode, {json, ReturnBody}, State}; - [<<"hydrator-pipeline">>, Namespace] -> - lager:warning("WARNING, metrics not actually implemented yet for pipelines!!"), - {ReturnCode, ReturnBody} = cdap_interface:get_pipeline_metrics(Appname, Namespace, ?CDAPURL), - {ReturnCode, {json, ReturnBody}, State} - end, - ?AUDI(Req, Bts, XER, Rcode), - {RCode, RBody, RState}; -get("/application/:appname/healthcheck", Req, State) -> - %get healthcheck of an application - {Bts, XER} = init_api_call(Req), - Appname = leptus_req:param(Req, appname), - lager:info(io_lib:format("Get Healthcheck recieved for ~s", [Appname])), - {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of - none -> {404, "", State}; - [<<"program-flowlet">>, Namespace] -> - {cdap_interface:get_app_healthcheck(XER, Appname, Namespace, ?CDAPURL), "", State}; - [<<"hydrator-pipeline">>, Namespace] -> - {cdap_interface:get_pipeline_healthcheck(XER, Appname, Namespace, ?CDAPURL, ?PipelineHealthLimit), "", State} - end, - ?AUDI(Req, Bts, XER, Rcode), - {RCode, RBody, RState}. - -%%%DELETE Methods -delete("/application/:appname", Req, State) -> - %Uninstall and delete a CDAP app - {Bts, XER} = init_api_call(Req), - Appname = leptus_req:param(Req, appname), - {RCode, RBody, RState} = delete_app_helper(Appname, State, XER, Req), - ?AUDI(Req, Bts, XER, Rcode), - {RCode, RBody, RState}. - -%%%PUT Methods parse_put_body(B) -> %parse the PUT body to application try @@ -274,7 +194,7 @@ parse_put_body(B) -> maps:get(<<"program_id">>, P), maps:get(<<"program_pref">>, P)} end, maps:get(<<"program_preferences">>, Body)), - {pf, <<"program-flowlet">>, {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences}}; + {<<"program-flowlet">>, {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences}}; <<"hydrator-pipeline">> -> PipelineConfigJsonURL = maps:get(<<"pipeline_config_json_url">>, Body), @@ -295,7 +215,7 @@ parse_put_body(B) -> } end, D); false -> [] %normalize optional user input into []; just prevents user from having to explicitly pass in [] end, - {hp, <<"hydrator-pipeline">>, {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies}} + {<<"hydrator-pipeline">>, {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies}} end end catch _:_ -> invalid @@ -315,12 +235,12 @@ parse_reconfiguration_put_body(Body) -> catch _:_ -> invalid end. -handle_reconfigure_put(Req, State, XER, Appname, ReqBody, AppnameToNS) -> +handle_reconfigure_put(Req, State, XER, Appname, ReqBody) -> %handle the reconfiguration put. broker out from the http call, and takes the lookup func as an arg, to allow for better unit testing. %this is still not a pure function due to the workflows call, still needs enhancement - case AppnameToNS(Appname) of + case ?MODULE:appname_to_field_vals(Appname, [<<"namespace">>]) of none -> {404, "Reconfigure recieved but the app is not registered", State}; - Namespace -> + [Namespace] -> ParsedBody = parse_reconfiguration_put_body(ReqBody), case ParsedBody of invalid -> {400, "Invalid PUT Reconfigure Body", State}; @@ -351,37 +271,27 @@ handle_reconfigure_put(Req, State, XER, Appname, ReqBody, AppnameToNS) -> end end. -put("/application/:appname", Req, State) -> - %create a new registration; deploys and starts a cdap application - {Bts, XER} = init_api_call(Req), - Appname = leptus_req:param(Req, appname), - {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"appname">>]) of - [Appname] -> - {400, "Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first", State}; - none -> %no matches, create the resource, return the application record +handle_put(Req, State, XER, Appname, ReqBody, RequestUrl) -> + %use of ?MODULE here is due to the meck limitation described here: https://github.com/eproxus/meck + case ?MODULE:appname_to_field_vals(Appname, [<<"appname">>]) == none of + false -> {400, "Put recieved on /application/:appname but appname is already registered. Call /application/:appname/reconfigure if trying to reconfigure or delete first", State}; + true -> %Initial put requires the put body parameters - case parse_put_body(leptus_req:body_raw(Req)) of - %could not parse the body - invalid -> {400, "Invalid PUT Body or unparseable URL", State}; - - %unsupported cdap application type - unsupported -> {404, "Unsupported CDAP Application Type", State}; - - {Type, AppType, Params} -> + ParsedBody = parse_put_body(ReqBody), + case ParsedBody of + invalid -> {400, "Invalid PUT Body or unparseable URL", State}; %could not parse the body + unsupported -> {400, "Unsupported CDAP Application Type", State}; %unsupported cdap application type + {AppType, Params} -> %form shared info - %hateaos cuz they aintaos - {RequestUrl,_} = cowboy_req:url((leptus_req:get_req(Req))), Metricsurl = <<RequestUrl/binary, <<"/metrics">>/binary>>, Healthcheckurl = <<RequestUrl/binary, <<"/healthcheck">>/binary>>, try - case Type of - hp -> + case AppType of + <<"hydrator-pipeline">> -> {Namespace, Streamname, PipelineConfigJsonURL, ParsedDependencies} = Params, ConnectionURL = cdap_interface:form_stream_url_from_streamname(?CDAPURL, Namespace, Streamname), - - %TODO: This! - ServiceEndpoints = [], %unclear if this is possible with pipelines + ServiceEndpoints = [], %TODO: unclear if this is possible with pipelines %write into mnesia, deploy A = #application{appname = Appname, apptype = AppType, namespace = Namespace, healthcheckurl = Healthcheckurl, metricsurl = Metricsurl, url = RequestUrl, connectionurl = ConnectionURL, serviceendpoints = ServiceEndpoints, creationtime=erlang:system_time()}, @@ -389,7 +299,7 @@ put("/application/:appname", Req, State) -> ok = workflows:deploy_hydrator_pipeline(Req, XER, Appname, Namespace, ?CDAPURL, PipelineConfigJsonURL, ParsedDependencies, ?CONSURL, RequestUrl, Healthcheckurl, ?HCInterval, ?AutoDeregisterAfter), metrics(info, Req, [{bts, iso()}, {xer, XER}, {mod, mod()}, {msg, io_lib:format("New Hydrator Application Created: ~p", [lager:pr(A, ?MODULE)])}]), %see Record Pretty Printing: https://github.com/basho/lager ok; - pf -> + <<"program-flowlet">> -> {Namespace, Streamname, JarURL, ArtifactName, ArtifactVersion, AppConfig, AppPreferences, ParsedServices, Programs, ParsedProgramPreferences} = Params, %Form URLs that are part of the record %NOTE: These are both String concatenation functions and neither make an HTTP call so not catching normal {Code, Status} return here @@ -420,7 +330,95 @@ put("/application/:appname", Req, State) -> {500, "Please report this error", State} end end - end, + end. + +%%% HTTP API CALLBACKS %%% +init(_Route, _Req, State) -> + {ok, State}. +terminate(_Reason, _Route, _Req, _State) -> + ok. +%%%FOR Cors support +%%%Note! only matches on host. Does not handle ports. See: https://github.com/s1n4/leptus/issues/55 +cross_domains(_Route, _Req, State) -> + {['_'], State}. + +%%%GET Methods +get("/", Req, State) -> + %The broker's "info" endpoint; returns some possibly useful information + {Bts, XER} = init_api_call(Req), + Apps = util:get_all_appnames_from_db(), + {UT, _} = statistics(wall_clock), + CDAPVer = cdap_interface:get_cdap_cluster_version(XER, ?CDAPURL), + RB = {[ + {<<"cdap cluster version">>, CDAPVer}, + {<<"managed cdap url">>, ?CDAPURL}, + {<<"cdap GUI port">>, cdap_interface:get_cdap_gui_port_from_version(CDAPVer)}, + {<<"number of applications registered">>, length(Apps)}, + {<<"uptime (s)">>, UT/1000}, + {<<"broker API version">>, util:get_my_version()} + ]}, + {RCode, RBody, RState} = {200, {json, RB}, State}, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +get("/application", Req, State) -> + %get a list of all registered apps + {Bts, XER} = init_api_call(Req), + {RCode, RBody, RState} = {200, {json, util:get_all_appnames_from_db()}, State}, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +get("/application/:appname", Req, State) -> + %get information about a registered application + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + {RCode, RBody, RState} = appname_to_application_http(XER, Appname, State), + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +get("/application/:appname/metrics", Req, State) -> + %get metrics for a registered application + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of + none -> {404, "", State}; + [<<"program-flowlet">>, Namespace] -> + {ReturnCode, ReturnBody} = cdap_interface:get_app_metrics(XER, Appname, Namespace, ?CDAPURL), %warning, see note in README, this always reutrns 200 + {ReturnCode, {json, ReturnBody}, State}; + [<<"hydrator-pipeline">>, Namespace] -> + lager:warning("WARNING, metrics not actually implemented yet for pipelines!!"), + {ReturnCode, ReturnBody} = cdap_interface:get_pipeline_metrics(Appname, Namespace, ?CDAPURL), + {ReturnCode, {json, ReturnBody}, State} + end, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}; +get("/application/:appname/healthcheck", Req, State) -> + %get healthcheck of an application + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + lager:info(io_lib:format("Get Healthcheck recieved for ~s", [Appname])), + {RCode, RBody, RState} = case appname_to_field_vals(Appname, [<<"apptype">>, <<"namespace">>]) of + none -> {404, "", State}; + [<<"program-flowlet">>, Namespace] -> + {cdap_interface:get_app_healthcheck(XER, Appname, Namespace, ?CDAPURL), "", State}; + [<<"hydrator-pipeline">>, Namespace] -> + {cdap_interface:get_pipeline_healthcheck(XER, Appname, Namespace, ?CDAPURL, ?PipelineHealthLimit), "", State} + end, + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}. +%%%DELETE Methods +delete("/application/:appname", Req, State) -> + %Uninstall and delete a CDAP app + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + {RCode, RBody, RState} = delete_app_helper(Appname, State, XER, Req), + ?AUDI(Req, Bts, XER, Rcode), + {RCode, RBody, RState}. +%%%PUT Methods +put("/application/:appname", Req, State) -> + %create a new registration; deploys and starts a cdap application + {Bts, XER} = init_api_call(Req), + Appname = leptus_req:param(Req, appname), + ReqBody = leptus_req:body_raw(Req), + {RequestUrl,_} = cowboy_req:url((leptus_req:get_req(Req))), + {RCode, RBody, RState} = handle_put(Req, State, XER, Appname, ReqBody, RequestUrl), ?AUDI(Req, Bts, XER, Rcode), {RCode, RBody, RState}; put("/application/:appname/reconfigure", Req, State) -> @@ -428,14 +426,7 @@ put("/application/:appname/reconfigure", Req, State) -> {Bts, XER} = init_api_call(Req), Appname = leptus_req:param(Req, appname), ReqBody = leptus_req:body_raw(Req), - AppnameToNS = fun(App) -> - X = appname_to_field_vals(App, [<<"namespace">>]), - case X of - none -> X; - [Namespace] -> Namespace - end - end, - {RCode, RBody, RState} = handle_reconfigure_put(Req, State, XER, Appname, ReqBody, AppnameToNS), + {RCode, RBody, RState} = handle_reconfigure_put(Req, State, XER, Appname, ReqBody), ?AUDI(Req, Bts, XER, Rcode), {RCode, RBody, RState}. |