summaryrefslogtreecommitdiffstats
path: root/adapter/acumos/aoconversion/scanner.py
diff options
context:
space:
mode:
authorAndrew Gauld <agauld@att.com>2019-11-13 18:09:32 +0000
committerAndrew Gauld <agauld@att.com>2019-11-15 20:23:42 +0000
commit849da15d5b7ddc68e4c2b90b603fc8948d4b5e6d (patch)
treeb0d3d6e7dcd1551c007a7212dfdb369720707a22 /adapter/acumos/aoconversion/scanner.py
parent3f67c400813a60e4b8f9327e20eccc9033dc1b0b (diff)
Add acumos adapter project
Signed-off-by: Andrew Gauld <agauld@att.com> Issue-ID: DCAEGEN2-1860 Change-Id: Ib22fd2aa61fe7761bacf85e69540d11803c7acee Signed-off-by: Andrew Gauld <agauld@att.com>
Diffstat (limited to 'adapter/acumos/aoconversion/scanner.py')
-rw-r--r--adapter/acumos/aoconversion/scanner.py312
1 files changed, 312 insertions, 0 deletions
diff --git a/adapter/acumos/aoconversion/scanner.py b/adapter/acumos/aoconversion/scanner.py
new file mode 100644
index 0000000..22a5922
--- /dev/null
+++ b/adapter/acumos/aoconversion/scanner.py
@@ -0,0 +1,312 @@
+# ============LICENSE_START====================================================
+# org.onap.dcae
+# =============================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+import json
+import os
+from pkg_resources import resource_string
+import shutil
+import traceback
+try:
+ from urllib.parse import unquote_plus
+ from socketserver import ThreadingMixIn
+ from http.server import BaseHTTPRequestHandler, HTTPServer
+except ImportError:
+ from urllib import unquote_plus
+ from SocketServer import ThreadingMixIn
+ from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+
+import requests
+
+from aoconversion import convert
+
+
+def _derefconfig(value):
+ if value.startswith('@'):
+ with open(value[1:], 'r') as f:
+ return f.readline().strip()
+ return value
+
+
+class Config(object):
+ """
+ Configuration parameters as attributes, make sure the required ones are there,
+ populate defaults.
+ """
+ def __init__(self, dcaeurl, dcaeuser, onboardingurl, onboardinguser, onboardingpass, certfile, dockerregistry, dockeruser, dockerpass, acumosurl=None, interval=900, dockerhost='unix:///var/run/docker.sock', tmpdir='/var/tmp/aoadapter', certverify=True, catalogs=None, port=None, **extras):
+ self.dcaeurl = dcaeurl
+ self.dcaeuser = dcaeuser
+
+ def x(fmt, *args, **kwargs):
+ return onboardingurl + fmt.format(*args, **kwargs)
+ self.oburl = x
+ self._onboardingpass = onboardingpass
+ self._onboardinguser = onboardinguser
+ self.acumosurl = acumosurl
+ self.certfile = certfile
+ self.certverify = certverify
+ self.dockerhost = dockerhost
+ self.dockerregistry = dockerregistry
+ self.dockeruser = dockeruser
+ self._dockerpass = dockerpass
+ self.interval = interval
+ self.tmpdir = tmpdir
+ if catalogs is not None and type(catalogs) is not list:
+ catalogs = [catalogs]
+ self.catalogs = catalogs
+ self.port = port
+
+ def obauth(self):
+ return (self._onboardinguser, _derefconfig(self._onboardingpass))
+
+ def dockerpass(self):
+ return _derefconfig(self._dockerpass)
+
+
+class _AcumosAccess(object):
+ def __init__(self, config, url):
+ self.cert = config.certfile
+ self.verify = config.certverify
+ self.url = url.strip().rstrip('/')
+
+ def artgetter(self, xrev, matcher):
+ for art in xrev['artifacts']:
+ if matcher(art):
+ def ret():
+ nurl = '{}/artifacts/{}/content'.format(self.url, art['artifactId'])
+ resp = requests.get(nurl, stream=True, cert=self.cert, verify=self.verify)
+ if resp.status_code == 500:
+ resp = requests.get(nurl, stream=True, cert=self.cert, verify=self.verify)
+ resp.raise_for_status()
+ return resp
+ return ret
+ return None
+
+ def jsonget(self, format, *args, **kwargs):
+ nurl = self.url + format.format(*args, **kwargs)
+ resp = requests.get(nurl, cert=self.cert, verify=self.verify)
+ if resp.status_code == 500:
+ resp = requests.get(nurl, cert=self.cert, verify=self.verify)
+ resp.raise_for_status()
+ return resp.json()['content']
+
+
+def _x_proto_matcher(art):
+ """ Is this artifact the x.proto file? """
+ return art['name'].endswith('.proto')
+
+
+def _x_zip_matcher(art):
+ """ Is this artifact the x.zip file? """
+ return art['name'].endswith('.zip')
+
+
+def _md_json_matcher(art):
+ """ Is this artifact the metadata.json file? """
+ return art['name'].endswith('.json')
+
+
+def _walk(config):
+ """
+ Walk the Federation E5 interface of an Acumos instance
+ """
+ url = config.acumosurl
+ callback = _makecallback(config)
+ catalogs = config.catalogs
+ aa = _AcumosAccess(config, url)
+ for catalog in aa.jsonget('/catalogs'):
+ if catalogs is not None and catalog['catalogId'] not in catalogs and catalog['name'] not in catalogs:
+ continue
+ for solution in aa.jsonget('/solutions?catalogId={}', catalog['catalogId']):
+ for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
+ onboard(aa, callback, solution, revision['revisionId'])
+
+
+def onboard(aa, callback, solution, revid):
+ xrev = aa.jsonget('/solutions/{}/revisions/{}', solution['solutionId'], revid)
+ callback(model_name=solution['name'], model_version=xrev['version'], model_last_updated=xrev['modified'], rating=solution['ratingAverageTenths'] / 10.0, proto_getter=aa.artgetter(xrev, _x_proto_matcher), zip_getter=aa.artgetter(xrev, _x_zip_matcher), metadata_getter=aa.artgetter(xrev, _md_json_matcher))
+
+
+def _pullfile(source, dest):
+ with open(dest, 'wb') as f:
+ for chunk in source().iter_content(65536):
+ f.write(chunk)
+
+
+_loadedformats = set()
+_loadedcomponents = set()
+
+
+def scan(config):
+ _walk(config)
+
+
+def _makecallback(config):
+ workdir = config.tmpdir
+ obauth = config.obauth()
+ oburl = config.oburl
+
+ def callback(model_name, model_version, model_last_updated, rating, proto_getter, zip_getter, metadata_getter):
+ model_name = model_name.lower()
+ model_version = model_version.lower()
+ compid = (model_name, model_version)
+ if compid in _loadedcomponents:
+ print('Skipping component {}: already analyzed'.format(compid))
+ return
+ if proto_getter is None or zip_getter is None or metadata_getter is None:
+ print('Skipping component {}: does not have required artifacts'.format(compid))
+ _loadedcomponents.add(compid)
+ return
+ modeldir = '{}/{}'.format(workdir, model_name)
+ shutil.rmtree(modeldir, True)
+ os.makedirs(modeldir)
+ try:
+ _pullfile(proto_getter, '{}/model.proto'.format(modeldir))
+ _pullfile(zip_getter, '{}/model.zip'.format(modeldir))
+ _pullfile(metadata_getter, '{}/metadata.json'.format(modeldir))
+ except Exception:
+ print('Skipping component {}: artifact access error'.format(compid))
+ _loadedcomponents.add(compid)
+ return
+ try:
+ docker_uri, data_formats, spec = convert.gen_dcae_artifacts_for_model(config, model_name, model_version)
+ shutil.rmtree(modeldir)
+ except Exception:
+ print('Error analyzing artifacts for {}'.format(compid))
+ traceback.print_exc()
+ return
+ for data_format in data_formats:
+ fmtid = (data_format['self']['name'], data_format['self']['version'])
+ if fmtid in _loadedformats:
+ print('Skipping data format {}: already analyzed'.format(fmtid))
+ continue
+ try:
+ resp = requests.post(oburl('/dataformats'), json={'owner': config.dcaeuser, 'spec': data_format}, auth=obauth)
+ if resp.status_code == 409:
+ print('Skipping data format {}: previously loaded'.format(fmtid))
+ _loadedformats.add(fmtid)
+ continue
+ resp.raise_for_status()
+ requests.patch(resp.json()['dataFormatUrl'], json={'owner': config.dcaeuser, 'status': 'published'}, auth=obauth).raise_for_status()
+ print('Loaded data format {}'.format(fmtid))
+ _loadedformats.add(fmtid)
+ except Exception:
+ print('Error loading data format {}'.format(fmtid))
+ traceback.print_exc()
+ raise
+ try:
+ resp = requests.post(oburl('/components'), json={'owner': config.dcaeuser, 'spec': spec}, auth=obauth)
+ if resp.status_code == 409:
+ print('Skipping component {}: previously loaded'.format(compid))
+ _loadedcomponents.add(compid)
+ return
+ resp.raise_for_status()
+ requests.patch(resp.json()['componentUrl'], json={'owner': config.dcaeuser, 'status': 'published'}, auth=obauth).raise_for_status()
+ print('Loaded component {}'.format(compid))
+ _loadedcomponents.add(compid)
+ except Exception:
+ print('Error loading component {}'.format(compid))
+ traceback.print_exc()
+ raise
+ return callback
+
+
+class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
+ protocol_version = "HTTP/1.1"
+
+
+class Apihandler(BaseHTTPRequestHandler):
+ def doqp(self):
+ self.qparams = {}
+ if not self.path or '?' not in self.path:
+ return
+ self.path, qp = self.path.split('?', 1)
+ for x in qp.split('&'):
+ k, v = x.split('=', 1)
+ self.qparams[unquote_plus(k)] = unquote_plus(v)
+
+ def replyjson(self, body, ctype='application/json'):
+ self.replyraw(json.dumps(body).encode('utf-8'), ctype)
+
+ def replyraw(self, data, ctype):
+ self.send_response(200)
+ self.send_header('Content-Type', ctype)
+ self.send_header('Content-Length', len(data))
+ self.end_headers()
+ self.wfile.write(data)
+
+ def do_GET(self):
+ self.doqp()
+ if self.path == '/' or self.path == '/index.html':
+ self.replyraw(self.server.index, 'text/html')
+ return
+ if 'acumos' not in self.qparams:
+ self.send_error(400)
+ return
+ aa = _AcumosAccess(self.server.config, self.qparams['acumos'])
+ if self.path == '/listCatalogs.js':
+ self.replyjson([{'name': x['name'], 'id': x['catalogId']} for x in aa.jsonget('/catalogs')])
+ return
+ if self.path == '/listSolutions.js':
+ if 'catalogId' not in self.qparams:
+ self.send_error(400)
+ return
+ self.replyjson([{'name': x['name'], 'id': x['solutionId']} for x in aa.jsonget('/solutions?catalogId={}', self.qparams['catalogId'])])
+ return
+ if self.path == '/listRevisions.js':
+ if 'solutionId' not in self.qparams:
+ self.send_error(400)
+ return
+ self.replyjson([{'name': x['version'], 'id': x['revisionId']} for x in aa.jsonget('/solutions/{}/revisions', self.qparams['solutionId'])])
+ return
+ self.send_error(404)
+
+ def do_POST(self):
+ self.doqp()
+ if self.path == '/onboard.js':
+ if 'acumos' not in self.qparams:
+ self.send_error(400)
+ return
+ aa = _AcumosAccess(self.server.config, self.qparams['acumos'])
+ callback = self.server.callback
+ if 'catalogId' not in self.qparams:
+ for catalog in aa.jsonget('/catalogs'):
+ for solution in aa.jsonget('/solutions?catalogId={}', catalog['catalogId']):
+ for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
+ onboard(aa, callback, solution, revision['revisionId'])
+ elif 'solutionId' not in self.qparams:
+ for solution in aa.jsonget('/solutions?catalogId={}', self.qparams['catalogId']):
+ for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
+ onboard(aa, callback, solution, revision['revisionId'])
+ elif 'revisionId' not in self.qparams:
+ solution = aa.jsonget('/solutions/{}', self.qparams['solutionId'])
+ for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
+ onboard(aa, callback, solution, revision['revisionId'])
+ else:
+ solution = aa.jsonget('/solutions/{}', self.qparams['solutionId'])
+ onboard(aa, callback, solution, self.qparams['revisionId'])
+ self.replyraw('OK', 'text/plain')
+ return
+ self.send_error(400)
+
+
+def serve(config):
+ server = ThreadedHTTPServer(('', config.port), Apihandler)
+ server.config = config
+ server.callback = _makecallback(config)
+ server.index = resource_string(__name__, 'index.html')
+ server.serve_forever()