File: compositedisposable.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (103 lines) | stat: -rw-r--r-- 2,816 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from threading import RLock
from typing import Any, List

from reactivex import abc


class CompositeDisposable(abc.DisposableBase):
    """Represents a group of disposable resources that are disposed
    together"""

    def __init__(self, *args: Any):
        if args and isinstance(args[0], list):
            self.disposable: List[abc.DisposableBase] = args[0]
        else:
            self.disposable = list(args)

        self.is_disposed = False
        self.lock = RLock()
        super(CompositeDisposable, self).__init__()

    def add(self, item: abc.DisposableBase) -> None:
        """Adds a disposable to the CompositeDisposable or disposes the
        disposable if the CompositeDisposable is disposed

        Args:
            item: Disposable to add."""

        should_dispose = False
        with self.lock:
            if self.is_disposed:
                should_dispose = True
            else:
                self.disposable.append(item)

        if should_dispose:
            item.dispose()

    def remove(self, item: abc.DisposableBase) -> bool:
        """Removes and disposes the first occurrence of a disposable
        from the CompositeDisposable."""

        if self.is_disposed:
            return False

        should_dispose = False
        with self.lock:
            if item in self.disposable:
                self.disposable.remove(item)
                should_dispose = True

        if should_dispose:
            item.dispose()

        return should_dispose

    def dispose(self) -> None:
        """Disposes all disposable in the group and removes them from
        the group."""

        if self.is_disposed:
            return

        with self.lock:
            self.is_disposed = True
            current_disposable = self.disposable
            self.disposable = []

        for disp in current_disposable:
            disp.dispose()

    def clear(self) -> None:
        """Removes and disposes all disposable from the
        CompositeDisposable, but does not dispose the
        CompositeDisposable."""

        with self.lock:
            current_disposable = self.disposable
            self.disposable = []

        for disposable in current_disposable:
            disposable.dispose()

    def contains(self, item: abc.DisposableBase) -> bool:
        """Determines whether the CompositeDisposable contains a specific
        disposable.

        Args:
            item: Disposable to search for

        Returns:
            True if the disposable was found; otherwise, False"""

        return item in self.disposable

    def to_list(self) -> List[abc.DisposableBase]:
        return self.disposable[:]

    def __len__(self) -> int:
        return len(self.disposable)

    @property
    def length(self) -> int:
        return len(self.disposable)