summaryrefslogtreecommitdiffstats
path: root/mod/distributorapi/distributor
diff options
context:
space:
mode:
authorMichael Hwang <mhwang@research.att.com>2019-11-11 15:28:57 -0500
committerMichael Hwang <mhwang@research.att.com>2019-12-16 11:02:15 -0500
commit77900bb3097491cd9fca964c111ea70724e53989 (patch)
treeb23fb51b9e3c465bfcecaede1f0450acfe22e880 /mod/distributorapi/distributor
parentc698e66797bad69b4c77b26b487bf8322989beb0 (diff)
Add distributor api projectdev-mod
Issue-ID: DCAEGEN2-1860 Signed-off-by: Michael Hwang <mhwang@research.att.com> Change-Id: I67aa9178b1b1830e330ca1259e8f6b30202945df
Diffstat (limited to 'mod/distributorapi/distributor')
-rw-r--r--mod/distributorapi/distributor/__init__.py15
-rw-r--r--mod/distributorapi/distributor/config.py38
-rw-r--r--mod/distributorapi/distributor/data_access.py89
-rw-r--r--mod/distributorapi/distributor/errors.py25
-rw-r--r--mod/distributorapi/distributor/http.py256
-rw-r--r--mod/distributorapi/distributor/onboarding_client.py35
-rw-r--r--mod/distributorapi/distributor/registry_client.py91
-rw-r--r--mod/distributorapi/distributor/runtime_client.py102
-rw-r--r--mod/distributorapi/distributor/transform.py137
-rw-r--r--mod/distributorapi/distributor/utils.py43
-rw-r--r--mod/distributorapi/distributor/version.py16
11 files changed, 847 insertions, 0 deletions
diff --git a/mod/distributorapi/distributor/__init__.py b/mod/distributorapi/distributor/__init__.py
new file mode 100644
index 0000000..e6f924f
--- /dev/null
+++ b/mod/distributorapi/distributor/__init__.py
@@ -0,0 +1,15 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
diff --git a/mod/distributorapi/distributor/config.py b/mod/distributorapi/distributor/config.py
new file mode 100644
index 0000000..8d2cede
--- /dev/null
+++ b/mod/distributorapi/distributor/config.py
@@ -0,0 +1,38 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Configuration for distributor api"""
+import os, tempfile, six, inspect
+from datetime import datetime
+from distributor import errors
+
+
+def _grab_env(name, default=None):
+ try:
+ if default:
+ return os.environ.get(name, default)
+ else:
+ return os.environ[name]
+ except KeyError:
+ raise errors.DistributorAPIConfigError("Required environment variable missing: {0}".format(name))
+
+def init():
+ global nifi_registry_url
+ nifi_registry_url = _grab_env("NIFI_REGISTRY_URL"
+ , default="http://nifi-registry:18080/nifi-registry-api")
+
+ global onboarding_api_url
+ onboarding_api_url = _grab_env("ONBOARDING_API_URL"
+ , default="http://onboarding-api/onboarding")
diff --git a/mod/distributorapi/distributor/data_access.py b/mod/distributorapi/distributor/data_access.py
new file mode 100644
index 0000000..e1a45e3
--- /dev/null
+++ b/mod/distributorapi/distributor/data_access.py
@@ -0,0 +1,89 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Data layer"""
+
+from datetime import datetime
+import uuid
+
+# TODO: Use real storage
+_cache = []
+
+
+def get_distribution_targets():
+ global _cache
+ return _cache
+
+
+def get_distribution_target(ds_id):
+ global _cache
+ result = [ i for i in _cache if i["dt_id"] == ds_id ]
+ return result[0] if result else {}
+
+def transform_request(req):
+ """Transform request to object to store
+
+ NOTE: This method is not safe
+ """
+ ts = datetime.utcnow().isoformat()
+ req["created"] = ts
+ req["modified"] = ts
+ req["dt_id"] = str(uuid.uuid4())
+ req["processGroups"] = []
+ return req
+
+def add_distribution_target(dt):
+ global _cache
+ _cache.append(dt)
+ return dt
+
+
+def merge_request(dt, req):
+ dt["name"] = req["name"]
+ dt["runtimeApiUrl"] = req["runtimeApiUrl"]
+ dt["description"] = req.get("description", None)
+ dt["nextDistributionTargetId"] = req.get("nextDistributionTargetId", None)
+ dt["modified"] = datetime.utcnow().isoformat()
+ return dt
+
+def update_distribution_target(updated_dt):
+ dt_id = updated_dt["dt_id"]
+ global _cache
+ # Did not use list comprehension blah blah because could not do the "return
+ # True" easily
+ for i, dt in enumerate(_cache):
+ if dt["dt_id"] == dt_id:
+ _cache[i] = updated_dt
+ return True
+ return False
+
+
+def delete_distribution_target(dt_id):
+ global _cache
+ num_prev = len(_cache)
+ _cache = list(filter(lambda e: e["dt_id"] != dt_id, _cache))
+ return len(_cache) < num_prev
+
+
+def add_process_group(ds_id, process_group):
+ global _cache
+ for dt in _cache:
+ if dt["dt_id"] == ds_id:
+ process_group["processed"] = datetime.utcnow().isoformat()
+ dt["processGroups"].append(process_group)
+ return process_group
+ return None
+
+
diff --git a/mod/distributorapi/distributor/errors.py b/mod/distributorapi/distributor/errors.py
new file mode 100644
index 0000000..e28b5f5
--- /dev/null
+++ b/mod/distributorapi/distributor/errors.py
@@ -0,0 +1,25 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Errors"""
+
+class DistributorAPIError(RuntimeError):
+ pass
+
+class DistributorAPIConfigError(DistributorAPIError):
+ pass
+
+class DistributorAPIResourceNotFound(DistributorAPIError):
+ pass
diff --git a/mod/distributorapi/distributor/http.py b/mod/distributorapi/distributor/http.py
new file mode 100644
index 0000000..963a852
--- /dev/null
+++ b/mod/distributorapi/distributor/http.py
@@ -0,0 +1,256 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Code for http interface"""
+
+import logging, json
+import uuid
+from flask import Flask
+from flask_cors import CORS
+import flask_restplus as frp
+from flask_restplus import Api, Resource, fields
+from distributor.version import __version__
+from distributor import data_access as da
+from distributor import config
+from distributor import registry_client as rc
+from distributor import onboarding_client as oc
+from distributor import runtime_client as runc
+from distributor import transform as tr
+
+
+_log = logging.getLogger("distributor.http")
+
+_app = Flask(__name__)
+CORS(_app)
+# Try to bundle as many errors together
+# https://flask-restplus.readthedocs.io/en/stable/parsing.html#error-handling
+_app.config['BUNDLE_ERRORS'] = True
+_api = Api(_app, version=__version__, title="Distributor HTTP API",
+ description="HTTP API to manage distribution targets for DCAE design. Distribution targets are DCAE runtime environments that have been registered and are enabled to accept flow design changes that are to be orchestrated in that environment",
+ contact="", default_mediatype="application/json"
+ , prefix="/distributor", doc="/distributor", default="distributor"
+ )
+# REVIEW: Do I need a namespace?
+ns = _api
+
+model_pg = _api.model("ProcessGroup", {
+ "id": fields.String(required=True, description="Id for this process group"
+ , attribute="processGroupId")
+ , "version": fields.Integer(required=True
+ , description="Version of the process group")
+ , "processed": fields.DateTime(required=True
+ , description="When this process group was processed by this API")
+ , "runtimeResponse": fields.String(required=True
+ , description="Full response from the runtime API")
+ })
+
+model_dt = _api.model("DistributionTarget", {
+ "selfUrl": fields.Url("resource_distribution_target", absolute=True)
+ , "id": fields.String(required=True, description="Id for this distribution target"
+ , attribute="dt_id")
+ , "name": fields.String(required=True, description="Name for this distribution target"
+ , attribute="name")
+ , "runtimeApiUrl": fields.String(required=True
+ , description="Url to the runtime API for this distribution target"
+ , attribute="runtimeApiUrl")
+ , "description": fields.String(required=False
+ , description="Description for this distribution target"
+ , attribute="description")
+ , "nextDistributionTargetId": fields.String(required=False
+ , description="Id to the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order."
+ , attribute="nextDistributionTargetId")
+ , "created": fields.String(required=True
+ , description="When this distribution target was created in UTC"
+ , attribute="created")
+ , "modified": fields.String(required=True
+ , description="When this distribution target was last modified in UTC"
+ , attribute="modified")
+ , "processGroups": fields.List(fields.Nested(model_pg))
+ })
+
+model_dts = _api.model("DistributionTargets", {
+ "distributionTargets": fields.List(fields.Nested(model_dt))
+ })
+
+
+parser_dt_req = ns.parser()
+parser_dt_req.add_argument("name", required=True, trim=True,
+ location="json", help="Name for this new distribution target")
+parser_dt_req.add_argument("runtimeApiUrl", required=True, trim=True,
+ location="json", help="Url to the runtime API for this distribution target")
+parser_dt_req.add_argument("description", required=False, trim=True,
+ location="json", help="Description for this distribution target")
+parser_dt_req.add_argument("nextDistributionTargetId", required=False, trim=True,
+ location="json", help="Id of the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order.")
+
+
+@ns.route("/distribution-targets", endpoint="resource_distribution_targets")
+class DistributionTargets(Resource):
+ @ns.doc("get_distribution_targets", description="List distribution targets")
+ @ns.marshal_with(model_dts)
+ def get(self):
+ return { "distributionTargets": da.get_distribution_targets() }, 200
+
+ @ns.doc("post_distribution_targets", description="Create a new distribution target")
+ @ns.expect(parser_dt_req)
+ @ns.marshal_with(model_dt)
+ def post(self):
+ req = parser_dt_req.parse_args()
+ req = da.transform_request(req)
+ resp = da.add_distribution_target(req)
+ return resp, 200
+
+@ns.route("/distribution-targets/<string:dt_id>", endpoint="resource_distribution_target")
+class DistributionTarget(Resource):
+ @ns.doc("get_distribution_target", description="Get a distribution target instance")
+ @ns.response(404, 'Distribution target not found')
+ @ns.response(500, 'Internal Server Error')
+ @ns.marshal_with(model_dt)
+ def get(self, dt_id):
+ result = da.get_distribution_target(dt_id)
+
+ if result:
+ return result, 200
+ else:
+ frp.abort(code=404, message="Unknown distribution target")
+
+ @ns.doc("put_distribution_target", description="Update an existing distribution target")
+ @ns.response(404, 'Distribution target not found')
+ @ns.response(500, 'Internal Server Error')
+ @ns.expect(parser_dt_req)
+ @ns.marshal_with(model_dt)
+ def put(self, dt_id):
+ result = da.get_distribution_target(dt_id)
+
+ if not result:
+ frp.abort(code=404, message="Unknown distribution target")
+
+ req = parser_dt_req.parse_args()
+ updated_dt = da.merge_request(result, req)
+
+ if da.update_distribution_target(updated_dt):
+ return updated_dt, 200
+ else:
+ frp.abort(code=500, message="Problem with storing the update")
+
+ @ns.response(404, 'Distribution target not found')
+ @ns.response(500, 'Internal Server Error')
+ @ns.doc("delete_distribution_target", description="Delete an existing distribution target")
+ def delete(self, dt_id):
+ if da.delete_distribution_target(dt_id):
+ return
+ else:
+ frp.abort(code=404, message="Unknown distribution target")
+
+
+parser_post_process_group = ns.parser()
+parser_post_process_group.add_argument("processGroupId", required=True,
+ trim=True, location="json", help="Process group ID that exists in Nifi")
+
+@ns.route("/distribution-targets/<string:dt_id>/process-groups", endpoint="resource_target_process_groups")
+class DTargetProcessGroups(Resource):
+
+ @ns.response(404, 'Distribution target not found')
+ @ns.response(501, 'Feature is not supported right now')
+ @ns.response(500, 'Internal Server Error')
+ @ns.expect(parser_post_process_group)
+ def post(self, dt_id):
+ # TODO: Need bucket ID but for now will simply scan through all buckets
+ # TODO: Current impl doesn't take into consideration the last state of
+ # the distribution target e.g. what was the last design processed
+
+ req = parser_post_process_group.parse_args()
+
+ # Check existence of distribution target
+
+ dtarget = da.get_distribution_target(dt_id)
+
+ if not dtarget:
+ frp.abort(code=404, message="Unknown distribution target")
+
+ runtime_url = dtarget["runtimeApiUrl"]
+ pg_id = req["processGroupId"]
+
+ # Find flow from Nifi registry
+
+ try:
+ flow = rc.find_flow(config.nifi_registry_url, pg_id)
+ except Exception as e:
+ # TODO: Switch to logging
+ print(e)
+ # Assuming it'll be 404
+ frp.abort(code=404, message="Process group not found in registry")
+
+ pg_name = flow["name"]
+
+ # Make sure graph is setup in runtime api
+
+ if runc.ensure_graph(runtime_url, pg_id, pg_name) == False:
+ frp.abort(code=501 , message="Runtime API: Graph could not be created")
+
+ # Graph diffing using Nifi registry
+
+ flow_diff = rc.get_flow_diff_latest(config.nifi_registry_url, flow["selfUrl"])
+
+ if flow_diff:
+ # TODO: Not really processing diff right now instead just processing
+ # latest. Later process the diffs instead and send out the changes.
+ flow_latest = rc.get_flow_version_latest(config.nifi_registry_url, flow["selfUrl"])
+ else:
+ flow_latest = rc.get_flow_version(config.nifi_registry_url, flow["selfUrl"], 1)
+
+ # Get component data from onboarding API
+
+ components = tr.extract_components_from_flow(flow_latest)
+
+ try:
+ components = oc.get_components_indexed(config.onboarding_api_url, components)
+ except Exception as e:
+ # TODO: Switch to logging
+ print(e)
+ # Assuming it'll be 404
+ frp.abort(code=404, message="Component not found in onboarding API")
+
+ #
+ # Put everything together, post to runtime API, save
+ #
+
+ actions = tr.make_fbp_from_flow(flow_latest, components)
+
+ resp = dict(req)
+ resp["version"] = flow_latest["snapshotMetadata"]["version"]
+ resp["runtimeResponse"] = json.dumps(runc.post_graph(runtime_url, pg_id, actions))
+ resp = da.add_process_group(dt_id, resp)
+
+ if resp:
+ return resp, 200
+ else:
+ frp.abort(code=500, message="Could not store process group")
+
+
+def start_http_server():
+ config.init()
+
+ def is_debug():
+ import os
+ if os.environ.get("DISTRIBUTOR_DEBUG", "1") == "1":
+ return True
+ else:
+ return False
+
+ if is_debug():
+ _app.run(debug=True)
+ else:
+ _app.run(host="0.0.0.0", port=80, debug=False)
diff --git a/mod/distributorapi/distributor/onboarding_client.py b/mod/distributorapi/distributor/onboarding_client.py
new file mode 100644
index 0000000..d7b0780
--- /dev/null
+++ b/mod/distributorapi/distributor/onboarding_client.py
@@ -0,0 +1,35 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Onboarding API client"""
+
+from distributor.utils import urljoin, get_json
+from distributor import errors
+
+
+def get_component(onboarding_url, name, version):
+ url = urljoin(onboarding_url, "components", **{"name": name, "version": version})
+ result = get_json(url)["components"]
+
+ if result:
+ return get_json(result[0]["componentUrl"])
+ else:
+ raise errors.DistributorAPIResourceNotFound("Onboarding API: Component not found")
+
+
+def get_components_indexed(onboarding_url, list_name_version):
+ return dict([
+ ((c[0], c[1]), get_component(onboarding_url, c[0], c[1]))
+ for c in list_name_version])
diff --git a/mod/distributorapi/distributor/registry_client.py b/mod/distributorapi/distributor/registry_client.py
new file mode 100644
index 0000000..5d437e7
--- /dev/null
+++ b/mod/distributorapi/distributor/registry_client.py
@@ -0,0 +1,91 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Sophisticated Nifi registry client"""
+
+from distributor.utils import urljoin as _urljoin
+from distributor.utils import get_json as _get_json
+
+
+def _add_url_from_link(registry_url, obj):
+ result = {}
+
+ for k, v in obj.items():
+ if k == "link":
+ result["selfUrl"] =_urljoin(registry_url, v["href"])
+ result[k] = v
+ elif type(v) == dict:
+ result[k] = _add_url_from_link(registry_url, v)
+ else:
+ result[k] = v
+
+ return result
+
+
+def get_buckets(registry_url):
+ buckets = _get_json(_urljoin(registry_url, "buckets"))
+ return [_add_url_from_link(registry_url, b) for b in buckets]
+
+
+def get_flows(registry_url, bucket_url):
+ flows = _get_json(_urljoin(bucket_url, "flows"))
+ return [_add_url_from_link(registry_url, f) for f in flows]
+
+
+def find_flow(registry_url, flow_id):
+ buckets = get_buckets(registry_url)
+
+ def is_match(flow):
+ return flow["identifier"] == flow_id
+
+ for bucket in buckets:
+ result = [f for f in get_flows(registry_url, bucket["selfUrl"]) if is_match(f)]
+
+ if result:
+ return result.pop()
+
+ return None
+
+
+def get_flow_versions(flow_url):
+ """Returns list of versions from greatest to least for a given flow"""
+ versions_url = _urljoin(flow_url, "versions")
+ # List of versions will be greatest to least
+ return list(reversed(sorted(
+ [v["version"] for v in _get_json(versions_url)])))
+
+def get_flow_diff(registry_url, flow_url, version_one, version_two):
+ diff_url = _urljoin(flow_url, "diff", str(version_one), str(version_two))
+ return _get_json(diff_url)
+
+def get_flow_diff_latest(registry_url, flow_url):
+ versions = get_flow_versions(flow_url)
+
+ if len(versions) == 0:
+ # Should not happen, should this be an error?
+ return None
+ elif len(versions) == 1:
+ return None
+ else:
+ # Example in gitlab wiki shows that lower version is first
+ return _add_url_from_link(registry_url
+ , get_flow_diff(registry_url, flow_url, versions[1], versions[0]))
+
+def get_flow_version(registry_url, flow_url, version):
+ version_url = _urljoin(flow_url, "versions", str(version))
+ return _add_url_from_link(registry_url, _get_json(version_url))
+
+def get_flow_version_latest(registry_url, flow_url):
+ return get_flow_version(registry_url, flow_url, "latest")
diff --git a/mod/distributorapi/distributor/runtime_client.py b/mod/distributorapi/distributor/runtime_client.py
new file mode 100644
index 0000000..7cd06ae
--- /dev/null
+++ b/mod/distributorapi/distributor/runtime_client.py
@@ -0,0 +1,102 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Runtime API client"""
+
+import requests as reqs
+from distributor import errors
+from distributor.utils import urljoin, get_json
+
+
+def get_graph(runtime_url, graph_id):
+ # REVIEW: There's only support for one graph right now..
+ url = urljoin(runtime_url, "api/graph/main")
+ return get_json(url)
+
+
+def create_graph(runtime_url, graph_id, graph_name):
+ url = urljoin(runtime_url, "api/graph/main")
+
+ resp = reqs.post(url, json={"name": graph_name, "id": graph_id
+ , "description": "", "main": True})
+
+ try:
+ resp.raise_for_status()
+ except Exception as e:
+ raise errors.DistributorAPIError(e)
+
+
+def delete_graph(runtime_url):
+ url = urljoin(runtime_url, "api/graph/main")
+
+ try:
+ reqs.delete(url).raise_for_status()
+ except Exception as e:
+ raise errors.DistributorAPIError(e)
+
+
+def post_graph(runtime_url, graph_id, actions):
+ url = urljoin(runtime_url, "api/graph", graph_id, "distribute")
+ graph_request = {"actions": actions}
+
+ resp = reqs.post(url, json=graph_request)
+
+ try:
+ resp.raise_for_status()
+ # REVIEW: Should be blueprint
+ return resp.json()
+ except Exception as e:
+ with open("runtime-request-failed.json", "w+") as f:
+ import json
+ json.dump(graph_request, f)
+ raise errors.DistributorAPIError(e)
+
+
+def ensure_graph(runtime_url, pg_id, pg_name, max_attempts=6):
+ """Ensures the graph with the specified id will exist"""
+ # TODO: Remove this when runtime API more mature
+ # Added this attempted delete call here to make sure repeated calls to post
+ # flows works by making sure the runtime API main graph is always empty
+ try:
+ delete_graph(runtime_url)
+ except:
+ # Probably a 404, doesn't matter
+ pass
+
+ # The attempts are not *really* attempts because attempts equates to looping
+ # twice
+ for i in range(0, max_attempts):
+ resp = None
+
+ try:
+ resp = get_graph(runtime_url, pg_id)
+ except Exception as e:
+ # Assuming you are here because graph needs to be created
+ create_graph(runtime_url, pg_id, pg_name)
+
+ # TODO: Runtime api only supports 1 graph which is why this check is
+ # here. Make sure it will support many graphs and remove this
+
+ if resp == None:
+ # You are here because the graph was created for first time or
+ # the graph was deleted then created. Anyways next loop should
+ # check if it got created ok
+ continue
+ elif resp != None and resp["id"] != pg_id:
+ delete_graph(runtime_url)
+ elif resp != None and resp["id"] == pg_id:
+ return True
+
+ return False
diff --git a/mod/distributorapi/distributor/transform.py b/mod/distributorapi/distributor/transform.py
new file mode 100644
index 0000000..9654249
--- /dev/null
+++ b/mod/distributorapi/distributor/transform.py
@@ -0,0 +1,137 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Transform objects from one form to another"""
+
+import json
+from functools import partial
+
+
+def extract_components_from_flow(flow):
+ """Given a versionedFlowSnapshot object, extract out the processors
+ and create a list of tuples where each tuple is
+ (component name, component version)"""
+ extract = lambda p: (p["bundle"]["artifact"], p["bundle"]["version"])
+ return [ extract(p) for p in flow["flowContents"]["processors"] ]
+
+
+def get_component(flow, components, processor_id):
+ def get_component(p):
+ bundle = p["bundle"]
+ return components.get((bundle["artifact"], bundle["version"]), None)
+
+ cs = [get_component(p) for p in flow["flowContents"]["processors"] \
+ if p["identifier"] == processor_id]
+ return cs[0] if cs else None
+
+
+def make_fbp_from_flow(flow, components: "dict of (name, version) to components"):
+ """Transform a versionedFlowSnapshot object into a runtime API (FBP) request
+
+ An example of an edge:
+
+ {
+ "command": "addedge",
+ "payload": {
+ "src" : {
+ "node": "comp1234",
+ "port": "DCAE-HELLO-WORLD-PUB-MR"
+ },
+ "tgt" : {
+ "node": "comp5678",
+ "port": "DCAE-HELLO-WORLD-SUB-MR"
+ },
+ "metadata":{
+ "name": "sample_topic_0",
+ "data_type": "json",
+ "dmaap_type": "MR"
+ }
+ },
+ "target_graph_id": "string"
+ }
+ """
+ _get_component = partial(get_component, flow, components)
+
+ def parse_connection(conn):
+ rels = conn["selectedRelationships"]
+
+ if conn["source"]["type"] == "PROCESSOR":
+ comp = _get_component(conn["source"]["id"])
+
+ if not comp:
+ # REVIEW: Raise error?
+ return None
+
+ # Example:
+ # publishes:ves_specification:7.30.1:message router:ves-pnfRegistration-secondary
+ rels_pubs = [r for r in rels if "publishes" in r]
+
+ if rels_pubs:
+ _, _, _, transport_type, config_key = rels_pubs[0].split(":")
+ src = { "node": comp["id"], "port": config_key }
+ else:
+ # REVIEW: This should be an error?
+ src = { "node": comp["id"], "port": None }
+ else:
+ src = {}
+
+ if conn["destination"]["type"] == "PROCESSOR":
+ comp = _get_component(conn["destination"]["id"])
+
+ if not comp:
+ # REVIEW: Raise error?
+ return None
+
+ # Example:
+ # subscribes:predictin:1.0.0:message_router:predict_subscriber
+ rels_subs = [r for r in rels if "subscribes" in r]
+
+ if rels_subs:
+ _, _, _, transport_type, config_key = rels_subs[0].split(":")
+ tgt = { "node": comp["id"], "port": config_key }
+ else:
+ # REVIEW: This should be an error?
+ tgt = { "node": comp["id"], "port": None }
+ else:
+ tgt = {}
+
+ return { "command": "addedge"
+ , "payload": {
+ "src": src
+ , "tgt": tgt
+ , "metadata": {
+ "name": conn["name"]
+ # TODO: Question these hardcoded attributes
+ , "data_type": "json"
+ , "dmaap_type": "MR"
+ }
+ }
+ }
+
+ def parse_processor(p):
+ c = components[(p["bundle"]["artifact"], p["bundle"]["version"])]
+ return { "command": "addnode"
+ # TODO: spec is required to be a json string but runtime api
+ # changing this soon hopefully
+ , "payload": { "component_spec": json.dumps(c["spec"])
+ , "component_id": c["id"]
+ , "name": c["name"]
+ , "processor": p }
+ }
+
+ ps = [ parse_processor(p) for p in flow["flowContents"]["processors"] ]
+ cs = [ parse_connection(c) for c in flow["flowContents"]["connections"] ]
+ return ps+cs
+
diff --git a/mod/distributorapi/distributor/utils.py b/mod/distributorapi/distributor/utils.py
new file mode 100644
index 0000000..7457d5a
--- /dev/null
+++ b/mod/distributorapi/distributor/utils.py
@@ -0,0 +1,43 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Utility functions"""
+
+from urllib.parse import quote
+import requests as reqs
+from distributor import errors
+
+
+def urljoin(base, *trailing, **query_params):
+ base = base[0:-1] if base[-1] == "/" else base
+ full = [base] + list(trailing)
+ url = "/".join(full)
+
+ if query_params:
+ qp = ["{0}={1}".format(quote(k), quote(str(v))) for k,v in query_params.items()]
+ qp = "&".join(qp)
+ return "?".join([url, qp])
+ else:
+ return url
+
+
+def get_json(url):
+ resp = reqs.get(url)
+
+ try:
+ resp.raise_for_status()
+ return resp.json()
+ except Exception as e:
+ raise errors.DistributorAPIError(e)
diff --git a/mod/distributorapi/distributor/version.py b/mod/distributorapi/distributor/version.py
new file mode 100644
index 0000000..57c4da3
--- /dev/null
+++ b/mod/distributorapi/distributor/version.py
@@ -0,0 +1,16 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+__version__ = "1.0.0"