summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/storage/filesystem_rapi.py
blob: b425fa21a7e50b4b299b45978d2cfacc1f949c86 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
File system implementation of the storage resource API ("RAPI").
"""

import os
import shutil
from multiprocessing import RLock
from contextlib import contextmanager
from functools import partial
from distutils import dir_util                                # https://github.com/PyCQA/pylint/issues/73; pylint: disable=no-name-in-module

from aria.storage import (
    api,
    exceptions
)


class FileSystemResourceAPI(api.ResourceAPI):
    """
    File system implementation of the storage resource API ("RAPI").
    """

    def __init__(self, directory, **kwargs):
        """
        :param directory: root dir for storage
        """
        super(FileSystemResourceAPI, self).__init__(**kwargs)
        self.directory = directory
        self.base_path = os.path.join(self.directory, self.name)
        self._join_path = partial(os.path.join, self.base_path)
        self._lock = RLock()

    @contextmanager
    def connect(self):
        """
        Establishes a connection and destroys it after use.
        """
        try:
            self._establish_connection()
            yield self
        except BaseException as e:
            raise exceptions.StorageError(str(e))
        finally:
            self._destroy_connection()

    def _establish_connection(self):
        """
        Establishes a connection. Used in the ``connect`` context manager.
        """
        self._lock.acquire()

    def _destroy_connection(self):
        """
        Destroys a connection. Used in the ``connect`` context manager.
        """
        self._lock.release()

    def __repr__(self):
        return '{cls.__name__}(directory={self.directory})'.format(
            cls=self.__class__, self=self)

    def create(self, **kwargs):
        """
        Creates a directory in by path. Tries to create the root directory as well.

        :param name: path of directory
        """
        try:
            os.makedirs(self.directory)
        except (OSError, IOError):
            pass
        try:
            os.makedirs(self.base_path)
        except (OSError, IOError):
            pass

    def read(self, entry_id, path, **_):
        """
        Retrieves the contents of a file.

        :param entry_id: entry ID
        :param path: path to resource
        :return: contents of the file
        :rtype: bytes
        """
        resource_relative_path = os.path.join(self.name, entry_id, path or '')
        resource = os.path.join(self.directory, resource_relative_path)
        if not os.path.exists(resource):
            raise exceptions.StorageError("Resource {0} does not exist".
                                          format(resource_relative_path))
        if not os.path.isfile(resource):
            resources = os.listdir(resource)
            if len(resources) != 1:
                raise exceptions.StorageError(
                    'Failed to read {0}; Reading a directory is '
                    'only allowed when it contains a single resource'.format(resource))
            resource = os.path.join(resource, resources[0])
        with open(resource, 'rb') as resource_file:
            return resource_file.read()

    def download(self, entry_id, destination, path=None, **_):
        """
        Downloads a file or directory.

        :param entry_id: entry ID
        :param destination: download destination
        :param path: path to download relative to the root of the entry (otherwise all)
        """
        resource_relative_path = os.path.join(self.name, entry_id, path or '')
        resource = os.path.join(self.directory, resource_relative_path)
        if not os.path.exists(resource):
            raise exceptions.StorageError("Resource {0} does not exist".
                                          format(resource_relative_path))
        if os.path.isfile(resource):
            shutil.copy2(resource, destination)
        else:
            dir_util.copy_tree(resource, destination)  # pylint: disable=no-member

    def upload(self, entry_id, source, path=None, **_):
        """
        Uploads a file or directory.

        :param entry_id: entry ID
        :param source: source of the files to upload
        :param path: the destination of the file/s relative to the entry root dir.
        """
        resource_directory = os.path.join(self.directory, self.name, entry_id)
        if not os.path.exists(resource_directory):
            os.makedirs(resource_directory)
        destination = os.path.join(resource_directory, path or '')
        if os.path.isfile(source):
            shutil.copy2(source, destination)
        else:
            dir_util.copy_tree(source, destination)                                       # pylint: disable=no-member

    def delete(self, entry_id, path=None, **_):
        """
        Deletes a file or directory.

        :param entry_id: entry ID
        :param path: path to delete relative to the root of the entry (otherwise all)
        """
        destination = os.path.join(self.directory, self.name, entry_id, path or '')
        if os.path.exists(destination):
            if os.path.isfile(destination):
                os.remove(destination)
            else:
                shutil.rmtree(destination)
            return True
        return False