summaryrefslogtreecommitdiffstats
path: root/mod/distributorapi/distributor/transform.py
diff options
context:
space:
mode:
Diffstat (limited to 'mod/distributorapi/distributor/transform.py')
-rw-r--r--mod/distributorapi/distributor/transform.py68
1 files changed, 36 insertions, 32 deletions
diff --git a/mod/distributorapi/distributor/transform.py b/mod/distributorapi/distributor/transform.py
index 9654249..5ca1277 100644
--- a/mod/distributorapi/distributor/transform.py
+++ b/mod/distributorapi/distributor/transform.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@ def extract_components_from_flow(flow):
and create a list of tuples where each tuple is
(component name, component version)"""
extract = lambda p: (p["bundle"]["artifact"], p["bundle"]["version"])
- return [ extract(p) for p in flow["flowContents"]["processors"] ]
+ return [extract(p) for p in flow["flowContents"]["processors"]]
def get_component(flow, components, processor_id):
@@ -32,8 +32,7 @@ def get_component(flow, components, processor_id):
bundle = p["bundle"]
return components.get((bundle["artifact"], bundle["version"]), None)
- cs = [get_component(p) for p in flow["flowContents"]["processors"] \
- if p["identifier"] == processor_id]
+ cs = [get_component(p) for p in flow["flowContents"]["processors"] if p["identifier"] == processor_id]
return cs[0] if cs else None
@@ -80,10 +79,10 @@ def make_fbp_from_flow(flow, components: "dict of (name, version) to components"
if rels_pubs:
_, _, _, transport_type, config_key = rels_pubs[0].split(":")
- src = { "node": comp["id"], "port": config_key }
+ src = {"node": comp["id"], "port": config_key}
else:
# REVIEW: This should be an error?
- src = { "node": comp["id"], "port": None }
+ src = {"node": comp["id"], "port": None}
else:
src = {}
@@ -100,38 +99,43 @@ def make_fbp_from_flow(flow, components: "dict of (name, version) to components"
if rels_subs:
_, _, _, transport_type, config_key = rels_subs[0].split(":")
- tgt = { "node": comp["id"], "port": config_key }
+ tgt = {"node": comp["id"], "port": config_key}
else:
# REVIEW: This should be an error?
- tgt = { "node": comp["id"], "port": None }
+ tgt = {"node": comp["id"], "port": None}
else:
tgt = {}
- return { "command": "addedge"
- , "payload": {
- "src": src
- , "tgt": tgt
- , "metadata": {
- "name": conn["name"]
- # TODO: Question these hardcoded attributes
- , "data_type": "json"
- , "dmaap_type": "MR"
- }
- }
- }
+ return {
+ "command": "addedge",
+ "payload": {
+ "src": src,
+ "tgt": tgt,
+ "metadata": {
+ "name": conn["name"]
+ # TODO: Question these hardcoded attributes
+ ,
+ "data_type": "json",
+ "dmaap_type": "MR",
+ },
+ },
+ }
def parse_processor(p):
c = components[(p["bundle"]["artifact"], p["bundle"]["version"])]
- return { "command": "addnode"
- # TODO: spec is required to be a json string but runtime api
- # changing this soon hopefully
- , "payload": { "component_spec": json.dumps(c["spec"])
- , "component_id": c["id"]
- , "name": c["name"]
- , "processor": p }
- }
-
- ps = [ parse_processor(p) for p in flow["flowContents"]["processors"] ]
- cs = [ parse_connection(c) for c in flow["flowContents"]["connections"] ]
- return ps+cs
+ return {
+ "command": "addnode"
+ # TODO: spec is required to be a json string but runtime api
+ # changing this soon hopefully
+ ,
+ "payload": {
+ "component_spec": json.dumps(c["spec"]),
+ "component_id": c["id"],
+ "name": c["name"],
+ "processor": p,
+ },
+ }
+ ps = [parse_processor(p) for p in flow["flowContents"]["processors"]]
+ cs = [parse_connection(c) for c in flow["flowContents"]["connections"]]
+ return ps + cs