summaryrefslogtreecommitdiffstats
path: root/mod/distributorapi/distributor/registry_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'mod/distributorapi/distributor/registry_client.py')
-rw-r--r--mod/distributorapi/distributor/registry_client.py91
1 files changed, 91 insertions, 0 deletions
diff --git a/mod/distributorapi/distributor/registry_client.py b/mod/distributorapi/distributor/registry_client.py
new file mode 100644
index 0000000..5d437e7
--- /dev/null
+++ b/mod/distributorapi/distributor/registry_client.py
@@ -0,0 +1,91 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Sophisticated Nifi registry client"""
+
+from distributor.utils import urljoin as _urljoin
+from distributor.utils import get_json as _get_json
+
+
+def _add_url_from_link(registry_url, obj):
+ result = {}
+
+ for k, v in obj.items():
+ if k == "link":
+ result["selfUrl"] =_urljoin(registry_url, v["href"])
+ result[k] = v
+ elif type(v) == dict:
+ result[k] = _add_url_from_link(registry_url, v)
+ else:
+ result[k] = v
+
+ return result
+
+
+def get_buckets(registry_url):
+ buckets = _get_json(_urljoin(registry_url, "buckets"))
+ return [_add_url_from_link(registry_url, b) for b in buckets]
+
+
+def get_flows(registry_url, bucket_url):
+ flows = _get_json(_urljoin(bucket_url, "flows"))
+ return [_add_url_from_link(registry_url, f) for f in flows]
+
+
+def find_flow(registry_url, flow_id):
+ buckets = get_buckets(registry_url)
+
+ def is_match(flow):
+ return flow["identifier"] == flow_id
+
+ for bucket in buckets:
+ result = [f for f in get_flows(registry_url, bucket["selfUrl"]) if is_match(f)]
+
+ if result:
+ return result.pop()
+
+ return None
+
+
+def get_flow_versions(flow_url):
+ """Returns list of versions from greatest to least for a given flow"""
+ versions_url = _urljoin(flow_url, "versions")
+ # List of versions will be greatest to least
+ return list(reversed(sorted(
+ [v["version"] for v in _get_json(versions_url)])))
+
+def get_flow_diff(registry_url, flow_url, version_one, version_two):
+ diff_url = _urljoin(flow_url, "diff", str(version_one), str(version_two))
+ return _get_json(diff_url)
+
+def get_flow_diff_latest(registry_url, flow_url):
+ versions = get_flow_versions(flow_url)
+
+ if len(versions) == 0:
+ # Should not happen, should this be an error?
+ return None
+ elif len(versions) == 1:
+ return None
+ else:
+ # Example in gitlab wiki shows that lower version is first
+ return _add_url_from_link(registry_url
+ , get_flow_diff(registry_url, flow_url, versions[1], versions[0]))
+
+def get_flow_version(registry_url, flow_url, version):
+ version_url = _urljoin(flow_url, "versions", str(version))
+ return _add_url_from_link(registry_url, _get_json(version_url))
+
+def get_flow_version_latest(registry_url, flow_url):
+ return get_flow_version(registry_url, flow_url, "latest")