1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
|
Description: Replace pkg_resources with importlib
Author: Martin Durant, Alexandre Detiste
Forwarded: not-needed
Last-Update: 2025-11-02
--- a/streamz/core.py
+++ b/streamz/core.py
@@ -166,7 +166,7 @@
def register_plugin_entry_point(cls, entry_point, modifier=identity):
if hasattr(cls, entry_point.name):
raise ValueError(
- f"Can't add {entry_point.name} from {entry_point.module_name} "
+ f"Can't add {entry_point.name} "
f"to {cls.__name__}: duplicate method name."
)
@@ -176,7 +176,6 @@
if not issubclass(node, Stream):
raise TypeError(
f"Error loading {entry_point.name} "
- f"from module {entry_point.module_name}: "
f"{node.__class__.__name__} must be a subclass of Stream"
)
if getattr(cls, entry_point.name).__name__ == "stub":
--- a/streamz/plugins.py
+++ b/streamz/plugins.py
@@ -1,6 +1,6 @@
import warnings
-import pkg_resources
+import importlib.metadata
def try_register(cls, entry_point, *modifier):
@@ -13,10 +13,18 @@
)
+def get_entry_point(eps, group):
+ if hasattr(eps, "select"): # Python 3.10+ / importlib_metadata >= 3.9.0
+ return eps.select(group=group)
+ else:
+ return eps.get(group, [])
+
def load_plugins(cls):
- for entry_point in pkg_resources.iter_entry_points("streamz.sources"):
+ eps = importlib.metadata.entry_points()
+
+ for entry_point in get_entry_point(eps, "streamz.sources"):
try_register(cls, entry_point, staticmethod)
- for entry_point in pkg_resources.iter_entry_points("streamz.nodes"):
+ for entry_point in get_entry_point(eps, "streamz.nodes"):
try_register(cls, entry_point)
- for entry_point in pkg_resources.iter_entry_points("streamz.sinks"):
+ for entry_point in get_entry_point(eps, "streamz.sinks"):
try_register(cls, entry_point)
--- a/streamz/sources.py
+++ b/streamz/sources.py
@@ -896,12 +896,15 @@
:param client_kwargs:
Passed to the client's ``connect()`` method
"""
- def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None, **kwargs):
+ def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None,
+ user=None, pw=None, **kwargs):
self.host = host
self.port = port
self.keepalive = keepalive
self.topic = topic
self.client_kwargs = client_kwargs
+ self.user = user
+ self.pw = pw
super().__init__(q=queue.Queue(), **kwargs)
def _on_connect(self, client, userdata, flags, rc):
@@ -913,6 +916,8 @@
async def run(self):
import paho.mqtt.client as mqtt
client = mqtt.Client()
+ if self.user:
+ client.username_pw_set(self.user, self.pw)
client.on_connect = self._on_connect
client.on_message = self._on_message
client.connect(self.host, self.port, self.keepalive, **(self.client_kwargs or {}))
--- a/streamz/tests/test_kafka.py
+++ b/streamz/tests/test_kafka.py
@@ -51,7 +51,7 @@
def launch_kafka():
stop_docker(let_fail=True)
- subprocess.call(shlex.split("docker pull spotify/kafka"))
+ subprocess.call(shlex.split("docker pull spotify/kafka"), stderr=subprocess.DEVNULL)
cmd = ("docker run -d -p 2181:2181 -p 9092:9092 --env "
"ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 "
"--name streamz-kafka spotify/kafka")
|