summaryrefslogtreecommitdiffstats
path: root/mod/onboardingapi/dcae_cli/catalog/mock/catalog.py
blob: dcebdca0d0daf16b133f98f44f1b405025ccde95 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
# ============LICENSE_START=======================================================
# org.onap.dcae
# ================================================================================
# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.

# -*- coding: utf-8 -*-
"""
Provides the mock catalog
"""
import os
import json
import contextlib
import itertools
from functools import partial
from datetime import datetime

import six

from sqlalchemy import create_engine as create_engine_, event, and_, or_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy_utils import database_exists, create_database, drop_database

from dcae_cli import _version as cli_version
from dcae_cli.catalog.mock.tables import Component, Format, FormatPair, Base
from dcae_cli.catalog.mock.schema import validate_component, validate_format, apply_defaults_docker_config
from dcae_cli.util import reraise_with_msg, get_app_dir
from dcae_cli.util.config import get_config, get_path_component_spec, \
    get_path_data_format
from dcae_cli.util.logger import get_logger
from dcae_cli.util.docker_util import image_exists
from dcae_cli.catalog.exc import CatalogError, DuplicateEntry, MissingEntry, FrozenEntry, ForbiddenRequest
from dcae_cli.util.cdap_util import normalize_cdap_params


logger = get_logger('Catalog')


#INTERNAL HELPERS
def _get_component(session, name, version):
    '''Returns a single component ORM'''
    try:
        if not version:
            query = session.query(Component).filter(Component.name==name).order_by(Component.version.desc()).limit(1)
        else:
            query = session.query(Component).filter(Component.name==name, Component.version==version)
        return query.one()
    except NoResultFound:
        comp_msg = "{}:{}".format(name, version) if version else name
        raise MissingEntry("Component '{}' was not found in the catalog".format(comp_msg))

def _get_component_by_id(session, component_id):
    try:
        result = session.query(Component).filter(Component.id==component_id).one()
        return result.__dict__
    except NoResultFound:
        raise MissingEntry("Component '{0}' was not found in the catalog" \
                .format(component_id))


def _get_docker_image_from_spec(spec):
    images = [ art["uri"] for art in spec["artifacts"] if art["type"] == "docker image" ]
    return images[0]

def _get_docker_image(session, name, version):
    '''Returns the docker image name of a given component'''
    comp = _get_component(session, name, version)
    return _get_docker_image_from_spec(comp.get_spec_as_dict())

def _add_docker_component(session, user, spec, update, enforce_image=True):
    '''Adds/updates a docker component to the catalog'''
    image = _get_docker_image_from_spec(spec)

    if enforce_image and not image_exists(image):
        raise CatalogError("Specified image '{}' does not exist locally.".format(image))

    comp = build_generic_component(session, user, spec, update)
    session.commit()

def _get_cdap_jar_from_spec(spec):
    jars = [ art["uri"] for art in spec["artifacts"] if art["type"] == "jar" ]
    return jars[0]

def _add_cdap_component(session, user, spec, update):
    '''Adds/updates a cdap component to the catalog'''
    comp = build_generic_component(session, user, spec, update)
    session.commit()


#PUBLIC FUNCTIONS
@contextlib.contextmanager
def SessionTransaction(engine):
    '''Provides a transactional scope around a series of operations'''
    Session = sessionmaker(engine)
    try:
        session = Session()
        yield session
        session.commit()
    except IntegrityError as e:
        session.rollback()
        _raise_if_duplicate(str(engine.url), e)
        raise
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()


_dup_e = DuplicateEntry('name:version already exists. To update: [In CLI: Use the --update flag]  [For HTTP request: Use PUT]')

