diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/storage')
4 files changed, 803 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/storage/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/__init__.py new file mode 100644 index 0000000..8a4d613 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/__init__.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from shutil import rmtree +from tempfile import mkdtemp + +from sqlalchemy import ( + create_engine, + orm, + pool, + MetaData +) + + +class TestFileSystem(object): + + def setup_method(self): + self.path = mkdtemp('{0}'.format(self.__class__.__name__)) + + def teardown_method(self): + rmtree(self.path, ignore_errors=True) + + +def release_sqlite_storage(storage): + """ + Drops the tables and clears the session + :param storage: + :return: + """ + storage._all_api_kwargs['session'].close() + MetaData(bind=storage._all_api_kwargs['engine']).drop_all() + + +def init_inmemory_model_storage(): + uri = 'sqlite:///:memory:' + engine_kwargs = dict(connect_args={'check_same_thread': False}, poolclass=pool.StaticPool) + + engine = create_engine(uri, **engine_kwargs) + session_factory = orm.sessionmaker(bind=engine) + + return dict(engine=engine, session=session_factory()) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_collection_instrumentation.py b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_collection_instrumentation.py new file mode 100644 index 0000000..e915421 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_collection_instrumentation.py @@ -0,0 +1,257 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.modeling import models +from aria.storage import collection_instrumentation + + +class MockActor(object): + def __init__(self): + self.dict_ = {} + self.list_ = [] + + +class MockMAPI(object): + + def __init__(self): + pass + + def put(self, *args, **kwargs): + pass + + def update(self, *args, **kwargs): + pass + + +class CollectionInstrumentation(object): + + @pytest.fixture + def actor(self): + return MockActor() + + @pytest.fixture + def model(self): + return MockMAPI() + + @pytest.fixture + def dict_(self, actor, model): + return collection_instrumentation._InstrumentedDict(model, actor, 'dict_', models.Attribute) + + @pytest.fixture + def list_(self, actor, model): + return collection_instrumentation._InstrumentedList(model, actor, 'list_', models.Attribute) + + +class TestDict(CollectionInstrumentation): + + def test_keys(self, actor, dict_): + dict_.update( + { + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key2', 'value2') + } + ) + assert sorted(dict_.keys()) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys()) + + def test_values(self, actor, dict_): + dict_.update({ + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key1', 'value2') + }) + assert (sorted(dict_.values()) == + sorted(['value1', 'value2']) == + sorted(v.value for v in actor.dict_.values())) + + def test_items(self, dict_): + dict_.update({ + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key1', 'value2') + }) + assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')]) + + def test_iter(self, actor, dict_): + dict_.update({ + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key1', 'value2') + }) + assert sorted(list(dict_)) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys()) + + def test_bool(self, dict_): + assert not dict_ + dict_.update({ + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key1', 'value2') + }) + assert dict_ + + def test_set_item(self, actor, dict_): + dict_['key1'] = models.Attribute.wrap('key1', 'value1') + assert dict_['key1'] == 'value1' == actor.dict_['key1'].value + assert isinstance(actor.dict_['key1'], models.Attribute) + + def test_nested(self, actor, dict_): + dict_['key'] = {} + assert isinstance(actor.dict_['key'], models.Attribute) + assert dict_['key'] == actor.dict_['key'].value == {} + + dict_['key']['inner_key'] = 'value' + + assert len(dict_) == 1 + assert 'inner_key' in dict_['key'] + assert dict_['key']['inner_key'] == 'value' + assert dict_['key'].keys() == ['inner_key'] + assert dict_['key'].values() == ['value'] + assert dict_['key'].items() == [('inner_key', 'value')] + assert isinstance(actor.dict_['key'], models.Attribute) + assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict) + + dict_['key'].update({'updated_key': 'updated_value'}) + assert len(dict_) == 1 + assert 'updated_key' in dict_['key'] + assert dict_['key']['updated_key'] == 'updated_value' + assert sorted(dict_['key'].keys()) == sorted(['inner_key', 'updated_key']) + assert sorted(dict_['key'].values()) == sorted(['value', 'updated_value']) + assert sorted(dict_['key'].items()) == sorted([('inner_key', 'value'), + ('updated_key', 'updated_value')]) + assert isinstance(actor.dict_['key'], models.Attribute) + assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict) + + dict_.update({'key': 'override_value'}) + assert len(dict_) == 1 + assert 'key' in dict_ + assert dict_['key'] == 'override_value' + assert len(actor.dict_) == 1 + assert isinstance(actor.dict_['key'], models.Attribute) + assert actor.dict_['key'].value == 'override_value' + + def test_get_item(self, actor, dict_): + dict_['key1'] = models.Attribute.wrap('key1', 'value1') + assert isinstance(actor.dict_['key1'], models.Attribute) + + def test_update(self, actor, dict_): + dict_['key1'] = 'value1' + + new_dict = {'key2': 'value2'} + dict_.update(new_dict) + assert len(dict_) == 2 + assert dict_['key2'] == 'value2' + assert isinstance(actor.dict_['key2'], models.Attribute) + + new_dict = {} + new_dict.update(dict_) + assert new_dict['key1'] == dict_['key1'] + + def test_copy(self, dict_): + dict_['key1'] = 'value1' + + new_dict = dict_.copy() + assert new_dict is not dict_ + assert new_dict == dict_ + + dict_['key1'] = 'value2' + assert new_dict['key1'] == 'value1' + assert dict_['key1'] == 'value2' + + def test_clear(self, dict_): + dict_['key1'] = 'value1' + dict_.clear() + + assert len(dict_) == 0 + + +class TestList(CollectionInstrumentation): + + def test_append(self, actor, list_): + list_.append(models.Attribute.wrap('name', 'value1')) + list_.append('value2') + assert len(actor.list_) == 2 + assert len(list_) == 2 + assert isinstance(actor.list_[0], models.Attribute) + assert list_[0] == 'value1' + + assert isinstance(actor.list_[1], models.Attribute) + assert list_[1] == 'value2' + + list_[0] = 'new_value1' + list_[1] = 'new_value2' + assert isinstance(actor.list_[1], models.Attribute) + assert isinstance(actor.list_[1], models.Attribute) + assert list_[0] == 'new_value1' + assert list_[1] == 'new_value2' + + def test_iter(self, list_): + list_.append('value1') + list_.append('value2') + assert sorted(list_) == sorted(['value1', 'value2']) + + def test_insert(self, actor, list_): + list_.append('value1') + list_.insert(0, 'value2') + list_.insert(2, 'value3') + list_.insert(10, 'value4') + assert sorted(list_) == sorted(['value1', 'value2', 'value3', 'value4']) + assert len(actor.list_) == 4 + + def test_set(self, list_): + list_.append('value1') + list_.append('value2') + + list_[1] = 'value3' + assert len(list_) == 2 + assert sorted(list_) == sorted(['value1', 'value3']) + + def test_insert_into_nested(self, actor, list_): + list_.append([]) + + list_[0].append('inner_item') + assert isinstance(actor.list_[0], models.Attribute) + assert len(list_) == 1 + assert list_[0][0] == 'inner_item' + + list_[0].append('new_item') + assert isinstance(actor.list_[0], models.Attribute) + assert len(list_) == 1 + assert list_[0][1] == 'new_item' + + assert list_[0] == ['inner_item', 'new_item'] + assert ['inner_item', 'new_item'] == list_[0] + + +class TestDictList(CollectionInstrumentation): + def test_dict_in_list(self, actor, list_): + list_.append({}) + assert len(list_) == 1 + assert isinstance(actor.list_[0], models.Attribute) + assert actor.list_[0].value == {} + + list_[0]['key'] = 'value' + assert list_[0]['key'] == 'value' + assert len(actor.list_) == 1 + assert isinstance(actor.list_[0], models.Attribute) + assert actor.list_[0].value['key'] == 'value' + + def test_list_in_dict(self, actor, dict_): + dict_['key'] = [] + assert len(dict_) == 1 + assert isinstance(actor.dict_['key'], models.Attribute) + assert actor.dict_['key'].value == [] + + dict_['key'].append('value') + assert dict_['key'][0] == 'value' + assert len(actor.dict_) == 1 + assert isinstance(actor.dict_['key'], models.Attribute) + assert actor.dict_['key'].value[0] == 'value' diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_model_storage.py b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_model_storage.py new file mode 100644 index 0000000..518d624 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_model_storage.py @@ -0,0 +1,213 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from sqlalchemy import ( + Column, + Integer, + Text +) + +from aria import ( + application_model_storage, + modeling +) +from aria.storage import ( + ModelStorage, + exceptions, + sql_mapi, +) + +from tests import ( + mock, + storage as tests_storage, + modeling as tests_modeling +) + + +@pytest.fixture +def storage(): + base_storage = ModelStorage(sql_mapi.SQLAlchemyModelAPI, + initiator=tests_storage.init_inmemory_model_storage) + base_storage.register(tests_modeling.MockModel) + yield base_storage + tests_storage.release_sqlite_storage(base_storage) + + +@pytest.fixture(scope='module', autouse=True) +def module_cleanup(): + modeling.models.aria_declarative_base.metadata.remove(tests_modeling.MockModel.__table__) #pylint: disable=no-member + + +def test_storage_base(storage): + with pytest.raises(AttributeError): + storage.non_existent_attribute() + + +def test_model_storage(storage): + mock_model = tests_modeling.MockModel(value=0, name='model_name') + storage.mock_model.put(mock_model) + + assert storage.mock_model.get_by_name('model_name') == mock_model + + assert [mm_from_storage for mm_from_storage in storage.mock_model.iter()] == [mock_model] + assert [mm_from_storage for mm_from_storage in storage.mock_model] == [mock_model] + + storage.mock_model.delete(mock_model) + with pytest.raises(exceptions.StorageError): + storage.mock_model.get(mock_model.id) + + +def test_application_storage_factory(): + storage = application_model_storage(sql_mapi.SQLAlchemyModelAPI, + initiator=tests_storage.init_inmemory_model_storage) + + assert storage.service_template + assert storage.node_template + assert storage.group_template + assert storage.policy_template + assert storage.substitution_template + assert storage.substitution_template_mapping + assert storage.requirement_template + assert storage.relationship_template + assert storage.capability_template + assert storage.interface_template + assert storage.operation_template + assert storage.artifact_template + + assert storage.service + assert storage.node + assert storage.group + assert storage.policy + assert storage.substitution + assert storage.substitution_mapping + assert storage.relationship + assert storage.capability + assert storage.interface + assert storage.operation + assert storage.artifact + + assert storage.execution + assert storage.service_update + assert storage.service_update_step + assert storage.service_modification + assert storage.plugin + assert storage.task + + assert storage.input + assert storage.output + assert storage.property + assert storage.attribute + + assert storage.type + assert storage.metadata + + tests_storage.release_sqlite_storage(storage) + + +def test_cascade_deletion(context): + service = context.model.service.list()[0] + + assert len(context.model.service_template.list()) == 1 + assert len(service.nodes) == len(context.model.node.list()) == 2 + + context.model.service.delete(service) + + assert len(context.model.service_template.list()) == 1 + assert len(context.model.service.list()) == 0 + assert len(context.model.node.list()) == 0 + + +@pytest.fixture +def context(tmpdir): + result = mock.context.simple(str(tmpdir)) + yield result + tests_storage.release_sqlite_storage(result.model) + + +def test_mapi_include(context): + service1 = context.model.service.list()[0] + service1.name = 'service1' + service1.service_template.name = 'service_template1' + context.model.service.update(service1) + + service_template2 = mock.models.create_service_template('service_template2') + service2 = mock.models.create_service(service_template2, 'service2') + context.model.service.put(service2) + + assert service1 != service2 + assert service1.service_template != service2.service_template + + def assert_include(service): + st_name = context.model.service.get(service.id, include=('service_template_name',)) + st_name_list = context.model.service.list(filters={'id': service.id}, + include=('service_template_name', )) + assert len(st_name) == len(st_name_list) == 1 + assert st_name[0] == st_name_list[0][0] == service.service_template.name + + assert_include(service1) + assert_include(service2) + + +class MockModel(modeling.models.aria_declarative_base, modeling.mixins.ModelMixin): #pylint: disable=abstract-method + __tablename__ = 'op_mock_model' + + name = Column(Text) + value = Column(Integer) + + +class TestFilterOperands(object): + + @pytest.fixture() + def storage(self): + model_storage = application_model_storage( + sql_mapi.SQLAlchemyModelAPI, initiator=tests_storage.init_inmemory_model_storage) + model_storage.register(MockModel) + for value in (1, 2, 3, 4): + model_storage.op_mock_model.put(MockModel(value=value)) + yield model_storage + tests_storage.release_sqlite_storage(model_storage) + + def test_gt(self, storage): + assert len(storage.op_mock_model.list(filters=dict(value=dict(gt=3)))) == 1 + assert len(storage.op_mock_model.list(filters=dict(value=dict(gt=4)))) == 0 + + def test_ge(self, storage): + assert len(storage.op_mock_model.list(filters=dict(value=dict(ge=3)))) == 2 + assert len(storage.op_mock_model.list(filters=dict(value=dict(ge=5)))) == 0 + + def test_lt(self, storage): + assert len(storage.op_mock_model.list(filters=dict(value=dict(lt=2)))) == 1 + assert len(storage.op_mock_model.list(filters=dict(value=dict(lt=1)))) == 0 + + def test_le(self, storage): + assert len(storage.op_mock_model.list(filters=dict(value=dict(le=2)))) == 2 + assert len(storage.op_mock_model.list(filters=dict(value=dict(le=0)))) == 0 + + def test_eq(self, storage): + assert len(storage.op_mock_model.list(filters=dict(value=dict(eq=2)))) == 1 + assert len(storage.op_mock_model.list(filters=dict(value=dict(eq=0)))) == 0 + + def test_neq(self, storage): + assert len(storage.op_mock_model.list(filters=dict(value=dict(ne=2)))) == 3 + + def test_gt_and_lt(self, storage): + assert len(storage.op_mock_model.list(filters=dict(value=dict(gt=1, lt=3)))) == 1 + assert len(storage.op_mock_model.list(filters=dict(value=dict(gt=2, lt=2)))) == 0 + + def test_eq_and_ne(self, storage): + assert len(storage.op_mock_model.list(filters=dict(value=dict(eq=1, ne=3)))) == 1 + assert len(storage.op_mock_model.list(filters=dict(value=dict(eq=1, ne=1)))) == 0 diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_resource_storage.py b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_resource_storage.py new file mode 100644 index 0000000..efacb2e --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/storage/test_resource_storage.py @@ -0,0 +1,280 @@ +# Licensed to the Apache ftware Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import tempfile + +import pytest + +from aria.storage.filesystem_rapi import FileSystemResourceAPI +from aria.storage import ( + exceptions, + ResourceStorage +) +from . import TestFileSystem + + +class TestResourceStorage(TestFileSystem): + def _create(self, storage): + storage.register('service_template') + + def _upload(self, storage, tmp_path, id): + with open(tmp_path, 'w') as f: + f.write('fake context') + + storage.service_template.upload(entry_id=id, source=tmp_path) + + def _upload_dir(self, storage, tmp_dir, tmp_file_name, id): + file_source = os.path.join(tmp_dir, tmp_file_name) + with open(file_source, 'w') as f: + f.write('fake context') + + storage.service_template.upload(entry_id=id, source=tmp_dir) + + def _create_storage(self): + return ResourceStorage(FileSystemResourceAPI, + api_kwargs=dict(directory=self.path)) + + def test_name(self): + api = FileSystemResourceAPI + storage = ResourceStorage(FileSystemResourceAPI, + items=['service_template'], + api_kwargs=dict(directory=self.path)) + assert repr(storage) == 'ResourceStorage(api={api})'.format(api=api) + assert 'directory={resource_dir}'.format(resource_dir=self.path) in \ + repr(storage.registered['service_template']) + + def test_create(self): + storage = self._create_storage() + self._create(storage) + assert os.path.exists(os.path.join(self.path, 'service_template')) + + def test_upload_file(self): + storage = ResourceStorage(FileSystemResourceAPI, api_kwargs=dict(directory=self.path)) + self._create(storage) + tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1] + self._upload(storage, tmpfile_path, id='service_template_id') + + storage_path = os.path.join( + self.path, + 'service_template', + 'service_template_id', + os.path.basename(tmpfile_path)) + assert os.path.exists(storage_path) + + with open(storage_path, 'rb') as f: + assert f.read() == 'fake context' + + def test_download_file(self): + storage = self._create_storage() + self._create(storage) + tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1] + tmpfile_name = os.path.basename(tmpfile_path) + self._upload(storage, tmpfile_path, 'service_template_id') + + temp_dir = tempfile.mkdtemp(dir=self.path) + storage.service_template.download( + entry_id='service_template_id', + destination=temp_dir, + path=tmpfile_name) + + with open(os.path.join(self.path, os.path.join(temp_dir, tmpfile_name))) as f: + assert f.read() == 'fake context' + + def test_download_non_existing_file(self): + storage = self._create_storage() + self._create(storage) + with pytest.raises(exceptions.StorageError): + storage.service_template.download(entry_id='service_template_id', destination='', + path='fake_path') + + def test_data_non_existing_file(self): + storage = self._create_storage() + self._create(storage) + with pytest.raises(exceptions.StorageError): + storage.service_template.read(entry_id='service_template_id', path='fake_path') + + def test_data_file(self): + storage = self._create_storage() + self._create(storage) + tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1] + self._upload(storage, tmpfile_path, 'service_template_id') + + assert storage.service_template.read(entry_id='service_template_id', + path=os.path.basename(tmpfile_path)) == 'fake context' + + def test_upload_dir(self): + storage = self._create_storage() + self._create(storage) + tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path) + second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir) + tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1] + self._upload_dir(storage, tmp_dir, tmp_filename, id='service_template_id') + + destination = os.path.join( + self.path, + 'service_template', + 'service_template_id', + os.path.basename(second_level_tmp_dir), + os.path.basename(tmp_filename)) + + assert os.path.isfile(destination) + + def test_upload_path_in_dir(self): + storage = self._create_storage() + self._create(storage) + tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path) + second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir) + tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1] + self._upload_dir(storage, tmp_dir, tmp_filename, id='service_template_id') + + second_update_file = tempfile.mkstemp(dir=self.path)[1] + with open(second_update_file, 'w') as f: + f.write('fake context2') + + storage.service_template.upload( + entry_id='service_template_id', + source=second_update_file, + path=os.path.basename(second_level_tmp_dir)) + + assert os.path.isfile(os.path.join( + self.path, + 'service_template', + 'service_template_id', + os.path.basename(second_level_tmp_dir), + os.path.basename(second_update_file))) + + def test_download_dir(self): + storage = self._create_storage() + self._create(storage) + tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path) + second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir) + tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1] + self._upload_dir(storage, tmp_dir, tmp_filename, id='service_template_id') + + temp_destination_dir = tempfile.mkdtemp(dir=self.path) + storage.service_template.download( + entry_id='service_template_id', + destination=temp_destination_dir) + + destination_file_path = os.path.join( + temp_destination_dir, + os.path.basename(second_level_tmp_dir), + os.path.basename(tmp_filename)) + + assert os.path.isfile(destination_file_path) + + with open(destination_file_path) as f: + assert f.read() == 'fake context' + + def test_data_dir(self): + storage = self._create_storage() + self._create(storage) + + tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path) + tempfile.mkstemp(dir=tmp_dir) + tempfile.mkstemp(dir=tmp_dir) + + storage.service_template.upload(entry_id='service_template_id', source=tmp_dir) + + with pytest.raises(exceptions.StorageError): + storage.service_template.read(entry_id='service_template_id', path='') + + def test_delete_resource(self): + storage = self._create_storage() + self._create(storage) + tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1] + self._upload(storage, tmpfile_path, 'service_template_id') + tmpfile2_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1] + self._upload(storage, tmpfile2_path, 'service_template_id') + + # deleting the first resource and expecting an error on read + storage.service_template.delete(entry_id='service_template_id', + path=os.path.basename(tmpfile_path)) + with pytest.raises(exceptions.StorageError): + storage.service_template.read(entry_id='service_template_id', + path=os.path.basename(tmpfile_path)) + # the second resource should still be available for reading + assert storage.service_template.read( + entry_id='service_template_id', + path=os.path.basename(tmpfile2_path)) == 'fake context' + + def test_delete_directory(self): + storage = self._create_storage() + self._create(storage) + temp_destination_dir = tempfile.mkdtemp(dir=self.path) + + tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path) + second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir) + tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1] + self._upload_dir(storage, tmp_dir, tmp_filename, id='service_template_id') + file_path_in_dir = os.path.join( + os.path.basename(second_level_tmp_dir), + os.path.basename(tmp_filename)) + + # should be able to read the file and download the directory.. + assert storage.service_template.read( + entry_id='service_template_id', + path=file_path_in_dir) == 'fake context' + storage.service_template.download( + entry_id='service_template_id', + path=os.path.basename(second_level_tmp_dir), + destination=temp_destination_dir) + + # after deletion, the file and directory should both be gone + storage.service_template.delete( + entry_id='service_template_id', + path=os.path.basename(second_level_tmp_dir)) + with pytest.raises(exceptions.StorageError): + assert storage.service_template.read( + entry_id='service_template_id', + path=file_path_in_dir) == 'fake context' + with pytest.raises(exceptions.StorageError): + storage.service_template.download( + entry_id='service_template_id', + path=os.path.basename(second_level_tmp_dir), + destination=temp_destination_dir) + + def test_delete_all_resources(self): + storage = self._create_storage() + self._create(storage) + temp_destination_dir = tempfile.mkdtemp(dir=self.path) + + tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path) + second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir) + tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1] + self._upload_dir(storage, tmp_dir, tmp_filename, id='service_template_id') + file_path_in_dir = os.path.join( + os.path.basename(second_level_tmp_dir), + os.path.basename(tmp_filename)) + + # deleting without specifying a path - delete all resources of this entry + storage.service_template.delete(entry_id='service_template_id') + with pytest.raises(exceptions.StorageError): + assert storage.service_template.read( + entry_id='service_template_id', + path=file_path_in_dir) == 'fake context' + with pytest.raises(exceptions.StorageError): + storage.service_template.download( + entry_id='service_template_id', + path=os.path.basename(second_level_tmp_dir), + destination=temp_destination_dir) + + def test_delete_nonexisting_resource(self): + storage = self._create_storage() + self._create(storage) + # deleting a nonexisting resource - no effect is expected to happen + assert storage.service_template.delete(entry_id='service_template_id', + path='fake-file') is False |