summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Hwang <mhwang@research.att.com>2019-11-11 15:28:57 -0500
committerMichael Hwang <mhwang@research.att.com>2019-12-16 11:02:15 -0500
commit77900bb3097491cd9fca964c111ea70724e53989 (patch)
treeb23fb51b9e3c465bfcecaede1f0450acfe22e880
parentc698e66797bad69b4c77b26b487bf8322989beb0 (diff)
Add distributor api projectdev-mod
Issue-ID: DCAEGEN2-1860 Signed-off-by: Michael Hwang <mhwang@research.att.com> Change-Id: I67aa9178b1b1830e330ca1259e8f6b30202945df
-rw-r--r--.gitignore10
-rw-r--r--mod/distributorapi/Dockerfile23
-rw-r--r--mod/distributorapi/README.md10
-rw-r--r--mod/distributorapi/distributor/__init__.py15
-rw-r--r--mod/distributorapi/distributor/config.py38
-rw-r--r--mod/distributorapi/distributor/data_access.py89
-rw-r--r--mod/distributorapi/distributor/errors.py25
-rw-r--r--mod/distributorapi/distributor/http.py256
-rw-r--r--mod/distributorapi/distributor/onboarding_client.py35
-rw-r--r--mod/distributorapi/distributor/registry_client.py91
-rw-r--r--mod/distributorapi/distributor/runtime_client.py102
-rw-r--r--mod/distributorapi/distributor/transform.py137
-rw-r--r--mod/distributorapi/distributor/utils.py43
-rw-r--r--mod/distributorapi/distributor/version.py16
-rw-r--r--mod/distributorapi/pom.xml37
-rw-r--r--mod/distributorapi/setup.py42
-rw-r--r--mod/distributorapi/tests/components.json260
-rw-r--r--mod/distributorapi/tests/flow.json305
-rw-r--r--mod/distributorapi/tests/test_registry_client.py78
-rw-r--r--mod/distributorapi/tests/test_transform.py68
-rw-r--r--mod/distributorapi/tests/test_utils.py24
-rw-r--r--mod/distributorapi/tox.ini14
22 files changed, 1718 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
index b842f5a..adb4b28 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,3 +32,13 @@ target
.idea
*.iml
+
+# For python
+.coverage
+.tox/
+coverage.xml
+__pycache__
+*.egg-info/
+htmlcov/
+xunit-results.xml
+*.eggs/
diff --git a/mod/distributorapi/Dockerfile b/mod/distributorapi/Dockerfile
new file mode 100644
index 0000000..cc10c68
--- /dev/null
+++ b/mod/distributorapi/Dockerfile
@@ -0,0 +1,23 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+FROM python:3.7-alpine
+
+COPY . /code
+WORKDIR /code
+RUN pip install .
+EXPOSE 80
+ENV DISTRIBUTOR_DEBUG=0
+CMD start-distributor-api
diff --git a/mod/distributorapi/README.md b/mod/distributorapi/README.md
new file mode 100644
index 0000000..5446ccc
--- /dev/null
+++ b/mod/distributorapi/README.md
@@ -0,0 +1,10 @@
+# Distributor API
+
+HTTP API to manage distribution targets for DCAE design. Distribution targets are DCAE runtime environments that have been registered and are enabled to accept flow design changes that are to be orchestrated in that environment.
+
+Run docker container:
+
+```
+docker run -d -p 5000:80 --name distributor-api \
+ distributor-api:latest
+```
diff --git a/mod/distributorapi/distributor/__init__.py b/mod/distributorapi/distributor/__init__.py
new file mode 100644
index 0000000..e6f924f
--- /dev/null
+++ b/mod/distributorapi/distributor/__init__.py
@@ -0,0 +1,15 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
diff --git a/mod/distributorapi/distributor/config.py b/mod/distributorapi/distributor/config.py
new file mode 100644
index 0000000..8d2cede
--- /dev/null
+++ b/mod/distributorapi/distributor/config.py
@@ -0,0 +1,38 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Configuration for distributor api"""
+import os, tempfile, six, inspect
+from datetime import datetime
+from distributor import errors
+
+
+def _grab_env(name, default=None):
+ try:
+ if default:
+ return os.environ.get(name, default)
+ else:
+ return os.environ[name]
+ except KeyError:
+ raise errors.DistributorAPIConfigError("Required environment variable missing: {0}".format(name))
+
+def init():
+ global nifi_registry_url
+ nifi_registry_url = _grab_env("NIFI_REGISTRY_URL"
+ , default="http://nifi-registry:18080/nifi-registry-api")
+
+ global onboarding_api_url
+ onboarding_api_url = _grab_env("ONBOARDING_API_URL"
+ , default="http://onboarding-api/onboarding")
diff --git a/mod/distributorapi/distributor/data_access.py b/mod/distributorapi/distributor/data_access.py
new file mode 100644
index 0000000..e1a45e3
--- /dev/null
+++ b/mod/distributorapi/distributor/data_access.py
@@ -0,0 +1,89 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Data layer"""
+
+from datetime import datetime
+import uuid
+
+# TODO: Use real storage
+_cache = []
+
+
+def get_distribution_targets():
+ global _cache
+ return _cache
+
+
+def get_distribution_target(ds_id):
+ global _cache
+ result = [ i for i in _cache if i["dt_id"] == ds_id ]
+ return result[0] if result else {}
+
+def transform_request(req):
+ """Transform request to object to store
+
+ NOTE: This method is not safe
+ """
+ ts = datetime.utcnow().isoformat()
+ req["created"] = ts
+ req["modified"] = ts
+ req["dt_id"] = str(uuid.uuid4())
+ req["processGroups"] = []
+ return req
+
+def add_distribution_target(dt):
+ global _cache
+ _cache.append(dt)
+ return dt
+
+
+def merge_request(dt, req):
+ dt["name"] = req["name"]
+ dt["runtimeApiUrl"] = req["runtimeApiUrl"]
+ dt["description"] = req.get("description", None)
+ dt["nextDistributionTargetId"] = req.get("nextDistributionTargetId", None)
+ dt["modified"] = datetime.utcnow().isoformat()
+ return dt
+
+def update_distribution_target(updated_dt):
+ dt_id = updated_dt["dt_id"]
+ global _cache
+ # Did not use list comprehension blah blah because could not do the "return
+ # True" easily
+ for i, dt in enumerate(_cache):
+ if dt["dt_id"] == dt_id:
+ _cache[i] = updated_dt
+ return True
+ return False
+
+
+def delete_distribution_target(dt_id):
+ global _cache
+ num_prev = len(_cache)
+ _cache = list(filter(lambda e: e["dt_id"] != dt_id, _cache))
+ return len(_cache) < num_prev
+
+
+def add_process_group(ds_id, process_group):
+ global _cache
+ for dt in _cache:
+ if dt["dt_id"] == ds_id:
+ process_group["processed"] = datetime.utcnow().isoformat()
+ dt["processGroups"].append(process_group)
+ return process_group
+ return None
+
+
diff --git a/mod/distributorapi/distributor/errors.py b/mod/distributorapi/distributor/errors.py
new file mode 100644
index 0000000..e28b5f5
--- /dev/null
+++ b/mod/distributorapi/distributor/errors.py
@@ -0,0 +1,25 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Errors"""
+
+class DistributorAPIError(RuntimeError):
+ pass
+
+class DistributorAPIConfigError(DistributorAPIError):
+ pass
+
+class DistributorAPIResourceNotFound(DistributorAPIError):
+ pass
diff --git a/mod/distributorapi/distributor/http.py b/mod/distributorapi/distributor/http.py
new file mode 100644
index 0000000..963a852
--- /dev/null
+++ b/mod/distributorapi/distributor/http.py
@@ -0,0 +1,256 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Code for http interface"""
+
+import logging, json
+import uuid
+from flask import Flask
+from flask_cors import CORS
+import flask_restplus as frp
+from flask_restplus import Api, Resource, fields
+from distributor.version import __version__
+from distributor import data_access as da
+from distributor import config
+from distributor import registry_client as rc
+from distributor import onboarding_client as oc
+from distributor import runtime_client as runc
+from distributor import transform as tr
+
+
+_log = logging.getLogger("distributor.http")
+
+_app = Flask(__name__)
+CORS(_app)
+# Try to bundle as many errors together
+# https://flask-restplus.readthedocs.io/en/stable/parsing.html#error-handling
+_app.config['BUNDLE_ERRORS'] = True
+_api = Api(_app, version=__version__, title="Distributor HTTP API",
+ description="HTTP API to manage distribution targets for DCAE design. Distribution targets are DCAE runtime environments that have been registered and are enabled to accept flow design changes that are to be orchestrated in that environment",
+ contact="", default_mediatype="application/json"
+ , prefix="/distributor", doc="/distributor", default="distributor"
+ )
+# REVIEW: Do I need a namespace?
+ns = _api
+
+model_pg = _api.model("ProcessGroup", {
+ "id": fields.String(required=True, description="Id for this process group"
+ , attribute="processGroupId")
+ , "version": fields.Integer(required=True
+ , description="Version of the process group")
+ , "processed": fields.DateTime(required=True
+ , description="When this process group was processed by this API")
+ , "runtimeResponse": fields.String(required=True
+ , description="Full response from the runtime API")
+ })
+
+model_dt = _api.model("DistributionTarget", {
+ "selfUrl": fields.Url("resource_distribution_target", absolute=True)
+ , "id": fields.String(required=True, description="Id for this distribution target"
+ , attribute="dt_id")
+ , "name": fields.String(required=True, description="Name for this distribution target"
+ , attribute="name")
+ , "runtimeApiUrl": fields.String(required=True
+ , description="Url to the runtime API for this distribution target"
+ , attribute="runtimeApiUrl")
+ , "description": fields.String(required=False
+ , description="Description for this distribution target"
+ , attribute="description")
+ , "nextDistributionTargetId": fields.String(required=False
+ , description="Id to the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order."
+ , attribute="nextDistributionTargetId")
+ , "created": fields.String(required=True
+ , description="When this distribution target was created in UTC"
+ , attribute="created")
+ , "modified": fields.String(required=True
+ , description="When this distribution target was last modified in UTC"
+ , attribute="modified")
+ , "processGroups": fields.List(fields.Nested(model_pg))
+ })
+
+model_dts = _api.model("DistributionTargets", {
+ "distributionTargets": fields.List(fields.Nested(model_dt))
+ })
+
+
+parser_dt_req = ns.parser()
+parser_dt_req.add_argument("name", required=True, trim=True,
+ location="json", help="Name for this new distribution target")
+parser_dt_req.add_argument("runtimeApiUrl", required=True, trim=True,
+ location="json", help="Url to the runtime API for this distribution target")
+parser_dt_req.add_argument("description", required=False, trim=True,
+ location="json", help="Description for this distribution target")
+parser_dt_req.add_argument("nextDistributionTargetId", required=False, trim=True,
+ location="json", help="Id of the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order.")
+
+
+@ns.route("/distribution-targets", endpoint="resource_distribution_targets")
+class DistributionTargets(Resource):
+ @ns.doc("get_distribution_targets", description="List distribution targets")
+ @ns.marshal_with(model_dts)
+ def get(self):
+ return { "distributionTargets": da.get_distribution_targets() }, 200
+
+ @ns.doc("post_distribution_targets", description="Create a new distribution target")
+ @ns.expect(parser_dt_req)
+ @ns.marshal_with(model_dt)
+ def post(self):
+ req = parser_dt_req.parse_args()
+ req = da.transform_request(req)
+ resp = da.add_distribution_target(req)
+ return resp, 200
+
+@ns.route("/distribution-targets/<string:dt_id>", endpoint="resource_distribution_target")
+class DistributionTarget(Resource):
+ @ns.doc("get_distribution_target", description="Get a distribution target instance")
+ @ns.response(404, 'Distribution target not found')
+ @ns.response(500, 'Internal Server Error')
+ @ns.marshal_with(model_dt)
+ def get(self, dt_id):
+ result = da.get_distribution_target(dt_id)
+
+ if result:
+ return result, 200
+ else:
+ frp.abort(code=404, message="Unknown distribution target")
+
+ @ns.doc("put_distribution_target", description="Update an existing distribution target")
+ @ns.response(404, 'Distribution target not found')
+ @ns.response(500, 'Internal Server Error')
+ @ns.expect(parser_dt_req)
+ @ns.marshal_with(model_dt)
+ def put(self, dt_id):
+ result = da.get_distribution_target(dt_id)
+
+ if not result:
+ frp.abort(code=404, message="Unknown distribution target")
+
+ req = parser_dt_req.parse_args()
+ updated_dt = da.merge_request(result, req)
+
+ if da.update_distribution_target(updated_dt):
+ return updated_dt, 200
+ else:
+ frp.abort(code=500, message="Problem with storing the update")
+
+ @ns.response(404, 'Distribution target not found')
+ @ns.response(500, 'Internal Server Error')
+ @ns.doc("delete_distribution_target", description="Delete an existing distribution target")
+ def delete(self, dt_id):
+ if da.delete_distribution_target(dt_id):
+ return
+ else:
+ frp.abort(code=404, message="Unknown distribution target")
+
+
+parser_post_process_group = ns.parser()
+parser_post_process_group.add_argument("processGroupId", required=True,
+ trim=True, location="json", help="Process group ID that exists in Nifi")
+
+@ns.route("/distribution-targets/<string:dt_id>/process-groups", endpoint="resource_target_process_groups")
+class DTargetProcessGroups(Resource):
+
+ @ns.response(404, 'Distribution target not found')
+ @ns.response(501, 'Feature is not supported right now')
+ @ns.response(500, 'Internal Server Error')
+ @ns.expect(parser_post_process_group)
+ def post(self, dt_id):
+ # TODO: Need bucket ID but for now will simply scan through all buckets
+ # TODO: Current impl doesn't take into consideration the last state of
+ # the distribution target e.g. what was the last design processed
+
+ req = parser_post_process_group.parse_args()
+
+ # Check existence of distribution target
+
+ dtarget = da.get_distribution_target(dt_id)
+
+ if not dtarget:
+ frp.abort(code=404, message="Unknown distribution target")
+
+ runtime_url = dtarget["runtimeApiUrl"]
+ pg_id = req["processGroupId"]
+
+ # Find flow from Nifi registry
+
+ try:
+ flow = rc.find_flow(config.nifi_registry_url, pg_id)
+ except Exception as e:
+ # TODO: Switch to logging
+ print(e)
+ # Assuming it'll be 404
+ frp.abort(code=404, message="Process group not found in registry")
+
+ pg_name = flow["name"]
+
+ # Make sure graph is setup in runtime api
+
+ if runc.ensure_graph(runtime_url, pg_id, pg_name) == False:
+ frp.abort(code=501 , message="Runtime API: Graph could not be created")
+
+ # Graph diffing using Nifi registry
+
+ flow_diff = rc.get_flow_diff_latest(config.nifi_registry_url, flow["selfUrl"])
+
+ if flow_diff:
+ # TODO: Not really processing diff right now instead just processing
+ # latest. Later process the diffs instead and send out the changes.
+ flow_latest = rc.get_flow_version_latest(config.nifi_registry_url, flow["selfUrl"])
+ else:
+ flow_latest = rc.get_flow_version(config.nifi_registry_url, flow["selfUrl"], 1)
+
+ # Get component data from onboarding API
+
+ components = tr.extract_components_from_flow(flow_latest)
+
+ try:
+ components = oc.get_components_indexed(config.onboarding_api_url, components)
+ except Exception as e:
+ # TODO: Switch to logging
+ print(e)
+ # Assuming it'll be 404
+ frp.abort(code=404, message="Component not found in onboarding API")
+
+ #
+ # Put everything together, post to runtime API, save
+ #
+
+ actions = tr.make_fbp_from_flow(flow_latest, components)
+
+ resp = dict(req)
+ resp["version"] = flow_latest["snapshotMetadata"]["version"]
+ resp["runtimeResponse"] = json.dumps(runc.post_graph(runtime_url, pg_id, actions))
+ resp = da.add_process_group(dt_id, resp)
+
+ if resp:
+ return resp, 200
+ else:
+ frp.abort(code=500, message="Could not store process group")
+
+
+def start_http_server():
+ config.init()
+
+ def is_debug():
+ import os
+ if os.environ.get("DISTRIBUTOR_DEBUG", "1") == "1":
+ return True
+ else:
+ return False
+
+ if is_debug():
+ _app.run(debug=True)
+ else:
+ _app.run(host="0.0.0.0", port=80, debug=False)
diff --git a/mod/distributorapi/distributor/onboarding_client.py b/mod/distributorapi/distributor/onboarding_client.py
new file mode 100644
index 0000000..d7b0780
--- /dev/null
+++ b/mod/distributorapi/distributor/onboarding_client.py
@@ -0,0 +1,35 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Onboarding API client"""
+
+from distributor.utils import urljoin, get_json
+from distributor import errors
+
+
+def get_component(onboarding_url, name, version):
+ url = urljoin(onboarding_url, "components", **{"name": name, "version": version})
+ result = get_json(url)["components"]
+
+ if result:
+ return get_json(result[0]["componentUrl"])
+ else:
+ raise errors.DistributorAPIResourceNotFound("Onboarding API: Component not found")
+
+
+def get_components_indexed(onboarding_url, list_name_version):
+ return dict([
+ ((c[0], c[1]), get_component(onboarding_url, c[0], c[1]))
+ for c in list_name_version])
diff --git a/mod/distributorapi/distributor/registry_client.py b/mod/distributorapi/distributor/registry_client.py
new file mode 100644
index 0000000..5d437e7
--- /dev/null
+++ b/mod/distributorapi/distributor/registry_client.py
@@ -0,0 +1,91 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Sophisticated Nifi registry client"""
+
+from distributor.utils import urljoin as _urljoin
+from distributor.utils import get_json as _get_json
+
+
+def _add_url_from_link(registry_url, obj):
+ result = {}
+
+ for k, v in obj.items():
+ if k == "link":
+ result["selfUrl"] =_urljoin(registry_url, v["href"])
+ result[k] = v
+ elif type(v) == dict:
+ result[k] = _add_url_from_link(registry_url, v)
+ else:
+ result[k] = v
+
+ return result
+
+
+def get_buckets(registry_url):
+ buckets = _get_json(_urljoin(registry_url, "buckets"))
+ return [_add_url_from_link(registry_url, b) for b in buckets]
+
+
+def get_flows(registry_url, bucket_url):
+ flows = _get_json(_urljoin(bucket_url, "flows"))
+ return [_add_url_from_link(registry_url, f) for f in flows]
+
+
+def find_flow(registry_url, flow_id):
+ buckets = get_buckets(registry_url)
+
+ def is_match(flow):
+ return flow["identifier"] == flow_id
+
+ for bucket in buckets:
+ result = [f for f in get_flows(registry_url, bucket["selfUrl"]) if is_match(f)]
+
+ if result:
+ return result.pop()
+
+ return None
+
+
+def get_flow_versions(flow_url):
+ """Returns list of versions from greatest to least for a given flow"""
+ versions_url = _urljoin(flow_url, "versions")
+ # List of versions will be greatest to least
+ return list(reversed(sorted(
+ [v["version"] for v in _get_json(versions_url)])))
+
+def get_flow_diff(registry_url, flow_url, version_one, version_two):
+ diff_url = _urljoin(flow_url, "diff", str(version_one), str(version_two))
+ return _get_json(diff_url)
+
+def get_flow_diff_latest(registry_url, flow_url):
+ versions = get_flow_versions(flow_url)
+
+ if len(versions) == 0:
+ # Should not happen, should this be an error?
+ return None
+ elif len(versions) == 1:
+ return None
+ else:
+ # Example in gitlab wiki shows that lower version is first
+ return _add_url_from_link(registry_url
+ , get_flow_diff(registry_url, flow_url, versions[1], versions[0]))
+
+def get_flow_version(registry_url, flow_url, version):
+ version_url = _urljoin(flow_url, "versions", str(version))
+ return _add_url_from_link(registry_url, _get_json(version_url))
+
+def get_flow_version_latest(registry_url, flow_url):
+ return get_flow_version(registry_url, flow_url, "latest")
diff --git a/mod/distributorapi/distributor/runtime_client.py b/mod/distributorapi/distributor/runtime_client.py
new file mode 100644
index 0000000..7cd06ae
--- /dev/null
+++ b/mod/distributorapi/distributor/runtime_client.py
@@ -0,0 +1,102 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Runtime API client"""
+
+import requests as reqs
+from distributor import errors
+from distributor.utils import urljoin, get_json
+
+
+def get_graph(runtime_url, graph_id):
+ # REVIEW: There's only support for one graph right now..
+ url = urljoin(runtime_url, "api/graph/main")
+ return get_json(url)
+
+
+def create_graph(runtime_url, graph_id, graph_name):
+ url = urljoin(runtime_url, "api/graph/main")
+
+ resp = reqs.post(url, json={"name": graph_name, "id": graph_id
+ , "description": "", "main": True})
+
+ try:
+ resp.raise_for_status()
+ except Exception as e:
+ raise errors.DistributorAPIError(e)
+
+
+def delete_graph(runtime_url):
+ url = urljoin(runtime_url, "api/graph/main")
+
+ try:
+ reqs.delete(url).raise_for_status()
+ except Exception as e:
+ raise errors.DistributorAPIError(e)
+
+
+def post_graph(runtime_url, graph_id, actions):
+ url = urljoin(runtime_url, "api/graph", graph_id, "distribute")
+ graph_request = {"actions": actions}
+
+ resp = reqs.post(url, json=graph_request)
+
+ try:
+ resp.raise_for_status()
+ # REVIEW: Should be blueprint
+ return resp.json()
+ except Exception as e:
+ with open("runtime-request-failed.json", "w+") as f:
+ import json
+ json.dump(graph_request, f)
+ raise errors.DistributorAPIError(e)
+
+
+def ensure_graph(runtime_url, pg_id, pg_name, max_attempts=6):
+ """Ensures the graph with the specified id will exist"""
+ # TODO: Remove this when runtime API more mature
+ # Added this attempted delete call here to make sure repeated calls to post
+ # flows works by making sure the runtime API main graph is always empty
+ try:
+ delete_graph(runtime_url)
+ except:
+ # Probably a 404, doesn't matter
+ pass
+
+ # The attempts are not *really* attempts because attempts equates to looping
+ # twice
+ for i in range(0, max_attempts):
+ resp = None
+
+ try:
+ resp = get_graph(runtime_url, pg_id)
+ except Exception as e:
+ # Assuming you are here because graph needs to be created
+ create_graph(runtime_url, pg_id, pg_name)
+
+ # TODO: Runtime api only supports 1 graph which is why this check is
+ # here. Make sure it will support many graphs and remove this
+
+ if resp == None:
+ # You are here because the graph was created for first time or
+ # the graph was deleted then created. Anyways next loop should
+ # check if it got created ok
+ continue
+ elif resp != None and resp["id"] != pg_id:
+ delete_graph(runtime_url)
+ elif resp != None and resp["id"] == pg_id:
+ return True
+
+ return False
diff --git a/mod/distributorapi/distributor/transform.py b/mod/distributorapi/distributor/transform.py
new file mode 100644
index 0000000..9654249
--- /dev/null
+++ b/mod/distributorapi/distributor/transform.py
@@ -0,0 +1,137 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Transform objects from one form to another"""
+
+import json
+from functools import partial
+
+
+def extract_components_from_flow(flow):
+ """Given a versionedFlowSnapshot object, extract out the processors
+ and create a list of tuples where each tuple is
+ (component name, component version)"""
+ extract = lambda p: (p["bundle"]["artifact"], p["bundle"]["version"])
+ return [ extract(p) for p in flow["flowContents"]["processors"] ]
+
+
+def get_component(flow, components, processor_id):
+ def get_component(p):
+ bundle = p["bundle"]
+ return components.get((bundle["artifact"], bundle["version"]), None)
+
+ cs = [get_component(p) for p in flow["flowContents"]["processors"] \
+ if p["identifier"] == processor_id]
+ return cs[0] if cs else None
+
+
+def make_fbp_from_flow(flow, components: "dict of (name, version) to components"):
+ """Transform a versionedFlowSnapshot object into a runtime API (FBP) request
+
+ An example of an edge:
+
+ {
+ "command": "addedge",
+ "payload": {
+ "src" : {
+ "node": "comp1234",
+ "port": "DCAE-HELLO-WORLD-PUB-MR"
+ },
+ "tgt" : {
+ "node": "comp5678",
+ "port": "DCAE-HELLO-WORLD-SUB-MR"
+ },
+ "metadata":{
+ "name": "sample_topic_0",
+ "data_type": "json",
+ "dmaap_type": "MR"
+ }
+ },
+ "target_graph_id": "string"
+ }
+ """
+ _get_component = partial(get_component, flow, components)
+
+ def parse_connection(conn):
+ rels = conn["selectedRelationships"]
+
+ if conn["source"]["type"] == "PROCESSOR":
+ comp = _get_component(conn["source"]["id"])
+
+ if not comp:
+ # REVIEW: Raise error?
+ return None
+
+ # Example:
+ # publishes:ves_specification:7.30.1:message router:ves-pnfRegistration-secondary
+ rels_pubs = [r for r in rels if "publishes" in r]
+
+ if rels_pubs:
+ _, _, _, transport_type, config_key = rels_pubs[0].split(":")
+ src = { "node": comp["id"], "port": config_key }
+ else:
+ # REVIEW: This should be an error?
+ src = { "node": comp["id"], "port": None }
+ else:
+ src = {}
+
+ if conn["destination"]["type"] == "PROCESSOR":
+ comp = _get_component(conn["destination"]["id"])
+
+ if not comp:
+ # REVIEW: Raise error?
+ return None
+
+ # Example:
+ # subscribes:predictin:1.0.0:message_router:predict_subscriber
+ rels_subs = [r for r in rels if "subscribes" in r]
+
+ if rels_subs:
+ _, _, _, transport_type, config_key = rels_subs[0].split(":")
+ tgt = { "node": comp["id"], "port": config_key }
+ else:
+ # REVIEW: This should be an error?
+ tgt = { "node": comp["id"], "port": None }
+ else:
+ tgt = {}
+
+ return { "command": "addedge"
+ , "payload": {
+ "src": src
+ , "tgt": tgt
+ , "metadata": {
+ "name": conn["name"]
+ # TODO: Question these hardcoded attributes
+ , "data_type": "json"
+ , "dmaap_type": "MR"
+ }
+ }
+ }
+
+ def parse_processor(p):
+ c = components[(p["bundle"]["artifact"], p["bundle"]["version"])]
+ return { "command": "addnode"
+ # TODO: spec is required to be a json string but runtime api
+ # changing this soon hopefully
+ , "payload": { "component_spec": json.dumps(c["spec"])
+ , "component_id": c["id"]
+ , "name": c["name"]
+ , "processor": p }
+ }
+
+ ps = [ parse_processor(p) for p in flow["flowContents"]["processors"] ]
+ cs = [ parse_connection(c) for c in flow["flowContents"]["connections"] ]
+ return ps+cs
+
diff --git a/mod/distributorapi/distributor/utils.py b/mod/distributorapi/distributor/utils.py
new file mode 100644
index 0000000..7457d5a
--- /dev/null
+++ b/mod/distributorapi/distributor/utils.py
@@ -0,0 +1,43 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Utility functions"""
+
+from urllib.parse import quote
+import requests as reqs
+from distributor import errors
+
+
+def urljoin(base, *trailing, **query_params):
+ base = base[0:-1] if base[-1] == "/" else base
+ full = [base] + list(trailing)
+ url = "/".join(full)
+
+ if query_params:
+ qp = ["{0}={1}".format(quote(k), quote(str(v))) for k,v in query_params.items()]
+ qp = "&".join(qp)
+ return "?".join([url, qp])
+ else:
+ return url
+
+
+def get_json(url):
+ resp = reqs.get(url)
+
+ try:
+ resp.raise_for_status()
+ return resp.json()
+ except Exception as e:
+ raise errors.DistributorAPIError(e)
diff --git a/mod/distributorapi/distributor/version.py b/mod/distributorapi/distributor/version.py
new file mode 100644
index 0000000..57c4da3
--- /dev/null
+++ b/mod/distributorapi/distributor/version.py
@@ -0,0 +1,16 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+__version__ = "1.0.0"
diff --git a/mod/distributorapi/pom.xml b/mod/distributorapi/pom.xml
new file mode 100644
index 0000000..6f080f6
--- /dev/null
+++ b/mod/distributorapi/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+============LICENSE_START=======================================================
+org.onap.dcae
+================================================================================
+Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+================================================================================
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+============LICENSE_END=========================================================
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.onap.dcaegen2.platform.mod</groupId>
+ <artifactId>dcaegen2-platform-mod-distributorapi</artifactId>
+ <!-- NOTE: Must keep this version synchronized with the version in distributor/version.py file -->
+ <version>1.0.0</version>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <sonar.sources>.</sonar.sources>
+ <sonar.junit.reportsPath>xunit-results.xml</sonar.junit.reportsPath>
+ <sonar.python.coverage.reportPath>coverage.xml</sonar.python.coverage.reportPath>
+ <sonar.language>py</sonar.language>
+ <sonar.pluginname>python</sonar.pluginname>
+ <sonar.inclusions>**/*.py</sonar.inclusions>
+ <sonar.exclusions>**/tests/**,**/setup.py</sonar.exclusions>
+ </properties>
+</project>
diff --git a/mod/distributorapi/setup.py b/mod/distributorapi/setup.py
new file mode 100644
index 0000000..3d0acfd
--- /dev/null
+++ b/mod/distributorapi/setup.py
@@ -0,0 +1,42 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+import os
+from setuptools import setup, find_packages
+
+# extract __version__ from version file. importing distributor will lead to install failures
+setup_dir = os.path.dirname(__file__)
+with open(os.path.join(setup_dir, 'distributor', 'version.py')) as file:
+ globals_dict = dict()
+ exec(file.read(), globals_dict)
+ __version__ = globals_dict['__version__']
+
+setup(
+ name = "distributor-api",
+ version = __version__,
+ packages = find_packages(),
+ author = "Michael Hwang",
+ description = ("API that manages distribution targets"),
+ entry_points="""
+ [console_scripts]
+ start-distributor-api=distributor.http:start_http_server
+ """,
+ install_requires=[
+ "flask-restplus"
+ , "Flask-Cors"
+ , "requests"
+ ],
+ zip_safe = False
+ )
diff --git a/mod/distributorapi/tests/components.json b/mod/distributorapi/tests/components.json
new file mode 100644
index 0000000..f9a71f2
--- /dev/null
+++ b/mod/distributorapi/tests/components.json
@@ -0,0 +1,260 @@
+[
+{
+ "spec": {
+ "self": {
+ "version": "1.5.0",
+ "name": "dcae-ves-collector",
+ "description": "Collector for receiving VES events through restful interface",
+ "component_type": "docker"
+ },
+ "streams": {
+ "subscribes": [],
+ "publishes": [
+ {
+ "format": "VES_specification",
+ "version": "7.30.1",
+ "type": "message router",
+ "config_key": "ves-pnfRegistration"
+ },
+ {
+ "format": "VES_specification",
+ "version": "7.30.1",
+ "type": "message router",
+ "config_key": "ves-pnfRegistration-secondary"
+ },
+ {
+ "format": "VES_specification",
+ "version": "7.30.1",
+ "type": "message router",
+ "config_key": "ves-notification"
+ },
+ {
+ "format": "VES_specification",
+ "version": "7.30.1",
+ "type": "message router",
+ "config_key": "ves-notification-secondary"
+ }
+ ]
+ },
+ "services": {
+ "calls": [],
+ "provides": []
+ },
+ "parameters": [
+ {
+ "name": "collector.service.port",
+ "value": 8080,
+ "description": "standard http port collector will open for listening;",
+ "sourced_at_deployment": false,
+ "policy_editable": false,
+ "designer_editable": false
+ },
+ {
+ "name": "collector.service.secure.port",
+ "value": 8443,
+ "description": "secure http port collector will open for listening ",
+ "sourced_at_deployment": false,
+ "policy_editable": false,
+ "designer_editable": true
+ },
+ {
+ "name": "collector.keystore.file.location",
+ "value": "/opt/app/dcae-certificate/keystore.jks",
+ "description": "fs location of keystore file in vm",
+ "sourced_at_deployment": false,
+ "policy_editable": false,
+ "designer_editable": false
+ },
+ {
+ "name": "collector.keystore.passwordfile",
+ "value": "/opt/app/dcae-certificate/.password",
+ "description": "location of keystore password file in vm",
+ "sourced_at_deployment": false,
+ "policy_editable": false,
+ "designer_editable": false
+ },
+ {
+ "name": "collector.truststore.file.location",
+ "value": "/opt/app/dcae-certificate/truststore.jks",
+ "description": "fs location of truststore file in vm",
+ "sourced_at_deployment": false,
+ "policy_editable": false,
+ "designer_editable": false
+ },
+ {
+ "name": "collector.truststore.passwordfile",
+ "value": "/opt/app/dcae-certificate/.trustpassword",
+ "description": "location of truststore password file in vm",
+ "sourced_at_deployment": false,
+ "policy_editable": false,
+ "designer_editable": false
+ },
+ {
+ "name": "collector.dmaap.streamid",
+ "value": "fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary|notification=ves-notification,ves-notification-secondary|pnfRegistration=ves-pnfRegistration,ves-pnfRegistration-secondary",
+ "description": "domain-to-streamid mapping used by VESCollector to distributes events based on domain. Both primary and secondary config_key are included for resilency (multiple streamid can be included commma separated). The streamids MUST match to topic config_keys. For single site without resiliency deployment - configkeys with -secondary suffix can be removed",
+ "sourced_at_deployment": true,
+ "policy_editable": false,
+ "designer_editable": false
+ },
+ {
+ "name": "auth.method",
+ "value": "noAuth",
+ "description": "Property to manage application mode, possible configurations: noAuth - default option - no security (http) , certOnly - auth by certificate (https), basicAuth - auth by basic auth username and password (https),certBasicAuth - auth by certificate and basic auth username / password (https),",
+ "sourced_at_deployment": false,
+ "policy_editable": false,
+ "designer_editable": false
+ },
+ {
+ "name": "header.authlist",
+ "value": "sample1,$2a$10$pgjaxDzSuc6XVFEeqvxQ5u90DKJnM/u7TJTcinAlFJVaavXMWf/Zi|userid1,$2a$10$61gNubgJJl9lh3nvQvY9X.x4e5ETWJJ7ao7ZhJEvmfJigov26Z6uq|userid2,$2a$10$G52y/3uhuhWAMy.bx9Se8uzWinmbJa.dlm1LW6bYPdPkkywLDPLiy",
+ "description": "List of id and base 64 encoded password.For each onboarding VNF - unique userid and password should be assigned and communicated to VNF owner. Password value should be base64 encoded in config here",
+ "policy_editable": false,
+ "sourced_at_deployment": true,
+ "designer_editable": true
+ },
+ {
+ "name": "collector.schema.checkflag",
+ "value": 1,
+ "description": "Schema check validation flag. When enabled, collector will validate input VES events against VES Schema defined on collector.schema.file ",
+ "sourced_at_deployment": false,
+ "policy_editable": false,
+ "designer_editable": false
+ },
+ {
+ "name": "collector.schema.file",
+ "value": "{\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.1.json\",\"v7\":\"./etc/CommonEventFormat_30.json\"}",
+ "description": "VES schema file name per version used for validation",
+ "designer_editable": true,
+ "sourced_at_deployment": false,
+ "policy_editable": false
+ },
+ {
+ "name": "event.transform.flag",
+ "value": 1,
+ "description": "flag to enable tranformation rules defined under eventTransform.json; this is applicable when event tranformation rules preset should be activated for transforming <VES5.4 events to 5.4",
+ "sourced_at_deployment": false,
+ "policy_editable": false,
+ "designer_editable": false
+ },
+ {
+ "name": "tomcat.maxthreads",
+ "value": "200",
+ "description": "Tomcat control for concurrent request",
+ "sourced_at_deployment": false,
+ "policy_editable": false,
+ "designer_editable": false
+ }
+ ],
+ "auxilary": {
+ "healthcheck": {
+ "type": "https",
+ "interval": "15s",
+ "timeout": "1s",
+ "endpoint": "/healthcheck"
+ },
+ "volumes": [
+ {
+ "container": {
+ "bind": "/opt/app/dcae-certificate"
+ },
+ "host": {
+ "path": "/opt/app/dcae-certificate"
+ }
+ },
+ {
+ "container": {
+ "bind": "/opt/app/VESCollector/logs"
+ },
+ "host": {
+ "path": "/opt/logs/DCAE/VESCollector/logs"
+ }
+ },
+ {
+ "container": {
+ "bind": "/opt/app/VESCollector/etc"
+ },
+ "host": {
+ "path": "/opt/logs/DCAE/VESCollector/etc"
+ }
+ }
+ ],
+ "ports": [
+ "8080:8080",
+ "8443:8443"
+ ]
+ },
+ "artifacts": [
+ {
+ "type": "docker image",
+ "uri": "nexus3.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:latest"
+ }
+ ]
+ },
+ "id": "75c9a179-b36b-4985-9445-d44c8768d6eb",
+ "name": "dcae-ves-collector",
+ "version": "1.5.0",
+ "owner": "spike",
+ "whenAdded": "2019-10-04T15:16:01.717861",
+ "modified": "2019-10-04T15:16:37.234517",
+ "status": "published",
+ "description": "Collector for receiving VES events through restful interface",
+ "componentType": "docker",
+ "componentUrl": "http://t3-2/onboarding/components/75c9a179-b36b-4985-9445-d44c8768d6eb"
+},
+{
+ "spec": {
+ "parameters": [],
+ "artifacts": [
+ {
+ "type": "docker image",
+ "uri": "anomaly-detector:3"
+ }
+ ],
+ "self": {
+ "version": "1.0.0",
+ "name": "anomaly-detector",
+ "component_type": "docker",
+ "description": "Automatically generated from Acumos model"
+ },
+ "streams": {
+ "publishes": [
+ {
+ "type": "message_router",
+ "version": "1.0.0",
+ "config_key": "predict_publisher",
+ "format": "OutputFormat"
+ }
+ ],
+ "subscribes": [
+ {
+ "type": "message_router",
+ "version": "1.0.0",
+ "config_key": "predict_subscriber",
+ "format": "PredictIn"
+ }
+ ]
+ },
+ "auxilary": {
+ "healthcheck": {
+ "endpoint": "/healthcheck",
+ "type": "http"
+ }
+ },
+ "services": {
+ "provides": [],
+ "calls": []
+ }
+ },
+ "id": "3fadb641-2079-4ca9-bb07-0df5952967fc",
+ "name": "anomaly-detector",
+ "version": "1.0.0",
+ "owner": "spike",
+ "whenAdded": "2019-10-04T18:00:56.433976",
+ "modified": "2019-10-04T18:00:56.433971",
+ "status": "unpublished",
+ "description": "Automatically generated from Acumos model",
+ "componentType": "docker",
+ "componentUrl": "http://t3-2/onboarding/components/3fadb641-2079-4ca9-bb07-0df5952967fc"
+}
+]
diff --git a/mod/distributorapi/tests/flow.json b/mod/distributorapi/tests/flow.json
new file mode 100644
index 0000000..63cf0f2
--- /dev/null
+++ b/mod/distributorapi/tests/flow.json
@@ -0,0 +1,305 @@
+{
+ "bucket": {
+ "allowBundleRedeploy": false,
+ "createdTimestamp": 1570196205153,
+ "identifier": "f645b971-f096-485c-9699-93a193d9c0fa",
+ "link": {
+ "href": "buckets/f645b971-f096-485c-9699-93a193d9c0fa",
+ "params": {
+ "rel": "self"
+ }
+ },
+ "name": "Demos",
+ "permissions": {
+ "canDelete": true,
+ "canRead": true,
+ "canWrite": true
+ }
+ },
+ "flow": {
+ "bucketIdentifier": "f645b971-f096-485c-9699-93a193d9c0fa",
+ "bucketName": "Demos",
+ "createdTimestamp": 1570589769648,
+ "description": "",
+ "identifier": "ddec92e2-97c3-4b41-80d7-fd38bbf5bb69",
+ "link": {
+ "href": "buckets/f645b971-f096-485c-9699-93a193d9c0fa/flows/ddec92e2-97c3-4b41-80d7-fd38bbf5bb69",
+ "params": {
+ "rel": "self"
+ }
+ },
+ "modifiedTimestamp": 1570673258622,
+ "name": "work-in-progress",
+ "type": "Flow",
+ "versionCount": 4
+ },
+ "flowContents": {
+ "comments": "",
+ "componentType": "PROCESS_GROUP",
+ "connections": [
+ {
+ "backPressureDataSizeThreshold": "1 GB",
+ "backPressureObjectThreshold": 10000,
+ "bends": [],
+ "componentType": "CONNECTION",
+ "destination": {
+ "comments": "",
+ "groupId": "07fc9b5e-bbcd-3487-aded-eeced74fdfaa",
+ "id": "a8134467-b4b4-348f-8a1c-8d732fe4fcad",
+ "name": "AnomalyDetector",
+ "type": "PROCESSOR"
+ },
+ "flowFileExpiration": "0 sec",
+ "groupIdentifier": "07fc9b5e-bbcd-3487-aded-eeced74fdfaa",
+ "identifier": "04ab8849-858d-36d5-b7cf-40da26051759",
+ "labelIndex": 1,
+ "loadBalanceCompression": "DO_NOT_COMPRESS",
+ "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+ "name": "foo-conn",
+ "partitioningAttribute": "",
+ "prioritizers": [],
+ "selectedRelationships": [
+ "publishes:ves_specification:7.30.1:message router:ves-pnfRegistration-secondary",
+ "subscribes:predictin:1.0.0:message_router:predict_subscriber"
+ ],
+ "source": {
+ "comments": "",
+ "groupId": "07fc9b5e-bbcd-3487-aded-eeced74fdfaa",
+ "id": "3a0b5a44-f262-3836-93d5-dfe45fd90996",
+ "name": "DcaeVesCollector",
+ "type": "PROCESSOR"
+ },
+ "zIndex": 0
+ },
+ {
+ "backPressureDataSizeThreshold": "1 GB",
+ "backPressureObjectThreshold": 10000,
+ "bends": [],
+ "componentType": "CONNECTION",
+ "destination": {
+ "comments": "",
+ "groupId": "07fc9b5e-bbcd-3487-aded-eeced74fdfaa",
+ "id": "3a0b5a44-f262-3836-93d5-dfe45fd90996",
+ "name": "DcaeVesCollector",
+ "type": "PROCESSOR"
+ },
+ "flowFileExpiration": "0 sec",
+ "groupIdentifier": "07fc9b5e-bbcd-3487-aded-eeced74fdfaa",
+ "identifier": "c7c62954-ae06-388c-a6d9-a24243b51bea",
+ "labelIndex": 1,
+ "loadBalanceCompression": "DO_NOT_COMPRESS",
+ "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+ "name": "ves-data-conn",
+ "partitioningAttribute": "",
+ "prioritizers": [],
+ "selectedRelationships": [
+ "subscribes:ves_specification:7.30.1:message router:ves-notification"
+ ],
+ "source": {
+ "comments": "",
+ "groupId": "07fc9b5e-bbcd-3487-aded-eeced74fdfaa",
+ "id": "58629a8a-e813-3bb3-ac2d-7ce2f4abf604",
+ "name": "ves-notification",
+ "type": "INPUT_PORT"
+ },
+ "zIndex": 0
+ }
+ ],
+ "controllerServices": [],
+ "funnels": [],
+ "identifier": "07fc9b5e-bbcd-3487-aded-eeced74fdfaa",
+ "inputPorts": [
+ {
+ "allowRemoteAccess": false,
+ "comments": "",
+ "componentType": "INPUT_PORT",
+ "concurrentlySchedulableTaskCount": 1,
+ "groupIdentifier": "07fc9b5e-bbcd-3487-aded-eeced74fdfaa",
+ "identifier": "58629a8a-e813-3bb3-ac2d-7ce2f4abf604",
+ "name": "ves-notification",
+ "position": {
+ "x": 1168.5,
+ "y": 544.5
+ },
+ "type": "INPUT_PORT"
+ }
+ ],
+ "labels": [],
+ "name": "test",
+ "outputPorts": [],
+ "position": {
+ "x": 826,
+ "y": 301
+ },
+ "processGroups": [],
+ "processors": [
+ {
+ "autoTerminatedRelationships": [],
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "anomaly-detector",
+ "group": "org.onap.dcae",
+ "version": "1.0.0"
+ },
+ "comments": "",
+ "componentType": "PROCESSOR",
+ "concurrentlySchedulableTaskCount": 1,
+ "executionNode": "ALL",
+ "groupIdentifier": "07fc9b5e-bbcd-3487-aded-eeced74fdfaa",
+ "identifier": "a8134467-b4b4-348f-8a1c-8d732fe4fcad",
+ "name": "AnomalyDetector",
+ "penaltyDuration": "30 sec",
+ "position": {
+ "x": 2410,
+ "y": 824
+ },
+ "properties": {},
+ "propertyDescriptors": {},
+ "runDurationMillis": 0,
+ "schedulingPeriod": "0 sec",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "style": {},
+ "type": "org.onap.dcae.AnomalyDetector",
+ "yieldDuration": "1 sec"
+ },
+ {
+ "autoTerminatedRelationships": [],
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "dcae-ves-collector",
+ "group": "org.onap.dcae",
+ "version": "1.5.0"
+ },
+ "comments": "",
+ "componentType": "PROCESSOR",
+ "concurrentlySchedulableTaskCount": 1,
+ "executionNode": "ALL",
+ "groupIdentifier": "07fc9b5e-bbcd-3487-aded-eeced74fdfaa",
+ "identifier": "3a0b5a44-f262-3836-93d5-dfe45fd90996",
+ "name": "DcaeVesCollector",
+ "penaltyDuration": "30 sec",
+ "position": {
+ "x": 1500,
+ "y": 690
+ },
+ "properties": {
+ "collector.keystore.passwordfile": "/opt/app/dcae-certificate/.password",
+ "event.transform.flag": "1",
+ "collector.service.port": "8080",
+ "collector.schema.checkflag": "1",
+ "tomcat.maxthreads": "200",
+ "collector.truststore.passwordfile": "/opt/app/dcae-certificate/.trustpassword",
+ "header.authlist": "sample1,$2a$10$pgjaxDzSuc6XVFEeqvxQ5u90DKJnM/u7TJTcinAlFJVaavXMWf/Zi|userid1,$2a$10$61gNubgJJl9lh3nvQvY9X.x4e5ETWJJ7ao7ZhJEvmfJigov26Z6uq|userid2,$2a$10$G52y/3uhuhWAMy.bx9Se8uzWinmbJa.dlm1LW6bYPdPkkywLDPLiy",
+ "collector.service.secure.port": "8443",
+ "collector.keystore.file.location": "/opt/app/dcae-certificate/keystore.jks",
+ "collector.dmaap.streamid": "fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary|notification=ves-notification,ves-notification-secondary|pnfRegistration=ves-pnfRegistration,ves-pnfRegistration-secondary",
+ "collector.schema.file": "{\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.1.json\",\"v7\":\"./etc/CommonEventFormat_30.json\"}",
+ "auth.method": "noAuth",
+ "collector.truststore.file.location": "/opt/app/dcae-certificate/truststore.jks"
+ },
+ "propertyDescriptors": {
+ "collector.keystore.passwordfile": {
+ "displayName": "collector.keystore.passwordfile",
+ "identifiesControllerService": false,
+ "name": "collector.keystore.passwordfile",
+ "sensitive": false
+ },
+ "event.transform.flag": {
+ "displayName": "event.transform.flag",
+ "identifiesControllerService": false,
+ "name": "event.transform.flag",
+ "sensitive": false
+ },
+ "collector.service.port": {
+ "displayName": "collector.service.port",
+ "identifiesControllerService": false,
+ "name": "collector.service.port",
+ "sensitive": false
+ },
+ "collector.schema.checkflag": {
+ "displayName": "collector.schema.checkflag",
+ "identifiesControllerService": false,
+ "name": "collector.schema.checkflag",
+ "sensitive": false
+ },
+ "tomcat.maxthreads": {
+ "displayName": "tomcat.maxthreads",
+ "identifiesControllerService": false,
+ "name": "tomcat.maxthreads",
+ "sensitive": false
+ },
+ "collector.truststore.passwordfile": {
+ "displayName": "collector.truststore.passwordfile",
+ "identifiesControllerService": false,
+ "name": "collector.truststore.passwordfile",
+ "sensitive": false
+ },
+ "header.authlist": {
+ "displayName": "header.authlist",
+ "identifiesControllerService": false,
+ "name": "header.authlist",
+ "sensitive": false
+ },
+ "collector.service.secure.port": {
+ "displayName": "collector.service.secure.port",
+ "identifiesControllerService": false,
+ "name": "collector.service.secure.port",
+ "sensitive": false
+ },
+ "collector.keystore.file.location": {
+ "displayName": "collector.keystore.file.location",
+ "identifiesControllerService": false,
+ "name": "collector.keystore.file.location",
+ "sensitive": false
+ },
+ "collector.dmaap.streamid": {
+ "displayName": "collector.dmaap.streamid",
+ "identifiesControllerService": false,
+ "name": "collector.dmaap.streamid",
+ "sensitive": false
+ },
+ "collector.schema.file": {
+ "displayName": "collector.schema.file",
+ "identifiesControllerService": false,
+ "name": "collector.schema.file",
+ "sensitive": false
+ },
+ "auth.method": {
+ "displayName": "auth.method",
+ "identifiesControllerService": false,
+ "name": "auth.method",
+ "sensitive": false
+ },
+ "collector.truststore.file.location": {
+ "displayName": "collector.truststore.file.location",
+ "identifiesControllerService": false,
+ "name": "collector.truststore.file.location",
+ "sensitive": false
+ }
+ },
+ "runDurationMillis": 0,
+ "schedulingPeriod": "0 sec",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "style": {},
+ "type": "org.onap.dcae.DcaeVesCollector",
+ "yieldDuration": "1 sec"
+ }
+ ],
+ "remoteProcessGroups": [],
+ "variables": {}
+ },
+ "snapshotMetadata": {
+ "author": "anonymous",
+ "bucketIdentifier": "f645b971-f096-485c-9699-93a193d9c0fa",
+ "comments": "",
+ "flowIdentifier": "ddec92e2-97c3-4b41-80d7-fd38bbf5bb69",
+ "link": {
+ "href": "buckets/f645b971-f096-485c-9699-93a193d9c0fa/flows/ddec92e2-97c3-4b41-80d7-fd38bbf5bb69/versions/4",
+ "params": {
+ "rel": "content"
+ }
+ },
+ "timestamp": 1570673258587,
+ "version": 4
+ }
+}
diff --git a/mod/distributorapi/tests/test_registry_client.py b/mod/distributorapi/tests/test_registry_client.py
new file mode 100644
index 0000000..3767242
--- /dev/null
+++ b/mod/distributorapi/tests/test_registry_client.py
@@ -0,0 +1,78 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+import distributor
+from distributor import registry_client as rc
+
+
+def test_add_url_from_link():
+ test = {"link": {"href": "bar"}, "name": "jane", "age": 33,
+ "innerTest": {"link": {"href": "baz"}, "name": "bob"}
+ }
+ result = rc._add_url_from_link("http://foo", test)
+
+ assert result["selfUrl"] == "http://foo/bar"
+ assert result["innerTest"]["selfUrl"] == "http://foo/baz"
+
+
+def test_get_buckets(monkeypatch):
+ def fake_get_json(url):
+ if url == "http://registry/buckets":
+ return []
+ return None
+
+ monkeypatch.setattr(distributor.registry_client, "_get_json", fake_get_json)
+
+ assert [] == rc.get_buckets("http://registry")
+ assert [] == rc.get_buckets("http://registry/")
+
+
+def test_find_flow(monkeypatch):
+ def fake_get_buckets(url):
+ return [{"selfUrl": "{0}/buckets/123".format(url)}]
+
+ monkeypatch.setattr(distributor.registry_client, "get_buckets", fake_get_buckets)
+
+ def fake_get_flows(registry_url, url):
+ if url == "http://registry/buckets/123":
+ return [{"identifier": "abc"}]
+ return None
+
+ monkeypatch.setattr(distributor.registry_client, "get_flows", fake_get_flows)
+
+ assert rc.find_flow("http://registry", "abc")["identifier"] == "abc"
+
+
+def test_flow_versions(monkeypatch):
+ def fake_get_json_many(url):
+ if url == "http://registry/buckets/123/flows/abc/versions":
+ return [{"version": 1}, {"version": 3}, {"version": 2}]
+ print(url)
+ return []
+
+ monkeypatch.setattr(distributor.registry_client, "_get_json",
+ fake_get_json_many)
+
+ assert [3, 2, 1] == rc.get_flow_versions("http://registry/buckets/123/flows/abc/")
+
+
+def test_get_flow_diff_latest(monkeypatch):
+ def fake_get_flow_versions(url):
+ return ["1"]
+
+ monkeypatch.setattr(distributor.registry_client, "get_flow_versions",
+ fake_get_flow_versions)
+
+ assert None == rc.get_flow_diff_latest("http://registry", "http://registry/buckets/123/flows/abc/")
diff --git a/mod/distributorapi/tests/test_transform.py b/mod/distributorapi/tests/test_transform.py
new file mode 100644
index 0000000..f275e63
--- /dev/null
+++ b/mod/distributorapi/tests/test_transform.py
@@ -0,0 +1,68 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+import os, json
+from distributor import transform as tr
+
+TEST_DIR = os.path.dirname(__file__)
+
+def _load_data(filename):
+ path = os.path.join(TEST_DIR, filename)
+ with open(path) as f:
+ return json.load(f)
+
+def _setup():
+ flow = _load_data("flow.json")
+ components = _load_data("components.json")
+ components = dict([((c["name"], c["version"]), c) for c in components])
+ return (flow, components)
+
+
+def test_get_component():
+ flow, components = _setup()
+ c = tr.get_component(flow, components, "a8134467-b4b4-348f-8a1c-8d732fe4fcad")
+ assert "3fadb641-2079-4ca9-bb07-0df5952967fc" == c["id"]
+
+
+def test_make_fbp_from_flow():
+ flow, components = _setup()
+
+ fbp = tr.make_fbp_from_flow(flow, components)
+ assert len(fbp) == 4
+
+ def check_node(n):
+ n["payload"]["component_id"]
+
+ expected = ["75c9a179-b36b-4985-9445-d44c8768d6eb", "3fadb641-2079-4ca9-bb07-0df5952967fc"]
+ actual = [n["payload"]["component_id"] for n in fbp if n["command"] == "addnode"]
+ assert list(sorted(expected)) == list(sorted(actual))
+
+ # Test processor to processor scenario
+ expected = {'metadata': {'data_type': 'json',
+ 'dmaap_type': 'MR',
+ 'name': 'foo-conn'},
+ 'src': {'node': '75c9a179-b36b-4985-9445-d44c8768d6eb',
+ 'port': 'ves-pnfRegistration-secondary'},
+ 'tgt': {'node': '3fadb641-2079-4ca9-bb07-0df5952967fc',
+ 'port': 'predict_subscriber'}}
+ actual = [e["payload"] for e in fbp if e["command"] == "addedge"]
+ assert actual[0] == expected or actual[1] == expected
+
+ # Test input port to processor scenario
+ expected = {'metadata': {'data_type': 'json', 'dmaap_type': 'MR',
+ 'name': 'ves-data-conn'}, 'src': {},
+ 'tgt': {'node': '75c9a179-b36b-4985-9445-d44c8768d6eb',
+ 'port': 'ves-notification'}}
+ assert actual[0] == expected or actual[1] == expected
diff --git a/mod/distributorapi/tests/test_utils.py b/mod/distributorapi/tests/test_utils.py
new file mode 100644
index 0000000..481e325
--- /dev/null
+++ b/mod/distributorapi/tests/test_utils.py
@@ -0,0 +1,24 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+from distributor import utils
+
+
+def test_urljoin():
+ assert "http://foo/bar/baz" == utils.urljoin("http://foo", "bar", "baz")
+ assert "http://foo/bar/baz" == utils.urljoin("http://foo/", "bar", "baz")
+ assert "http://foo/bar/baz?name=some-name&version=1.5.0" \
+ == utils.urljoin("http://foo", "bar", "baz", **{"name": "some-name",
+ "version": "1.5.0"})
diff --git a/mod/distributorapi/tox.ini b/mod/distributorapi/tox.ini
new file mode 100644
index 0000000..a0bebea
--- /dev/null
+++ b/mod/distributorapi/tox.ini
@@ -0,0 +1,14 @@
+[tox]
+envlist = py37
+
+[testenv]
+deps=
+ pytest
+ coverage
+ pytest-cov
+setenv =
+ PYTHONPATH={toxinidir}
+passenv = http_proxy HTTP_PROXY https_proxy HTTPS_PROXY no_proxy NO_PROXY
+commands=
+ pytest tests --junitxml xunit-results.xml --cov distributor --cov-report xml \
+ --cov-report term --cov-report html