def _raise_if_duplicate(url, e):
    '''Raises if the exception relates to duplicate entries'''
    if 'sqlite' in url:
        if 'UNIQUE' in e.orig.args[0].upper():
            raise _dup_e
    elif 'postgres' in url:
        # e.orig is of type psycopg2.IntegrityError that has 
        # pgcode which uses the following:
        #
        # https://www.postgresql.org/docs/current/static/errcodes-appendix.html#ERRCODES-TABLE
        #
        # 23505 means "unique_violation"
        if e.orig.pgcode == "23505":
            raise _dup_e

def create_engine(base, db_name=None, purge_existing=False, db_url=None):
    '''Returns an initialized database engine'''
    if db_url is None:
        if db_name is None:
            # no url or db name indicates we want to use the tool's configured db
            config = get_config()
            url = config['db_url']
        else:
            # if only a db name is given, interpret as a sqlite db in the app dir. this maintains backwards compat with existing tests.
            db_path = os.path.join(get_app_dir(), db_name)
            url = ''.join(('sqlite:///', db_path))
    else:
        # a full db url is the most explicit input and should be used over other inputs if provided
        url = db_url

    if not database_exists(url):
        create_database(url)
    elif purge_existing:
        drop_database(url)
        create_database(url)

    engine = create_engine_(url)
    _configure_engine(engine)
    base.metadata.create_all(engine)
    return engine


def _configure_engine(engine):
    '''Performs additional db-specific configurations'''
    str_url = str(engine.url)
    if 'sqlite' in str_url:
        event.listen(engine, 'connect', lambda conn, record: conn.execute('pragma foreign_keys=ON'))


def get_format(session, name, version):
    '''Returns a single data format ORM'''
    try:
        if not version:
            query = session.query(Format).filter(Format.name==name).order_by(Format.version.desc()).limit(1)
        else:
            query = session.query(Format).filter(Format.name==name, Format.version==version)
        return query.one()
    except NoResultFound:
        msg = "{}:{}".format(name, version) if version else name
        raise MissingEntry("Data format '{}' was not found in the catalog.".format(msg))


def _get_format_by_id(session, dataformat_id):
    try:
        result = session.query(Format).filter(Format.id==dataformat_id).one()
        return result.__dict__
    except NoResultFound:
        raise MissingEntry("Dataformat '{0}' was not found in the catalog" \
                .format(dataformat_id))


def _create_format_tuple(entry):
    '''Create tuple to identify format'''
    return (entry['format'], entry['version'])


def _get_format_pair(session, req_name, req_version, resp_name, resp_version, create=True):
    '''Returns a single data format pair ORM'''
    req = get_format(session, req_name, req_version)
    resp = get_format(session, resp_name, resp_version)

    query = session.query(FormatPair).filter(and_(FormatPair.req == req, FormatPair.resp == resp))
    try:
        return query.one()
    except NoResultFound:
        if not create:
            raise MissingEntry("Data format pair with request '{}:{}' and response '{}:{}' was not found in the catalog.".format(req.name, req.version, resp.name, resp.version))

    pair = FormatPair(req=req, resp=resp)
    session.add(pair)
    return pair

def _create_format_pair_tuple(entry):
    '''Create tuple to identify format pair'''
    req_name, req_version = entry['request']['format'], entry['request']['version']
    resp_name, resp_version = entry['response']['format'], entry['response']['version']
    return (req_name, req_version, resp_name, resp_version)

def _get_unique_format_things(create_tuple, get_func, entries):
    '''Get unique format things (formats, format pairs, ..)

    Args
    ----
    create_tuple: Function that has the signature dict->tuple
    get_func: Function that has the signature *tuple->orm
    entries: list of dicts that have data format details that come from
    streams.publishes, streams.subscribes, services.calls, services.provides

    Return
    ------
    List of unique orms
    '''
    src = set(create_tuple(entry) for entry in entries)
    return [get_func(*yo) for yo in src]


def verify_component(session, name, version):
    '''Returns the orm name and version of a given component'''
    orm = _get_component(session, name, version)
    return orm.name, orm.version


