1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
|
import functools
import sys
import time
from dcos import emitting, util
from dcos.errors import DCOSException
logger = util.get_logger(__name__)
emitter = emitting.FlatEmitter()
def _no_file_exception():
return DCOSException('No files exist. Exiting.')
def log_files(mesos_files, follow, lines):
"""Print the contents of the given `mesos_files`. Behaves like unix
tail.
:param mesos_files: file objects to print
:type mesos_files: [MesosFile]
:param follow: same as unix tail's -f
:type follow: bool
:param lines: number of lines to print
:type lines: int
:rtype: None
"""
fn = functools.partial(_read_last_lines, lines)
curr_header, mesos_files = _stream_files(None, fn, mesos_files)
if not mesos_files:
raise _no_file_exception()
while follow:
# This flush is needed only for testing, since stdout is fully
# buffered (as opposed to line-buffered) when redirected to a
# pipe. So if we don't flush, our --follow tests, which use a
# pipe, never see the data
sys.stdout.flush()
curr_header, mesos_files = _stream_files(curr_header,
_read_rest,
mesos_files)
if not mesos_files:
raise _no_file_exception()
time.sleep(1)
def _stream_files(curr_header, fn, mesos_files):
"""Apply `fn` in parallel to each file in `mesos_files`. `fn` must
return a list of strings, and these strings are then printed
serially as separate lines.
`curr_header` is the most recently printed header. It's used to
group lines. Each line has an associated header (e.g. a string
representation of the MesosFile it was read from), and we only
print the header before printing a line with a different header
than the previous line. This effectively groups lines together
when the have the same header.
:param curr_header: Most recently printed header
:type curr_header: str
:param fn: function that reads a sequence of lines from a MesosFile
:type fn: MesosFile -> [str]
:param mesos_files: files to read
:type mesos_files: [MesosFile]
:returns: Returns the most recently printed header, and a list of
files that are still reachable. Once we detect a file is
unreachable, we stop trying to read from it.
:rtype: (str, [MesosFile])
"""
reachable_files = list(mesos_files)
# TODO switch to map
for job, mesos_file in util.stream(fn, mesos_files):
try:
lines = job.result()
except DCOSException as e:
# The read function might throw an exception if read.json
# is unavailable, or if the file doesn't exist in the
# sandbox. In any case, we silently remove the file and
# continue.
logger.exception("Error reading file: {}".format(e))
reachable_files.remove(mesos_file)
continue
if lines:
curr_header = _output(curr_header,
len(reachable_files) > 1,
str(mesos_file),
lines)
return curr_header, reachable_files
def _output(curr_header, output_header, header, lines):
"""Prints a sequence of lines. If `header` is different than
`curr_header`, first print the header.
:param curr_header: most recently printed header
:type curr_header: str
:param output_header: whether or not to output the header
:type output_header: bool
:param header: header for `lines`
:type header: str
:param lines: lines to print
:type lines: [str]
:returns: `header`
:rtype: str
"""
if lines:
if output_header and header != curr_header:
emitter.publish('===> {} <==='.format(header))
for line in lines:
emitter.publish(line)
return header
# A liberal estimate of a line size. Used to estimate how much data
# we need to fetch from a file when we want to read N lines.
LINE_SIZE = 200
def _read_last_lines(num_lines, mesos_file):
"""Returns the last `num_lines` of a file, or less if the file is
smaller. Seeks to EOF.
:param num_lines: number of lines to read
:type num_lines: int
:param mesos_file: file to read
:type mesos_file: MesosFile
:returns: lines read
:rtype: [str]
"""
file_size = mesos_file.size()
# estimate how much data we need to fetch to read `num_lines`.
fetch_size = LINE_SIZE * num_lines
end = file_size
start = max(end - fetch_size, 0)
data = ''
while True:
# fetch data
mesos_file.seek(start)
data = mesos_file.read(end - start) + data
# break if we have enough lines
data_tmp = _strip_trailing_newline(data)
lines = data_tmp.split('\n')
if len(lines) > num_lines:
ret = lines[-num_lines:]
break
elif start == 0:
ret = lines
break
# otherwise shift our read window and repeat
end = start
start = max(end - fetch_size, 0)
mesos_file.seek(file_size)
return ret
def _read_rest(mesos_file):
""" Reads the rest of the file, and returns the lines.
:param mesos_file: file to read
:type mesos_file: MesosFile
:returns: lines read
:rtype: [str]
"""
data = mesos_file.read()
if data == '':
return []
else:
data_tmp = _strip_trailing_newline(data)
return data_tmp.split('\n')
def _strip_trailing_newline(s):
"""Returns a modified version of the string with the last character
truncated if it's a newline.
:param s: string to trim
:type s: str
:returns: modified string
:rtype: str
"""
if s == "":
return s
else:
return s[:-1] if s[-1] == '\n' else s
|