1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
|
# This file is part of Buildbot. Buildbot is free software: you can
# redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, version 2.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright Buildbot Team Members
from __future__ import annotations
import multiprocessing
import os.path
import socket
import sys
import time
from twisted.application import service
from twisted.internet import defer
from twisted.internet import reactor
from twisted.python import log
from twisted.spread import pb
import buildbot_worker
from buildbot_worker.commands import base
from buildbot_worker.commands import registry
from buildbot_worker.compat import bytes2unicode
from buildbot_worker.util import buffer_manager
from buildbot_worker.util import lineboundaries
class UnknownCommand(pb.Error):
pass
class ProtocolCommandBase:
def __init__(
self,
unicode_encoding,
worker_basedir,
buffer_size,
buffer_timeout,
max_line_length,
newline_re,
builder_is_running,
on_command_complete,
on_lost_remote_step,
command,
command_id,
args,
):
self.unicode_encoding = unicode_encoding
self.worker_basedir = worker_basedir
self.buffer_size = buffer_size
self.buffer_timeout = buffer_timeout
self.max_line_length = max_line_length
self.newline_re = newline_re
self.builder_is_running = builder_is_running
self.on_command_complete = on_command_complete
self.on_lost_remote_step = on_lost_remote_step
self.command_id = command_id
self.protocol_args_setup(command, args)
try:
factory = registry.getFactory(command)
except KeyError as e:
raise UnknownCommand(
f"(command {command_id}): unrecognized WorkerCommand '{command}'"
) from e
# .command points to a WorkerCommand instance, and is set while the step is running.
self.command = factory(self, command_id, args)
self._lbfs = {}
self.buffer = buffer_manager.BufferManager(
reactor, self.protocol_send_update_message, self.buffer_size, self.buffer_timeout
)
self.is_complete = False
def log_msg(self, msg):
log.msg(f"(command {self.command_id}): {msg}")
def split_lines(self, stream, text, text_time):
try:
return self._lbfs[stream].append(text, text_time)
except KeyError:
lbf = self._lbfs[stream] = lineboundaries.LineBoundaryFinder(
self.max_line_length, self.newline_re
)
return lbf.append(text, text_time)
def flush_command_output(self):
for key in sorted(list(self._lbfs)):
lbf = self._lbfs[key]
if key in ['stdout', 'stderr', 'header']:
whole_line = lbf.flush()
if whole_line is not None:
self.buffer.append(key, whole_line)
else: # custom logfile
logname = key
whole_line = lbf.flush()
if whole_line is not None:
self.buffer.append('log', (logname, whole_line))
self.buffer.flush()
return defer.succeed(None)
# sendUpdate is invoked by the Commands we spawn
def send_update(self, data):
if not self.builder_is_running:
# if builder is not running, do not send any status messages
return
if not self.is_complete:
# first element of the tuple is dictionary key, second element is value
data_time = time.time()
for key, value in data:
if key in ['stdout', 'stderr', 'header']:
whole_line = self.split_lines(key, value, data_time)
if whole_line is not None:
self.buffer.append(key, whole_line)
elif key == 'log':
logname, data = value
whole_line = self.split_lines(logname, data, data_time)
if whole_line is not None:
self.buffer.append('log', (logname, whole_line))
else:
self.buffer.append(key, value)
def _ack_failed(self, why, where):
self.log_msg(f"ProtocolCommandBase._ack_failed: {where}")
log.err(why) # we don't really care
# this is fired by the Deferred attached to each Command
def command_complete(self, failure):
if failure:
self.log_msg(f"ProtocolCommandBase.command_complete (failure) {self.command}")
log.err(failure)
# failure, if present, is a failure.Failure. To send it across
# the wire, we must turn it into a pb.CopyableFailure.
failure = pb.CopyableFailure(failure)
failure.unsafeTracebacks = True
else:
# failure is None
self.log_msg(f"ProtocolCommandBase.command_complete (success) {self.command}")
self.on_command_complete()
if not self.builder_is_running:
self.log_msg(" but we weren't running, quitting silently")
return
if not self.is_complete:
d = self.protocol_complete(failure)
d.addErrback(self._ack_failed, "ProtocolCommandBase.command_complete")
self.is_complete = True
class WorkerForBuilderBase(service.Service):
ProtocolCommand: type[ProtocolCommandBase] = ProtocolCommandBase
class BotBase(service.MultiService):
"""I represent the worker-side bot."""
name: str | None = "bot" # type: ignore[assignment]
WorkerForBuilder: type[WorkerForBuilderBase] = WorkerForBuilderBase
os_release_file = "/etc/os-release"
def __init__(self, basedir, unicode_encoding=None, delete_leftover_dirs=False):
service.MultiService.__init__(self)
self.basedir = basedir
self.numcpus = None
self.unicode_encoding = unicode_encoding or sys.getfilesystemencoding() or 'ascii'
self.delete_leftover_dirs = delete_leftover_dirs
self.builders = {}
# Don't send any data until at least buffer_size bytes have been collected
# or buffer_timeout elapsed
self.buffer_size = 64 * 1024
self.buffer_timeout = 5
self.max_line_length = 4096
self.newline_re = r'(\r\n|\r(?=.)|\033\[u|\033\[[0-9]+;[0-9]+[Hf]|\033\[2J|\x08+)'
# for testing purposes
def setOsReleaseFile(self, os_release_file):
self.os_release_file = os_release_file
def startService(self):
assert os.path.isdir(self.basedir)
service.MultiService.startService(self)
def remote_getCommands(self):
commands = {n: base.command_version for n in registry.getAllCommandNames()}
return commands
def remote_print(self, message):
log.msg("message from master:", message)
@staticmethod
def _read_os_release(os_release_file, props):
if not os.path.exists(os_release_file):
return
with open(os_release_file) as fin:
for line in fin:
line = line.strip("\r\n")
# as per man page: Lines beginning with "#" shall be ignored as comments.
if len(line) == 0 or line.startswith('#'):
continue
# parse key-values
key, value = line.split("=", 1)
if value:
key = f'os_{key.lower()}'
props[key] = value.strip('"')
def remote_getWorkerInfo(self):
"""This command retrieves data from the files in WORKERDIR/info/* and
sends the contents to the buildmaster. These are used to describe
the worker and its configuration, and should be created and
maintained by the worker administrator. They will be retrieved each
time the master-worker connection is established.
"""
files = {}
basedir = os.path.join(self.basedir, "info")
if os.path.isdir(basedir):
for f in os.listdir(basedir):
filename = os.path.join(basedir, f)
if os.path.isfile(filename):
with open(filename) as fin:
try:
files[f] = bytes2unicode(fin.read())
except UnicodeDecodeError as e:
log.err(e, f'error while reading file: {filename}')
self._read_os_release(self.os_release_file, files)
if not self.numcpus:
try:
self.numcpus = multiprocessing.cpu_count()
except NotImplementedError:
log.msg(
"warning: could not detect the number of CPUs for this worker. Assuming 1 CPU."
)
self.numcpus = 1
files['environ'] = os.environ.copy()
files['system'] = os.name
files['basedir'] = self.basedir
files['numcpus'] = self.numcpus
files['version'] = self.remote_getVersion()
files['worker_commands'] = self.remote_getCommands()
files['delete_leftover_dirs'] = self.delete_leftover_dirs
return files
def remote_getVersion(self):
"""Send our version back to the Master"""
return buildbot_worker.version
def remote_shutdown(self):
log.msg("worker shutting down on command from master")
# there's no good way to learn that the PB response has been delivered,
# so we'll just wait a bit, in hopes the master hears back. Masters are
# resilient to workers dropping their connections, so there is no harm
# if this timeout is too short.
reactor.callLater(0.2, reactor.stop)
class WorkerBase(service.MultiService):
def __init__(
self,
name,
basedir,
bot_class,
umask=None,
unicode_encoding=None,
delete_leftover_dirs=False,
):
service.MultiService.__init__(self)
self.name = name
bot = bot_class(
basedir, unicode_encoding=unicode_encoding, delete_leftover_dirs=delete_leftover_dirs
)
bot.setServiceParent(self)
self.bot = bot
self.umask = umask
self.basedir = basedir
def startService(self):
log.msg(f"Starting Worker -- version: {buildbot_worker.version}")
if self.umask is not None:
os.umask(self.umask)
self.recordHostname(self.basedir)
service.MultiService.startService(self)
def recordHostname(self, basedir):
"Record my hostname in twistd.hostname, for user convenience"
log.msg("recording hostname in twistd.hostname")
filename = os.path.join(basedir, "twistd.hostname")
try:
hostname = os.uname()[1] # only on unix
except AttributeError:
# this tends to fail on non-connected hosts, e.g., laptops
# on planes
hostname = socket.getfqdn()
try:
with open(filename, "w") as f:
f.write(f"{hostname}\n")
except Exception:
log.msg("failed - ignoring")
|