path: root/mod/distributorapi/distributor/
diff options
authorMichael Hwang <>2019-11-11 15:28:57 -0500
committerMichael Hwang <>2019-12-16 11:02:15 -0500
commit77900bb3097491cd9fca964c111ea70724e53989 (patch)
treeb23fb51b9e3c465bfcecaede1f0450acfe22e880 /mod/distributorapi/distributor/
parentc698e66797bad69b4c77b26b487bf8322989beb0 (diff)
Add distributor api projectdev-mod
Issue-ID: DCAEGEN2-1860 Signed-off-by: Michael Hwang <> Change-Id: I67aa9178b1b1830e330ca1259e8f6b30202945df
Diffstat (limited to 'mod/distributorapi/distributor/')
1 files changed, 256 insertions, 0 deletions
diff --git a/mod/distributorapi/distributor/ b/mod/distributorapi/distributor/
new file mode 100644
index 0000000..963a852
--- /dev/null
+++ b/mod/distributorapi/distributor/
@@ -0,0 +1,256 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Code for http interface"""
+import logging, json
+import uuid
+from flask import Flask
+from flask_cors import CORS
+import flask_restplus as frp
+from flask_restplus import Api, Resource, fields
+from distributor.version import __version__
+from distributor import data_access as da
+from distributor import config
+from distributor import registry_client as rc
+from distributor import onboarding_client as oc
+from distributor import runtime_client as runc
+from distributor import transform as tr
+_log = logging.getLogger("distributor.http")
+_app = Flask(__name__)
+# Try to bundle as many errors together
+_app.config['BUNDLE_ERRORS'] = True
+_api = Api(_app, version=__version__, title="Distributor HTTP API",
+ description="HTTP API to manage distribution targets for DCAE design. Distribution targets are DCAE runtime environments that have been registered and are enabled to accept flow design changes that are to be orchestrated in that environment",
+ contact="", default_mediatype="application/json"
+ , prefix="/distributor", doc="/distributor", default="distributor"
+ )
+# REVIEW: Do I need a namespace?
+ns = _api
+model_pg = _api.model("ProcessGroup", {
+ "id": fields.String(required=True, description="Id for this process group"
+ , attribute="processGroupId")
+ , "version": fields.Integer(required=True
+ , description="Version of the process group")
+ , "processed": fields.DateTime(required=True
+ , description="When this process group was processed by this API")
+ , "runtimeResponse": fields.String(required=True
+ , description="Full response from the runtime API")
+ })
+model_dt = _api.model("DistributionTarget", {
+ "selfUrl": fields.Url("resource_distribution_target", absolute=True)
+ , "id": fields.String(required=True, description="Id for this distribution target"
+ , attribute="dt_id")
+ , "name": fields.String(required=True, description="Name for this distribution target"
+ , attribute="name")
+ , "runtimeApiUrl": fields.String(required=True
+ , description="Url to the runtime API for this distribution target"
+ , attribute="runtimeApiUrl")
+ , "description": fields.String(required=False
+ , description="Description for this distribution target"
+ , attribute="description")
+ , "nextDistributionTargetId": fields.String(required=False
+ , description="Id to the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order."
+ , attribute="nextDistributionTargetId")
+ , "created": fields.String(required=True
+ , description="When this distribution target was created in UTC"
+ , attribute="created")
+ , "modified": fields.String(required=True
+ , description="When this distribution target was last modified in UTC"
+ , attribute="modified")
+ , "processGroups": fields.List(fields.Nested(model_pg))
+ })
+model_dts = _api.model("DistributionTargets", {
+ "distributionTargets": fields.List(fields.Nested(model_dt))
+ })
+parser_dt_req = ns.parser()
+parser_dt_req.add_argument("name", required=True, trim=True,
+ location="json", help="Name for this new distribution target")
+parser_dt_req.add_argument("runtimeApiUrl", required=True, trim=True,
+ location="json", help="Url to the runtime API for this distribution target")
+parser_dt_req.add_argument("description", required=False, trim=True,
+ location="json", help="Description for this distribution target")
+parser_dt_req.add_argument("nextDistributionTargetId", required=False, trim=True,
+ location="json", help="Id of the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order.")
+@ns.route("/distribution-targets", endpoint="resource_distribution_targets")
+class DistributionTargets(Resource):
+ @ns.doc("get_distribution_targets", description="List distribution targets")
+ @ns.marshal_with(model_dts)
+ def get(self):
+ return { "distributionTargets": da.get_distribution_targets() }, 200
+ @ns.doc("post_distribution_targets", description="Create a new distribution target")
+ @ns.expect(parser_dt_req)
+ @ns.marshal_with(model_dt)
+ def post(self):
+ req = parser_dt_req.parse_args()
+ req = da.transform_request(req)
+ resp = da.add_distribution_target(req)
+ return resp, 200
+@ns.route("/distribution-targets/<string:dt_id>", endpoint="resource_distribution_target")
+class DistributionTarget(Resource):
+ @ns.doc("get_distribution_target", description="Get a distribution target instance")
+ @ns.response(404, 'Distribution target not found')
+ @ns.response(500, 'Internal Server Error')
+ @ns.marshal_with(model_dt)
+ def get(self, dt_id):
+ result = da.get_distribution_target(dt_id)
+ if result:
+ return result, 200
+ else:
+ frp.abort(code=404, message="Unknown distribution target")
+ @ns.doc("put_distribution_target", description="Update an existing distribution target")
+ @ns.response(404, 'Distribution target not found')
+ @ns.response(500, 'Internal Server Error')
+ @ns.expect(parser_dt_req)
+ @ns.marshal_with(model_dt)
+ def put(self, dt_id):
+ result = da.get_distribution_target(dt_id)
+ if not result:
+ frp.abort(code=404, message="Unknown distribution target")
+ req = parser_dt_req.parse_args()
+ updated_dt = da.merge_request(result, req)
+ if da.update_distribution_target(updated_dt):
+ return updated_dt, 200
+ else:
+ frp.abort(code=500, message="Problem with storing the update")
+ @ns.response(404, 'Distribution target not found')
+ @ns.response(500, 'Internal Server Error')
+ @ns.doc("delete_distribution_target", description="Delete an existing distribution target")
+ def delete(self, dt_id):
+ if da.delete_distribution_target(dt_id):
+ return
+ else:
+ frp.abort(code=404, message="Unknown distribution target")
+parser_post_process_group = ns.parser()
+parser_post_process_group.add_argument("processGroupId", required=True,
+ trim=True, location="json", help="Process group ID that exists in Nifi")
+@ns.route("/distribution-targets/<string:dt_id>/process-groups", endpoint="resource_target_process_groups")
+class DTargetProcessGroups(Resource):
+ @ns.response(404, 'Distribution target not found')
+ @ns.response(501, 'Feature is not supported right now')
+ @ns.response(500, 'Internal Server Error')
+ @ns.expect(parser_post_process_group)
+ def post(self, dt_id):
+ # TODO: Need bucket ID but for now will simply scan through all buckets
+ # TODO: Current impl doesn't take into consideration the last state of
+ # the distribution target e.g. what was the last design processed
+ req = parser_post_process_group.parse_args()
+ # Check existence of distribution target
+ dtarget = da.get_distribution_target(dt_id)
+ if not dtarget:
+ frp.abort(code=404, message="Unknown distribution target")
+ runtime_url = dtarget["runtimeApiUrl"]
+ pg_id = req["processGroupId"]
+ # Find flow from Nifi registry
+ try:
+ flow = rc.find_flow(config.nifi_registry_url, pg_id)
+ except Exception as e:
+ # TODO: Switch to logging
+ print(e)
+ # Assuming it'll be 404
+ frp.abort(code=404, message="Process group not found in registry")
+ pg_name = flow["name"]
+ # Make sure graph is setup in runtime api
+ if runc.ensure_graph(runtime_url, pg_id, pg_name) == False:
+ frp.abort(code=501 , message="Runtime API: Graph could not be created")
+ # Graph diffing using Nifi registry
+ flow_diff = rc.get_flow_diff_latest(config.nifi_registry_url, flow["selfUrl"])
+ if flow_diff:
+ # TODO: Not really processing diff right now instead just processing
+ # latest. Later process the diffs instead and send out the changes.
+ flow_latest = rc.get_flow_version_latest(config.nifi_registry_url, flow["selfUrl"])
+ else:
+ flow_latest = rc.get_flow_version(config.nifi_registry_url, flow["selfUrl"], 1)
+ # Get component data from onboarding API
+ components = tr.extract_components_from_flow(flow_latest)
+ try:
+ components = oc.get_components_indexed(config.onboarding_api_url, components)
+ except Exception as e:
+ # TODO: Switch to logging
+ print(e)
+ # Assuming it'll be 404
+ frp.abort(code=404, message="Component not found in onboarding API")
+ #
+ # Put everything together, post to runtime API, save
+ #
+ actions = tr.make_fbp_from_flow(flow_latest, components)
+ resp = dict(req)
+ resp["version"] = flow_latest["snapshotMetadata"]["version"]
+ resp["runtimeResponse"] = json.dumps(runc.post_graph(runtime_url, pg_id, actions))
+ resp = da.add_process_group(dt_id, resp)
+ if resp:
+ return resp, 200
+ else:
+ frp.abort(code=500, message="Could not store process group")
+def start_http_server():
+ config.init()
+ def is_debug():
+ import os
+ if os.environ.get("DISTRIBUTOR_DEBUG", "1") == "1":
+ return True
+ else:
+ return False
+ if is_debug():
+ else:
+"", port=80, debug=False)