File: multiprocess_string_io.py

package info (click to toggle)
python-flaky 3.8.1-2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 332 kB
  • sloc: python: 1,320; makefile: 11; sh: 10
file content (34 lines) | stat: -rw-r--r-- 919 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import multiprocessing


class MultiprocessingStringIO:
    """
    Provide a StringIO-like interface to the multiprocessing ListProxy. The
    multiprocessing ListProxy needs to be instantiated before the flaky plugin
    is configured, so the list is created as a class variable.
    """

    _manager = multiprocessing.Manager()
    proxy = _manager.list()  # pylint:disable=no-member

    def getvalue(self):
        """
        Shadow the StringIO.getvalue method.
        """
        return ''.join(i for i in self.proxy)

    def writelines(self, content_list):
        """
        Shadow the StringIO.writelines method. Ingests a list and
        translates that to a string
        """

        for item in content_list:
            self.write(item)

    def write(self, content):
        """
        Shadow the StringIO.write method.
        """
        content.strip('\n')
        self.proxy.append(content)