diff options
Diffstat (limited to 'mod/distributorapi/distributor')
-rw-r--r-- | mod/distributorapi/distributor/config.py | 9 | ||||
-rw-r--r-- | mod/distributorapi/distributor/data_access.py | 9 | ||||
-rw-r--r-- | mod/distributorapi/distributor/errors.py | 5 | ||||
-rw-r--r-- | mod/distributorapi/distributor/http.py | 155 | ||||
-rw-r--r-- | mod/distributorapi/distributor/onboarding_client.py | 6 | ||||
-rw-r--r-- | mod/distributorapi/distributor/registry_client.py | 14 | ||||
-rw-r--r-- | mod/distributorapi/distributor/runtime_client.py | 6 | ||||
-rw-r--r-- | mod/distributorapi/distributor/transform.py | 68 | ||||
-rw-r--r-- | mod/distributorapi/distributor/utils.py | 4 | ||||
-rw-r--r-- | mod/distributorapi/distributor/version.py | 4 |
10 files changed, 154 insertions, 126 deletions
diff --git a/mod/distributorapi/distributor/config.py b/mod/distributorapi/distributor/config.py index d83eee1..2195220 100644 --- a/mod/distributorapi/distributor/config.py +++ b/mod/distributorapi/distributor/config.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -28,11 +28,10 @@ def _grab_env(name, default=None): except KeyError: raise errors.DistributorAPIConfigError("Required environment variable missing: {0}".format(name)) + def init(): global nifi_registry_url - nifi_registry_url = _grab_env("NIFI_REGISTRY_URL" - , default="http://nifi-registry:18080/nifi-registry-api") + nifi_registry_url = _grab_env("NIFI_REGISTRY_URL", default="http://nifi-registry:18080/nifi-registry-api") global onboarding_api_url - onboarding_api_url = _grab_env("ONBOARDING_API_URL" - , default="http://onboarding-api:8080/onboarding") + onboarding_api_url = _grab_env("ONBOARDING_API_URL", default="http://onboarding-api:8080/onboarding") diff --git a/mod/distributorapi/distributor/data_access.py b/mod/distributorapi/distributor/data_access.py index e1a45e3..5b544d8 100644 --- a/mod/distributorapi/distributor/data_access.py +++ b/mod/distributorapi/distributor/data_access.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,9 +29,10 @@ def get_distribution_targets(): def get_distribution_target(ds_id): global _cache - result = [ i for i in _cache if i["dt_id"] == ds_id ] + result = [i for i in _cache if i["dt_id"] == ds_id] return result[0] if result else {} + def transform_request(req): """Transform request to object to store @@ -44,6 +45,7 @@ def transform_request(req): req["processGroups"] = [] return req + def add_distribution_target(dt): global _cache _cache.append(dt) @@ -58,6 +60,7 @@ def merge_request(dt, req): dt["modified"] = datetime.utcnow().isoformat() return dt + def update_distribution_target(updated_dt): dt_id = updated_dt["dt_id"] global _cache @@ -85,5 +88,3 @@ def add_process_group(ds_id, process_group): dt["processGroups"].append(process_group) return process_group return None - - diff --git a/mod/distributorapi/distributor/errors.py b/mod/distributorapi/distributor/errors.py index e28b5f5..04779b0 100644 --- a/mod/distributorapi/distributor/errors.py +++ b/mod/distributorapi/distributor/errors.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,11 +15,14 @@ # ============LICENSE_END========================================================= """Errors""" + class DistributorAPIError(RuntimeError): pass + class DistributorAPIConfigError(DistributorAPIError): pass + class DistributorAPIResourceNotFound(DistributorAPIError): pass diff --git a/mod/distributorapi/distributor/http.py b/mod/distributorapi/distributor/http.py index f1aa2fd..e367b19 100644 --- a/mod/distributorapi/distributor/http.py +++ b/mod/distributorapi/distributor/http.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -36,64 +36,82 @@ _app = Flask(__name__) CORS(_app) # Try to bundle as many errors together # https://flask-restplus.readthedocs.io/en/stable/parsing.html#error-handling -_app.config['BUNDLE_ERRORS'] = True -_api = Api(_app, version=__version__, title="Distributor HTTP API", - description="HTTP API to manage distribution targets for DCAE design. Distribution targets are DCAE runtime environments that have been registered and are enabled to accept flow design changes that are to be orchestrated in that environment", - contact="", default_mediatype="application/json" - , prefix="/distributor", doc="/distributor", default="distributor" - ) +_app.config["BUNDLE_ERRORS"] = True +_api = Api( + _app, + version=__version__, + title="Distributor HTTP API", + description="HTTP API to manage distribution targets for DCAE design. Distribution targets are DCAE runtime environments that have been registered and are enabled to accept flow design changes that are to be orchestrated in that environment", + contact="", + default_mediatype="application/json", + prefix="/distributor", + doc="/distributor", + default="distributor", +) # REVIEW: Do I need a namespace? ns = _api -model_pg = _api.model("ProcessGroup", { - "id": fields.String(required=True, description="Id for this process group" - , attribute="processGroupId") - , "version": fields.Integer(required=True - , description="Version of the process group") - , "processed": fields.DateTime(required=True - , description="When this process group was processed by this API") - , "runtimeResponse": fields.String(required=True - , description="Full response from the runtime API") - }) - -model_dt = _api.model("DistributionTarget", { - "selfUrl": fields.Url("resource_distribution_target", absolute=True) - , "id": fields.String(required=True, description="Id for this distribution target" - , attribute="dt_id") - , "name": fields.String(required=True, description="Name for this distribution target" - , attribute="name") - , "runtimeApiUrl": fields.String(required=True - , description="Url to the runtime API for this distribution target" - , attribute="runtimeApiUrl") - , "description": fields.String(required=False - , description="Description for this distribution target" - , attribute="description") - , "nextDistributionTargetId": fields.String(required=False - , description="Id to the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order." - , attribute="nextDistributionTargetId") - , "created": fields.String(required=True - , description="When this distribution target was created in UTC" - , attribute="created") - , "modified": fields.String(required=True - , description="When this distribution target was last modified in UTC" - , attribute="modified") - , "processGroups": fields.List(fields.Nested(model_pg)) - }) - -model_dts = _api.model("DistributionTargets", { - "distributionTargets": fields.List(fields.Nested(model_dt)) - }) +model_pg = _api.model( + "ProcessGroup", + { + "id": fields.String(required=True, description="Id for this process group", attribute="processGroupId"), + "version": fields.Integer(required=True, description="Version of the process group"), + "processed": fields.DateTime(required=True, description="When this process group was processed by this API"), + "runtimeResponse": fields.String(required=True, description="Full response from the runtime API"), + }, +) + +model_dt = _api.model( + "DistributionTarget", + { + "selfUrl": fields.Url("resource_distribution_target", absolute=True), + "id": fields.String(required=True, description="Id for this distribution target", attribute="dt_id"), + "name": fields.String(required=True, description="Name for this distribution target", attribute="name"), + "runtimeApiUrl": fields.String( + required=True, description="Url to the runtime API for this distribution target", attribute="runtimeApiUrl" + ), + "description": fields.String( + required=False, description="Description for this distribution target", attribute="description" + ), + "nextDistributionTargetId": fields.String( + required=False, + description="Id to the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order.", + attribute="nextDistributionTargetId", + ), + "created": fields.String( + required=True, description="When this distribution target was created in UTC", attribute="created" + ), + "modified": fields.String( + required=True, description="When this distribution target was last modified in UTC", attribute="modified" + ), + "processGroups": fields.List(fields.Nested(model_pg)), + }, +) + +model_dts = _api.model("DistributionTargets", {"distributionTargets": fields.List(fields.Nested(model_dt))}) parser_dt_req = ns.parser() -parser_dt_req.add_argument("name", required=True, trim=True, - location="json", help="Name for this new distribution target") -parser_dt_req.add_argument("runtimeApiUrl", required=True, trim=True, - location="json", help="Url to the runtime API for this distribution target") -parser_dt_req.add_argument("description", required=False, trim=True, - location="json", help="Description for this distribution target") -parser_dt_req.add_argument("nextDistributionTargetId", required=False, trim=True, - location="json", help="Id of the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order.") +parser_dt_req.add_argument( + "name", required=True, trim=True, location="json", help="Name for this new distribution target" +) +parser_dt_req.add_argument( + "runtimeApiUrl", + required=True, + trim=True, + location="json", + help="Url to the runtime API for this distribution target", +) +parser_dt_req.add_argument( + "description", required=False, trim=True, location="json", help="Description for this distribution target" +) +parser_dt_req.add_argument( + "nextDistributionTargetId", + required=False, + trim=True, + location="json", + help="Id of the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order.", +) @ns.route("/distribution-targets", endpoint="resource_distribution_targets") @@ -101,7 +119,7 @@ class DistributionTargets(Resource): @ns.doc("get_distribution_targets", description="List distribution targets") @ns.marshal_with(model_dts) def get(self): - return { "distributionTargets": da.get_distribution_targets() }, 200 + return {"distributionTargets": da.get_distribution_targets()}, 200 @ns.doc("post_distribution_targets", description="Create a new distribution target") @ns.expect(parser_dt_req) @@ -112,11 +130,12 @@ class DistributionTargets(Resource): resp = da.add_distribution_target(req) return resp, 200 + @ns.route("/distribution-targets/<string:dt_id>", endpoint="resource_distribution_target") class DistributionTarget(Resource): @ns.doc("get_distribution_target", description="Get a distribution target instance") - @ns.response(404, 'Distribution target not found') - @ns.response(500, 'Internal Server Error') + @ns.response(404, "Distribution target not found") + @ns.response(500, "Internal Server Error") @ns.marshal_with(model_dt) def get(self, dt_id): result = da.get_distribution_target(dt_id) @@ -127,8 +146,8 @@ class DistributionTarget(Resource): frp.abort(code=404, message="Unknown distribution target") @ns.doc("put_distribution_target", description="Update an existing distribution target") - @ns.response(404, 'Distribution target not found') - @ns.response(500, 'Internal Server Error') + @ns.response(404, "Distribution target not found") + @ns.response(500, "Internal Server Error") @ns.expect(parser_dt_req) @ns.marshal_with(model_dt) def put(self, dt_id): @@ -145,8 +164,8 @@ class DistributionTarget(Resource): else: frp.abort(code=500, message="Problem with storing the update") - @ns.response(404, 'Distribution target not found') - @ns.response(500, 'Internal Server Error') + @ns.response(404, "Distribution target not found") + @ns.response(500, "Internal Server Error") @ns.doc("delete_distribution_target", description="Delete an existing distribution target") def delete(self, dt_id): if da.delete_distribution_target(dt_id): @@ -156,15 +175,16 @@ class DistributionTarget(Resource): parser_post_process_group = ns.parser() -parser_post_process_group.add_argument("processGroupId", required=True, - trim=True, location="json", help="Process group ID that exists in Nifi") +parser_post_process_group.add_argument( + "processGroupId", required=True, trim=True, location="json", help="Process group ID that exists in Nifi" +) + @ns.route("/distribution-targets/<string:dt_id>/process-groups", endpoint="resource_target_process_groups") class DTargetProcessGroups(Resource): - - @ns.response(404, 'Distribution target not found') - @ns.response(501, 'Feature is not supported right now') - @ns.response(500, 'Internal Server Error') + @ns.response(404, "Distribution target not found") + @ns.response(501, "Feature is not supported right now") + @ns.response(500, "Internal Server Error") @ns.expect(parser_post_process_group) def post(self, dt_id): # TODO: Need bucket ID but for now will simply scan through all buckets @@ -198,7 +218,7 @@ class DTargetProcessGroups(Resource): # Make sure graph is setup in runtime api if runc.ensure_graph(runtime_url, pg_id, pg_name) == False: - frp.abort(code=501 , message="Runtime API: Graph could not be created") + frp.abort(code=501, message="Runtime API: Graph could not be created") # Graph diffing using Nifi registry @@ -245,6 +265,7 @@ def start_http_server(): def is_debug(): import os + if os.environ.get("DISTRIBUTOR_DEBUG", "1") == "1": return True else: diff --git a/mod/distributorapi/distributor/onboarding_client.py b/mod/distributorapi/distributor/onboarding_client.py index d7b0780..9665561 100644 --- a/mod/distributorapi/distributor/onboarding_client.py +++ b/mod/distributorapi/distributor/onboarding_client.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -30,6 +30,4 @@ def get_component(onboarding_url, name, version): def get_components_indexed(onboarding_url, list_name_version): - return dict([ - ((c[0], c[1]), get_component(onboarding_url, c[0], c[1])) - for c in list_name_version]) + return dict([((c[0], c[1]), get_component(onboarding_url, c[0], c[1])) for c in list_name_version]) diff --git a/mod/distributorapi/distributor/registry_client.py b/mod/distributorapi/distributor/registry_client.py index 5d437e7..372099a 100644 --- a/mod/distributorapi/distributor/registry_client.py +++ b/mod/distributorapi/distributor/registry_client.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ def _add_url_from_link(registry_url, obj): for k, v in obj.items(): if k == "link": - result["selfUrl"] =_urljoin(registry_url, v["href"]) + result["selfUrl"] = _urljoin(registry_url, v["href"]) result[k] = v elif type(v) == dict: result[k] = _add_url_from_link(registry_url, v) @@ -63,13 +63,14 @@ def get_flow_versions(flow_url): """Returns list of versions from greatest to least for a given flow""" versions_url = _urljoin(flow_url, "versions") # List of versions will be greatest to least - return list(reversed(sorted( - [v["version"] for v in _get_json(versions_url)]))) + return list(reversed(sorted([v["version"] for v in _get_json(versions_url)]))) + def get_flow_diff(registry_url, flow_url, version_one, version_two): diff_url = _urljoin(flow_url, "diff", str(version_one), str(version_two)) return _get_json(diff_url) + def get_flow_diff_latest(registry_url, flow_url): versions = get_flow_versions(flow_url) @@ -80,12 +81,13 @@ def get_flow_diff_latest(registry_url, flow_url): return None else: # Example in gitlab wiki shows that lower version is first - return _add_url_from_link(registry_url - , get_flow_diff(registry_url, flow_url, versions[1], versions[0])) + return _add_url_from_link(registry_url, get_flow_diff(registry_url, flow_url, versions[1], versions[0])) + def get_flow_version(registry_url, flow_url, version): version_url = _urljoin(flow_url, "versions", str(version)) return _add_url_from_link(registry_url, _get_json(version_url)) + def get_flow_version_latest(registry_url, flow_url): return get_flow_version(registry_url, flow_url, "latest") diff --git a/mod/distributorapi/distributor/runtime_client.py b/mod/distributorapi/distributor/runtime_client.py index 7cd06ae..2fc2ccd 100644 --- a/mod/distributorapi/distributor/runtime_client.py +++ b/mod/distributorapi/distributor/runtime_client.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,8 +29,7 @@ def get_graph(runtime_url, graph_id): def create_graph(runtime_url, graph_id, graph_name): url = urljoin(runtime_url, "api/graph/main") - resp = reqs.post(url, json={"name": graph_name, "id": graph_id - , "description": "", "main": True}) + resp = reqs.post(url, json={"name": graph_name, "id": graph_id, "description": "", "main": True}) try: resp.raise_for_status() @@ -60,6 +59,7 @@ def post_graph(runtime_url, graph_id, actions): except Exception as e: with open("runtime-request-failed.json", "w+") as f: import json + json.dump(graph_request, f) raise errors.DistributorAPIError(e) diff --git a/mod/distributorapi/distributor/transform.py b/mod/distributorapi/distributor/transform.py index 9654249..5ca1277 100644 --- a/mod/distributorapi/distributor/transform.py +++ b/mod/distributorapi/distributor/transform.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ def extract_components_from_flow(flow): and create a list of tuples where each tuple is (component name, component version)""" extract = lambda p: (p["bundle"]["artifact"], p["bundle"]["version"]) - return [ extract(p) for p in flow["flowContents"]["processors"] ] + return [extract(p) for p in flow["flowContents"]["processors"]] def get_component(flow, components, processor_id): @@ -32,8 +32,7 @@ def get_component(flow, components, processor_id): bundle = p["bundle"] return components.get((bundle["artifact"], bundle["version"]), None) - cs = [get_component(p) for p in flow["flowContents"]["processors"] \ - if p["identifier"] == processor_id] + cs = [get_component(p) for p in flow["flowContents"]["processors"] if p["identifier"] == processor_id] return cs[0] if cs else None @@ -80,10 +79,10 @@ def make_fbp_from_flow(flow, components: "dict of (name, version) to components" if rels_pubs: _, _, _, transport_type, config_key = rels_pubs[0].split(":") - src = { "node": comp["id"], "port": config_key } + src = {"node": comp["id"], "port": config_key} else: # REVIEW: This should be an error? - src = { "node": comp["id"], "port": None } + src = {"node": comp["id"], "port": None} else: src = {} @@ -100,38 +99,43 @@ def make_fbp_from_flow(flow, components: "dict of (name, version) to components" if rels_subs: _, _, _, transport_type, config_key = rels_subs[0].split(":") - tgt = { "node": comp["id"], "port": config_key } + tgt = {"node": comp["id"], "port": config_key} else: # REVIEW: This should be an error? - tgt = { "node": comp["id"], "port": None } + tgt = {"node": comp["id"], "port": None} else: tgt = {} - return { "command": "addedge" - , "payload": { - "src": src - , "tgt": tgt - , "metadata": { - "name": conn["name"] - # TODO: Question these hardcoded attributes - , "data_type": "json" - , "dmaap_type": "MR" - } - } - } + return { + "command": "addedge", + "payload": { + "src": src, + "tgt": tgt, + "metadata": { + "name": conn["name"] + # TODO: Question these hardcoded attributes + , + "data_type": "json", + "dmaap_type": "MR", + }, + }, + } def parse_processor(p): c = components[(p["bundle"]["artifact"], p["bundle"]["version"])] - return { "command": "addnode" - # TODO: spec is required to be a json string but runtime api - # changing this soon hopefully - , "payload": { "component_spec": json.dumps(c["spec"]) - , "component_id": c["id"] - , "name": c["name"] - , "processor": p } - } - - ps = [ parse_processor(p) for p in flow["flowContents"]["processors"] ] - cs = [ parse_connection(c) for c in flow["flowContents"]["connections"] ] - return ps+cs + return { + "command": "addnode" + # TODO: spec is required to be a json string but runtime api + # changing this soon hopefully + , + "payload": { + "component_spec": json.dumps(c["spec"]), + "component_id": c["id"], + "name": c["name"], + "processor": p, + }, + } + ps = [parse_processor(p) for p in flow["flowContents"]["processors"]] + cs = [parse_connection(c) for c in flow["flowContents"]["connections"]] + return ps + cs diff --git a/mod/distributorapi/distributor/utils.py b/mod/distributorapi/distributor/utils.py index 7457d5a..c8449bf 100644 --- a/mod/distributorapi/distributor/utils.py +++ b/mod/distributorapi/distributor/utils.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ def urljoin(base, *trailing, **query_params): url = "/".join(full) if query_params: - qp = ["{0}={1}".format(quote(k), quote(str(v))) for k,v in query_params.items()] + qp = ["{0}={1}".format(quote(k), quote(str(v))) for k, v in query_params.items()] qp = "&".join(qp) return "?".join([url, qp]) else: diff --git a/mod/distributorapi/distributor/version.py b/mod/distributorapi/distributor/version.py index 9da6f0f..bef5bf5 100644 --- a/mod/distributorapi/distributor/version.py +++ b/mod/distributorapi/distributor/version.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -__version__ = "1.0.1" +__version__ = "1.1.1" |