summaryrefslogtreecommitdiffstats
path: root/mod/distributorapi/distributor/transform.py
diff options
context:
space:
mode:
Diffstat (limited to 'mod/distributorapi/distributor/transform.py')
-rw-r--r--mod/distributorapi/distributor/transform.py137
1 files changed, 137 insertions, 0 deletions
diff --git a/mod/distributorapi/distributor/transform.py b/mod/distributorapi/distributor/transform.py
new file mode 100644
index 0000000..9654249
--- /dev/null
+++ b/mod/distributorapi/distributor/transform.py
@@ -0,0 +1,137 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Transform objects from one form to another"""
+
+import json
+from functools import partial
+
+
+def extract_components_from_flow(flow):
+ """Given a versionedFlowSnapshot object, extract out the processors
+ and create a list of tuples where each tuple is
+ (component name, component version)"""
+ extract = lambda p: (p["bundle"]["artifact"], p["bundle"]["version"])
+ return [ extract(p) for p in flow["flowContents"]["processors"] ]
+
+
+def get_component(flow, components, processor_id):
+ def get_component(p):
+ bundle = p["bundle"]
+ return components.get((bundle["artifact"], bundle["version"]), None)
+
+ cs = [get_component(p) for p in flow["flowContents"]["processors"] \
+ if p["identifier"] == processor_id]
+ return cs[0] if cs else None
+
+
+def make_fbp_from_flow(flow, components: "dict of (name, version) to components"):
+ """Transform a versionedFlowSnapshot object into a runtime API (FBP) request
+
+ An example of an edge:
+
+ {
+ "command": "addedge",
+ "payload": {
+ "src" : {
+ "node": "comp1234",
+ "port": "DCAE-HELLO-WORLD-PUB-MR"
+ },
+ "tgt" : {
+ "node": "comp5678",
+ "port": "DCAE-HELLO-WORLD-SUB-MR"
+ },
+ "metadata":{
+ "name": "sample_topic_0",
+ "data_type": "json",
+ "dmaap_type": "MR"
+ }
+ },
+ "target_graph_id": "string"
+ }
+ """
+ _get_component = partial(get_component, flow, components)
+
+ def parse_connection(conn):
+ rels = conn["selectedRelationships"]
+
+ if conn["source"]["type"] == "PROCESSOR":
+ comp = _get_component(conn["source"]["id"])
+
+ if not comp:
+ # REVIEW: Raise error?
+ return None
+
+ # Example:
+ # publishes:ves_specification:7.30.1:message router:ves-pnfRegistration-secondary
+ rels_pubs = [r for r in rels if "publishes" in r]
+
+ if rels_pubs:
+ _, _, _, transport_type, config_key = rels_pubs[0].split(":")
+ src = { "node": comp["id"], "port": config_key }
+ else:
+ # REVIEW: This should be an error?
+ src = { "node": comp["id"], "port": None }
+ else:
+ src = {}
+
+ if conn["destination"]["type"] == "PROCESSOR":
+ comp = _get_component(conn["destination"]["id"])
+
+ if not comp:
+ # REVIEW: Raise error?
+ return None
+
+ # Example:
+ # subscribes:predictin:1.0.0:message_router:predict_subscriber
+ rels_subs = [r for r in rels if "subscribes" in r]
+
+ if rels_subs:
+ _, _, _, transport_type, config_key = rels_subs[0].split(":")
+ tgt = { "node": comp["id"], "port": config_key }
+ else:
+ # REVIEW: This should be an error?
+ tgt = { "node": comp["id"], "port": None }
+ else:
+ tgt = {}
+
+ return { "command": "addedge"
+ , "payload": {
+ "src": src
+ , "tgt": tgt
+ , "metadata": {
+ "name": conn["name"]
+ # TODO: Question these hardcoded attributes
+ , "data_type": "json"
+ , "dmaap_type": "MR"
+ }
+ }
+ }
+
+ def parse_processor(p):
+ c = components[(p["bundle"]["artifact"], p["bundle"]["version"])]
+ return { "command": "addnode"
+ # TODO: spec is required to be a json string but runtime api
+ # changing this soon hopefully
+ , "payload": { "component_spec": json.dumps(c["spec"])
+ , "component_id": c["id"]
+ , "name": c["name"]
+ , "processor": p }
+ }
+
+ ps = [ parse_processor(p) for p in flow["flowContents"]["processors"] ]
+ cs = [ parse_connection(c) for c in flow["flowContents"]["connections"] ]
+ return ps+cs
+