1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
|
Starting with **confluent-kafka-python 2.12.0** (GA release), the next generation consumer group rebalance protocol defined in `KIP-848 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol>`_ is **production-ready**.
**Note:** The new consumer group protocol defined in `KIP-848 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol>`_ is not enabled by default. There are few contract change associated with the new protocol and might cause breaking changes. ``group.protocol`` configuration property dictates whether to use the new ``consumer`` protocol or older ``classic`` protocol. It defaults to ``classic`` if not provided.
********
Overview
********
- **What changed:**
The **Group Leader role** (consumer member) is removed. Assignments are calculated by the **Group Coordinator (broker)** and distributed via **heartbeats**.
- **Requirements:**
- Broker version **4.0.0+**
- confluent-kafka-python version **2.12.0+**: GA (production-ready)
- **Enablement (client-side):**
- ``group.protocol=consumer``
- ``group.remote.assignor=<assignor>`` (optional; broker-controlled
if unset; default broker assignor is ``uniform``)
******************
Available Features
******************
All `KIP-848 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol>`_ features are supported including:
- Subscription to one or more topics, including **regular expression
(regex) subscriptions**
- Rebalance callbacks (**incremental only**)
- Static group membership
- Configurable remote assignor
- Enforced max poll interval
- Upgrade from ``classic`` protocol or downgrade from ``consumer``
protocol
- AdminClient changes as per KIP
****************
Contract Changes
****************
Client Configuration changes
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+------------------------------------+-----------------------------------------+
| Classic Protocol (Deprecated | KIP-848 / Next-Gen Replacement |
| Configs in KIP-848) | |
+====================================+=========================================+
| ``partition.assignment.strategy`` | ``group.remote.assignor`` |
+------------------------------------+-----------------------------------------+
| ``session.timeout.ms`` | Broker config: |
| | ``group.consumer.session.timeout.ms`` |
+------------------------------------+-----------------------------------------+
| ``heartbeat.interval.ms`` | Broker config: |
| | ``group.consumer.heartbeat.interval.ms``|
+------------------------------------+-----------------------------------------+
| ``group.protocol.type`` | Not used in the new protocol |
+------------------------------------+-----------------------------------------+
**Note:** The properties listed under “Classic Protocol (Deprecated
Configs in KIP-848)” are **no longer used** when using the KIP-848
consumer protocol.
Rebalance Callback Changes
^^^^^^^^^^^^^^^^^^^^^^^^^^
- The **protocol is fully incremental** in KIP-848.
- In the **rebalance callbacks**, you **must only use** (optional - if not used, client will handle it internally):
- ``consumer.incremental_assign(partitions)`` to assign new
partitions
- ``consumer.incremental_unassign(partitions)`` to revoke partitions
- **Do not** use ``consumer.assign()`` or ``consumer.unassign()`` when
using ``group.protocol='consumer'`` (KIP-848).
- If you don't provide incremental assign/unassign inside rebalance callbacks, the client will automatically use incremental assign/unassign internally.
- ⚠️ The ``partitions`` list passed to ``incremental_assign()`` and
``incremental_unassign()`` contains only the **incremental changes**
— partitions being **added** or **revoked** — **not the full
assignment**, as was the case with ``assign()`` in the classic
protocol.
- All assignors under KIP-848 are now **sticky**, including ``range``,
which was **not sticky** in the classic protocol.
Static Group Membership
^^^^^^^^^^^^^^^^^^^^^^^
- Duplicate ``group.instance.id`` handling:
- **Newly joining member** is fenced with **UNRELEASED_INSTANCE_ID
(fatal)**.
- (Classic protocol fenced the **existing** member instead.)
- Implications:
- Ensure only **one active instance per** ``group.instance.id``.
- Consumers must shut down cleanly to avoid blocking replacements
until session timeout expires.
Session Timeout & Fetching
^^^^^^^^^^^^^^^^^^^^^^^^^^
- **Session timeout is broker-controlled**:
- If the Coordinator is unreachable, a consumer **continues fetching
messages** but cannot commit offsets.
- Consumer is fenced once a heartbeat response is received from the
Coordinator.
- In the classic protocol, the client stopped fetching when session
timeout expired.
Closing / Auto-Commit
^^^^^^^^^^^^^^^^^^^^^
- On ``close()`` or unsubscribe with auto-commit enabled:
- Member retries committing offsets until a timeout expires.
- Currently uses the **default remote session timeout**.
- Future **KIP-1092** will allow custom commit timeouts.
Error Handling Changes
^^^^^^^^^^^^^^^^^^^^^^
- ``UNKNOWN_TOPIC_OR_PART`` (**subscription case**):
- No longer returned if a topic is missing in the **local cache**
when subscribing; the subscription proceeds.
- ``TOPIC_AUTHORIZATION_FAILED``:
- Reported once per heartbeat or subscription change, even if only
one topic is unauthorized.
Summary of Key Differences (Classic vs Next-Gen)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- **Assignment:** Classic protocol calculated by **Group Leader
(consumer)**; KIP-848 calculated by **Group Coordinator (broker)**
- **Assignors:** Classic range assignor was **not sticky**; KIP-848
assignors are **sticky**, including range
- **Deprecated configs:** Classic client configs are replaced by
``group.remote.assignor`` and broker-controlled session/heartbeat
configs
- **Static membership fencing:** KIP-848 fences **new member** on
duplicate ``group.instance.id``
- **Session timeout:** Classic enforced on client; KIP-848 enforced on
broker
- **Auto-commit on close:** Classic stops at client session timeout;
KIP-848 retries until remote timeout
- **Unknown topics:** KIP-848 does not return error on subscription if
topic missing
- **Upgrade/Downgrade:** KIP-848 supports upgrade/downgrade from/to
``classic`` and ``consumer`` protocols
**********************
Minimal Example Config
**********************
Classic Protocol
^^^^^^^^^^^^^^^^
.. code:: properties
# Optional; default is 'classic'
group.protocol=classic
partition.assignment.strategy=<range,roundrobin,sticky>
session.timeout.ms=45000
heartbeat.interval.ms=15000
Next-Gen Protocol / KIP-848
^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code:: properties
group.protocol=consumer
# Optional: select a remote assignor
# Valid options currently: 'uniform' or 'range'
# group.remote.assignor=<uniform,range>
# If unset, broker chooses the assignor (default: 'uniform')
# Session & heartbeat now controlled by broker:
# group.consumer.session.timeout.ms
# group.consumer.heartbeat.interval.ms
****************************
Rebalance Callback Migration
****************************
Range Assignor (Classic)
^^^^^^^^^^^^^^^^^^^^^^^^
.. code:: python
# Rebalance Callback for Range Assignor (Classic Protocol)
def on_assign(consumer, partitions):
# Full partition list is provided under the classic protocol
print(f"[Classic] Assigned partitions: {partitions}")
consumer.assign(partitions) # Optional: client handles if not used
def on_revoke(consumer, partitions):
print(f"[Classic] Revoked partitions: {partitions}")
consumer.unassign() # Optional: client handles if not used
Incremental Assignor (Including Range in Consumer / KIP-848, Any Protocol)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code:: python
# Rebalance callback for incremental assignor
def on_assign(consumer, partitions):
# Only incremental partitions are passed here (not full list)
print(f"[KIP-848] Incrementally assigning: {partitions}")
consumer.incremental_assign(partitions) # Optional: client handles if not used
def on_revoke(consumer, partitions):
print(f"[KIP-848] Incrementally revoking: {partitions}")
consumer.incremental_unassign(partitions) # Optional: client handles if not used
**Note:** The ``partitions`` list contains **only partitions being added or revoked**, not the full partition list as in the classic ``consumer.assign()``.
*********************
Upgrade and Downgrade
*********************
- A group made up entirely of ``classic`` consumers runs under the
classic protocol.
- The group is **upgraded to the consumer protocol** as soon as at
least one ``consumer`` protocol member joins.
- The group is **downgraded back to the classic protocol** if the last
``consumer`` protocol member leaves while ``classic`` members remain.
- Both **rolling upgrade** (classic → consumer) and **rolling
downgrade** (consumer → classic) are supported.
**************************************************************************************************************************************************************************
Migration Checklist (Next-Gen Protocol / `KIP-848 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol>`_)
**************************************************************************************************************************************************************************
1. Upgrade to **confluent-kafka-python ≥ 2.12.0** (GA release)
2. Run against **Kafka brokers ≥ 4.0.0**
3. Set ``group.protocol=consumer``
4. Optionally set ``group.remote.assignor``; leave unspecified for
broker-controlled (default: ``uniform``), valid options: ``uniform``
or ``range``
5. Replace deprecated configs with new ones
6. Update rebalance callbacks to **incremental APIs only** (if used)
7. Review static membership handling (``group.instance.id``)
8. Ensure proper shutdown to avoid fencing issues
9. Adjust error handling for unknown topics and authorization failures
|