File: fork_test.py

package info (click to toggle)
python-eventlet 0.19.0-6
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 2,120 kB
  • ctags: 3,551
  • sloc: python: 18,723; sh: 196; makefile: 102
file content (52 lines) | stat: -rw-r--r-- 1,329 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from tests.patcher_test import ProcessBase


class ForkTest(ProcessBase):
    def test_simple(self):
        newmod = '''
import eventlet
import os
import sys
import signal
from eventlet.support import bytes_to_str, six
mydir = %r
signal_file = os.path.join(mydir, "output.txt")
pid = os.fork()
if (pid != 0):
  eventlet.Timeout(10)
  try:
    port = None
    while True:
      try:
        contents = open(signal_file, "rb").read()
        port = int(contents.split()[0])
        break
      except (IOError, IndexError, ValueError, TypeError):
        eventlet.sleep(0.1)
    eventlet.connect(('127.0.0.1', port))
    while True:
      try:
        contents = open(signal_file, "rb").read()
        result = contents.split()[1]
        break
      except (IOError, IndexError):
        eventlet.sleep(0.1)
    print('result {0}'.format(bytes_to_str(result)))
  finally:
    os.kill(pid, signal.SIGTERM)
else:
  try:
    s = eventlet.listen(('', 0))
    fd = open(signal_file, "wb")
    fd.write(six.b(str(s.getsockname()[1])))
    fd.write(b"\\n")
    fd.flush()
    s.accept()
    fd.write(b"done")
    fd.flush()
  finally:
    fd.close()
'''
        self.write_to_tempfile("newmod", newmod % self.tempdir)
        output, lines = self.launch_subprocess('newmod.py')
        self.assertEqual(lines[0], "result done", output)