def get_component_type(session, name, version):
    '''Returns the component_type of a given component'''
    return _get_component(session, name, version).component_type


def get_component_spec(session, name, version):
    '''Returns the spec dict of a given component'''
    return json.loads(_get_component(session, name, version).spec)


def get_component_id(session, name, version):
    '''Returns the id of a given component'''
    return _get_component(session, name, version).id


def get_format_spec(session, name, version):
    '''Returns the spec dict of a given data format'''
    return json.loads(get_format(session, name, version).spec)


def get_dataformat_id(session, name, version):
    '''Returns the id of a given data format'''
    return get_format(session, name, version).id


def build_generic_component(session, user, spec, update):
    '''Builds, adds, and returns a generic component ORM. Does not commit changes.'''
    attrs = spec['self'].copy()
    attrs['spec'] = json.dumps(spec)

    # TODO: This should really come from the spec too
    attrs['owner'] = user

    # grab existing or create a new component
    name, version = attrs['name'], attrs['version']
    if update:
        comp = _get_component(session, name, version)
        if comp.is_published():
            raise FrozenEntry("Component '{}:{}' has been published and cannot be updated".format(name, version))
    else:
        comp = Component()
        session.add(comp)

    # REVIEW: Inject these parameters as function arguments instead of this
    # hidden approach?
    # WATCH: This has to be done here before the code below because there is a
    # commit somewhere below and since these fields are not nullable, you'll get a
    # violation.
    comp.cli_version = cli_version.__version__
    comp.schema_path = get_path_component_spec()

    # build the ORM
    for attr, val in six.iteritems(attrs):
        setattr(comp, attr, val)

    # update relationships
    get_format_local = partial(get_format, session)
    get_unique_formats = partial(_get_unique_format_things, _create_format_tuple,
            get_format_local)

    try:
        comp.publishes = get_unique_formats(spec['streams']['publishes'])
    except MissingEntry as e:
        reraise_with_msg(e, 'Add failed while traversing "publishes"')

    try:
        comp.subscribes = get_unique_formats(spec['streams']['subscribes'])
    except MissingEntry as e:
        reraise_with_msg(e, 'Add failed while traversing "subscribes"')

    get_format_pairs = partial(_get_format_pair, session)
    get_unique_format_pairs = partial(_get_unique_format_things,
            _create_format_pair_tuple, get_format_pairs)

    try:
        comp.provides = get_unique_format_pairs(spec['services']['provides'])
    except MissingEntry as e:
        reraise_with_msg(e, 'Add failed while traversing "provides"')

    try:
        comp.calls = get_unique_format_pairs(spec['services']['calls'])
    except MissingEntry as e:
        reraise_with_msg(e, 'Add failed while traversing "calls"')

    return comp


def add_format(session, spec, user, update):
    '''Helper function which adds a data format to the catalog'''
    attrs = spec['self'].copy()
    attrs['spec'] = json.dumps(spec)
    name, version = attrs['name'], attrs['version']

    # TODO: This should really come from the spec too
    attrs['owner'] = user

    if update:
        data_format = get_format(session, name, version)
        if data_format.is_published():
            raise FrozenEntry("Data format {}:{} has been published and cannot be updated".format(name, version))
    else:
        data_format = Format()
        session.add(data_format)

    # build the ORM
    for attr, val in six.iteritems(attrs):
        setattr(data_format, attr, val)

    # REVIEW: Inject these parameters as function arguments instead of this
    # hidden approach?
    data_format.cli_version = cli_version.__version__
    data_format.schema_path = get_path_data_format()

    session.commit()


def _filter_neighbors(session, neighbors=None):
    '''Returns a Component query filtered by available neighbors'''
    if neighbors is None:
        query = session.query(Component)
    else:
        subfilt = or_(and_(Component.name==n, Component.version==v) for n,v in neighbors)
        query = session.query(Component).filter(subfilt)
    return query


