1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
import threading
from typing import TYPE_CHECKING, Optional, TypeVar
from .. import abc
if TYPE_CHECKING:
from .subject import Subject
_T = TypeVar("_T")
class InnerSubscription(abc.DisposableBase):
def __init__(
self, subject: "Subject[_T]", observer: Optional[abc.ObserverBase[_T]] = None
):
self.subject = subject
self.observer = observer
self.lock = threading.RLock()
def dispose(self) -> None:
with self.lock:
if not self.subject.is_disposed and self.observer:
if self.observer in self.subject.observers:
self.subject.observers.remove(self.observer)
self.observer = None
|