summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/storage
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/storage')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/storage/__init__.py53
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_collection_instrumentation.py257
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_model_storage.py213
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_resource_storage.py280
4 files changed, 803 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/storage/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/__init__.py
new file mode 100644
index 0000000..8a4d613
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/__init__.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from shutil import rmtree
+from tempfile import mkdtemp
+
+from sqlalchemy import (
+ create_engine,
+ orm,
+ pool,
+ MetaData
+)
+
+
+class TestFileSystem(object):
+
+ def setup_method(self):
+ self.path = mkdtemp('{0}'.format(self.__class__.__name__))
+
+ def teardown_method(self):
+ rmtree(self.path, ignore_errors=True)
+
+
+def release_sqlite_storage(storage):
+ """
+ Drops the tables and clears the session
+ :param storage:
+ :return:
+ """
+ storage._all_api_kwargs['session'].close()
+ MetaData(bind=storage._all_api_kwargs['engine']).drop_all()
+
+
+def init_inmemory_model_storage():
+ uri = 'sqlite:///:memory:'
+ engine_kwargs = dict(connect_args={'check_same_thread': False}, poolclass=pool.StaticPool)
+
+ engine = create_engine(uri, **engine_kwargs)
+ session_factory = orm.sessionmaker(bind=engine)
+
+ return dict(engine=engine, session=session_factory())
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_collection_instrumentation.py b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_collection_instrumentation.py
new file mode 100644
index 0000000..e915421
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_collection_instrumentation.py
@@ -0,0 +1,257 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.modeling import models
+from aria.storage import collection_instrumentation
+
+
+class MockActor(object):
+ def __init__(self):
+ self.dict_ = {}
+ self.list_ = []
+
+
+class MockMAPI(object):
+
+ def __init__(self):
+ pass
+
+ def put(self, *args, **kwargs):
+ pass
+
+ def update(self, *args, **kwargs):
+ pass
+
+
+class CollectionInstrumentation(object):
+
+ @pytest.fixture
+ def actor(self):
+ return MockActor()
+
+ @pytest.fixture
+ def model(self):
+ return MockMAPI()
+
+ @pytest.fixture
+ def dict_(self, actor, model):
+ return collection_instrumentation._InstrumentedDict(model, actor, 'dict_', models.Attribute)
+
+ @pytest.fixture
+ def list_(self, actor, model):
+ return collection_instrumentation._InstrumentedList(model, actor, 'list_', models.Attribute)
+
+
+class TestDict(CollectionInstrumentation):
+
+ def test_keys(self, actor, dict_):
+ dict_.update(
+ {
+ 'key1': models.Attribute.wrap('key1', 'value1'),
+ 'key2': models.Attribute.wrap('key2', 'value2')
+ }
+ )
+ assert sorted(dict_.keys()) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys())
+
+ def test_values(self, actor, dict_):
+ dict_.update({
+ 'key1': models.Attribute.wrap('key1', 'value1'),
+ 'key2': models.Attribute.wrap('key1', 'value2')
+ })
+ assert (sorted(dict_.values()) ==
+ sorted(['value1', 'value2']) ==
+ sorted(v.value for v in actor.dict_.values()))
+
+ def test_items(self, dict_):
+ dict_.update({
+ 'key1': models.Attribute.wrap('key1', 'value1'),
+ 'key2': models.Attribute.wrap('key1', 'value2')
+ })
+ assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')])
+
+ def test_iter(self, actor, dict_):
+ dict_.update({
+ 'key1': models.Attribute.wrap('key1', 'value1'),
+ 'key2': models.Attribute.wrap('key1', 'value2')
+ })
+ assert sorted(list(dict_)) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys())
+
+ def test_bool(self, dict_):
+ assert not dict_
+ dict_.update({
+ 'key1': models.Attribute.wrap('key1', 'value1'),
+ 'key2': models.Attribute.wrap('key1', 'value2')
+ })
+ assert dict_
+
+ def test_set_item(self, actor, dict_):
+ dict_['key1'] = models.Attribute.wrap('key1', 'value1')
+ assert dict_['key1'] == 'value1' == actor.dict_['key1'].value
+ assert isinstance(actor.dict_['key1'], models.Attribute)
+
+ def test_nested(self, actor, dict_):
+ dict_['key'] = {}
+ assert isinstance(actor.dict_['key'], models.Attribute)
+ assert dict_['key'] == actor.dict_['key'].value == {}
+
+ dict_['key']['inner_key'] = 'value'
+
+ assert len(dict_) == 1
+ assert 'inner_key' in dict_['key']
+ assert dict_['key']['inner_key'] == 'value'
+ assert dict_['key'].keys() == ['inner_key']
+ assert dict_['key'].values() == ['value']
+ assert dict_['key'].items() == [('inner_key', 'value')]
+ assert isinstance(actor.dict_['key'], models.Attribute)
+ assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict)
+
+ dict_['key'].update({'updated_key': 'updated_value'})
+ assert len(dict_) == 1
+ assert 'updated_key' in dict_['key']
+ assert dict_['key']['updated_key'] == 'updated_value'
+ assert sorted(dict_['key'].keys()) == sorted(['inner_key', 'updated_key'])
+ assert sorted(dict_['key'].values()) == sorted(['value', 'updated_value'])
+ assert sorted(dict_['key'].items()) == sorted([('inner_key', 'value'),
+ ('updated_key', 'updated_value')])
+ assert isinstance(actor.dict_['key'], models.Attribute)
+ assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict)
+
+ dict_.update({'key': 'override_value'})
+ assert len(dict_) == 1
+ assert 'key' in dict_
+ assert dict_['key'] == 'override_value'
+ assert len(actor.dict_) == 1
+ assert isinstance(actor.dict_['key'], models.Attribute)
+ assert actor.dict_['key'].value == 'override_value'
+
+ def test_get_item(self, actor, dict_):
+ dict_['key1'] = models.Attribute.wrap('key1', 'value1')
+ assert isinstance(actor.dict_['key1'], models.Attribute)
+
+ def test_update(self, actor, dict_):
+ dict_['key1'] = 'value1'
+
+ new_dict = {'key2': 'value2'}
+ dict_.update(new_dict)
+ assert len(dict_) == 2
+ assert dict_['key2'] == 'value2'
+ assert isinstance(actor.dict_['key2'], models.Attribute)
+
+ new_dict = {}
+ new_dict.update(dict_)
+ assert new_dict['key1'] == dict_['key1']
+
+ def test_copy(self, dict_):
+ dict_['key1'] = 'value1'
+
+ new_dict = dict_.copy()
+ assert new_dict is not dict_
+ assert new_dict == dict_
+
+ dict_['key1'] = 'value2'
+ assert new_dict['key1'] == 'value1'
+ assert dict_['key1'] == 'value2'
+
+ def test_clear(self, dict_):
+ dict_['key1'] = 'value1'
+ dict_.clear()
+
+ assert len(dict_) == 0
+
+
+class TestList(CollectionInstrumentation):
+
+ def test_append(self, actor, list_):
+ list_.append(models.Attribute.wrap('name', 'value1'))
+ list_.append('value2')
+ assert len(actor.list_) == 2
+ assert len(list_) == 2
+ assert isinstance(actor.list_[0], models.Attribute)
+ assert list_[0] == 'value1'
+
+ assert isinstance(actor.list_[1], models.Attribute)
+ assert list_[1] == 'value2'
+
+ list_[0] = 'new_value1'
+ list_[1] = 'new_value2'
+ assert isinstance(actor.list_[1], models.Attribute)
+ assert isinstance(actor.list_[1], models.Attribute)
+ assert list_[0] == 'new_value1'
+ assert list_[1] == 'new_value2'
+
+ def test_iter(self, list_):
+ list_.append('value1')
+ list_.append('value2')
+ assert sorted(list_) == sorted(['value1', 'value2'])
+
+ def test_insert(self, actor, list_):
+ list_.append('value1')
+ list_.insert(0, 'value2')
+ list_.insert(2, 'value3')
+ list_.insert(10, 'value4')
+ assert sorted(list_) == sorted(['value1', 'value2', 'value3', 'value4'])
+ assert len(actor.list_) == 4
+
+ def test_set(self, list_):
+ list_.append('value1')
+ list_.append('value2')
+
+ list_[1] = 'value3'
+ assert len(list_) == 2
+ assert sorted(list_) == sorted(['value1', 'value3'])
+
+ def test_insert_into_nested(self, actor, list_):
+ list_.append([])
+
+ list_[0].append('inner_item')
+ assert isinstance(actor.list_[0], models.Attribute)
+ assert len(list_) == 1
+ assert list_[0][0] == 'inner_item'
+
+ list_[0].append('new_item')
+ assert isinstance(actor.list_[0], models.Attribute)
+ assert len(list_) == 1
+ assert list_[0][1] == 'new_item'
+
+ assert list_[0] == ['inner_item', 'new_item']
+ assert ['inner_item', 'new_item'] == list_[0]
+
+
+class TestDictList(CollectionInstrumentation):
+ def test_dict_in_list(self, actor, list_):
+ list_.append({})
+ assert len(list_) == 1
+ assert isinstance(actor.list_[0], models.Attribute)
+ assert actor.list_[0].value == {}
+
+ list_[0]['key'] = 'value'
+ assert list_[0]['key'] == 'value'
+ assert len(actor.list_) == 1
+ assert isinstance(actor.list_[0], models.Attribute)
+ assert actor.list_[0].value['key'] == 'value'
+
+ def test_list_in_dict(self, actor, dict_):
+ dict_['key'] = []
+ assert len(dict_) == 1
+ assert isinstance(actor.dict_['key'], models.Attribute)
+ assert actor.dict_['key'].value == []
+
+ dict_['key'].append('value')
+ assert dict_['key'][0] == 'value'
+ assert len(actor.dict_) == 1
+ assert isinstance(actor.dict_['key'], models.Attribute)
+ assert actor.dict_['key'].value[0] == 'value'
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_model_storage.py b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_model_storage.py
new file mode 100644
index 0000000..518d624
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_model_storage.py
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from sqlalchemy import (
+ Column,
+ Integer,
+ Text
+)
+
+from aria import (
+ application_model_storage,
+ modeling
+)
+from aria.storage import (
+ ModelStorage,
+ exceptions,
+ sql_mapi,
+)
+
+from tests import (
+ mock,
+ storage as tests_storage,
+ modeling as tests_modeling
+)
+
+
+@pytest.fixture
+def storage():
+ base_storage = ModelStorage(sql_mapi.SQLAlchemyModelAPI,
+ initiator=tests_storage.init_inmemory_model_storage)
+ base_storage.register(tests_modeling.MockModel)
+ yield base_storage
+ tests_storage.release_sqlite_storage(base_storage)
+
+
+@pytest.fixture(scope='module', autouse=True)
+def module_cleanup():
+ modeling.models.aria_declarative_base.metadata.remove(tests_modeling.MockModel.__table__) #pylint: disable=no-member
+
+
+def test_storage_base(storage):
+ with pytest.raises(AttributeError):
+ storage.non_existent_attribute()
+
+
+def test_model_storage(storage):
+ mock_model = tests_modeling.MockModel(value=0, name='model_name')
+ storage.mock_model.put(mock_model)
+
+ assert storage.mock_model.get_by_name('model_name') == mock_model
+
+ assert [mm_from_storage for mm_from_storage in storage.mock_model.iter()] == [mock_model]
+ assert [mm_from_storage for mm_from_storage in storage.mock_model] == [mock_model]
+
+ storage.mock_model.delete(mock_model)
+ with pytest.raises(exceptions.StorageError):
+ storage.mock_model.get(mock_model.id)
+
+
+def test_application_storage_factory():
+ storage = application_model_storage(sql_mapi.SQLAlchemyModelAPI,
+ initiator=tests_storage.init_inmemory_model_storage)
+
+ assert storage.service_template
+ assert storage.node_template
+ assert storage.group_template
+ assert storage.policy_template
+ assert storage.substitution_template
+ assert storage.substitution_template_mapping
+ assert storage.requirement_template
+ assert storage.relationship_template
+ assert storage.capability_template
+ assert storage.interface_template
+ assert storage.operation_template
+ assert storage.artifact_template
+
+ assert storage.service
+ assert storage.node
+ assert storage.group
+ assert storage.policy
+ assert storage.substitution
+ assert storage.substitution_mapping
+ assert storage.relationship
+ assert storage.capability
+ assert storage.interface
+ assert storage.operation
+ assert storage.artifact
+
+ assert storage.execution
+ assert storage.service_update
+ assert storage.service_update_step
+ assert storage.service_modification
+ assert storage.plugin
+ assert storage.task
+
+ assert storage.input
+ assert storage.output
+ assert storage.property
+ assert storage.attribute
+
+ assert storage.type
+ assert storage.metadata
+
+ tests_storage.release_sqlite_storage(storage)
+
+
+def test_cascade_deletion(context):
+ service = context.model.service.list()[0]
+
+ assert len(context.model.service_template.list()) == 1
+ assert len(service.nodes) == len(context.model.node.list()) == 2
+
+ context.model.service.delete(service)
+
+ assert len(context.model.service_template.list()) == 1
+ assert len(context.model.service.list()) == 0
+ assert len(context.model.node.list()) == 0
+
+
+@pytest.fixture
+def context(tmpdir):
+ result = mock.context.simple(str(tmpdir))
+ yield result
+ tests_storage.release_sqlite_storage(result.model)
+
+
+def test_mapi_include(context):
+ service1 = context.model.service.list()[0]
+ service1.name = 'service1'
+ service1.service_template.name = 'service_template1'
+ context.model.service.update(service1)
+
+ service_template2 = mock.models.create_service_template('service_template2')
+ service2 = mock.models.create_service(service_template2, 'service2')
+ context.model.service.put(service2)
+
+ assert service1 != service2
+ assert service1.service_template != service2.service_template
+
+ def assert_include(service):
+ st_name = context.model.service.get(service.id, include=('service_template_name',))
+ st_name_list = context.model.service.list(filters={'id': service.id},
+ include=('service_template_name', ))
+ assert len(st_name) == len(st_name_list) == 1
+ assert st_name[0] == st_name_list[0][0] == service.service_template.name
+
+ assert_include(service1)
+ assert_include(service2)
+
+
+class MockModel(modeling.models.aria_declarative_base, modeling.mixins.ModelMixin): #pylint: disable=abstract-method
+ __tablename__ = 'op_mock_model'
+
+ name = Column(Text)
+ value = Column(Integer)
+
+
+class TestFilterOperands(object):
+
+ @pytest.fixture()
+ def storage(self):
+ model_storage = application_model_storage(
+ sql_mapi.SQLAlchemyModelAPI, initiator=tests_storage.init_inmemory_model_storage)
+ model_storage.register(MockModel)
+ for value in (1, 2, 3, 4):
+ model_storage.op_mock_model.put(MockModel(value=value))
+ yield model_storage
+ tests_storage.release_sqlite_storage(model_storage)
+
+ def test_gt(self, storage):
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(gt=3)))) == 1
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(gt=4)))) == 0
+
+ def test_ge(self, storage):
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(ge=3)))) == 2
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(ge=5)))) == 0
+
+ def test_lt(self, storage):
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(lt=2)))) == 1
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(lt=1)))) == 0
+
+ def test_le(self, storage):
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(le=2)))) == 2
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(le=0)))) == 0
+
+ def test_eq(self, storage):
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(eq=2)))) == 1
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(eq=0)))) == 0
+
+ def test_neq(self, storage):
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(ne=2)))) == 3
+
+ def test_gt_and_lt(self, storage):
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(gt=1, lt=3)))) == 1
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(gt=2, lt=2)))) == 0
+
+ def test_eq_and_ne(self, storage):
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(eq=1, ne=3)))) == 1
+ assert len(storage.op_mock_model.list(filters=dict(value=dict(eq=1, ne=1)))) == 0
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_resource_storage.py b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_resource_storage.py
new file mode 100644
index 0000000..efacb2e
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_resource_storage.py
@@ -0,0 +1,280 @@
+# Licensed to the Apache ftware Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import tempfile
+
+import pytest
+
+from aria.storage.filesystem_rapi import FileSystemResourceAPI
+from aria.storage import (
+ exceptions,
+ ResourceStorage
+)
+from . import TestFileSystem
+
+
+class TestResourceStorage(TestFileSystem):
+ def _create(self, storage):
+ storage.register('service_template')
+
+ def _upload(self, storage, tmp_path, id):
+ with open(tmp_path, 'w') as f:
+ f.write('fake context')
+
+ storage.service_template.upload(entry_id=id, source=tmp_path)
+
+ def _upload_dir(self, storage, tmp_dir, tmp_file_name, id):
+ file_source = os.path.join(tmp_dir, tmp_file_name)
+ with open(file_source, 'w') as f:
+ f.write('fake context')
+
+ storage.service_template.upload(entry_id=id, source=tmp_dir)
+
+ def _create_storage(self):
+ return ResourceStorage(FileSystemResourceAPI,
+ api_kwargs=dict(directory=self.path))
+
+ def test_name(self):
+ api = FileSystemResourceAPI
+ storage = ResourceStorage(FileSystemResourceAPI,
+ items=['service_template'],
+ api_kwargs=dict(directory=self.path))
+ assert repr(storage) == 'ResourceStorage(api={api})'.format(api=api)
+ assert 'directory={resource_dir}'.format(resource_dir=self.path) in \
+ repr(storage.registered['service_template'])
+
+ def test_create(self):
+ storage = self._create_storage()
+ self._create(storage)
+ assert os.path.exists(os.path.join(self.path, 'service_template'))
+
+ def test_upload_file(self):
+ storage = ResourceStorage(FileSystemResourceAPI, api_kwargs=dict(directory=self.path))
+ self._create(storage)
+ tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1]
+ self._upload(storage, tmpfile_path, id='service_template_id')
+
+ storage_path = os.path.join(
+ self.path,
+ 'service_template',
+ 'service_template_id',
+ os.path.basename(tmpfile_path))
+ assert os.path.exists(storage_path)
+
+ with open(storage_path, 'rb') as f:
+ assert f.read() == 'fake context'
+
+ def test_download_file(self):
+ storage = self._create_storage()
+ self._create(storage)
+ tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1]
+ tmpfile_name = os.path.basename(tmpfile_path)
+ self._upload(storage, tmpfile_path, 'service_template_id')
+
+ temp_dir = tempfile.mkdtemp(dir=self.path)
+ storage.service_template.download(
+ entry_id='service_template_id',
+ destination=temp_dir,
+ path=tmpfile_name)
+
+ with open(os.path.join(self.path, os.path.join(temp_dir, tmpfile_name))) as f:
+ assert f.read() == 'fake context'
+
+ def test_download_non_existing_file(self):
+ storage = self._create_storage()
+ self._create(storage)
+ with pytest.raises(exceptions.StorageError):
+ storage.service_template.download(entry_id='service_template_id', destination='',
+ path='fake_path')
+
+ def test_data_non_existing_file(self):
+ storage = self._create_storage()
+ self._create(storage)
+ with pytest.raises(exceptions.StorageError):
+ storage.service_template.read(entry_id='service_template_id', path='fake_path')
+
+ def test_data_file(self):
+ storage = self._create_storage()
+ self._create(storage)
+ tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1]
+ self._upload(storage, tmpfile_path, 'service_template_id')
+
+ assert storage.service_template.read(entry_id='service_template_id',
+ path=os.path.basename(tmpfile_path)) == 'fake context'
+
+ def test_upload_dir(self):
+ storage = self._create_storage()
+ self._create(storage)
+ tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path)
+ second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir)
+ tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1]
+ self._upload_dir(storage, tmp_dir, tmp_filename, id='service_template_id')
+
+ destination = os.path.join(
+ self.path,
+ 'service_template',
+ 'service_template_id',
+ os.path.basename(second_level_tmp_dir),
+ os.path.basename(tmp_filename))
+
+ assert os.path.isfile(destination)
+
+ def test_upload_path_in_dir(self):
+ storage = self._create_storage()
+ self._create(storage)
+ tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path)
+ second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir)
+ tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1]
+ self._upload_dir(storage, tmp_dir, tmp_filename, id='service_template_id')
+
+ second_update_file = tempfile.mkstemp(dir=self.path)[1]
+ with open(second_update_file, 'w') as f:
+ f.write('fake context2')
+
+ storage.service_template.upload(
+ entry_id='service_template_id',
+ source=second_update_file,
+ path=os.path.basename(second_level_tmp_dir))
+
+ assert os.path.isfile(os.path.join(
+ self.path,
+ 'service_template',
+ 'service_template_id',
+ os.path.basename(second_level_tmp_dir),
+ os.path.basename(second_update_file)))
+
+ def test_download_dir(self):
+ storage = self._create_storage()
+ self._create(storage)
+ tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path)
+ second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir)
+ tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1]
+ self._upload_dir(storage, tmp_dir, tmp_filename, id='service_template_id')
+
+ temp_destination_dir = tempfile.mkdtemp(dir=self.path)
+ storage.service_template.download(
+ entry_id='service_template_id',
+ destination=temp_destination_dir)
+
+ destination_file_path = os.path.join(
+ temp_destination_dir,
+ os.path.basename(second_level_tmp_dir),
+ os.path.basename(tmp_filename))
+
+ assert os.path.isfile(destination_file_path)
+
+ with open(destination_file_path) as f:
+ assert f.read() == 'fake context'
+
+ def test_data_dir(self):
+ storage = self._create_storage()
+ self._create(storage)
+
+ tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path)
+ tempfile.mkstemp(dir=tmp_dir)
+ tempfile.mkstemp(dir=tmp_dir)
+
+ storage.service_template.upload(entry_id='service_template_id', source=tmp_dir)
+
+ with pytest.raises(exceptions.StorageError):
+ storage.service_template.read(entry_id='service_template_id', path='')
+
+ def test_delete_resource(self):
+ storage = self._create_storage()
+ self._create(storage)
+ tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1]
+ self._upload(storage, tmpfile_path, 'service_template_id')
+ tmpfile2_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1]
+ self._upload(storage, tmpfile2_path, 'service_template_id')
+
+ # deleting the first resource and expecting an error on read
+ storage.service_template.delete(entry_id='service_template_id',
+ path=os.path.basename(tmpfile_path))
+ with pytest.raises(exceptions.StorageError):
+ storage.service_template.read(entry_id='service_template_id',
+ path=os.path.basename(tmpfile_path))
+ # the second resource should still be available for reading
+ assert storage.service_template.read(
+ entry_id='service_template_id',
+ path=os.path.basename(tmpfile2_path)) == 'fake context'
+
+ def test_delete_directory(self):
+ storage = self._create_storage()
+ self._create(storage)
+ temp_destination_dir = tempfile.mkdtemp(dir=self.path)
+
+ tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path)
+ second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir)
+ tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1]
+ self._upload_dir(storage, tmp_dir, tmp_filename, id='service_template_id')
+ file_path_in_dir = os.path.join(
+ os.path.basename(second_level_tmp_dir),
+ os.path.basename(tmp_filename))
+
+ # should be able to read the file and download the directory..
+ assert storage.service_template.read(
+ entry_id='service_template_id',
+ path=file_path_in_dir) == 'fake context'
+ storage.service_template.download(
+ entry_id='service_template_id',
+ path=os.path.basename(second_level_tmp_dir),
+ destination=temp_destination_dir)
+
+ # after deletion, the file and directory should both be gone
+ storage.service_template.delete(
+ entry_id='service_template_id',
+ path=os.path.basename(second_level_tmp_dir))
+ with pytest.raises(exceptions.StorageError):
+ assert storage.service_template.read(
+ entry_id='service_template_id',
+ path=file_path_in_dir) == 'fake context'
+ with pytest.raises(exceptions.StorageError):
+ storage.service_template.download(
+ entry_id='service_template_id',
+ path=os.path.basename(second_level_tmp_dir),
+ destination=temp_destination_dir)
+
+ def test_delete_all_resources(self):
+ storage = self._create_storage()
+ self._create(storage)
+ temp_destination_dir = tempfile.mkdtemp(dir=self.path)
+
+ tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path)
+ second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir)
+ tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1]
+ self._upload_dir(storage, tmp_dir, tmp_filename, id='service_template_id')
+ file_path_in_dir = os.path.join(
+ os.path.basename(second_level_tmp_dir),
+ os.path.basename(tmp_filename))
+
+ # deleting without specifying a path - delete all resources of this entry
+ storage.service_template.delete(entry_id='service_template_id')
+ with pytest.raises(exceptions.StorageError):
+ assert storage.service_template.read(
+ entry_id='service_template_id',
+ path=file_path_in_dir) == 'fake context'
+ with pytest.raises(exceptions.StorageError):
+ storage.service_template.download(
+ entry_id='service_template_id',
+ path=os.path.basename(second_level_tmp_dir),
+ destination=temp_destination_dir)
+
+ def test_delete_nonexisting_resource(self):
+ storage = self._create_storage()
+ self._create(storage)
+ # deleting a nonexisting resource - no effect is expected to happen
+ assert storage.service_template.delete(entry_id='service_template_id',
+ path='fake-file') is False