def get_subscribers(session, orm, neighbors=None):
    '''Returns a list of component ORMs which subscribe to the specified format'''
    query = _filter_neighbors(session, neighbors)
    return query.filter(Component.subscribes.contains(orm)).all()


def get_providers(session, orm, neighbors=None):
    '''Returns a list of component ORMs which provide the specified format pair'''
    query = _filter_neighbors(session, neighbors)
    return query.filter(Component.provides.contains(orm)).all()


def _match_pub(entries, orms):
    '''Aligns the publishes orms with spec entries to get the config key'''
    lookup = {(orm.name, orm.version): orm for orm in orms}
    for entry in entries:
        if "http" not in entry["type"]:
            continue

        key = (entry['format'], entry['version'])
        yield entry['config_key'], lookup[key]


def _match_call(entries, orms):
    '''Aligns the calls orms with spec entries to get the config key'''
    lookup = {(orm.req.name, orm.req.version, orm.resp.name, orm.resp.version): orm for orm in orms}
    for entry in entries:
        key = (entry['request']['format'], entry['request']['version'], entry['response']['format'], entry['response']['version'])
        yield entry['config_key'], lookup[key]

def get_discovery(get_params_func, session, name, version,  neighbors=None):
    '''Returns the parameters and interface map for a given component and considering its neighbors'''
    comp = _get_component(session, name, version)
    spec = json.loads(comp.spec)
    interfaces = dict()
    for key, orm in _match_pub(spec['streams']['publishes'], comp.publishes):
        interfaces[key] = [(c.name, c.version) for c in get_subscribers(session, orm, neighbors) if not c is comp]

    for key, orm in _match_call(spec['services']['calls'], comp.calls):
        interfaces[key] = [(c.name, c.version) for c in get_providers(session, orm, neighbors) if not c is comp]

    params = get_params_func(spec)
    return params, interfaces

_get_discovery_for_cdap = partial(get_discovery, normalize_cdap_params)
_get_discovery_for_docker = partial(get_discovery,
        lambda spec: {param['name']: param['value'] for param in spec['parameters']})


def _get_discovery_for_dmaap(get_component_spec_func, name, version):
    """Get all config keys that are for dmaap streams

    Returns:
    --------
    Tuple of message router config keys list, data router config keys list
    """
    spec = get_component_spec_func(name, version)

    all_streams = spec["streams"].get("publishes", []) \
            + spec["streams"].get("subscribes", [])

    def is_for_message_router(stream):
        return stream["type"] == "message router" \
                or stream["type"] == "message_router"

    mr_keys = [ stream["config_key"] for stream in filter(is_for_message_router, all_streams) ]

    def is_for_data_router(stream):
        return stream["type"] == "data router" \
                or stream["type"] == "data_router"

    dr_keys = [ stream["config_key"] for stream in filter(is_for_data_router, all_streams) ]
    return mr_keys, dr_keys


def _filter_latest(orms):
    '''Filters and yields only (name, version, *) orm tuples with the highest version'''
    get_first_key_func = lambda x: x[0]
    # itertools.groupby requires the input to be sorted
    sorted_orms = sorted(orms, key=get_first_key_func)
    for _, g in itertools.groupby(sorted_orms, get_first_key_func):
        yield max(g, key=lambda x: x[1])


def list_components(session, user, only_published, subscribes=None, publishes=None,
        provides=None, calls=None, latest=True):
    """Get list of components

    Returns:
    --------
    List of component orms as dicts
    """
    filters = list()
    if subscribes:
        filters.extend(Component.subscribes.contains(get_format(session, n, v)) for n, v in subscribes)
    if publishes:
        filters.extend(Component.publishes.contains(get_format(session, n, v)) for n, v in publishes)
    if provides:
        filters.extend(Component.provides.contains(_get_format_pair(session, reqn, reqv, respn, respv, create=False))
                       for (reqn, reqv), (respn, respv) in provides)
    if calls:
        filters.extend(Component.calls.contains(_get_format_pair(session, reqn, reqv, respn, respv, create=False))
                       for (reqn, reqv), (respn, respv) in calls)
    if filters:
        query = session.query(Component).filter(or_(*filters))
    else:
        query = session.query(Component)

    if user:
        query = query.filter(Component.owner==user)
    if only_published:
        query = query.filter(Component.when_published!=None)

    orms = ((orm.name, orm.version, orm.component_type, orm) for orm in query)

    if latest:
        orms = _filter_latest(orms)

    return [ orm.__dict__ for _, _, _, orm in orms ]


