1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
# Copyright 2014 Tycho Andersen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Not strictly necessary to be included with the binding, but may be useful for
# others who want to test things using xcffib.
import errno
import fcntl
import os
import subprocess
import time
from . import Connection, ConnectionException
def lock_path(display):
return '/tmp/.X%d-lock' % display
def find_display():
display = 10
while True:
try:
f = open(lock_path(display), "w+")
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
f.close()
raise
except OSError:
display += 1
continue
return display, f
class XvfbTest:
""" A helper class for testing things with nosetests. This class will run
each test in its own fresh xvfb, leaving you with an xcffib connection to
that X session as `self.conn` for use in testing. """
# Set this to true if you'd like to get xtrace output to stdout of each
# test.
xtrace = False
def __init__(self, width=800, height=600, depth=16):
self.width = width
self.height = height
self.depth = depth
def spawn(self, cmd):
""" Spawn a command but swallow its output. """
return subprocess.Popen(cmd)
def _restore_display(self):
if self._old_display is None:
del os.environ['DISPLAY']
else:
os.environ['DISPLAY'] = self._old_display
def setUp(self):
self._old_display = os.environ.get('DISPLAY')
self._display, self._display_lock = find_display()
os.environ['DISPLAY'] = ':%d' % self._display
self._xvfb = self.spawn(self._xvfb_command())
if self.xtrace:
subprocess.Popen(['xtrace', '-n'])
# xtrace's default display is :9; obviously this won't work
# concurrently, but it's not the default so...
os.environ['DISPLAY'] = ':9'
try:
self.conn = self._connect_to_xvfb()
except AssertionError:
self._restore_display()
raise
def tearDown(self):
try:
self.conn.disconnect()
except ConnectionException:
# We don't care if the connection was in an invalid state, maybe
# the test failed.
pass
finally:
self.conn = None
self._xvfb.kill()
self._xvfb.wait()
self._xvfb = None
# Delete our X lock file too, since we .kill() the process so it won't
# clean up after itself.
try:
self._display_lock.close()
os.remove(lock_path(self._display))
except OSError as e:
# we don't care if it doesn't exist, maybe something crashed and
# cleaned it up during a test.
if e.errno != errno.ENOENT:
raise
finally:
self._restore_display()
def __enter__(self):
self.setUp()
return self
def __exit__(self, type, value, traceback):
self.tearDown()
def _xvfb_command(self):
""" You can override this if you have some extra args for Xvfb or
whatever. At this point, os.environ['DISPLAY'] is set to something Xvfb
can use. """
screen = '%sx%sx%s' % (self.width, self.height, self.depth)
return ['Xvfb', os.environ['DISPLAY'], '-screen', '0', screen]
def _connect_to_xvfb(self):
# sometimes it takes a while for Xvfb to start
for _ in range(100):
try:
conn = Connection(os.environ['DISPLAY'])
conn.invalid()
return conn
except ConnectionException:
time.sleep(0.2)
assert False, "couldn't connect to xvfb"
|