summaryrefslogtreecommitdiffstats
path: root/mod/onboardingapi/dcae_cli/catalog
diff options
context:
space:
mode:
authorMichael Hwang <mhwang@research.att.com>2019-11-12 16:04:20 -0500
committerMichael Hwang <mhwang@research.att.com>2019-12-13 16:46:11 -0500
commitc698e66797bad69b4c77b26b487bf8322989beb0 (patch)
treee40a8449728768107e4ab4c1ac506af13230a580 /mod/onboardingapi/dcae_cli/catalog
parent9cb529e42f5625f2fa802e21919b10f814a89ca7 (diff)
Copy dcae-cli->onboardingapi, copy component specs
Issue-ID: DCAEGEN2-1860 Change-Id: I4805398c76479fad51cbdb74470ccc8f706ce9dc Signed-off-by: Michael Hwang <mhwang@research.att.com>
Diffstat (limited to 'mod/onboardingapi/dcae_cli/catalog')
-rw-r--r--mod/onboardingapi/dcae_cli/catalog/__init__.py36
-rw-r--r--mod/onboardingapi/dcae_cli/catalog/exc.py45
-rw-r--r--mod/onboardingapi/dcae_cli/catalog/mock/__init__.py21
-rw-r--r--mod/onboardingapi/dcae_cli/catalog/mock/catalog.py834
-rw-r--r--mod/onboardingapi/dcae_cli/catalog/mock/schema.py191
-rw-r--r--mod/onboardingapi/dcae_cli/catalog/mock/tables.py149
-rw-r--r--mod/onboardingapi/dcae_cli/catalog/mock/tests/test_mock_catalog.py786
-rw-r--r--mod/onboardingapi/dcae_cli/catalog/mock/tests/test_schema.py421
8 files changed, 2483 insertions, 0 deletions
diff --git a/mod/onboardingapi/dcae_cli/catalog/__init__.py b/mod/onboardingapi/dcae_cli/catalog/__init__.py
new file mode 100644
index 0000000..d9b09f5
--- /dev/null
+++ b/mod/onboardingapi/dcae_cli/catalog/__init__.py
@@ -0,0 +1,36 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+# -*- coding: utf-8 -*-
+'''
+Provides catalog utilities.
+'''
+from dcae_cli.catalog.mock.catalog import MockCatalog
+
+from dcae_cli.util.exc import DcaeException
+
+
+def get_catalog(**config):
+ '''Returns a catalog object'''
+ ctype = config.get('ctype', 'local')
+ if ctype == 'local':
+ return MockCatalog()
+ else:
+ raise DcaeException("Unsupported catalog type: {:}".format(ctype))
diff --git a/mod/onboardingapi/dcae_cli/catalog/exc.py b/mod/onboardingapi/dcae_cli/catalog/exc.py
new file mode 100644
index 0000000..5d65a41
--- /dev/null
+++ b/mod/onboardingapi/dcae_cli/catalog/exc.py
@@ -0,0 +1,45 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+# -*- coding: utf-8 -*-
+'''
+Provides catalog classes
+'''
+from dcae_cli.util.exc import DcaeException
+
+
+class CatalogError(DcaeException):
+ '''Base Catalog exception'''
+
+
+class DuplicateEntry(CatalogError):
+ pass
+
+
+class MissingEntry(CatalogError):
+ pass
+
+
+class FrozenEntry(CatalogError):
+ pass
+
+
+class ForbiddenRequest(CatalogError):
+ pass \ No newline at end of file
diff --git a/mod/onboardingapi/dcae_cli/catalog/mock/__init__.py b/mod/onboardingapi/dcae_cli/catalog/mock/__init__.py
new file mode 100644
index 0000000..ceddbb9
--- /dev/null
+++ b/mod/onboardingapi/dcae_cli/catalog/mock/__init__.py
@@ -0,0 +1,21 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+# -*- coding: utf-8 -*-
diff --git a/mod/onboardingapi/dcae_cli/catalog/mock/catalog.py b/mod/onboardingapi/dcae_cli/catalog/mock/catalog.py
new file mode 100644
index 0000000..dcebdca
--- /dev/null
+++ b/mod/onboardingapi/dcae_cli/catalog/mock/catalog.py
@@ -0,0 +1,834 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+# -*- coding: utf-8 -*-
+"""
+Provides the mock catalog
+"""
+import os
+import json
+import contextlib
+import itertools
+from functools import partial
+from datetime import datetime
+
+import six
+
+from sqlalchemy import create_engine as create_engine_, event, and_, or_
+from sqlalchemy.exc import IntegrityError
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy.orm.exc import NoResultFound
+from sqlalchemy_utils import database_exists, create_database, drop_database
+
+from dcae_cli import _version as cli_version
+from dcae_cli.catalog.mock.tables import Component, Format, FormatPair, Base
+from dcae_cli.catalog.mock.schema import validate_component, validate_format, apply_defaults_docker_config
+from dcae_cli.util import reraise_with_msg, get_app_dir
+from dcae_cli.util.config import get_config, get_path_component_spec, \
+ get_path_data_format
+from dcae_cli.util.logger import get_logger
+from dcae_cli.util.docker_util import image_exists
+from dcae_cli.catalog.exc import CatalogError, DuplicateEntry, MissingEntry, FrozenEntry, ForbiddenRequest
+from dcae_cli.util.cdap_util import normalize_cdap_params
+
+
+logger = get_logger('Catalog')
+
+
+#INTERNAL HELPERS
+def _get_component(session, name, version):
+ '''Returns a single component ORM'''
+ try:
+ if not version:
+ query = session.query(Component).filter(Component.name==name).order_by(Component.version.desc()).limit(1)
+ else:
+ query = session.query(Component).filter(Component.name==name, Component.version==version)
+ return query.one()
+ except NoResultFound:
+ comp_msg = "{}:{}".format(name, version) if version else name
+ raise MissingEntry("Component '{}' was not found in the catalog".format(comp_msg))
+
+def _get_component_by_id(session, component_id):
+ try:
+ result = session.query(Component).filter(Component.id==component_id).one()
+ return result.__dict__
+ except NoResultFound:
+ raise MissingEntry("Component '{0}' was not found in the catalog" \
+ .format(component_id))
+
+
+def _get_docker_image_from_spec(spec):
+ images = [ art["uri"] for art in spec["artifacts"] if art["type"] == "docker image" ]
+ return images[0]
+
+def _get_docker_image(session, name, version):
+ '''Returns the docker image name of a given component'''
+ comp = _get_component(session, name, version)
+ return _get_docker_image_from_spec(comp.get_spec_as_dict())
+
+def _add_docker_component(session, user, spec, update, enforce_image=True):
+ '''Adds/updates a docker component to the catalog'''
+ image = _get_docker_image_from_spec(spec)
+
+ if enforce_image and not image_exists(image):
+ raise CatalogError("Specified image '{}' does not exist locally.".format(image))
+
+ comp = build_generic_component(session, user, spec, update)
+ session.commit()
+
+def _get_cdap_jar_from_spec(spec):
+ jars = [ art["uri"] for art in spec["artifacts"] if art["type"] == "jar" ]
+ return jars[0]
+
+def _add_cdap_component(session, user, spec, update):
+ '''Adds/updates a cdap component to the catalog'''
+ comp = build_generic_component(session, user, spec, update)
+ session.commit()
+
+
+#PUBLIC FUNCTIONS
+@contextlib.contextmanager
+def SessionTransaction(engine):
+ '''Provides a transactional scope around a series of operations'''
+ Session = sessionmaker(engine)
+ try:
+ session = Session()
+ yield session
+ session.commit()
+ except IntegrityError as e:
+ session.rollback()
+ _raise_if_duplicate(str(engine.url), e)
+ raise
+ except Exception:
+ session.rollback()
+ raise
+ finally:
+ session.close()
+
+
+_dup_e = DuplicateEntry('name:version already exists. To update: [In CLI: Use the --update flag] [For HTTP request: Use PUT]')
+
+def _raise_if_duplicate(url, e):
+ '''Raises if the exception relates to duplicate entries'''
+ if 'sqlite' in url:
+ if 'UNIQUE' in e.orig.args[0].upper():
+ raise _dup_e
+ elif 'postgres' in url:
+ # e.orig is of type psycopg2.IntegrityError that has
+ # pgcode which uses the following:
+ #
+ # https://www.postgresql.org/docs/current/static/errcodes-appendix.html#ERRCODES-TABLE
+ #
+ # 23505 means "unique_violation"
+ if e.orig.pgcode == "23505":
+ raise _dup_e
+
+def create_engine(base, db_name=None, purge_existing=False, db_url=None):
+ '''Returns an initialized database engine'''
+ if db_url is None:
+ if db_name is None:
+ # no url or db name indicates we want to use the tool's configured db
+ config = get_config()
+ url = config['db_url']
+ else:
+ # if only a db name is given, interpret as a sqlite db in the app dir. this maintains backwards compat with existing tests.
+ db_path = os.path.join(get_app_dir(), db_name)
+ url = ''.join(('sqlite:///', db_path))
+ else:
+ # a full db url is the most explicit input and should be used over other inputs if provided
+ url = db_url
+
+ if not database_exists(url):
+ create_database(url)
+ elif purge_existing:
+ drop_database(url)
+ create_database(url)
+
+ engine = create_engine_(url)
+ _configure_engine(engine)
+ base.metadata.create_all(engine)
+ return engine
+
+
+def _configure_engine(engine):
+ '''Performs additional db-specific configurations'''
+ str_url = str(engine.url)
+ if 'sqlite' in str_url:
+ event.listen(engine, 'connect', lambda conn, record: conn.execute('pragma foreign_keys=ON'))
+
+
+def get_format(session, name, version):
+ '''Returns a single data format ORM'''
+ try:
+ if not version:
+ query = session.query(Format).filter(Format.name==name).order_by(Format.version.desc()).limit(1)
+ else:
+ query = session.query(Format).filter(Format.name==name, Format.version==version)
+ return query.one()
+ except NoResultFound:
+ msg = "{}:{}".format(name, version) if version else name
+ raise MissingEntry("Data format '{}' was not found in the catalog.".format(msg))
+
+
+def _get_format_by_id(session, dataformat_id):
+ try:
+ result = session.query(Format).filter(Format.id==dataformat_id).one()
+ return result.__dict__
+ except NoResultFound:
+ raise MissingEntry("Dataformat '{0}' was not found in the catalog" \
+ .format(dataformat_id))
+
+
+def _create_format_tuple(entry):
+ '''Create tuple to identify format'''
+ return (entry['format'], entry['version'])
+
+
+def _get_format_pair(session, req_name, req_version, resp_name, resp_version, create=True):
+ '''Returns a single data format pair ORM'''
+ req = get_format(session, req_name, req_version)
+ resp = get_format(session, resp_name, resp_version)
+
+ query = session.query(FormatPair).filter(and_(FormatPair.req == req, FormatPair.resp == resp))
+ try:
+ return query.one()
+ except NoResultFound:
+ if not create:
+ raise MissingEntry("Data format pair with request '{}:{}' and response '{}:{}' was not found in the catalog.".format(req.name, req.version, resp.name, resp.version))
+
+ pair = FormatPair(req=req, resp=resp)
+ session.add(pair)
+ return pair
+
+def _create_format_pair_tuple(entry):
+ '''Create tuple to identify format pair'''
+ req_name, req_version = entry['request']['format'], entry['request']['version']
+ resp_name, resp_version = entry['response']['format'], entry['response']['version']
+ return (req_name, req_version, resp_name, resp_version)
+
+def _get_unique_format_things(create_tuple, get_func, entries):
+ '''Get unique format things (formats, format pairs, ..)
+
+ Args
+ ----
+ create_tuple: Function that has the signature dict->tuple
+ get_func: Function that has the signature *tuple->orm
+ entries: list of dicts that have data format details that come from
+ streams.publishes, streams.subscribes, services.calls, services.provides
+
+ Return
+ ------
+ List of unique orms
+ '''
+ src = set(create_tuple(entry) for entry in entries)
+ return [get_func(*yo) for yo in src]
+
+
+def verify_component(session, name, version):
+ '''Returns the orm name and version of a given component'''
+ orm = _get_component(session, name, version)
+ return orm.name, orm.version
+
+
+def get_component_type(session, name, version):
+ '''Returns the component_type of a given component'''
+ return _get_component(session, name, version).component_type
+
+
+def get_component_spec(session, name, version):
+ '''Returns the spec dict of a given component'''
+ return json.loads(_get_component(session, name, version).spec)
+
+
+def get_component_id(session, name, version):
+ '''Returns the id of a given component'''
+ return _get_component(session, name, version).id
+
+
+def get_format_spec(session, name, version):
+ '''Returns the spec dict of a given data format'''
+ return json.loads(get_format(session, name, version).spec)
+
+
+def get_dataformat_id(session, name, version):
+ '''Returns the id of a given data format'''
+ return get_format(session, name, version).id
+
+
+def build_generic_component(session, user, spec, update):
+ '''Builds, adds, and returns a generic component ORM. Does not commit changes.'''
+ attrs = spec['self'].copy()
+ attrs['spec'] = json.dumps(spec)
+
+ # TODO: This should really come from the spec too
+ attrs['owner'] = user
+
+ # grab existing or create a new component
+ name, version = attrs['name'], attrs['version']
+ if update:
+ comp = _get_component(session, name, version)
+ if comp.is_published():
+ raise FrozenEntry("Component '{}:{}' has been published and cannot be updated".format(name, version))
+ else:
+ comp = Component()
+ session.add(comp)
+
+ # REVIEW: Inject these parameters as function arguments instead of this
+ # hidden approach?
+ # WATCH: This has to be done here before the code below because there is a
+ # commit somewhere below and since these fields are not nullable, you'll get a
+ # violation.
+ comp.cli_version = cli_version.__version__
+ comp.schema_path = get_path_component_spec()
+
+ # build the ORM
+ for attr, val in six.iteritems(attrs):
+ setattr(comp, attr, val)
+
+ # update relationships
+ get_format_local = partial(get_format, session)
+ get_unique_formats = partial(_get_unique_format_things, _create_format_tuple,
+ get_format_local)
+
+ try:
+ comp.publishes = get_unique_formats(spec['streams']['publishes'])
+ except MissingEntry as e:
+ reraise_with_msg(e, 'Add failed while traversing "publishes"')
+
+ try:
+ comp.subscribes = get_unique_formats(spec['streams']['subscribes'])
+ except MissingEntry as e:
+ reraise_with_msg(e, 'Add failed while traversing "subscribes"')
+
+ get_format_pairs = partial(_get_format_pair, session)
+ get_unique_format_pairs = partial(_get_unique_format_things,
+ _create_format_pair_tuple, get_format_pairs)
+
+ try:
+ comp.provides = get_unique_format_pairs(spec['services']['provides'])
+ except MissingEntry as e:
+ reraise_with_msg(e, 'Add failed while traversing "provides"')
+
+ try:
+ comp.calls = get_unique_format_pairs(spec['services']['calls'])
+ except MissingEntry as e:
+ reraise_with_msg(e, 'Add failed while traversing "calls"')
+
+ return comp
+
+
+def add_format(session, spec, user, update):
+ '''Helper function which adds a data format to the catalog'''
+ attrs = spec['self'].copy()
+ attrs['spec'] = json.dumps(spec)
+ name, version = attrs['name'], attrs['version']
+
+ # TODO: This should really come from the spec too
+ attrs['owner'] = user
+
+ if update:
+ data_format = get_format(session, name, version)
+ if data_format.is_published():
+ raise FrozenEntry("Data format {}:{} has been published and cannot be updated".format(name, version))
+ else:
+ data_format = Format()
+ session.add(data_format)
+
+ # build the ORM
+ for attr, val in six.iteritems(attrs):
+ setattr(data_format, attr, val)
+
+ # REVIEW: Inject these parameters as function arguments instead of this
+ # hidden approach?
+ data_format.cli_version = cli_version.__version__
+ data_format.schema_path = get_path_data_format()
+
+ session.commit()
+
+
+def _filter_neighbors(session, neighbors=None):
+ '''Returns a Component query filtered by available neighbors'''
+ if neighbors is None:
+ query = session.query(Component)
+ else:
+ subfilt = or_(and_(Component.name==n, Component.version==v) for n,v in neighbors)
+ query = session.query(Component).filter(subfilt)
+ return query
+
+
+def get_subscribers(session, orm, neighbors=None):
+ '''Returns a list of component ORMs which subscribe to the specified format'''
+ query = _filter_neighbors(session, neighbors)
+ return query.filter(Component.subscribes.contains(orm)).all()
+
+
+def get_providers(session, orm, neighbors=None):
+ '''Returns a list of component ORMs which provide the specified format pair'''
+ query = _filter_neighbors(session, neighbors)
+ return query.filter(Component.provides.contains(orm)).all()
+
+
+def _match_pub(entries, orms):
+ '''Aligns the publishes orms with spec entries to get the config key'''
+ lookup = {(orm.name, orm.version): orm for orm in orms}
+ for entry in entries:
+ if "http" not in entry["type"]:
+ continue
+
+ key = (entry['format'], entry['version'])
+ yield entry['config_key'], lookup[key]
+
+
+def _match_call(entries, orms):
+ '''Aligns the calls orms with spec entries to get the config key'''
+ lookup = {(orm.req.name, orm.req.version, orm.resp.name, orm.resp.version): orm for orm in orms}
+ for entry in entries:
+ key = (entry['request']['format'], entry['request']['version'], entry['response']['format'], entry['response']['version'])
+ yield entry['config_key'], lookup[key]
+
+def get_discovery(get_params_func, session, name, version, neighbors=None):
+ '''Returns the parameters and interface map for a given component and considering its neighbors'''
+ comp = _get_component(session, name, version)
+ spec = json.loads(comp.spec)
+ interfaces = dict()
+ for key, orm in _match_pub(spec['streams']['publishes'], comp.publishes):
+ interfaces[key] = [(c.name, c.version) for c in get_subscribers(session, orm, neighbors) if not c is comp]
+
+ for key, orm in _match_call(spec['services']['calls'], comp.calls):
+ interfaces[key] = [(c.name, c.version) for c in get_providers(session, orm, neighbors) if not c is comp]
+
+ params = get_params_func(spec)
+ return params, interfaces
+
+_get_discovery_for_cdap = partial(get_discovery, normalize_cdap_params)
+_get_discovery_for_docker = partial(get_discovery,
+ lambda spec: {param['name']: param['value'] for param in spec['parameters']})
+
+
+def _get_discovery_for_dmaap(get_component_spec_func, name, version):
+ """Get all config keys that are for dmaap streams
+
+ Returns:
+ --------
+ Tuple of message router config keys list, data router config keys list
+ """
+ spec = get_component_spec_func(name, version)
+
+ all_streams = spec["streams"].get("publishes", []) \
+ + spec["streams"].get("subscribes", [])
+
+ def is_for_message_router(stream):
+ return stream["type"] == "message router" \
+ or stream["type"] == "message_router"
+
+ mr_keys = [ stream["config_key"] for stream in filter(is_for_message_router, all_streams) ]
+
+ def is_for_data_router(stream):
+ return stream["type"] == "data router" \
+ or stream["type"] == "data_router"
+
+ dr_keys = [ stream["config_key"] for stream in filter(is_for_data_router, all_streams) ]
+ return mr_keys, dr_keys
+
+
+def _filter_latest(orms):
+ '''Filters and yields only (name, version, *) orm tuples with the highest version'''
+ get_first_key_func = lambda x: x[0]
+ # itertools.groupby requires the input to be sorted
+ sorted_orms = sorted(orms, key=get_first_key_func)
+ for _, g in itertools.groupby(sorted_orms, get_first_key_func):
+ yield max(g, key=lambda x: x[1])
+
+
+def list_components(session, user, only_published, subscribes=None, publishes=None,
+ provides=None, calls=None, latest=True):
+ """Get list of components
+
+ Returns:
+ --------
+ List of component orms as dicts
+ """
+ filters = list()
+ if subscribes:
+ filters.extend(Component.subscribes.contains(get_format(session, n, v)) for n, v in subscribes)
+ if publishes:
+ filters.extend(Component.publishes.contains(get_format(session, n, v)) for n, v in publishes)
+ if provides:
+ filters.extend(Component.provides.contains(_get_format_pair(session, reqn, reqv, respn, respv, create=False))
+ for (reqn, reqv), (respn, respv) in provides)
+ if calls:
+ filters.extend(Component.calls.contains(_get_format_pair(session, reqn, reqv, respn, respv, create=False))
+ for (reqn, reqv), (respn, respv) in calls)
+ if filters:
+ query = session.query(Component).filter(or_(*filters))
+ else:
+ query = session.query(Component)
+
+ if user:
+ query = query.filter(Component.owner==user)
+ if only_published:
+ query = query.filter(Component.when_published!=None)
+
+ orms = ((orm.name, orm.version, orm.component_type, orm) for orm in query)
+
+ if latest:
+ orms = _filter_latest(orms)
+
+ return [ orm.__dict__ for _, _, _, orm in orms ]
+
+
+def _list_formats(session, user, only_published, latest=True):
+ """Get list of data formats
+
+ Returns
+ -------
+ List of data format orms as dicts
+ """
+ query = session.query(Format).order_by(Format.modified.desc())
+
+ if user:
+ query = query.filter(Format.owner==user)
+ if only_published:
+ query = query.filter(Format.when_published!=None)
+
+ orms = [ (orm.name, orm.version, orm) for orm in query ]
+
+ if latest:
+ orms = _filter_latest(orms)
+ return [ orm.__dict__ for _, _, orm in orms ]
+
+
+def build_config_keys_map(spec):
+ """Build config keys map
+
+ Return
+ ------
+ Dict where each item:
+
+ <config_key>: { "group": <grouping>, "type": <http|message_router|data_router> }
+
+ where grouping includes "streams_publishes", "streams_subscribes", "services_calls"
+ """
+ # subscribing as http doesn't have config key
+ ss = [ (s["config_key"], { "group": "streams_subscribes", "type": s["type"] })
+ for s in spec["streams"]["subscribes"] if "config_key" in s]
+ sp = [ (s["config_key"], { "group": "streams_publishes", "type": s["type"] })
+ for s in spec["streams"]["publishes"] ]
+ sc = [ (s["config_key"], { "group": "services_calls" })
+ for s in spec["services"]["calls"] ]
+ return dict(ss+sp+sc)
+
+
+def get_data_router_subscriber_route(spec, config_key):
+ """Get route by config key for data router subscriber
+
+ Utility method that parses the component spec
+ """
+ for s in spec["streams"].get("subscribes", []):
+ if s["type"] in ["data_router", "data router"] \
+ and s["config_key"] == config_key:
+ return s["route"]
+
+ raise MissingEntry("No data router subscriber for {0}".format(config_key))
+
+
+class MockCatalog(object):
+
+ def __init__(self, purge_existing=False, enforce_image=False, db_name=None, engine=None, db_url=None):
+ self.engine = create_engine(Base, db_name=db_name, purge_existing=purge_existing, db_url=db_url) if engine is None else engine
+ self.enforce_image = enforce_image
+
+ def add_component(self, user, spec, update=False):
+ '''Validates component specification and adds component to the mock catalog'''
+ validate_component(spec)
+
+ component_type = spec["self"]["component_type"]
+
+ with SessionTransaction(self.engine) as session:
+ if component_type == "cdap":
+ _add_cdap_component(session, user, spec, update)
+ elif component_type == "docker":
+ _add_docker_component(session, user, spec, update, enforce_image=self.enforce_image)
+ else:
+ raise CatalogError("Unknown component type: {0}".format(component_type))
+
+ def get_docker_image(self, name, version):
+ '''Returns the docker image name associated with this component'''
+ with SessionTransaction(self.engine) as session:
+ return _get_docker_image(session, name, version)
+
+ def get_docker(self, name, version):
+ with SessionTransaction(self.engine) as session:
+ comp = _get_component(session, name, version)
+ spec = comp.get_spec_as_dict()
+ # NOTE: Defaults are being applied for docker config here at read
+ # time. Not completely sure that this is the correct approach. The
+ # benefit is that defaults can be changed without altering the stored
+ # specs. It's a nice layering.
+ docker_config = apply_defaults_docker_config(spec["auxilary"])
+ return _get_docker_image_from_spec(spec), docker_config, spec
+
+ def get_docker_config(self, name, version):
+ _, docker_config, _ = self.get_docker(name, version)
+ return docker_config
+
+ def get_cdap(self, name, version):
+ '''Returns a tuple representing this cdap component
+
+ Returns
+ -------
+ tuple(jar, config, spec)
+ jar: string
+ URL where the CDAP jar is located.
+ config: dict
+ A dictionary loaded from the CDAP JSON configuration file.
+ spec: dict
+ The dcae-cli component specification file.
+ '''
+ with SessionTransaction(self.engine) as session:
+ comp = _get_component(session, name, version)
+ spec = comp.get_spec_as_dict()
+ cdap_config = spec["auxilary"]
+ return _get_cdap_jar_from_spec(spec), cdap_config, spec
+
+ def get_component_type(self, name, version):
+ '''Returns the component type associated with this component'''
+ with SessionTransaction(self.engine) as session:
+ return get_component_type(session, name, version)
+
+ def get_component_spec(self, name, version):
+ '''Returns the spec dict associated with this component'''
+ with SessionTransaction(self.engine) as session:
+ return get_component_spec(session, name, version)
+
+ def get_component_id(self, name, version):
+ '''Returns the id associated with this component'''
+ with SessionTransaction(self.engine) as session:
+ return get_component_id(session, name, version)
+
+ def get_component_by_id(self, component_id):
+ '''Returns the component associated with this id'''
+ with SessionTransaction(self.engine) as session:
+ return _get_component_by_id(session, component_id)
+
+ def get_format_spec(self, name, version):
+ '''Returns the spec dict associated with this data format'''
+ with SessionTransaction(self.engine) as session:
+ return get_format_spec(session, name, version)
+
+ def get_dataformat_id(self, name, version):
+ '''Returns the id associated with this data format'''
+ with SessionTransaction(self.engine) as session:
+ return get_dataformat_id(session, name, version)
+
+ def get_dataformat_by_id(self, dataformat_id):
+ '''Returns the dataformat associated with this id'''
+ with SessionTransaction(self.engine) as session:
+ return _get_format_by_id(session, dataformat_id)
+
+ def add_format(self, spec, user, update=False):
+ '''Validates data format specification and adds data format to the mock catalog'''
+ validate_format(spec)
+ with SessionTransaction(self.engine) as session:
+ add_format(session, spec, user, update)
+
+ def get_discovery_for_cdap(self, name, version, neighbors=None):
+ '''Returns the parameters and interface map for a given component and considering its neighbors'''
+ with SessionTransaction(self.engine) as session:
+ return _get_discovery_for_cdap(session, name, version, neighbors)
+
+ def get_discovery_for_docker(self, name, version, neighbors=None):
+ '''Returns the parameters and interface map for a given component and considering its neighbors'''
+ with SessionTransaction(self.engine) as session:
+ return _get_discovery_for_docker(session, name, version, neighbors)
+
+ def get_discovery_for_dmaap(self, name, version):
+ with SessionTransaction(self.engine) as session:
+ get_component_spec_func = partial(get_component_spec, session)
+ return _get_discovery_for_dmaap(get_component_spec_func, name, version)
+
+ def get_discovery_from_spec(self, user, target_spec, neighbors=None):
+ '''Get pieces to generate configuration for the given target spec
+
+ This function is used to obtain the pieces needed to generate
+ the application configuration json: parameters map, interfaces map, dmaap
+ map. Where the input is a provided specification that hasn't been added to
+ the catalog - prospective specs - which includes a component that doesn't
+ exist or a new version of an existing spec.
+
+ Returns
+ -------
+ Tuple of three elements:
+
+ - Dict of parameter name to parameter value
+ - Dict of "config_key" to list of (component.name, component.version)
+ known as "interface_map"
+ - Tuple of lists of "config_key" the first for message router the second
+ for data router known as "dmaap_map"
+ '''
+ validate_component(target_spec)
+
+ with SessionTransaction(self.engine) as session:
+ # The following approach was taken in order to:
+ # 1. Re-use existing functionality e.g. implement fast
+ # 2. In order to make ORM-specific queries, I need the entire ORM
+ # in SQLAlchemy meaning I cannot do arbitrary DataFormatPair queries
+ # without Component.
+ name = target_spec["self"]["name"]
+ version = target_spec["self"]["version"]
+
+ try:
+ # Build a component with update to True first because you may
+ # want to run this for an existing component
+ build_generic_component(session, user, target_spec, True)
+ except MissingEntry:
+ # Since it doesn't exist already, build a new component
+ build_generic_component(session, user, target_spec, False)
+
+ # This is needed so that subsequent queries will "see" the component
+ session.flush()
+
+ ctype = target_spec["self"]["component_type"]
+
+ if ctype == "cdap":
+ params, interface_map = _get_discovery_for_cdap(session, name,
+ version, neighbors)
+ elif ctype == "docker":
+ params, interface_map = _get_discovery_for_docker(session, name,
+ version, neighbors)
+
+ # Don't want to commit these changes so rollback.
+ session.rollback()
+
+ # Use the target spec as the source to compile the config keys from
+ dmaap_config_keys = _get_discovery_for_dmaap(
+ lambda name, version: target_spec, name, version)
+
+ return params, interface_map, dmaap_config_keys
+
+ def verify_component(self, name, version):
+ '''Returns the component's name and version if it exists and raises an exception otherwise'''
+ with SessionTransaction(self.engine) as session:
+ return verify_component(session, name, version)
+
+ def list_components(self, subscribes=None, publishes=None, provides=None,
+ calls=None, latest=True, user=None, only_published=False):
+ '''Returns a list of component names which match the specified filter sequences'''
+ with SessionTransaction(self.engine) as session:
+ return list_components(session, user, only_published, subscribes,
+ publishes, provides, calls, latest)
+
+ def list_formats(self, latest=True, user=None, only_published=False):
+ """Get list of data formats
+
+ Returns
+ -------
+ List of data formats as dicts
+ """
+ with SessionTransaction(self.engine) as session:
+ return _list_formats(session, user, only_published, latest)
+
+ def get_format(self, name, version):
+ """Get data format
+
+ Throws MissingEntry exception if no matches found.
+
+ Returns
+ -------
+ Dict representation of data format
+ """
+ with SessionTransaction(self.engine) as session:
+ return get_format(session, name, version).__dict__
+
+ def _publish(self, get_func, user, name, version):
+ """Publish data format
+
+ Args:
+ -----
+ get_func: Function that takes a session, name, version and outputs a data
+ object either Component or Format
+
+ Returns:
+ --------
+ True upon success else False
+ """
+ # TODO: To make function composeable, it should take in the data format
+ # object
+ with SessionTransaction(self.engine) as session:
+ obj = get_func(session, name, version)
+
+ if obj:
+ if obj.owner != user:
+ errorMsg = "Not authorized to modify {0}:{1}".format(name, version)
+ logger.error(errorMsg)
+ raise ForbiddenRequest(errorMsg)
+ elif obj.when_published:
+ errorMsg = "{0}:{1} has already been published".format(name, version)
+ logger.error(errorMsg)
+ raise CatalogError(errorMsg)
+ else:
+ obj.when_published = datetime.utcnow()
+ session.commit()
+ else:
+ errorMsg = "{0}:{1} not found".format(name, version)
+ logger.error(errorMsg)
+ raise MissingEntry(errorMsg)
+
+ return True
+
+ def publish_format(self, user, name, version):
+ """Publish data format
+
+ Returns
+ -------
+ True upon success else False
+ """
+ return self._publish(get_format, user, name, version)
+
+ def get_unpublished_formats(self, comp_name, comp_version):
+ """Get unpublished formats for given component
+
+ Returns:
+ --------
+ List of unique data format name, version pairs
+ """
+ with SessionTransaction(self.engine) as session:
+ comp = _get_component(session, comp_name, comp_version)
+
+ dfs = comp.publishes + comp.subscribes
+ dfs += [ p.req for p in comp.provides]
+ dfs += [ p.resp for p in comp.provides]
+ dfs += [ c.req for c in comp.calls]
+ dfs += [ c.resp for c in comp.calls]
+
+ def is_not_published(orm):
+ return orm.when_published == None
+
+ formats = [(df.name, df.version) for df in filter(is_not_published, dfs)]
+ return list(set(formats))
+
+ def publish_component(self, user, name, version):
+ """Publish component
+
+ Returns
+ -------
+ True upon success else False
+ """
+ return self._publish(_get_component, user, name, version)
diff --git a/mod/onboardingapi/dcae_cli/catalog/mock/schema.py b/mod/onboardingapi/dcae_cli/catalog/mock/schema.py
new file mode 100644
index 0000000..640d125
--- /dev/null
+++ b/mod/onboardingapi/dcae_cli/catalog/mock/schema.py
@@ -0,0 +1,191 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+# -*- coding: utf-8 -*-
+"""
+Provides jsonschema
+"""
+import json
+from functools import partial, reduce
+
+import six
+from jsonschema import validate, ValidationError
+import requests
+
+from dcae_cli.util import reraise_with_msg, fetch_file_from_web
+from dcae_cli.util import config as cli_config
+from dcae_cli.util.exc import DcaeException
+from dcae_cli.util.logger import get_logger
+
+
+log = get_logger('Schema')
+
+# UPDATE: This message applies to the component spec which has been moved on a
+# remote server.
+#
+# WARNING: The below has a "oneOf" for service provides, that will validate as long as any of them are chosen.
+# However, this is wrong because what we really want is something like:
+# if component_type == docker
+# provides = foo
+# elif component_type == cdap
+# provides = bar
+# The unlikely but problematic case is the cdap developer gets a hold of the docker documentation, uses that, it validates, and blows up at cdap runtime
+
+
+# TODO: The next step here is to decide how to manage the links to the schemas. Either:
+#
+# a) Manage the links in the dcae-cli tool here and thus need to ask if this
+# belongs in the config to point to some remote server or even point to local
+# machine.
+# UPDATE: This item has been mostly completed where at least the path is configurable now.
+
+# b) Read the links to the schemas from the spec - self-describing jsons. Is
+# this even feasible?
+
+# c) Both
+#
+
+class FetchSchemaError(RuntimeError):
+ pass
+
+def _fetch_schema(schema_path):
+ try:
+ server_url = cli_config.get_server_url()
+ return fetch_file_from_web(server_url, schema_path)
+ except requests.HTTPError as e:
+ raise FetchSchemaError("HTTP error from fetching schema", e)
+ except Exception as e:
+ raise FetchSchemaError("Unexpected error from fetching schema", e)
+
+
+def _safe_dict(obj):
+ '''Returns a dict from a dict or json string'''
+ if isinstance(obj, str):
+ return json.loads(obj)
+ else:
+ return obj
+
+def _validate(fetch_schema_func, schema_path, spec):
+ '''Validate the given spec
+
+ Fetch the schema and then validate. Upon a error from fetching or validation,
+ a DcaeException is raised.
+
+ Parameters
+ ----------
+ fetch_schema_func: function that takes schema_path -> dict representation of schema
+ throws a FetchSchemaError upon any failure
+ schema_path: string - path to schema
+ spec: dict or string representation of JSON of schema instance
+
+ Returns
+ -------
+ Nothing, silence is golden
+ '''
+ try:
+ schema = fetch_schema_func(schema_path)
+ validate(_safe_dict(spec), schema)
+ except ValidationError as e:
+ reraise_with_msg(e, as_dcae=True)
+ except FetchSchemaError as e:
+ reraise_with_msg(e, as_dcae=True)
+
+_validate_using_nexus = partial(_validate, _fetch_schema)
+
+
+def apply_defaults(properties_definition, properties):
+ """Utility method to enforce expected defaults
+
+ This method is used to enforce properties that are *expected* to have at least
+ the default if not set by a user. Expected properties are not required but
+ have a default set. jsonschema does not provide this.
+
+ Parameters
+ ----------
+ properties_definition: dict of the schema definition of the properties to use
+ for verifying and applying defaults
+ properties: dict of the target properties to verify and apply defaults to
+
+ Return
+ ------
+ dict - a new version of properties that has the expected default values
+ """
+ # Recursively process all inner objects. Look for more properties and not match
+ # on type
+ for k,v in six.iteritems(properties_definition):
+ if "properties" in v:
+ properties[k] = apply_defaults(v["properties"], properties.get(k, {}))
+
+ # Collect defaults
+ defaults = [ (k, v["default"]) for k, v in properties_definition.items() if "default" in v ]
+
+ def apply_default(accumulator, default):
+ k, v = default
+ if k not in accumulator:
+ # Not doing data type checking and any casting. Assuming that this
+ # should have been taken care of in validation
+ accumulator[k] = v
+ return accumulator
+
+ return reduce(apply_default, defaults, properties)
+
+def apply_defaults_docker_config(config):
+ """Apply expected defaults to Docker config
+ Parameters
+ ----------
+ config: Docker config dict
+ Return
+ ------
+ Updated Docker config dict
+ """
+ # Apply health check defaults
+ healthcheck_type = config["healthcheck"]["type"]
+ component_spec = _fetch_schema(cli_config.get_path_component_spec())
+
+ if healthcheck_type in ["http", "https"]:
+ apply_defaults_func = partial(apply_defaults,
+ component_spec["definitions"]["docker_healthcheck_http"]["properties"])
+ elif healthcheck_type in ["script"]:
+ apply_defaults_func = partial(apply_defaults,
+ component_spec["definitions"]["docker_healthcheck_script"]["properties"])
+ else:
+ # You should never get here
+ apply_defaults_func = lambda x: x
+
+ config["healthcheck"] = apply_defaults_func(config["healthcheck"])
+
+ return config
+
+def validate_component(spec):
+ _validate_using_nexus(cli_config.get_path_component_spec(), spec)
+
+ # REVIEW: Could not determine how to do this nicely in json schema. This is
+ # not ideal. We want json schema to be the "it" for validation.
+ ctype = component_type = spec["self"]["component_type"]
+
+ if ctype == "cdap":
+ invalid = [s for s in spec["streams"].get("subscribes", []) \
+ if s["type"] in ["data_router", "data router"]]
+ if invalid:
+ raise DcaeException("Cdap component as data router subscriber is not supported.")
+
+def validate_format(spec):
+ path = cli_config.get_path_data_format()
+ _validate_using_nexus(path, spec)
diff --git a/mod/onboardingapi/dcae_cli/catalog/mock/tables.py b/mod/onboardingapi/dcae_cli/catalog/mock/tables.py
new file mode 100644
index 0000000..0e10b79
--- /dev/null
+++ b/mod/onboardingapi/dcae_cli/catalog/mock/tables.py
@@ -0,0 +1,149 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+# -*- coding: utf-8 -*-
+'''
+Provides a local mock catalog
+'''
+import uuid
+import json
+from datetime import datetime
+
+from sqlalchemy import UniqueConstraint, Table, Column, String, DateTime, ForeignKey, Boolean, Enum, Text
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship
+from sqlalchemy.schema import PrimaryKeyConstraint
+
+
+datetime_now = datetime.utcnow
+
+Base = declarative_base()
+
+
+published = Table('published', Base.metadata,
+ Column('component_id', String, ForeignKey('components.id', ondelete='CASCADE'), nullable=False),
+ Column('format_id', String, ForeignKey('formats.id', ondelete='CASCADE'), nullable=False),
+ PrimaryKeyConstraint('component_id', 'format_id')
+)
+
+
+subscribed = Table('subscribed', Base.metadata,
+ Column('component_id', String, ForeignKey('components.id', ondelete='CASCADE'), nullable=False),
+ Column('format_id', String, ForeignKey('formats.id', ondelete='CASCADE'), nullable=False),
+ PrimaryKeyConstraint('component_id', 'format_id')
+)
+
+
+provided = Table('provided', Base.metadata,
+ Column('component_id', String, ForeignKey('components.id', ondelete='CASCADE'), nullable=False),
+ Column('pair_id', String, ForeignKey('format_pairs.id', ondelete='CASCADE'), nullable=False),
+ PrimaryKeyConstraint('component_id', 'pair_id')
+)
+
+
+called = Table('called', Base.metadata,
+ Column('component_id', String, ForeignKey('components.id', ondelete='CASCADE'), nullable=False),
+ Column('pair_id', String, ForeignKey('format_pairs.id', ondelete='CASCADE'), nullable=False),
+ PrimaryKeyConstraint('component_id', 'pair_id')
+)
+
+
+def generate_uuid():
+ return str(uuid.uuid4())
+
+
+class Component(Base):
+ __tablename__ = 'components'
+ id = Column(String, primary_key=True, default=generate_uuid)
+ created = Column(DateTime, default=datetime_now, nullable=False)
+ modified = Column(DateTime, default=datetime_now, onupdate=datetime_now, nullable=False)
+ owner = Column(String, nullable=False)
+ # To be used for tracking and debugging
+ cli_version = Column(String, nullable=False)
+ schema_path = Column(String, nullable=False)
+
+ name = Column(String(), nullable=False)
+ component_type = Column(Enum('docker', 'cdap', name='component_types'), nullable=False)
+ version = Column(String(), nullable=False)
+ description = Column(Text(), nullable=False)
+ spec = Column(Text(), nullable=False)
+
+ when_added = Column(DateTime, default=datetime_now, nullable=True)
+ when_published = Column(DateTime, default=None, nullable=True)
+ when_revoked = Column(DateTime, default=None, nullable=True)
+
+ publishes = relationship('Format', secondary=published)
+ subscribes = relationship('Format', secondary=subscribed)
+ provides = relationship('FormatPair', secondary=provided)
+ calls = relationship('FormatPair', secondary=called)
+
+ __tableargs__ = (UniqueConstraint(name, version), )
+
+ def __repr__(self):
+ return '<{:}>'.format((self.__class__.__name__, self.id, self.name, self.version))
+
+ def is_published(self):
+ return self.when_published is not None
+
+ def get_spec_as_dict(self):
+ return json.loads(self.spec)
+
+
+class Format(Base):
+ __tablename__ = 'formats'
+ id = Column(String, primary_key=True, default=generate_uuid)
+ created = Column(DateTime, default=datetime_now, nullable=False)
+ modified = Column(DateTime, default=datetime_now, onupdate=datetime_now, nullable=False)
+ owner = Column(String, nullable=False)
+ # To be used for tracking and debugging
+ cli_version = Column(String, nullable=False)
+ schema_path = Column(String, nullable=False)
+
+ name = Column(String(), nullable=False)
+ version = Column(String(), nullable=False)
+ description = Column(Text(), nullable=True)
+ spec = Column(Text(), nullable=False)
+
+ when_added = Column(DateTime, default=datetime_now, nullable=True)
+ when_published = Column(DateTime, default=None, nullable=True)
+ when_revoked = Column(DateTime, default=None, nullable=True)
+
+ __tableargs__ = (UniqueConstraint(name, version), )
+
+ def __repr__(self):
+ return '<{:}>'.format((self.__class__.__name__, self.id, self.name, self.version))
+
+ def is_published(self):
+ return self.when_published is not None
+
+
+class FormatPair(Base):
+ __tablename__ = 'format_pairs'
+ id = Column(String, primary_key=True, default=generate_uuid)
+ req_id = Column(String, ForeignKey('formats.id', ondelete='CASCADE'))
+ resp_id = Column(String, ForeignKey('formats.id', ondelete='CASCADE'))
+
+ req = relationship('Format', foreign_keys=req_id, uselist=False)
+ resp = relationship('Format', foreign_keys=resp_id, uselist=False)
+
+ __table_args__ = (UniqueConstraint(req_id, resp_id), )
+
+ def __repr__(self):
+ return '<{:}>'.format((self.__class__.__name__, self.id, self.req, self.resp))
diff --git a/mod/onboardingapi/dcae_cli/catalog/mock/tests/test_mock_catalog.py b/mod/onboardingapi/dcae_cli/catalog/mock/tests/test_mock_catalog.py
new file mode 100644
index 0000000..0859c44
--- /dev/null
+++ b/mod/onboardingapi/dcae_cli/catalog/mock/tests/test_mock_catalog.py
@@ -0,0 +1,786 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+# -*- coding: utf-8 -*-
+'''
+Tests the mock catalog
+'''
+import json
+from copy import deepcopy
+from functools import partial
+
+import pytest
+
+from sqlalchemy.exc import IntegrityError
+
+from dcae_cli.catalog.mock.catalog import MockCatalog, MissingEntry, DuplicateEntry, _get_unique_format_things
+from dcae_cli.catalog.mock import catalog
+
+
+_c1_spec = {'self': {'name': 'std.comp_one',
+ 'version': '1.0.0',
+ 'description': 'comp1',
+ 'component_type': 'docker'},
+ 'streams': {'publishes': [{'format': 'std.format_one',
+ 'version': '1.0.0',
+ 'config_key': 'pub1',
+ 'type': 'http'}],
+ 'subscribes': [{'format': 'std.format_one',
+ 'version': '1.0.0',
+ 'route': '/sub1',
+ 'type': 'http'}]},
+ 'services': {'calls': [{'request': {'format': 'std.format_one',
+ 'version': '1.0.0'},
+ 'response': {'format': 'std.format_one',
+ 'version': '1.0.0'},
+ 'config_key': 'call1'}],
+ 'provides': [{'request': {'format': 'std.format_one',
+ 'version': '1.0.0'},
+ 'response': {'format': 'std.format_one',
+ 'version': '1.0.0'},
+ 'route': '/prov1'}]},
+ 'parameters': [{"name": "foo",
+ "value": 1,
+ "description": "the foo thing",
+ "designer_editable": False,
+ "sourced_at_deployment": False,
+ "policy_editable": False},
+ {"name": "bar",
+ "value": 2,
+ "description": "the bar thing",
+ "designer_editable": False,
+ "sourced_at_deployment": False,
+ "policy_editable": False}
+ ],
+ 'artifacts': [{ "uri": "foo-image", "type": "docker image" }],
+ 'auxilary': {
+ "healthcheck": {
+ "type": "http",
+ "endpoint": "/health",
+ "interval": "15s",
+ "timeout": "1s"
+ }
+ }
+ }
+
+_c2_spec = {'self': {'name': 'std.comp_two',
+ 'version': '1.0.0',
+ 'description': 'comp2',
+ 'component_type': 'docker'},
+ 'streams': {'publishes': [],
+ 'subscribes': [{'format': 'std.format_one',
+ 'version': '1.0.0',
+ 'route': '/sub1',
+ 'type': 'http'}]},
+ 'services': {'calls': [],
+ 'provides': [{'request': {'format': 'std.format_one',
+ 'version': '1.0.0'},
+ 'response': {'format': 'std.format_one',
+ 'version': '1.0.0'},
+ 'route': '/prov1'}]},
+ 'parameters': [],
+ 'artifacts': [{ "uri": "bar-image", "type": "docker image" }],
+ 'auxilary': {
+ "healthcheck": {
+ "type": "http",
+ "endpoint": "/health",
+ "interval": "15s",
+ "timeout": "1s"
+ }
+ }
+ }
+
+
+_c2v2_spec = {'self': {'name': 'std.comp_two',
+ 'version': '2.0.0',
+ 'description': 'comp2',
+ 'component_type': 'docker'},
+ 'streams': {'publishes': [],
+ 'subscribes': [{'format': 'std.format_one',
+ 'version': '1.0.0',
+ 'route': '/sub1',
+ 'type': 'http'}]},
+ 'services': {'calls': [],
+ 'provides': [{'request': {'format': 'std.format_one',
+ 'version': '1.0.0'},
+ 'response': {'format': 'std.format_one',
+ 'version': '1.0.0'},
+ 'route': '/prov1'}]},
+ 'parameters': [],
+ 'artifacts': [{ "uri": "baz-image", "type": "docker image" }],
+ 'auxilary': {
+ "healthcheck": {
+ "type": "http",
+ "endpoint": "/health",
+ "interval": "15s",
+ "timeout": "1s"
+ }
+ }
+ }
+
+
+_c3_spec = {'self': {'name': 'std.comp_three',
+ 'version': '3.0.0',
+ 'description': 'comp3',
+ 'component_type': 'docker'},
+ 'streams': {'publishes': [],
+ 'subscribes': [{'format': 'std.format_two',
+ 'version': '1.5.0',
+ 'route': '/sub1',
+ 'type': 'http'}]},
+ 'services': {'calls': [],
+ 'provides': [{'request': {'format': 'std.format_one',
+ 'version': '1.0.0'},
+ 'response': {'format': 'std.format_two',
+ 'version': '1.5.0'},
+ 'route': '/prov1'}]},
+ 'parameters': [],
+ 'artifacts': [{ "uri": "bazinga-image", "type": "docker image" }],
+ 'auxilary': {
+ "healthcheck": {
+ "type": "http",
+ "endpoint": "/health",
+ "interval": "15s",
+ "timeout": "1s"
+ }
+ }
+ }
+
+
+_df1_spec = {
+ "self": {
+ "name": "std.format_one",
+ "version": "1.0.0",
+ "description": "df1"
+ },
+ "dataformatversion": "1.0.0",
+ "jsonschema": {
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "raw-text": {
+ "type": "string"
+ }
+ },
+ "required": ["raw-text"],
+ "additionalProperties": False
+ }
+ }
+_df2_spec = {
+ "self": {
+ "name": "std.format_two",
+ "version": "1.5.0",
+ "description": "df2"
+ },
+ "dataformatversion": "1.0.0",
+ "jsonschema": {
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "raw-text": {
+ "type": "string"
+ }
+ },
+ "required": ["raw-text"],
+ "additionalProperties": False
+ }
+ }
+_df2v2_spec = {
+ "self": {
+ "name": "std.format_two",
+ "version": "2.0.0",
+ "description": "df2"
+ },
+ "dataformatversion": "1.0.0",
+ "jsonschema": {
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "raw-text": {
+ "type": "string"
+ }
+ },
+ "required": ["raw-text"],
+ "additionalProperties": False
+ }
+ }
+
+_cdap_spec={
+ "self":{
+ "name":"std.cdap_comp",
+ "version":"0.0.0",
+ "description":"cdap test component",
+ "component_type":"cdap"
+ },
+ "streams":{
+ "publishes":[
+ {
+ "format":"std.format_one",
+ "version":"1.0.0",
+ "config_key":"pub1",
+ "type": "http"
+ }
+ ],
+ "subscribes":[
+ {
+ "format":"std.format_two",
+ "version":"1.5.0",
+ "route":"/sub1",
+ "type": "http"
+ }
+ ]
+ },
+ "services":{
+ "calls":[
+
+ ],
+ "provides":[
+ {
+ "request":{
+ "format":"std.format_one",
+ "version":"1.0.0"
+ },
+ "response":{
+ "format":"std.format_two",
+ "version":"1.5.0"
+ },
+ "service_name":"baphomet",
+ "service_endpoint":"rises",
+ "verb":"GET"
+ }
+ ]
+ },
+ "parameters": {
+ "app_config" : [],
+ "app_preferences" : [],
+ "program_preferences" : []
+ },
+ "artifacts": [{"uri": "bahpomet.com", "type": "jar"}],
+ "auxilary": {
+ "streamname":"streamname",
+ "artifact_version":"6.6.6",
+ "artifact_name": "test_name",
+ "programs" : [{"program_type" : "flows", "program_id" : "flow_id"}]
+ }
+
+}
+
+
+def test_component_basic(mock_cli_config, mock_db_url, catalog=None):
+ '''Tests basic component usage of MockCatalog'''
+ if catalog is None:
+ mc = MockCatalog(db_name='dcae_cli.test.db', purge_existing=True,
+ enforce_image=False, db_url=mock_db_url)
+ else:
+ mc = catalog
+
+ c1_spec = deepcopy(_c1_spec)
+ df1_spec = deepcopy(_df1_spec)
+ df2_spec = deepcopy(_df2_spec)
+
+ user = "test_component_basic"
+
+ # success
+ mc.add_format(df2_spec, user)
+
+ # duplicate
+ with pytest.raises(DuplicateEntry):
+ mc.add_format(df2_spec, user)
+
+ # component relies on df1_spec which hasn't been added
+ with pytest.raises(MissingEntry):
+ mc.add_component(user, c1_spec)
+
+ # add df1 and comp1
+ mc.add_format(df1_spec, user)
+ mc.add_component(user, c1_spec)
+
+ with pytest.raises(DuplicateEntry):
+ mc.add_component(user, c1_spec)
+
+ cname, cver = mc.verify_component('std.comp_one', version=None)
+ assert cver == '1.0.0'
+
+
+def test_format_basic(mock_cli_config, mock_db_url, catalog=None):
+ '''Tests basic data format usage of MockCatalog'''
+ if catalog is None:
+ mc = MockCatalog(db_name='dcae_cli.test.db', purge_existing=True,
+ db_url=mock_db_url)
+ else:
+ mc = catalog
+
+ user = "test_format_basic"
+
+ df1_spec = deepcopy(_df1_spec)
+ df2_spec = deepcopy(_df2_spec)
+
+ # success
+ mc.add_format(df1_spec, user)
+
+ # duplicate is bad
+ with pytest.raises(DuplicateEntry):
+ mc.add_format(df1_spec, user)
+
+ # allow update of same version
+ new_descr = 'a new description'
+ df1_spec['self']['description'] = new_descr
+ mc.add_format(df1_spec, user, update=True)
+
+ # adding a new version is kosher
+ new_ver = '2.0.0'
+ df1_spec['self']['version'] = new_ver
+ mc.add_format(df1_spec, user)
+
+ # can't update a format that doesn't exist
+ with pytest.raises(MissingEntry):
+ mc.add_format(df2_spec, user, update=True)
+
+ # get spec and make sure it's updated
+ spec = mc.get_format_spec(df1_spec['self']['name'], version=None)
+ assert spec['self']['version'] == new_ver
+ assert spec['self']['description'] == new_descr
+
+
+def test_discovery(mock_cli_config, mock_db_url, catalog=None):
+ '''Tests creation of discovery objects'''
+ if catalog is None:
+ mc = MockCatalog(db_name='dcae_cli.test.db', purge_existing=True,
+ enforce_image=False, db_url=mock_db_url)
+ else:
+ mc = catalog
+
+ user = "test_discovery"
+
+ c1_spec = deepcopy(_c1_spec)
+ df1_spec = deepcopy(_df1_spec)
+ c2_spec = deepcopy(_c2_spec)
+
+ mc.add_format(df1_spec, user)
+ mc.add_component(user, c1_spec)
+ mc.add_component(user, c2_spec)
+
+ params, interfaces = mc.get_discovery_for_docker(c1_spec['self']['name'], c1_spec['self']['version'])
+ assert params == {'bar': 2, 'foo': 1}
+ assert interfaces == {'call1': [('std.comp_two', '1.0.0')], 'pub1': [('std.comp_two', '1.0.0')]}
+
+
+def _spec_tuple(dd):
+ '''Returns a (name, version, component type) tuple from a given component spec dict'''
+ return dd['self']['name'], dd['self']['version'], dd['self']['component_type']
+
+
+def _comp_tuple_set(*dds):
+ '''Runs a set of component spec tuples'''
+ return set(map(_spec_tuple, dds))
+
+
+def _format_tuple(dd):
+ '''Returns a (name, version) tuple from a given data format spec dict'''
+ return dd['self']['name'], dd['self']['version']
+
+
+def _format_tuple_set(*dds):
+ '''Runs a set of data format spec tuples'''
+ return set(map(_format_tuple, dds))
+
+
+def test_comp_list(mock_cli_config, mock_db_url, catalog=None):
+ '''Tests the list functionality of the catalog'''
+ if catalog is None:
+ mc = MockCatalog(db_name='dcae_cli.test.db', purge_existing=True,
+ enforce_image=False, db_url=mock_db_url)
+ else:
+ mc = catalog
+
+ user = "test_comp_list"
+
+ df1_spec = deepcopy(_df1_spec)
+ df2_spec = deepcopy(_df2_spec)
+ df2v2_spec = deepcopy(_df2v2_spec)
+
+ c1_spec = deepcopy(_c1_spec)
+ c2_spec = deepcopy(_c2_spec)
+ c2v2_spec = deepcopy(_c2v2_spec)
+ c3_spec = deepcopy(_c3_spec)
+
+ mc.add_format(df1_spec, user)
+ mc.add_format(df2_spec, user)
+ mc.add_format(df2v2_spec, user)
+ mc.add_component(user, c1_spec)
+ mc.add_component(user, c2_spec)
+ mc.add_component(user, c2v2_spec)
+ mc.add_component(user, c3_spec)
+
+ mc.add_component(user,_cdap_spec)
+
+ def components_to_specs(components):
+ return [ json.loads(c["spec"]) for c in components ]
+
+ # latest by default. only v2 of c2
+ components = mc.list_components()
+ specs = components_to_specs(components)
+ assert _comp_tuple_set(*specs) == _comp_tuple_set(c1_spec, c2v2_spec, c3_spec, _cdap_spec)
+
+ # all components
+ components = mc.list_components(latest=False)
+ specs = components_to_specs(components)
+ assert _comp_tuple_set(*specs) == _comp_tuple_set(c1_spec, c2_spec, c2v2_spec, c3_spec, _cdap_spec)
+
+ components = mc.list_components(subscribes=[('std.format_one', None)])
+ specs = components_to_specs(components)
+ assert _comp_tuple_set(*specs) == _comp_tuple_set(c1_spec, c2v2_spec)
+
+ # no comps subscribe to latest std.format_two
+ components = mc.list_components(subscribes=[('std.format_two', None)])
+ assert not components
+
+ components = mc.list_components(subscribes=[('std.format_two', '1.5.0')])
+ specs = components_to_specs(components)
+ assert _comp_tuple_set(*specs) == _comp_tuple_set(c3_spec, _cdap_spec)
+
+ # raise if format doesn't exist
+ with pytest.raises(MissingEntry):
+ mc.list_components(subscribes=[('std.format_two', '5.0.0')])
+
+ components = mc.list_components(publishes=[('std.format_one', None)])
+ specs = components_to_specs(components)
+ assert _comp_tuple_set(*specs) == _comp_tuple_set(c1_spec, _cdap_spec)
+
+ components = mc.list_components(calls=[(('std.format_one', None), ('std.format_one', None)), ])
+ specs = components_to_specs(components)
+ assert _comp_tuple_set(*specs) == _comp_tuple_set(c1_spec)
+
+ # raise if format doesn't exist
+ with pytest.raises(MissingEntry):
+ mc.list_components(calls=[(('std.format_one', '5.0.0'), ('std.format_one', None)), ])
+
+ components = mc.list_components(provides=[(('std.format_one', '1.0.0'), ('std.format_two', '1.5.0')), ])
+ specs = components_to_specs(components)
+ assert _comp_tuple_set(*specs) == _comp_tuple_set(c3_spec, _cdap_spec)
+
+ # test for listing published components
+
+ name_pub = c1_spec["self"]["name"]
+ version_pub = c1_spec["self"]["version"]
+ mc.publish_component(user, name_pub, version_pub)
+ components = mc.list_components(only_published=True)
+ specs = components_to_specs(components)
+ assert _comp_tuple_set(*specs) == _comp_tuple_set(c1_spec)
+
+ components = mc.list_components(only_published=False)
+ assert len(components) == 4
+
+
+def test_format_list(mock_cli_config, mock_db_url, catalog=None):
+ '''Tests the list functionality of the catalog'''
+ if catalog is None:
+ mc = MockCatalog(db_name='dcae_cli.test.db', purge_existing=True,
+ enforce_image=False, db_url=mock_db_url)
+ else:
+ mc = catalog
+
+ user = "test_format_list"
+
+ df1_spec = deepcopy(_df1_spec)
+ df2_spec = deepcopy(_df2_spec)
+ df2v2_spec = deepcopy(_df2v2_spec)
+
+ mc.add_format(df1_spec, user)
+ mc.add_format(df2_spec, user)
+ mc.add_format(df2v2_spec, user)
+
+ def formats_to_specs(components):
+ return [ json.loads(c["spec"]) for c in components ]
+
+ # latest by default. ensure only v2 of df2 makes it
+ formats = mc.list_formats()
+ specs = formats_to_specs(formats)
+ assert _format_tuple_set(*specs) == _format_tuple_set(df1_spec, df2v2_spec)
+
+ # list all
+ formats = mc.list_formats(latest=False)
+ specs = formats_to_specs(formats)
+ assert _format_tuple_set(*specs) == _format_tuple_set(df1_spec, df2_spec, df2v2_spec)
+
+ # test listing of published formats
+
+ name_pub = df1_spec["self"]["name"]
+ version_pub = df1_spec["self"]["version"]
+
+ mc.publish_format(user, name_pub, version_pub)
+ formats = mc.list_formats(only_published=True)
+ specs = formats_to_specs(formats)
+ assert _format_tuple_set(*specs) == _format_tuple_set(df1_spec)
+
+ formats = mc.list_formats(only_published=False)
+ assert len(formats) == 2
+
+
+def test_component_add_cdap(mock_cli_config, mock_db_url, catalog=None):
+ '''Adds a mock CDAP application'''
+ if catalog is None:
+ mc = MockCatalog(db_name='dcae_cli.test.db', purge_existing=True,
+ db_url=mock_db_url)
+ else:
+ mc = catalog
+
+ user = "test_component_add_cdap"
+
+ df1_spec = deepcopy(_df1_spec)
+ df2_spec = deepcopy(_df2_spec)
+
+ mc.add_format(df1_spec, user)
+ mc.add_format(df2_spec, user)
+
+ mc.add_component(user, _cdap_spec)
+
+ name, version, _ = _spec_tuple(_cdap_spec)
+ jar_out, cdap_config_out, spec_out = mc.get_cdap(name, version)
+
+ assert _cdap_spec["artifacts"][0]["uri"] == jar_out
+ assert _cdap_spec["auxilary"] == cdap_config_out
+ assert _cdap_spec == spec_out
+
+
+def test_get_discovery_from_spec(mock_cli_config, mock_db_url):
+ mc = MockCatalog(db_name='dcae_cli.test.db', purge_existing=True,
+ enforce_image=False, db_url=mock_db_url)
+
+ user = "test_get_discovery_from_spec"
+
+ c1_spec_updated = deepcopy(_c1_spec)
+ c1_spec_updated["streams"]["publishes"][0] = {
+ 'format': 'std.format_one',
+ 'version': '1.0.0',
+ 'config_key': 'pub1',
+ 'type': 'http'
+ }
+ c1_spec_updated["streams"]["subscribes"][0] = {
+ 'format': 'std.format_one',
+ 'version': '1.0.0',
+ 'route': '/sub1',
+ 'type': 'http'
+ }
+
+ # Case when c1 doesn't exist
+
+ mc.add_format(_df1_spec, user)
+ mc.add_component(user, _c2_spec)
+ actual_params, actual_interface_map, actual_dmaap_config_keys \
+ = mc.get_discovery_from_spec(user, c1_spec_updated, None)
+
+ assert actual_params == {'bar': 2, 'foo': 1}
+ assert actual_interface_map == { 'pub1': [('std.comp_two', '1.0.0')],
+ 'call1': [('std.comp_two', '1.0.0')] }
+ assert actual_dmaap_config_keys == ([], [])
+
+ # Case when c1 already exist
+
+ mc.add_component(user,_c1_spec)
+
+ c1_spec_updated["services"]["calls"][0]["config_key"] = "callme"
+ actual_params, actual_interface_map, actual_dmaap_config_keys \
+ = mc.get_discovery_from_spec(user, c1_spec_updated, None)
+
+ assert actual_params == {'bar': 2, 'foo': 1}
+ assert actual_interface_map == { 'pub1': [('std.comp_two', '1.0.0')],
+ 'callme': [('std.comp_two', '1.0.0')] }
+ assert actual_dmaap_config_keys == ([], [])
+
+ # Case where add in dmaap streams
+ # TODO: Add in subscribes test case after spec gets pushed
+
+ c1_spec_updated["streams"]["publishes"][0] = {
+ 'format': 'std.format_one',
+ 'version': '1.0.0',
+ 'config_key': 'pub1',
+ 'type': 'message router'
+ }
+
+ actual_params, actual_interface_map, actual_dmaap_config_keys \
+ = mc.get_discovery_from_spec(user, c1_spec_updated, None)
+
+ assert actual_params == {'bar': 2, 'foo': 1}
+ assert actual_interface_map == { 'callme': [('std.comp_two', '1.0.0')] }
+ assert actual_dmaap_config_keys == (["pub1"], [])
+
+ # Case when cdap spec doesn't exist
+
+ cdap_spec = deepcopy(_cdap_spec)
+ cdap_spec["streams"]["publishes"][0] = {
+ 'format': 'std.format_one',
+ 'version': '1.0.0',
+ 'config_key': 'pub1',
+ 'type': 'http'
+ }
+ cdap_spec["streams"]["subscribes"][0] = {
+ 'format': 'std.format_two',
+ 'version': '1.5.0',
+ 'route': '/sub1',
+ 'type': 'http'
+ }
+
+ mc.add_format(_df2_spec, user)
+ actual_params, actual_interface_map, actual_dmaap_config_keys \
+ = mc.get_discovery_from_spec(user, cdap_spec, None)
+
+ assert actual_params == {'program_preferences': [], 'app_config': {}, 'app_preferences': {}}
+ assert actual_interface_map == {'pub1': [('std.comp_two', '1.0.0'), ('std.comp_one', '1.0.0')]}
+ assert actual_dmaap_config_keys == ([], [])
+
+
+def test_get_unpublished_formats(mock_cli_config, mock_db_url, catalog=None):
+ if catalog is None:
+ mc = MockCatalog(db_name='dcae_cli.test.db', purge_existing=True,
+ enforce_image=False, db_url=mock_db_url)
+ else:
+ mc = catalog
+
+ user = "test_get_unpublished_formats"
+
+ mc.add_format(_df1_spec, user)
+ mc.add_component(user, _c1_spec)
+
+ # detect unpublished formats
+
+ name_to_pub = _c1_spec["self"]["name"]
+ version_to_pub = _c1_spec["self"]["version"]
+ formats = mc.get_unpublished_formats(name_to_pub, version_to_pub)
+ assert [('std.format_one', '1.0.0')] == formats
+
+ # all formats published
+
+ mc.publish_format(user, _df1_spec["self"]["name"], _df1_spec["self"]["version"])
+ formats = mc.get_unpublished_formats(name_to_pub, version_to_pub)
+ assert len(formats) == 0
+
+
+def test_get_unique_format_things():
+ def create_tuple(entry):
+ return (entry["name"], entry["version"])
+
+ def get_orm(name, version):
+ return ("ORM", name, version)
+
+ entries = [{"name": "abc", "version": 123},
+ {"name": "abc", "version": 123},
+ {"name": "abc", "version": 123},
+ {"name": "def", "version": 456},
+ {"name": "def", "version": 456}]
+
+ get_unique_fake_format = partial(_get_unique_format_things, create_tuple,
+ get_orm)
+ expected = [("ORM", "abc", 123), ("ORM", "def", 456)]
+
+ assert sorted(expected) == sorted(get_unique_fake_format(entries))
+
+
+def test_filter_latest():
+ orms = [('std.empty.get', '1.0.0'), ('std.unknown', '1.0.0'),
+ ('std.unknown', '1.0.1'), ('std.empty.get', '1.0.1')]
+
+ assert list(catalog._filter_latest(orms)) == [('std.empty.get', '1.0.1'), \
+ ('std.unknown', '1.0.1')]
+
+
+def test_raise_if_duplicate():
+ class FakeOrig(object):
+ args = ["unique", "duplicate"]
+
+ url = "sqlite"
+ orig = FakeOrig()
+ error = IntegrityError("Error about uniqueness", None, orig)
+
+ with pytest.raises(catalog.DuplicateEntry):
+ catalog._raise_if_duplicate(url, error)
+
+ # Couldn't find psycopg2.IntegrityError constructor nor way
+ # to set pgcode so decided to mock it.
+ class FakeOrigPostgres(object):
+ pgcode = "23505"
+
+ url = "postgres"
+ orig = FakeOrigPostgres()
+ error = IntegrityError("Error about uniqueness", None, orig)
+
+ with pytest.raises(catalog.DuplicateEntry):
+ catalog._raise_if_duplicate(url, error)
+
+
+def test_get_docker_image_from_spec():
+ assert "foo-image" == catalog._get_docker_image_from_spec(_c1_spec)
+
+def test_get_cdap_jar_from_spec():
+ assert "bahpomet.com" == catalog._get_cdap_jar_from_spec(_cdap_spec)
+
+
+def test_build_config_keys_map():
+ stub_spec = {
+ 'streams': {
+ 'publishes': [
+ {'format': 'std.format_one', 'version': '1.0.0',
+ 'config_key': 'pub1', 'type': 'http'},
+ {'format': 'std.format_one', 'version': '1.0.0',
+ 'config_key': 'pub2', 'type': 'message_router'}
+ ],
+ 'subscribes': [
+ {'format': 'std.format_one', 'version': '1.0.0', 'route': '/sub1',
+ 'type': 'http'},
+ {'format': 'std.format_one', 'version': '1.0.0',
+ 'config_key': 'sub2', 'type': 'message_router'}
+ ]
+ },
+ 'services': {
+ 'calls': [
+ {'request': {'format': 'std.format_one', 'version': '1.0.0'},
+ 'response': {'format': 'std.format_one', 'version': '1.0.0'},
+ 'config_key': 'call1'}
+ ],
+ 'provides': [
+ {'request': {'format': 'std.format_one', 'version': '1.0.0'},
+ 'response': {'format': 'std.format_one', 'version': '1.0.0'},
+ 'route': '/prov1'}
+ ]
+ }
+ }
+
+ grouping = catalog.build_config_keys_map(stub_spec)
+ expected = {'call1': {'group': 'services_calls'}, 'pub1': {'type': 'http', 'group': 'streams_publishes'}, 'sub2': {'type': 'message_router', 'group': 'streams_subscribes'}, 'pub2': {'type': 'message_router', 'group': 'streams_publishes'}}
+ assert expected == grouping
+
+
+def test_get_data_router_subscriber_route():
+ spec = {"streams": {"subscribes": [ { "type": "data_router", "config_key":
+ "alpha", "route": "/alpha" }, { "type": "message_router", "config_key":
+ "beta" } ]}}
+
+ assert "/alpha" == catalog.get_data_router_subscriber_route(spec, "alpha")
+
+ with pytest.raises(catalog.MissingEntry):
+ catalog.get_data_router_subscriber_route(spec, "beta")
+
+ with pytest.raises(catalog.MissingEntry):
+ catalog.get_data_router_subscriber_route(spec, "gamma")
+
+
+if __name__ == '__main__':
+ '''Test area'''
+ pytest.main([__file__, ])
diff --git a/mod/onboardingapi/dcae_cli/catalog/mock/tests/test_schema.py b/mod/onboardingapi/dcae_cli/catalog/mock/tests/test_schema.py
new file mode 100644
index 0000000..90674d9
--- /dev/null
+++ b/mod/onboardingapi/dcae_cli/catalog/mock/tests/test_schema.py
@@ -0,0 +1,421 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+# -*- coding: utf-8 -*-
+'''
+Tests the mock catalog
+'''
+import pytest
+import json, copy
+
+from dcae_cli.catalog.mock.schema import validate_component, validate_format,apply_defaults_docker_config, apply_defaults
+from dcae_cli.catalog.mock import schema
+from dcae_cli.util.exc import DcaeException
+
+
+format_test = r'''
+{
+ "self": {
+ "name": "asimov.format.integerClassification",
+ "version": "1.0.0",
+ "description": "Represents a single classification from a machine learning model - just a test version"
+ },
+ "dataformatversion": "1.0.0",
+ "jsonschema": {
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "classification": {
+ "type": "string"
+ }
+ },
+ "additionalProperties": false
+ }
+}
+'''
+
+
+component_test = r'''
+{
+ "self": {
+ "version": "1.0.0",
+ "name": "asimov.component.kpi_anomaly",
+ "description": "Classifies VNF KPI data as anomalous",
+ "component_type": "docker"
+ },
+ "streams": {
+ "subscribes": [
+ {
+ "format": "dcae.vnf.kpi",
+ "version": "1.0.0",
+ "route": "/data",
+ "type": "http"
+ },
+ {
+ "format":"std.format_one",
+ "version":"1.0.0",
+ "config_key":"sub2",
+ "type": "message router"
+ }
+ ],
+ "publishes": [
+ {
+ "format": "asimov.format.integerClassification",
+ "version": "1.0.0",
+ "config_key": "prediction",
+ "type": "http"
+ },
+ {
+ "format":"std.format_one",
+ "version":"1.0.0",
+ "config_key":"pub2",
+ "type": "message router"
+ }
+ ]
+ },
+ "services": {
+ "calls": [],
+ "provides": [
+ {
+ "route": "/score-vnf",
+ "request": {
+ "format": "dcae.vnf.kpi",
+ "version": "1.0.0"
+ },
+ "response": {
+ "format": "asimov.format.integerClassification",
+ "version": "1.0.0"
+ }
+ }
+ ]
+ },
+ "parameters": [
+ {
+ "name": "threshold",
+ "value": 0.75,
+ "description": "Probability threshold to exceed to be anomalous",
+ "designer_editable": false,
+ "sourced_at_deployment": false,
+ "policy_editable": false
+ }
+ ],
+ "artifacts": [
+ {
+ "uri": "somedockercontainerpath",
+ "type": "docker image"
+ }
+ ],
+ "auxilary": {
+ "healthcheck": {
+ "type": "http",
+ "endpoint": "/health"
+ }
+ }
+}
+'''
+
+cdap_component_test = r'''
+{
+ "self":{
+ "name":"std.cdap_comp",
+ "version":"0.0.0",
+ "description":"cdap test component",
+ "component_type":"cdap"
+ },
+ "streams":{
+ "publishes":[
+ {
+ "format":"std.format_one",
+ "version":"1.0.0",
+ "config_key":"pub1",
+ "type": "http"
+ },
+ {
+ "format":"std.format_one",
+ "version":"1.0.0",
+ "config_key":"pub2",
+ "type": "message router"
+ }
+ ],
+ "subscribes":[
+ {
+ "format":"std.format_two",
+ "version":"1.5.0",
+ "route":"/sub1",
+ "type": "http"
+ },
+ {
+ "format":"std.format_one",
+ "version":"1.0.0",
+ "config_key":"sub2",
+ "type": "message router"
+ }
+ ]
+ },
+ "services":{
+ "calls":[
+
+ ],
+ "provides":[
+ {
+ "request":{
+ "format":"std.format_one",
+ "version":"1.0.0"
+ },
+ "response":{
+ "format":"std.format_two",
+ "version":"1.5.0"
+ },
+ "service_name":"baphomet",
+ "service_endpoint":"rises",
+ "verb":"GET"
+ }
+ ]
+ },
+ "parameters":[
+
+ ],
+ "artifacts": [
+ {
+ "uri": "somecdapjarurl",
+ "type": "jar"
+ }
+ ],
+ "auxilary": {
+ "streamname":"who",
+ "artifact_name" : "HelloWorld",
+ "artifact_version" : "3.4.3",
+ "programs" : [
+ {"program_type" : "flows", "program_id" : "WhoFlow"},
+ {"program_type" : "services", "program_id" : "Greeting"}
+ ],
+ "namespace" : "hw"
+ }
+}
+'''
+
+
+def test_basic(mock_cli_config):
+ validate_component(json.loads(component_test))
+ validate_format(json.loads(format_test))
+ validate_component(json.loads(cdap_component_test))
+
+ # Test with DR publishes for cdap
+ dr_publishes = { "format":"std.format_one", "version":"1.0.0",
+ "config_key":"pub3", "type": "data router" }
+ cdap_valid = json.loads(cdap_component_test)
+ cdap_valid["streams"]["publishes"].append(dr_publishes)
+
+ # Test with DR subscribes for cdap
+ cdap_invalid = json.loads(cdap_component_test)
+ ss = cdap_invalid["streams"]["subscribes"][0]
+ ss["type"] = "data_router"
+ ss["config_key"] = "nada"
+ cdap_invalid["streams"]["subscribes"][0] = ss
+
+ with pytest.raises(DcaeException):
+ validate_component(cdap_invalid)
+
+
+
+def test_validate_docker_config(mock_cli_config):
+
+ def compose_spec(config):
+ spec = json.loads(component_test)
+ spec["auxilary"] = config
+ return spec
+
+ good_docker_configs = [
+ {
+ "healthcheck": {
+ "type": "http",
+ "endpoint": "/health",
+ "interval": "15s",
+ "timeout": "1s"
+ }
+ },
+ {
+ "healthcheck": {
+ "type": "script",
+ "script": "curl something"
+ }
+ }]
+
+ for good_config in good_docker_configs:
+ spec = compose_spec(good_config)
+ assert validate_component(spec) == None
+
+ bad_docker_configs = [
+ #{},
+ {
+ "healthcheck": {}
+ },
+ {
+ "healthcheck": {
+ "type": "http"
+ }
+ },
+ {
+ "healthcheck": {
+ "type": "http",
+ "script": "huh"
+ }
+ }]
+
+ for bad_config in bad_docker_configs:
+ with pytest.raises(DcaeException):
+ spec = compose_spec(bad_config)
+ validate_component(spec)
+
+
+def test_validate_cdap_config(mock_cli_config):
+
+ def compose_spec(config):
+ spec = json.loads(cdap_component_test)
+ spec["auxilary"] = config
+ return spec
+
+ good_cdap_configs = [
+ {
+ "streamname":"streamname",
+ "artifact_version":"6.6.6",
+ "artifact_name" : "testname",
+ "programs" : [],
+ },
+ {
+ "streamname":"streamname",
+ "artifact_version":"6.6.6",
+ "artifact_name" : "testname",
+ "programs" : [{"program_type" : "flows", "program_id" : "flow_id"}],
+ "program_preferences" : [{"program_type" : "flows", "program_id" : "flow_id", "program_pref" : {"he" : "shall rise"}}],
+ "namespace" : "this should be an optional field",
+ "app_preferences" : {"he" : "shall rise"}
+ }
+ ]
+
+ for good_config in good_cdap_configs:
+ spec = compose_spec(good_config)
+ assert validate_component(spec) == None
+
+ bad_cdap_configs = [
+ {},
+ {"YOU HAVE" : "ALWAYS FAILED ME"}
+ ]
+
+ for bad_config in bad_cdap_configs:
+ with pytest.raises(DcaeException):
+ spec = compose_spec(bad_config)
+ validate_component(bad_config)
+
+
+def test_apply_defaults():
+ definition = { "length": { "default": 10 }, "duration": { "default": "10s" } }
+
+ # Test: Add expected properties
+ properties = {}
+ actual = apply_defaults(definition, properties)
+ assert actual == { "length": 10, "duration": "10s" }
+
+ # Test: Don't mess with existing values
+ properties = { "length": 100, "duration": "100s" }
+ actual = apply_defaults(definition, properties)
+ assert actual == properties
+
+ # Test: No defaults to apply
+ definition = { "length": {}, "duration": {} }
+ properties = { "width": 100 }
+ actual = apply_defaults(definition, properties)
+ assert actual == properties
+
+ # Test: Nested object
+ definition = { "length": { "default": 10 }, "duration": { "default": "10s" },
+ "location": { "properties": { "lat": { "default": "40" },
+ "long": { "default": "75" }, "alt": {} } } }
+ actual = apply_defaults(definition, {})
+ assert actual == {'duration': '10s', 'length': 10,
+ 'location': {'lat': '40', 'long': '75'}}
+
+
+def test_apply_defaults_docker_config(mock_cli_config):
+ # Test: Adding of missing expected properties for http
+ dc = { "healthcheck": { "type": "http", "endpoint": "/foo" } }
+ actual = apply_defaults_docker_config(dc)
+
+ assert "interval" in actual["healthcheck"]
+ assert "timeout" in actual["healthcheck"]
+
+ # Test: Adding of missing expected properties for script
+ dc = { "healthcheck": { "type": "script", "script": "/bin/do-something" } }
+ actual = apply_defaults_docker_config(dc)
+
+ assert "interval" in actual["healthcheck"]
+ assert "timeout" in actual["healthcheck"]
+
+ # Test: Expected properties already exist
+ dc = { "healthcheck": { "type": "http", "endpoint": "/foo",
+ "interval": "10000s", "timeout": "100000s" } }
+ actual = apply_defaults_docker_config(dc)
+ assert dc == actual
+
+ # Test: Never should happen
+ dc = { "healthcheck": { "type": "bogus" } }
+ actual = apply_defaults_docker_config(dc)
+ assert dc == actual
+
+
+def test_validate():
+ fake_schema = {
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "title": "Test schema",
+ "type": "object",
+ "properties": {
+ "foo": { "type": "string" },
+ "bar": { "type": "integer" }
+ },
+ "required": ["foo", "bar"]
+ }
+
+ good_path = "/correct_path"
+
+ def fetch_schema(path):
+ if path == good_path:
+ return fake_schema
+ else:
+ raise schema.FetchSchemaError("Schema not found")
+
+ # Success case
+
+ good_instance = { "foo": "hello", "bar": 1776 }
+
+ schema._validate(fetch_schema, good_path, good_instance)
+
+ # Error from validating
+
+ bad_instance = {}
+
+ with pytest.raises(DcaeException):
+ schema._validate(fetch_schema, good_path, bad_instance)
+
+ # Error from fetching
+
+ bad_path = "/wrong_path"
+
+ with pytest.raises(DcaeException):
+ schema._validate(fetch_schema, bad_path, good_instance)