1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
|
"""Logging
"""
import sys
import os
import logging
from pip._vendor import colorama, pkg_resources
def _color_wrap(*colors):
def wrapped(inp):
return "".join(list(colors) + [inp, colorama.Style.RESET_ALL])
return wrapped
def should_color(consumer, environ, std=(sys.stdout, sys.stderr)):
real_consumer = (
consumer if not isinstance(consumer, colorama.AnsiToWin32)
else consumer.wrapped
)
# If consumer isn't stdout or stderr we shouldn't colorize it
if real_consumer not in std:
return False
# If consumer is a tty we should color it
if hasattr(real_consumer, "isatty") and real_consumer.isatty():
return True
# If we have an ASNI term we should color it
if environ.get("TERM") == "ANSI":
return True
# If anything else we should not color it
return False
def should_warn(current_version, removal_version):
# Our Significant digits on versions is 2, so remove everything but the
# first two places.
current_version = ".".join(current_version.split(".")[:2])
removal_version = ".".join(removal_version.split(".")[:2])
# Our warning threshold is one minor version before removal, so we
# decrement the minor version by one
major, minor = removal_version.split(".")
minor = str(int(minor) - 1)
warn_version = ".".join([major, minor])
# Test if our current_version should be a warn
return (pkg_resources.parse_version(current_version)
< pkg_resources.parse_version(warn_version))
class Logger(object):
"""
Logging object for use in command-line script. Allows ranges of
levels, to avoid some redundancy of displayed information.
"""
VERBOSE_DEBUG = logging.DEBUG - 1
DEBUG = logging.DEBUG
INFO = logging.INFO
NOTIFY = (logging.INFO + logging.WARN) / 2
WARN = WARNING = logging.WARN
ERROR = logging.ERROR
FATAL = logging.FATAL
LEVELS = [VERBOSE_DEBUG, DEBUG, INFO, NOTIFY, WARN, ERROR, FATAL]
COLORS = {
WARN: _color_wrap(colorama.Fore.YELLOW),
ERROR: _color_wrap(colorama.Fore.RED),
FATAL: _color_wrap(colorama.Fore.RED),
}
def __init__(self):
self.consumers = []
self.indent = 0
self.explicit_levels = False
self.in_progress = None
self.in_progress_hanging = False
def add_consumers(self, *consumers):
if sys.platform.startswith("win"):
for level, consumer in consumers:
if hasattr(consumer, "write"):
self.consumers.append(
(level, colorama.AnsiToWin32(consumer)),
)
else:
self.consumers.append((level, consumer))
else:
self.consumers.extend(consumers)
def debug(self, msg, *args, **kw):
self.log(self.DEBUG, msg, *args, **kw)
def info(self, msg, *args, **kw):
self.log(self.INFO, msg, *args, **kw)
def notify(self, msg, *args, **kw):
self.log(self.NOTIFY, msg, *args, **kw)
def warn(self, msg, *args, **kw):
self.log(self.WARN, msg, *args, **kw)
def error(self, msg, *args, **kw):
self.log(self.ERROR, msg, *args, **kw)
def fatal(self, msg, *args, **kw):
self.log(self.FATAL, msg, *args, **kw)
def deprecated(self, removal_version, msg, *args, **kwargs):
"""
Logs deprecation message which is log level WARN if the
``removal_version`` is > 1 minor release away and log level ERROR
otherwise.
removal_version should be the version that the deprecated feature is
expected to be removed in, so something that will not exist in
version 1.7, but will in 1.6 would have a removal_version of 1.7.
"""
from pip import __version__
if should_warn(__version__, removal_version):
self.warn(msg, *args, **kwargs)
else:
self.error(msg, *args, **kwargs)
def log(self, level, msg, *args, **kw):
if args:
if kw:
raise TypeError(
"You may give positional or keyword arguments, not both")
args = args or kw
# render
if args:
rendered = msg % args
else:
rendered = msg
rendered = ' ' * self.indent + rendered
if self.explicit_levels:
# FIXME: should this be a name, not a level number?
rendered = '%02i %s' % (level, rendered)
for consumer_level, consumer in self.consumers:
if self.level_matches(level, consumer_level):
if (self.in_progress_hanging
and consumer in (sys.stdout, sys.stderr)):
self.in_progress_hanging = False
sys.stdout.write('\n')
sys.stdout.flush()
if hasattr(consumer, 'write'):
write_content = rendered + '\n'
if should_color(consumer, os.environ):
# We are printing to stdout or stderr and it supports
# colors so render our text colored
colorizer = self.COLORS.get(level, lambda x: x)
write_content = colorizer(write_content)
consumer.write(write_content)
if hasattr(consumer, 'flush'):
consumer.flush()
else:
consumer(rendered)
def _show_progress(self):
"""Should we display download progress?"""
return (self.stdout_level_matches(self.NOTIFY) and sys.stdout.isatty())
def start_progress(self, msg):
assert not self.in_progress, (
"Tried to start_progress(%r) while in_progress %r"
% (msg, self.in_progress))
if self._show_progress():
sys.stdout.write(' ' * self.indent + msg)
sys.stdout.flush()
self.in_progress_hanging = True
else:
self.in_progress_hanging = False
self.in_progress = msg
self.last_message = None
def end_progress(self, msg='done.'):
assert self.in_progress, (
"Tried to end_progress without start_progress")
if self._show_progress():
if not self.in_progress_hanging:
# Some message has been printed out since start_progress
sys.stdout.write('...' + self.in_progress + msg + '\n')
sys.stdout.flush()
else:
# These erase any messages shown with show_progress
# (besides .'s)
logger.show_progress('')
logger.show_progress('')
sys.stdout.write(msg + '\n')
sys.stdout.flush()
self.in_progress = None
self.in_progress_hanging = False
def show_progress(self, message=None):
"""If we are in a progress scope, and no log messages have been
shown, write out another '.'"""
if self.in_progress_hanging:
if message is None:
sys.stdout.write('.')
sys.stdout.flush()
else:
if self.last_message:
padding = ' ' * max(
0,
len(self.last_message) - len(message)
)
else:
padding = ''
sys.stdout.write(
'\r%s%s%s%s' %
(' ' * self.indent, self.in_progress, message, padding)
)
sys.stdout.flush()
self.last_message = message
def stdout_level_matches(self, level):
"""Returns true if a message at this level will go to stdout"""
return self.level_matches(level, self._stdout_level())
def _stdout_level(self):
"""Returns the level that stdout runs at"""
for level, consumer in self.consumers:
if consumer is sys.stdout:
return level
return self.FATAL
def level_matches(self, level, consumer_level):
"""
>>> l = Logger()
>>> l.level_matches(3, 4)
False
>>> l.level_matches(3, 2)
True
>>> l.level_matches(slice(None, 3), 3)
False
>>> l.level_matches(slice(None, 3), 2)
True
>>> l.level_matches(slice(1, 3), 1)
True
>>> l.level_matches(slice(2, 3), 1)
False
"""
if isinstance(level, slice):
start, stop = level.start, level.stop
if start is not None and start > consumer_level:
return False
if stop is not None or stop <= consumer_level:
return False
return True
else:
return level >= consumer_level
@classmethod
def level_for_integer(cls, level):
levels = cls.LEVELS
if level < 0:
return levels[0]
if level >= len(levels):
return levels[-1]
return levels[level]
def move_stdout_to_stderr(self):
to_remove = []
to_add = []
for consumer_level, consumer in self.consumers:
if consumer == sys.stdout:
to_remove.append((consumer_level, consumer))
to_add.append((consumer_level, sys.stderr))
for item in to_remove:
self.consumers.remove(item)
self.consumers.extend(to_add)
logger = Logger()
|