1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
# Copyright 2017 ZTE Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import logging
import tempfile
import traceback
import urllib
import zipfile
logger = logging.getLogger(__name__)
def make_dirs(path):
"""
Make directories
:param path:
:return:
"""
if not os.path.exists(path):
os.makedirs(path, 0o777)
def delete_dirs(path):
"""
Delete directories
:param path:
:return:
"""
try:
if os.path.exists(path):
shutil.rmtree(path)
except Exception as e:
logger.error(traceback.format_exc())
logger.error("Failed to delete %s:%s", path, e.args[0])
def download_file_from_http(url, local_dir, file_name):
"""
Download file from http and save to local dir
:param url:
:param local_dir:
:param file_name:
:return:
"""
local_file_name = os.path.join(local_dir, file_name)
is_download_ok = False
try:
make_dirs(local_dir)
req = urllib.request.urlopen(url)
save_file = open(local_file_name, 'w')
save_file.write(req.read())
save_file.close()
req.close()
is_download_ok = True
except:
logger.error(traceback.format_exc())
logger.error("Failed to download %s to %s.", url, local_file_name)
return is_download_ok, local_file_name
def unzip_file(zip_src, dst_dir, csar_path):
"""
Unzip the zip file to given path
:param zip_src:
:param dst_dir:
:param csar_path:
:return:
"""
logger.debug("unzip_file %s to %s.", zip_src, dst_dir)
if os.path.exists(zip_src):
logger.debug("unzip_file %s.", zip_src)
fz = zipfile.ZipFile(zip_src, 'r')
for file in fz.namelist():
fz.extract(file, dst_dir)
return os.path.join(dst_dir, csar_path)
else:
return ""
def unzip_csar(zip_src, dst_dir):
"""
Unzip csar package
:param zip_src:
:param dst_dir:
:return:
"""
if os.path.exists(zip_src):
fz = zipfile.ZipFile(zip_src, 'r')
for file in fz.namelist():
fz.extract(file, dst_dir)
return dst_dir
else:
logger.error("%s doesn't exist", zip_src)
return ""
def unzip_csar_to_tmp(zip_src):
"""
Unzip csar package to temp path
:param zip_src:
:return:
"""
dirpath = tempfile.mkdtemp()
zip_ref = zipfile.ZipFile(zip_src, 'r')
zip_ref.extractall(dirpath)
return dirpath
def get_artifact_path(vnf_path, artifact_file):
"""
Get the path of artifact file
:param vnf_path:
:param artifact_file:
:return:
"""
for root, dirs, files in os.walk(vnf_path):
if artifact_file in files:
return os.path.join(root, artifact_file)
return None
def end_with(_s_in, *suffix):
"""
Check if end with given suffix
:param _s_in:
:param suffix:
:return:
"""
array = map(_s_in.endswith, suffix)
if True in array:
return True
return False
def filter_files(search_path, suffix):
"""
Filter file by given suffix
:param search_path:
:param suffix:
:return:
"""
f_find = []
file_list = os.listdir(search_path)
for file_item in file_list:
if end_with(file_item, suffix):
f_find.append(file_item)
return f_find
def recreate_dir(path):
"""
Recreate directory
:param path:
:return:
"""
if os.path.exists(path):
shutil.rmtree(path)
os.makedirs(path, mode=0o777)
def copy(src_file, dest_dir, new_file_name=None):
"""
Copy file to given dest dir
:param src_file:
:param dest_dir:
:param new_file_name:
:return:
"""
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
if new_file_name is None:
dst = os.path.join(dest_dir, os.path.basename(src_file))
else:
dst = os.path.join(dest_dir, new_file_name)
shutil.copyfile(src_file, dst)
shutil.copymode(src_file, dst)
|