File: protocol.py

package info (click to toggle)
python-kafka 2.0.2-9
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,740 kB
  • sloc: python: 20,457; makefile: 210; sh: 76
file content (33 lines) | stat: -rw-r--r-- 1,041 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from __future__ import absolute_import

from kafka.protocol.struct import Struct
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
from kafka.structs import TopicPartition


class ConsumerProtocolMemberMetadata(Struct):
    SCHEMA = Schema(
        ('version', Int16),
        ('subscription', Array(String('utf-8'))),
        ('user_data', Bytes))


class ConsumerProtocolMemberAssignment(Struct):
    SCHEMA = Schema(
        ('version', Int16),
        ('assignment', Array(
            ('topic', String('utf-8')),
            ('partitions', Array(Int32)))),
        ('user_data', Bytes))

    def partitions(self):
        return [TopicPartition(topic, partition)
                for topic, partitions in self.assignment # pylint: disable-msg=no-member
                for partition in partitions]


class ConsumerProtocol(object):
    PROTOCOL_TYPE = 'consumer'
    ASSIGNMENT_STRATEGIES = ('range', 'roundrobin')
    METADATA = ConsumerProtocolMemberMetadata
    ASSIGNMENT = ConsumerProtocolMemberAssignment