def _list_formats(session, user, only_published, latest=True):
    """Get list of data formats

    Returns
    -------
    List of data format orms as dicts
    """
    query = session.query(Format).order_by(Format.modified.desc())

    if user:
        query = query.filter(Format.owner==user)
    if only_published:
        query = query.filter(Format.when_published!=None)

    orms = [ (orm.name, orm.version, orm) for orm in query ]

    if latest:
        orms = _filter_latest(orms)
    return [ orm.__dict__ for _, _, orm in orms ]


def build_config_keys_map(spec):
    """Build config keys map

    Return
    ------
    Dict where each item:

        <config_key>: { "group": <grouping>, "type": <http|message_router|data_router> }

    where grouping includes "streams_publishes", "streams_subscribes", "services_calls"
    """
    # subscribing as http doesn't have config key
    ss = [ (s["config_key"], { "group": "streams_subscribes", "type": s["type"] })
        for s in spec["streams"]["subscribes"] if "config_key" in s]
    sp = [ (s["config_key"], { "group": "streams_publishes", "type": s["type"] })
        for s in spec["streams"]["publishes"] ]
    sc = [ (s["config_key"], { "group": "services_calls" })
        for s in spec["services"]["calls"] ]
    return dict(ss+sp+sc)


def get_data_router_subscriber_route(spec, config_key):
    """Get route by config key for data router subscriber

    Utility method that parses the component spec
    """
    for s in spec["streams"].get("subscribes", []):
        if s["type"] in ["data_router", "data router"] \
                and s["config_key"] == config_key:
            return s["route"]

    raise MissingEntry("No data router subscriber for {0}".format(config_key))


