1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513
|
#
# Copyright (C) 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import atexit
import base64
import logging
import os
import re
import subprocess
class FindDeviceError(RuntimeError):
pass
class DeviceNotFoundError(FindDeviceError):
def __init__(self, serial):
self.serial = serial
super(DeviceNotFoundError, self).__init__(
'No device with serial {}'.format(serial))
class NoUniqueDeviceError(FindDeviceError):
def __init__(self):
super(NoUniqueDeviceError, self).__init__('No unique device')
class ShellError(RuntimeError):
def __init__(self, cmd, stdout, stderr, exit_code):
super(ShellError, self).__init__(
'`{0}` exited with code {1}'.format(cmd, exit_code))
self.cmd = cmd
self.stdout = stdout
self.stderr = stderr
self.exit_code = exit_code
def get_devices(adb_path='adb'):
with open(os.devnull, 'wb') as devnull:
subprocess.check_call([adb_path, 'start-server'], stdout=devnull,
stderr=devnull)
out = split_lines(subprocess.check_output([adb_path, 'devices']))
# The first line of `adb devices` just says "List of attached devices", so
# skip that.
devices = []
for line in out[1:]:
if not line.strip():
continue
if 'offline' in line:
continue
serial, _ = re.split(r'\s+', line, maxsplit=1)
devices.append(serial)
return devices
def _get_unique_device(product=None, adb_path='adb'):
devices = get_devices(adb_path=adb_path)
if len(devices) != 1:
raise NoUniqueDeviceError()
return AndroidDevice(devices[0], product, adb_path)
def _get_device_by_serial(serial, product=None, adb_path='adb'):
for device in get_devices(adb_path=adb_path):
if device == serial:
return AndroidDevice(serial, product, adb_path)
raise DeviceNotFoundError(serial)
def get_device(serial=None, product=None, adb_path='adb'):
"""Get a uniquely identified AndroidDevice if one is available.
Raises:
DeviceNotFoundError:
The serial specified by `serial` or $ANDROID_SERIAL is not
connected.
NoUniqueDeviceError:
Neither `serial` nor $ANDROID_SERIAL was set, and the number of
devices connected to the system is not 1. Having 0 connected
devices will also result in this error.
Returns:
An AndroidDevice associated with the first non-None identifier in the
following order of preference:
1) The `serial` argument.
2) The environment variable $ANDROID_SERIAL.
3) The single device connnected to the system.
"""
if serial is not None:
return _get_device_by_serial(serial, product, adb_path)
android_serial = os.getenv('ANDROID_SERIAL')
if android_serial is not None:
return _get_device_by_serial(android_serial, product, adb_path)
return _get_unique_device(product, adb_path=adb_path)
def _get_device_by_type(flag, adb_path):
with open(os.devnull, 'wb') as devnull:
subprocess.check_call([adb_path, 'start-server'], stdout=devnull,
stderr=devnull)
try:
serial = subprocess.check_output(
[adb_path, flag, 'get-serialno']).strip()
except subprocess.CalledProcessError:
raise RuntimeError('adb unexpectedly returned nonzero')
if serial == 'unknown':
raise NoUniqueDeviceError()
return _get_device_by_serial(serial, adb_path=adb_path)
def get_usb_device(adb_path='adb'):
"""Get the unique USB-connected AndroidDevice if it is available.
Raises:
NoUniqueDeviceError:
0 or multiple devices are connected via USB.
Returns:
An AndroidDevice associated with the unique USB-connected device.
"""
return _get_device_by_type('-d', adb_path=adb_path)
def get_emulator_device(adb_path='adb'):
"""Get the unique emulator AndroidDevice if it is available.
Raises:
NoUniqueDeviceError:
0 or multiple emulators are running.
Returns:
An AndroidDevice associated with the unique running emulator.
"""
return _get_device_by_type('-e', adb_path=adb_path)
# If necessary, modifies subprocess.check_output() or subprocess.Popen() args
# to run the subprocess via Windows PowerShell to work-around an issue in
# Python 2's subprocess class on Windows where it doesn't support Unicode.
def _get_subprocess_args(args):
# Only do this slow work-around if Unicode is in the cmd line on Windows.
# PowerShell takes 600-700ms to startup on a 2013-2014 machine, which is
# very slow.
if os.name != 'nt' or all(not isinstance(arg, unicode) for arg in args[0]):
return args
def escape_arg(arg):
# Escape for the parsing that the C Runtime does in Windows apps. In
# particular, this will take care of double-quotes.
arg = subprocess.list2cmdline([arg])
# Escape single-quote with another single-quote because we're about
# to...
arg = arg.replace(u"'", u"''")
# ...put the arg in a single-quoted string for PowerShell to parse.
arg = u"'" + arg + u"'"
return arg
# Escape command line args.
argv = map(escape_arg, args[0])
# Cause script errors (such as adb not found) to stop script immediately
# with an error.
ps_code = u'$ErrorActionPreference = "Stop"\r\n'
# Add current directory to the PATH var, to match cmd.exe/CreateProcess()
# behavior.
ps_code += u'$env:Path = ".;" + $env:Path\r\n'
# Precede by &, the PowerShell call operator, and separate args by space.
ps_code += u'& ' + u' '.join(argv)
# Make the PowerShell exit code the exit code of the subprocess.
ps_code += u'\r\nExit $LastExitCode'
# Encode as UTF-16LE (without Byte-Order-Mark) which Windows natively
# understands.
ps_code = ps_code.encode('utf-16le')
# Encode the PowerShell command as base64 and use the special
# -EncodedCommand option that base64 decodes. Base64 is just plain ASCII,
# so it should have no problem passing through Win32 CreateProcessA()
# (which python erroneously calls instead of CreateProcessW()).
return (['powershell.exe', '-NoProfile', '-NonInteractive',
'-EncodedCommand', base64.b64encode(ps_code)],) + args[1:]
# Call this instead of subprocess.check_output() to work-around issue in Python
# 2's subprocess class on Windows where it doesn't support Unicode.
def _subprocess_check_output(*args, **kwargs):
try:
return subprocess.check_output(*_get_subprocess_args(args), **kwargs)
except subprocess.CalledProcessError as e:
# Show real command line instead of the powershell.exe command line.
raise subprocess.CalledProcessError(e.returncode, args[0],
output=e.output)
# Call this instead of subprocess.Popen(). Like _subprocess_check_output().
def _subprocess_Popen(*args, **kwargs):
return subprocess.Popen(*_get_subprocess_args(args), **kwargs)
def split_lines(s):
"""Splits lines in a way that works even on Windows and old devices.
Windows will see \r\n instead of \n, old devices do the same, old devices
on Windows will see \r\r\n.
"""
# rstrip is used here to workaround a difference between splineslines and
# re.split:
# >>> 'foo\n'.splitlines()
# ['foo']
# >>> re.split(r'\n', 'foo\n')
# ['foo', '']
return re.split(r'[\r\n]+', s.rstrip())
def version(adb_path=None):
"""Get the version of adb (in terms of ADB_SERVER_VERSION)."""
adb_path = adb_path if adb_path is not None else ['adb']
version_output = subprocess.check_output(adb_path + ['version'])
pattern = r'^Android Debug Bridge version 1.0.(\d+)$'
result = re.match(pattern, version_output.splitlines()[0])
if not result:
return 0
return int(result.group(1))
class AndroidDevice(object):
# Delimiter string to indicate the start of the exit code.
_RETURN_CODE_DELIMITER = 'x'
# Follow any shell command with this string to get the exit
# status of a program since this isn't propagated by adb.
#
# The delimiter is needed because `printf 1; echo $?` would print
# "10", and we wouldn't be able to distinguish the exit code.
_RETURN_CODE_PROBE = [';', 'echo', '{0}$?'.format(_RETURN_CODE_DELIMITER)]
# Maximum search distance from the output end to find the delimiter.
# adb on Windows returns \r\n even if adbd returns \n. Some old devices
# seem to actually return \r\r\n.
_RETURN_CODE_SEARCH_LENGTH = len(
'{0}255\r\r\n'.format(_RETURN_CODE_DELIMITER))
def __init__(self, serial, product=None, adb_path='adb'):
self.serial = serial
self.product = product
self.adb_cmd = [adb_path]
if self.serial is not None:
self.adb_cmd.extend(['-s', serial])
if self.product is not None:
self.adb_cmd.extend(['-p', product])
self._linesep = None
self._features = None
@property
def linesep(self):
if self._linesep is None:
self._linesep = subprocess.check_output(self.adb_cmd +
['shell', 'echo'])
return self._linesep
@property
def features(self):
if self._features is None:
try:
self._features = split_lines(self._simple_call(['features']))
except subprocess.CalledProcessError:
self._features = []
return self._features
def has_shell_protocol(self):
return version(self.adb_cmd) >= 35 and 'shell_v2' in self.features
def _make_shell_cmd(self, user_cmd):
command = self.adb_cmd + ['shell'] + user_cmd
if not self.has_shell_protocol():
command += self._RETURN_CODE_PROBE
return command
def _parse_shell_output(self, out):
"""Finds the exit code string from shell output.
Args:
out: Shell output string.
Returns:
An (exit_code, output_string) tuple. The output string is
cleaned of any additional stuff we appended to find the
exit code.
Raises:
RuntimeError: Could not find the exit code in |out|.
"""
search_text = out
if len(search_text) > self._RETURN_CODE_SEARCH_LENGTH:
# We don't want to search over massive amounts of data when we know
# the part we want is right at the end.
search_text = search_text[-self._RETURN_CODE_SEARCH_LENGTH:]
partition = search_text.rpartition(self._RETURN_CODE_DELIMITER)
if partition[1] == '':
raise RuntimeError('Could not find exit status in shell output.')
result = int(partition[2])
# partition[0] won't contain the full text if search_text was
# truncated, pull from the original string instead.
out = out[:-len(partition[1]) - len(partition[2])]
return result, out
def _simple_call(self, cmd):
logging.info(' '.join(self.adb_cmd + cmd))
return _subprocess_check_output(
self.adb_cmd + cmd, stderr=subprocess.STDOUT)
def shell(self, cmd):
"""Calls `adb shell`
Args:
cmd: command to execute as a list of strings.
Returns:
A (stdout, stderr) tuple. Stderr may be combined into stdout
if the device doesn't support separate streams.
Raises:
ShellError: the exit code was non-zero.
"""
exit_code, stdout, stderr = self.shell_nocheck(cmd)
if exit_code != 0:
raise ShellError(cmd, stdout, stderr, exit_code)
return stdout, stderr
def shell_nocheck(self, cmd):
"""Calls `adb shell`
Args:
cmd: command to execute as a list of strings.
Returns:
An (exit_code, stdout, stderr) tuple. Stderr may be combined
into stdout if the device doesn't support separate streams.
"""
cmd = self._make_shell_cmd(cmd)
logging.info(' '.join(cmd))
p = _subprocess_Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if self.has_shell_protocol():
exit_code = p.returncode
else:
exit_code, stdout = self._parse_shell_output(stdout)
return exit_code, stdout, stderr
def shell_popen(self, cmd, kill_atexit=True, preexec_fn=None,
creationflags=0, **kwargs):
"""Calls `adb shell` and returns a handle to the adb process.
This function provides direct access to the subprocess used to run the
command, without special return code handling. Users that need the
return value must retrieve it themselves.
Args:
cmd: Array of command arguments to execute.
kill_atexit: Whether to kill the process upon exiting.
preexec_fn: Argument forwarded to subprocess.Popen.
creationflags: Argument forwarded to subprocess.Popen.
**kwargs: Arguments forwarded to subprocess.Popen.
Returns:
subprocess.Popen handle to the adb shell instance
"""
command = self.adb_cmd + ['shell'] + cmd
# Make sure a ctrl-c in the parent script doesn't kill gdbserver.
if os.name == 'nt':
creationflags |= subprocess.CREATE_NEW_PROCESS_GROUP
else:
if preexec_fn is None:
preexec_fn = os.setpgrp
elif preexec_fn is not os.setpgrp:
fn = preexec_fn
def _wrapper():
fn()
os.setpgrp()
preexec_fn = _wrapper
p = _subprocess_Popen(command, creationflags=creationflags,
preexec_fn=preexec_fn, **kwargs)
if kill_atexit:
atexit.register(p.kill)
return p
def install(self, filename, replace=False):
cmd = ['install']
if replace:
cmd.append('-r')
cmd.append(filename)
return self._simple_call(cmd)
def push(self, local, remote, sync=False):
"""Transfer a local file or directory to the device.
Args:
local: The local file or directory to transfer.
remote: The remote path to which local should be transferred.
sync: If True, only transfers files that are newer on the host than
those on the device. If False, transfers all files.
Returns:
Exit status of the push command.
"""
cmd = ['push']
if sync:
cmd.append('--sync')
cmd.extend([local, remote])
return self._simple_call(cmd)
def pull(self, remote, local):
return self._simple_call(['pull', remote, local])
def sync(self, directory=None):
cmd = ['sync']
if directory is not None:
cmd.append(directory)
return self._simple_call(cmd)
def tcpip(self, port):
return self._simple_call(['tcpip', port])
def usb(self):
return self._simple_call(['usb'])
def reboot(self):
return self._simple_call(['reboot'])
def remount(self):
return self._simple_call(['remount'])
def root(self):
return self._simple_call(['root'])
def unroot(self):
return self._simple_call(['unroot'])
def connect(self, host):
return self._simple_call(['connect', host])
def disconnect(self, host):
return self._simple_call(['disconnect', host])
def forward(self, local, remote):
return self._simple_call(['forward', local, remote])
def forward_list(self):
return self._simple_call(['forward', '--list'])
def forward_no_rebind(self, local, remote):
return self._simple_call(['forward', '--no-rebind', local, remote])
def forward_remove(self, local):
return self._simple_call(['forward', '--remove', local])
def forward_remove_all(self):
return self._simple_call(['forward', '--remove-all'])
def reverse(self, remote, local):
return self._simple_call(['reverse', remote, local])
def reverse_list(self):
return self._simple_call(['reverse', '--list'])
def reverse_no_rebind(self, local, remote):
return self._simple_call(['reverse', '--no-rebind', local, remote])
def reverse_remove_all(self):
return self._simple_call(['reverse', '--remove-all'])
def reverse_remove(self, remote):
return self._simple_call(['reverse', '--remove', remote])
def wait(self):
return self._simple_call(['wait-for-device'])
def get_prop(self, prop_name):
output = split_lines(self.shell(['getprop', prop_name])[0])
if len(output) != 1:
raise RuntimeError('Too many lines in getprop output:\n' +
'\n'.join(output))
value = output[0]
if not value.strip():
return None
return value
def set_prop(self, prop_name, value):
self.shell(['setprop', prop_name, value])
|