class MockCatalog(object):

    def __init__(self, purge_existing=False, enforce_image=False, db_name=None, engine=None, db_url=None):
        self.engine = create_engine(Base, db_name=db_name, purge_existing=purge_existing, db_url=db_url) if engine is None else engine
        self.enforce_image = enforce_image

    def add_component(self, user, spec, update=False):
        '''Validates component specification and adds component to the mock catalog'''
        validate_component(spec)

        component_type = spec["self"]["component_type"]

        with SessionTransaction(self.engine) as session:
            if component_type == "cdap":
                _add_cdap_component(session, user, spec, update)
            elif component_type == "docker":
                _add_docker_component(session, user, spec, update, enforce_image=self.enforce_image)
            else:
                raise CatalogError("Unknown component type: {0}".format(component_type))

    def get_docker_image(self, name, version):
        '''Returns the docker image name associated with this component'''
        with SessionTransaction(self.engine) as session:
            return _get_docker_image(session, name, version)

    def get_docker(self, name, version):
        with SessionTransaction(self.engine) as session:
            comp = _get_component(session, name, version)
            spec = comp.get_spec_as_dict()
            # NOTE: Defaults are being applied for docker config here at read
            # time. Not completely sure that this is the correct approach. The
            # benefit is that defaults can be changed without altering the stored
            # specs. It's a nice layering.
            docker_config = apply_defaults_docker_config(spec["auxilary"])
            return _get_docker_image_from_spec(spec), docker_config, spec

    def get_docker_config(self, name, version):
        _, docker_config, _ = self.get_docker(name, version)
        return docker_config

    def get_cdap(self, name, version):
        '''Returns a tuple representing this cdap component

        Returns
        -------
        tuple(jar, config, spec)
            jar: string
                URL where the CDAP jar is located.
            config: dict
                A dictionary loaded from the CDAP JSON configuration file.
            spec: dict
                The dcae-cli component specification file.
        '''
        with SessionTransaction(self.engine) as session:
            comp = _get_component(session, name, version)
            spec = comp.get_spec_as_dict()
            cdap_config = spec["auxilary"]
            return _get_cdap_jar_from_spec(spec), cdap_config, spec

    def get_component_type(self, name, version):
        '''Returns the component type associated with this component'''
        with SessionTransaction(self.engine) as session:
            return get_component_type(session, name, version)

    def get_component_spec(self, name, version):
        '''Returns the spec dict associated with this component'''
        with SessionTransaction(self.engine) as session:
            return get_component_spec(session, name, version)

    def get_component_id(self, name, version):
        '''Returns the id associated with this component'''
        with SessionTransaction(self.engine) as session:
            return get_component_id(session, name, version)

    def get_component_by_id(self, component_id):
        '''Returns the component associated with this id'''
        with SessionTransaction(self.engine) as session:
            return _get_component_by_id(session, component_id)

    def get_format_spec(self, name, version):
        '''Returns the spec dict associated with this data format'''
        with SessionTransaction(self.engine) as session:
            return get_format_spec(session, name, version)

    def get_dataformat_id(self, name, version):
        '''Returns the id associated with this data format'''
        with SessionTransaction(self.engine) as session:
            return get_dataformat_id(session, name, version)

    def get_dataformat_by_id(self, dataformat_id):
        '''Returns the dataformat associated with this id'''
        with SessionTransaction(self.engine) as session:
            return _get_format_by_id(session, dataformat_id)

    def add_format(self, spec, user, update=False):
        '''Validates data format specification and adds data format to the mock catalog'''
        validate_format(spec)
        with SessionTransaction(self.engine) as session:
            add_format(session, spec, user, update)

    def get_discovery_for_cdap(self, name, version, neighbors=None):
        '''Returns the parameters and interface map for a given component and considering its neighbors'''
        with SessionTransaction(self.engine) as session:
            return _get_discovery_for_cdap(session, name, version, neighbors)

    def get_discovery_for_docker(self, name, version, neighbors=None):
        '''Returns the parameters and interface map for a given component and considering its neighbors'''
        with SessionTransaction(self.engine) as session:
            return _get_discovery_for_docker(session, name, version, neighbors)

    def get_discovery_for_dmaap(self, name, version):
        with SessionTransaction(self.engine) as session:
            get_component_spec_func = partial(get_component_spec, session)
            return _get_discovery_for_dmaap(get_component_spec_func, name, version)

    def get_discovery_from_spec(self, user, target_spec, neighbors=None):
        '''Get pieces to generate configuration for the given target spec

        This function is used to obtain the pieces needed to generate
        the application configuration json: parameters map, interfaces map, dmaap
        map. Where the input is a provided specification that hasn't been added to
        the catalog - prospective specs - which includes a component that doesn't
        exist or a new version of an existing spec.

        Returns
        -------
        Tuple of three elements:

        - Dict of parameter name to parameter value
        - Dict of "config_key" to list of (component.name, component.version)
          known as "interface_map"
        - Tuple of lists of "config_key" the first for message router the second
          for data router known as "dmaap_map"
        '''
        validate_component(target_spec)

        with SessionTransaction(self.engine) as session:
            # The following approach was taken in order to:
            # 1. Re-use existing functionality e.g. implement fast
            # 2. In order to make ORM-specific queries, I need the entire ORM
            # in SQLAlchemy meaning I cannot do arbitrary DataFormatPair queries
            # without Component.
            name = target_spec["self"]["name"]
            version = target_spec["self"]["version"]

            try:
                # Build a component with update to True first because you may
                # want to run this for an existing component
                build_generic_component(session, user, target_spec, True)
            except MissingEntry:
                # Since it doesn't exist already, build a new component
                build_generic_component(session, user, target_spec, False)

            # This is needed so that subsequent queries will "see" the component
            session.flush()

            ctype = target_spec["self"]["component_type"]

            if ctype == "cdap":
                params, interface_map = _get_discovery_for_cdap(session, name,
                        version, neighbors)
            elif ctype == "docker":
                params, interface_map = _get_discovery_for_docker(session, name,
                        version, neighbors)

            # Don't want to commit these changes so rollback.
            session.rollback()

            # Use the target spec as the source to compile the config keys from
            dmaap_config_keys = _get_discovery_for_dmaap(
                    lambda name, version: target_spec, name, version)

            return params, interface_map, dmaap_config_keys

    def verify_component(self, name, version):
        '''Returns the component's name and version if it exists and raises an exception otherwise'''
        with SessionTransaction(self.engine) as session:
            return verify_component(session, name, version)

    def list_components(self, subscribes=None, publishes=None, provides=None,
            calls=None, latest=True, user=None, only_published=False):
        '''Returns a list of component names which match the specified filter sequences'''
        with SessionTransaction(self.engine) as session:
            return list_components(session, user, only_published, subscribes,
                    publishes, provides, calls, latest)

    def list_formats(self, latest=True, user=None, only_published=False):
        """Get list of data formats

        Returns
        -------
        List of data formats as dicts
        """
        with SessionTransaction(self.engine) as session:
            return _list_formats(session, user, only_published, latest)

    def get_format(self, name, version):
        """Get data format

        Throws MissingEntry exception if no matches found.

        Returns
        -------
        Dict representation of data format
        """
        with SessionTransaction(self.engine) as session:
            return get_format(session, name, version).__dict__

    def _publish(self, get_func, user, name, version):
        """Publish data format

        Args:
        -----
        get_func: Function that takes a session, name, version and outputs a data
            object either Component or Format

        Returns:
        --------
        True upon success else False
        """
        # TODO: To make function composeable, it should take in the data format
        # object
        with SessionTransaction(self.engine) as session:
            obj = get_func(session, name, version)
            
            if obj:
                if obj.owner != user:
                    errorMsg = "Not authorized to modify {0}:{1}".format(name, version)
                    logger.error(errorMsg)
                    raise ForbiddenRequest(errorMsg)
                elif obj.when_published:
                    errorMsg = "{0}:{1} has already been published".format(name, version)
                    logger.error(errorMsg)
                    raise CatalogError(errorMsg)
                else:
                    obj.when_published = datetime.utcnow()
                    session.commit()
            else:
                errorMsg = "{0}:{1} not found".format(name, version)
                logger.error(errorMsg)
                raise MissingEntry(errorMsg)

        return True

    def publish_format(self, user, name, version):
        """Publish data format

        Returns
        -------
        True upon success else False
        """
        return self._publish(get_format, user, name, version)

    def get_unpublished_formats(self, comp_name, comp_version):
        """Get unpublished formats for given component

        Returns:
        --------
        List of unique data format name, version pairs
        """
        with SessionTransaction(self.engine) as session:
            comp = _get_component(session, comp_name, comp_version)

            dfs = comp.publishes + comp.subscribes
            dfs += [ p.req for p in comp.provides]
            dfs += [ p.resp for p in comp.provides]
            dfs += [ c.req for c in comp.calls]
            dfs += [ c.resp for c in comp.calls]

            def is_not_published(orm):
                return orm.when_published == None

            formats = [(df.name, df.version) for df in filter(is_not_published, dfs)]
            return list(set(formats))

    def publish_component(self, user, name, version):
        """Publish component

        Returns
        -------
        True upon success else False
        """
        return self._publish(_get_component, user, name, version)