1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252
|
package kafka
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
"net"
"strconv"
"strings"
"sync"
"time"
)
// ErrGroupClosed is returned by ConsumerGroup.Next when the group has already
// been closed.
var ErrGroupClosed = errors.New("consumer group is closed")
// ErrGenerationEnded is returned by the context.Context issued by the
// Generation's Start function when the context has been closed.
var ErrGenerationEnded = errors.New("consumer group generation has ended")
const (
// defaultProtocolType holds the default protocol type documented in the
// kafka protocol
//
// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI
defaultProtocolType = "consumer"
// defaultHeartbeatInterval contains the default time between heartbeats. If
// the coordinator does not receive a heartbeat within the session timeout interval,
// the consumer will be considered dead and the coordinator will rebalance the
// group.
//
// As a rule, the heartbeat interval should be no greater than 1/3 the session timeout.
defaultHeartbeatInterval = 3 * time.Second
// defaultSessionTimeout contains the default interval the coordinator will wait
// for a heartbeat before marking a consumer as dead.
defaultSessionTimeout = 30 * time.Second
// defaultRebalanceTimeout contains the amount of time the coordinator will wait
// for consumers to issue a join group once a rebalance has been requested.
defaultRebalanceTimeout = 30 * time.Second
// defaultJoinGroupBackoff is the amount of time to wait after a failed
// consumer group generation before attempting to re-join.
defaultJoinGroupBackoff = 5 * time.Second
// defaultRetentionTime holds the length of time a the consumer group will be
// saved by kafka. This value tells the broker to use its configured value.
defaultRetentionTime = -1 * time.Millisecond
// defaultPartitionWatchTime contains the amount of time the kafka-go will wait to
// query the brokers looking for partition changes.
defaultPartitionWatchTime = 5 * time.Second
// defaultTimeout is the deadline to set when interacting with the
// consumer group coordinator.
defaultTimeout = 5 * time.Second
)
// ConsumerGroupConfig is a configuration object used to create new instances of
// ConsumerGroup.
type ConsumerGroupConfig struct {
// ID is the consumer group ID. It must not be empty.
ID string
// The list of broker addresses used to connect to the kafka cluster. It
// must not be empty.
Brokers []string
// An dialer used to open connections to the kafka server. This field is
// optional, if nil, the default dialer is used instead.
Dialer *Dialer
// Topics is the list of topics that will be consumed by this group. It
// will usually have a single value, but it is permitted to have multiple
// for more complex use cases.
Topics []string
// GroupBalancers is the priority-ordered list of client-side consumer group
// balancing strategies that will be offered to the coordinator. The first
// strategy that all group members support will be chosen by the leader.
//
// Default: [Range, RoundRobin]
GroupBalancers []GroupBalancer
// HeartbeatInterval sets the optional frequency at which the reader sends the consumer
// group heartbeat update.
//
// Default: 3s
HeartbeatInterval time.Duration
// PartitionWatchInterval indicates how often a reader checks for partition changes.
// If a reader sees a partition change (such as a partition add) it will rebalance the group
// picking up new partitions.
//
// Default: 5s
PartitionWatchInterval time.Duration
// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
// polling the brokers and rebalancing if any partition changes happen to the topic.
WatchPartitionChanges bool
// SessionTimeout optionally sets the length of time that may pass without a heartbeat
// before the coordinator considers the consumer dead and initiates a rebalance.
//
// Default: 30s
SessionTimeout time.Duration
// RebalanceTimeout optionally sets the length of time the coordinator will wait
// for members to join as part of a rebalance. For kafka servers under higher
// load, it may be useful to set this value higher.
//
// Default: 30s
RebalanceTimeout time.Duration
// JoinGroupBackoff optionally sets the length of time to wait before re-joining
// the consumer group after an error.
//
// Default: 5s
JoinGroupBackoff time.Duration
// RetentionTime optionally sets the length of time the consumer group will
// be saved by the broker. -1 will disable the setting and leave the
// retention up to the broker's offsets.retention.minutes property. By
// default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >=
// 2.0.
//
// Default: -1
RetentionTime time.Duration
// StartOffset determines from whence the consumer group should begin
// consuming when it finds a partition without a committed offset. If
// non-zero, it must be set to one of FirstOffset or LastOffset.
//
// Default: FirstOffset
StartOffset int64
// If not nil, specifies a logger used to report internal changes within the
// reader.
Logger Logger
// ErrorLogger is the logger used to report errors. If nil, the reader falls
// back to using Logger instead.
ErrorLogger Logger
// Timeout is the network timeout used when communicating with the consumer
// group coordinator. This value should not be too small since errors
// communicating with the broker will generally cause a consumer group
// rebalance, and it's undesirable that a transient network error intoduce
// that overhead. Similarly, it should not be too large or the consumer
// group may be slow to respond to the coordinator failing over to another
// broker.
//
// Default: 5s
Timeout time.Duration
// connect is a function for dialing the coordinator. This is provided for
// unit testing to mock broker connections.
connect func(dialer *Dialer, brokers ...string) (coordinator, error)
}
// Validate method validates ConsumerGroupConfig properties and sets relevant
// defaults.
func (config *ConsumerGroupConfig) Validate() error {
if len(config.Brokers) == 0 {
return errors.New("cannot create a consumer group with an empty list of broker addresses")
}
if len(config.Topics) == 0 {
return errors.New("cannot create a consumer group without a topic")
}
if config.ID == "" {
return errors.New("cannot create a consumer group without an ID")
}
if config.Dialer == nil {
config.Dialer = DefaultDialer
}
if len(config.GroupBalancers) == 0 {
config.GroupBalancers = []GroupBalancer{
RangeGroupBalancer{},
RoundRobinGroupBalancer{},
}
}
if config.HeartbeatInterval == 0 {
config.HeartbeatInterval = defaultHeartbeatInterval
}
if config.SessionTimeout == 0 {
config.SessionTimeout = defaultSessionTimeout
}
if config.PartitionWatchInterval == 0 {
config.PartitionWatchInterval = defaultPartitionWatchTime
}
if config.RebalanceTimeout == 0 {
config.RebalanceTimeout = defaultRebalanceTimeout
}
if config.JoinGroupBackoff == 0 {
config.JoinGroupBackoff = defaultJoinGroupBackoff
}
if config.RetentionTime == 0 {
config.RetentionTime = defaultRetentionTime
}
if config.HeartbeatInterval < 0 || (config.HeartbeatInterval/time.Millisecond) >= math.MaxInt32 {
return fmt.Errorf("HeartbeatInterval out of bounds: %d", config.HeartbeatInterval)
}
if config.SessionTimeout < 0 || (config.SessionTimeout/time.Millisecond) >= math.MaxInt32 {
return fmt.Errorf("SessionTimeout out of bounds: %d", config.SessionTimeout)
}
if config.RebalanceTimeout < 0 || (config.RebalanceTimeout/time.Millisecond) >= math.MaxInt32 {
return fmt.Errorf("RebalanceTimeout out of bounds: %d", config.RebalanceTimeout)
}
if config.JoinGroupBackoff < 0 || (config.JoinGroupBackoff/time.Millisecond) >= math.MaxInt32 {
return fmt.Errorf("JoinGroupBackoff out of bounds: %d", config.JoinGroupBackoff)
}
if config.RetentionTime < 0 && config.RetentionTime != defaultRetentionTime {
return fmt.Errorf("RetentionTime out of bounds: %d", config.RetentionTime)
}
if config.PartitionWatchInterval < 0 || (config.PartitionWatchInterval/time.Millisecond) >= math.MaxInt32 {
return fmt.Errorf("PartitionWachInterval out of bounds %d", config.PartitionWatchInterval)
}
if config.StartOffset == 0 {
config.StartOffset = FirstOffset
}
if config.StartOffset != FirstOffset && config.StartOffset != LastOffset {
return fmt.Errorf("StartOffset is not valid %d", config.StartOffset)
}
if config.Timeout == 0 {
config.Timeout = defaultTimeout
}
if config.connect == nil {
config.connect = makeConnect(*config)
}
return nil
}
// PartitionAssignment represents the starting state of a partition that has
// been assigned to a consumer.
type PartitionAssignment struct {
// ID is the partition ID.
ID int
// Offset is the initial offset at which this assignment begins. It will
// either be an absolute offset if one has previously been committed for
// the consumer group or a relative offset such as FirstOffset when this
// is the first time the partition have been assigned to a member of the
// group.
Offset int64
}
// genCtx adapts the done channel of the generation to a context.Context. This
// is used by Generation.Start so that we can pass a context to go routines
// instead of passing around channels.
type genCtx struct {
gen *Generation
}
func (c genCtx) Done() <-chan struct{} {
return c.gen.done
}
func (c genCtx) Err() error {
select {
case <-c.gen.done:
return ErrGenerationEnded
default:
return nil
}
}
func (c genCtx) Deadline() (time.Time, bool) {
return time.Time{}, false
}
func (c genCtx) Value(interface{}) interface{} {
return nil
}
// Generation represents a single consumer group generation. The generation
// carries the topic+partition assignments for the given. It also provides
// facilities for committing offsets and for running functions whose lifecycles
// are bound to the generation.
type Generation struct {
// ID is the generation ID as assigned by the consumer group coordinator.
ID int32
// GroupID is the name of the consumer group.
GroupID string
// MemberID is the ID assigned to this consumer by the consumer group
// coordinator.
MemberID string
// Assignments is the initial state of this Generation. The partition
// assignments are grouped by topic.
Assignments map[string][]PartitionAssignment
conn coordinator
// the following fields are used for process accounting to synchronize
// between Start and close. lock protects all of them. done is closed
// when the generation is ending in order to signal that the generation
// should start self-desructing. closed protects against double-closing
// the done chan. routines is a count of running go routines that have been
// launched by Start. joined will be closed by the last go routine to exit.
lock sync.Mutex
done chan struct{}
closed bool
routines int
joined chan struct{}
retentionMillis int64
log func(func(Logger))
logError func(func(Logger))
}
// close stops the generation and waits for all functions launched via Start to
// terminate.
func (g *Generation) close() {
g.lock.Lock()
if !g.closed {
close(g.done)
g.closed = true
}
// determine whether any go routines are running that we need to wait for.
// waiting needs to happen outside of the critical section.
r := g.routines
g.lock.Unlock()
// NOTE: r will be zero if no go routines were ever launched. no need to
// wait in that case.
if r > 0 {
<-g.joined
}
}
// Start launches the provided function in a go routine and adds accounting such
// that when the function exits, it stops the current generation (if not
// already in the process of doing so).
//
// The provided function MUST support cancellation via the ctx argument and exit
// in a timely manner once the ctx is complete. When the context is closed, the
// context's Error() function will return ErrGenerationEnded.
//
// When closing out a generation, the consumer group will wait for all functions
// launched by Start to exit before the group can move on and join the next
// generation. If the function does not exit promptly, it will stop forward
// progress for this consumer and potentially cause consumer group membership
// churn.
func (g *Generation) Start(fn func(ctx context.Context)) {
g.lock.Lock()
defer g.lock.Unlock()
// this is an edge case: if the generation has already closed, then it's
// possible that the close func has already waited on outstanding go
// routines and exited.
//
// nonetheless, it's important to honor that the fn is invoked in case the
// calling function is waiting e.g. on a channel send or a WaitGroup. in
// such a case, fn should immediately exit because ctx.Err() will return
// ErrGenerationEnded.
if g.closed {
go fn(genCtx{g})
return
}
// register that there is one more go routine that's part of this gen.
g.routines++
go func() {
fn(genCtx{g})
g.lock.Lock()
// shut down the generation as soon as one function exits. this is
// different from close() in that it doesn't wait for all go routines in
// the generation to exit.
if !g.closed {
close(g.done)
g.closed = true
}
g.routines--
// if this was the last go routine in the generation, close the joined
// chan so that close() can exit if it's waiting.
if g.routines == 0 {
close(g.joined)
}
g.lock.Unlock()
}()
}
// CommitOffsets commits the provided topic+partition+offset combos to the
// consumer group coordinator. This can be used to reset the consumer to
// explicit offsets.
func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
if len(offsets) == 0 {
return nil
}
topics := make([]offsetCommitRequestV2Topic, 0, len(offsets))
for topic, partitions := range offsets {
t := offsetCommitRequestV2Topic{Topic: topic}
for partition, offset := range partitions {
t.Partitions = append(t.Partitions, offsetCommitRequestV2Partition{
Partition: int32(partition),
Offset: offset,
})
}
topics = append(topics, t)
}
request := offsetCommitRequestV2{
GroupID: g.GroupID,
GenerationID: g.ID,
MemberID: g.MemberID,
RetentionTime: g.retentionMillis,
Topics: topics,
}
_, err := g.conn.offsetCommit(request)
if err == nil {
// if logging is enabled, print out the partitions that were committed.
g.log(func(l Logger) {
var report []string
for _, t := range request.Topics {
report = append(report, fmt.Sprintf("\ttopic: %s", t.Topic))
for _, p := range t.Partitions {
report = append(report, fmt.Sprintf("\t\tpartition %d: %d", p.Partition, p.Offset))
}
}
l.Printf("committed offsets for group %s: \n%s", g.GroupID, strings.Join(report, "\n"))
})
}
return err
}
// heartbeatLoop checks in with the consumer group coordinator at the provided
// interval. It exits if it ever encounters an error, which would signal the
// end of the generation.
func (g *Generation) heartbeatLoop(interval time.Duration) {
g.Start(func(ctx context.Context) {
g.log(func(l Logger) {
l.Printf("started heartbeat for group, %v [%v]", g.GroupID, interval)
})
defer g.log(func(l Logger) {
l.Printf("stopped heartbeat for group %s\n", g.GroupID)
})
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_, err := g.conn.heartbeat(heartbeatRequestV0{
GroupID: g.GroupID,
GenerationID: g.ID,
MemberID: g.MemberID,
})
if err != nil {
return
}
}
}
})
}
// partitionWatcher queries kafka and watches for partition changes, triggering
// a rebalance if changes are found. Similar to heartbeat it's okay to return on
// error here as if you are unable to ask a broker for basic metadata you're in
// a bad spot and should rebalance. Commonly you will see an error here if there
// is a problem with the connection to the coordinator and a rebalance will
// establish a new connection to the coordinator.
func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
g.Start(func(ctx context.Context) {
g.log(func(l Logger) {
l.Printf("started partition watcher for group, %v, topic %v [%v]", g.GroupID, topic, interval)
})
defer g.log(func(l Logger) {
l.Printf("stopped partition watcher for group, %v, topic %v", g.GroupID, topic)
})
ticker := time.NewTicker(interval)
defer ticker.Stop()
ops, err := g.conn.readPartitions(topic)
if err != nil {
g.logError(func(l Logger) {
l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err)
})
return
}
oParts := len(ops)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
ops, err := g.conn.readPartitions(topic)
switch {
case err == nil, errors.Is(err, UnknownTopicOrPartition):
if len(ops) != oParts {
g.log(func(l Logger) {
l.Printf("Partition changes found, rebalancing group: %v.", g.GroupID)
})
return
}
default:
g.logError(func(l Logger) {
l.Printf("Problem getting partitions while checking for changes, %v", err)
})
var kafkaError Error
if errors.As(err, &kafkaError) {
continue
}
// other errors imply that we lost the connection to the coordinator, so we
// should abort and reconnect.
return
}
}
}
})
}
// coordinator is a subset of the functionality in Conn in order to facilitate
// testing the consumer group...especially for error conditions that are
// difficult to instigate with a live broker running in docker.
type coordinator interface {
io.Closer
findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
joinGroup(joinGroupRequest) (joinGroupResponse, error)
syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error)
leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error)
heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error)
offsetFetch(offsetFetchRequestV1) (offsetFetchResponseV1, error)
offsetCommit(offsetCommitRequestV2) (offsetCommitResponseV2, error)
readPartitions(...string) ([]Partition, error)
}
// timeoutCoordinator wraps the Conn to ensure that every operation has a
// deadline. Otherwise, it would be possible for requests to block indefinitely
// if the remote server never responds. There are many spots where the consumer
// group needs to interact with the broker, so it feels less error prone to
// factor all of the deadline management into this shared location as opposed to
// peppering it all through where the code actually interacts with the broker.
type timeoutCoordinator struct {
timeout time.Duration
sessionTimeout time.Duration
rebalanceTimeout time.Duration
conn *Conn
}
func (t *timeoutCoordinator) Close() error {
return t.conn.Close()
}
func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return findCoordinatorResponseV0{}, err
}
return t.conn.findCoordinator(req)
}
func (t *timeoutCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) {
// in the case of join group, the consumer group coordinator may wait up
// to rebalance timeout in order to wait for all members to join.
if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil {
return joinGroupResponse{}, err
}
return t.conn.joinGroup(req)
}
func (t *timeoutCoordinator) syncGroup(req syncGroupRequestV0) (syncGroupResponseV0, error) {
// in the case of sync group, the consumer group leader is given up to
// the session timeout to respond before the coordinator will give up.
if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.sessionTimeout)); err != nil {
return syncGroupResponseV0{}, err
}
return t.conn.syncGroup(req)
}
func (t *timeoutCoordinator) leaveGroup(req leaveGroupRequestV0) (leaveGroupResponseV0, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return leaveGroupResponseV0{}, err
}
return t.conn.leaveGroup(req)
}
func (t *timeoutCoordinator) heartbeat(req heartbeatRequestV0) (heartbeatResponseV0, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return heartbeatResponseV0{}, err
}
return t.conn.heartbeat(req)
}
func (t *timeoutCoordinator) offsetFetch(req offsetFetchRequestV1) (offsetFetchResponseV1, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return offsetFetchResponseV1{}, err
}
return t.conn.offsetFetch(req)
}
func (t *timeoutCoordinator) offsetCommit(req offsetCommitRequestV2) (offsetCommitResponseV2, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return offsetCommitResponseV2{}, err
}
return t.conn.offsetCommit(req)
}
func (t *timeoutCoordinator) readPartitions(topics ...string) ([]Partition, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return nil, err
}
return t.conn.ReadPartitions(topics...)
}
// NewConsumerGroup creates a new ConsumerGroup. It returns an error if the
// provided configuration is invalid. It does not attempt to connect to the
// Kafka cluster. That happens asynchronously, and any errors will be reported
// by Next.
func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) {
if err := config.Validate(); err != nil {
return nil, err
}
cg := &ConsumerGroup{
config: config,
next: make(chan *Generation),
errs: make(chan error),
done: make(chan struct{}),
}
cg.wg.Add(1)
go func() {
cg.run()
cg.wg.Done()
}()
return cg, nil
}
// ConsumerGroup models a Kafka consumer group. A caller doesn't interact with
// the group directly. Rather, they interact with a Generation. Every time a
// member enters or exits the group, it results in a new Generation. The
// Generation is where partition assignments and offset management occur.
// Callers will use Next to get a handle to the Generation.
type ConsumerGroup struct {
config ConsumerGroupConfig
next chan *Generation
errs chan error
closeOnce sync.Once
wg sync.WaitGroup
done chan struct{}
}
// Close terminates the current generation by causing this member to leave and
// releases all local resources used to participate in the consumer group.
// Close will also end the current generation if it is still active.
func (cg *ConsumerGroup) Close() error {
cg.closeOnce.Do(func() {
close(cg.done)
})
cg.wg.Wait()
return nil
}
// Next waits for the next consumer group generation. There will never be two
// active generations. Next will never return a new generation until the
// previous one has completed.
//
// If there are errors setting up the next generation, they will be surfaced
// here.
//
// If the ConsumerGroup has been closed, then Next will return ErrGroupClosed.
func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-cg.done:
return nil, ErrGroupClosed
case err := <-cg.errs:
return nil, err
case next := <-cg.next:
return next, nil
}
}
func (cg *ConsumerGroup) run() {
// the memberID is the only piece of information that is maintained across
// generations. it starts empty and will be assigned on the first nextGeneration
// when the joinGroup request is processed. it may change again later if
// the CG coordinator fails over or if the member is evicted. otherwise, it
// will be constant for the lifetime of this group.
var memberID string
var err error
for {
memberID, err = cg.nextGeneration(memberID)
// backoff will be set if this go routine should sleep before continuing
// to the next generation. it will be non-nil in the case of an error
// joining or syncing the group.
var backoff <-chan time.Time
switch {
case err == nil:
// no error...the previous generation finished normally.
continue
case errors.Is(err, ErrGroupClosed):
// the CG has been closed...leave the group and exit loop.
_ = cg.leaveGroup(memberID)
return
case errors.Is(err, RebalanceInProgress):
// in case of a RebalanceInProgress, don't leave the group or
// change the member ID, but report the error. the next attempt
// to join the group will then be subject to the rebalance
// timeout, so the broker will be responsible for throttling
// this loop.
default:
// leave the group and report the error if we had gotten far
// enough so as to have a member ID. also clear the member id
// so we don't attempt to use it again. in order to avoid
// a tight error loop, backoff before the next attempt to join
// the group.
_ = cg.leaveGroup(memberID)
memberID = ""
backoff = time.After(cg.config.JoinGroupBackoff)
}
// ensure that we exit cleanly in case the CG is done and no one is
// waiting to receive on the unbuffered error channel.
select {
case <-cg.done:
return
case cg.errs <- err:
}
// backoff if needed, being sure to exit cleanly if the CG is done.
if backoff != nil {
select {
case <-cg.done:
// exit cleanly if the group is closed.
return
case <-backoff:
}
}
}
}
func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
// get a new connection to the coordinator on each loop. the previous
// generation could have exited due to losing the connection, so this
// ensures that we always have a clean starting point. it means we will
// re-connect in certain cases, but that shouldn't be an issue given that
// rebalances are relatively infrequent under normal operating
// conditions.
conn, err := cg.coordinator()
if err != nil {
cg.withErrorLogger(func(log Logger) {
log.Printf("Unable to establish connection to consumer group coordinator for group %s: %v", cg.config.ID, err)
})
return memberID, err // a prior memberID may still be valid, so don't return ""
}
defer conn.Close()
var generationID int32
var groupAssignments GroupMemberAssignments
var assignments map[string][]int32
// join group. this will join the group and prepare assignments if our
// consumer is elected leader. it may also change or assign the member ID.
memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID)
if err != nil {
cg.withErrorLogger(func(log Logger) {
log.Printf("Failed to join group %s: %v", cg.config.ID, err)
})
return memberID, err
}
cg.withLogger(func(log Logger) {
log.Printf("Joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
})
// sync group
assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments)
if err != nil {
cg.withErrorLogger(func(log Logger) {
log.Printf("Failed to sync group %s: %v", cg.config.ID, err)
})
return memberID, err
}
// fetch initial offsets.
var offsets map[string]map[int]int64
offsets, err = cg.fetchOffsets(conn, assignments)
if err != nil {
cg.withErrorLogger(func(log Logger) {
log.Printf("Failed to fetch offsets for group %s: %v", cg.config.ID, err)
})
return memberID, err
}
// create the generation.
gen := Generation{
ID: generationID,
GroupID: cg.config.ID,
MemberID: memberID,
Assignments: cg.makeAssignments(assignments, offsets),
conn: conn,
done: make(chan struct{}),
joined: make(chan struct{}),
retentionMillis: int64(cg.config.RetentionTime / time.Millisecond),
log: cg.withLogger,
logError: cg.withErrorLogger,
}
// spawn all of the go routines required to facilitate this generation. if
// any of these functions exit, then the generation is determined to be
// complete.
gen.heartbeatLoop(cg.config.HeartbeatInterval)
if cg.config.WatchPartitionChanges {
for _, topic := range cg.config.Topics {
gen.partitionWatcher(cg.config.PartitionWatchInterval, topic)
}
}
// make this generation available for retrieval. if the CG is closed before
// we can send it on the channel, exit. that case is required b/c the next
// channel is unbuffered. if the caller to Next has already bailed because
// it's own teardown logic has been invoked, this would deadlock otherwise.
select {
case <-cg.done:
gen.close()
return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
case cg.next <- &gen:
}
// wait for generation to complete. if the CG is closed before the
// generation is finished, exit and leave the group.
select {
case <-cg.done:
gen.close()
return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
case <-gen.done:
// time for next generation! make sure all the current go routines exit
// before continuing onward.
gen.close()
return memberID, nil
}
}
// connect returns a connection to ANY broker.
func makeConnect(config ConsumerGroupConfig) func(dialer *Dialer, brokers ...string) (coordinator, error) {
return func(dialer *Dialer, brokers ...string) (coordinator, error) {
var err error
for _, broker := range brokers {
var conn *Conn
if conn, err = dialer.Dial("tcp", broker); err == nil {
return &timeoutCoordinator{
conn: conn,
timeout: config.Timeout,
sessionTimeout: config.SessionTimeout,
rebalanceTimeout: config.RebalanceTimeout,
}, nil
}
}
return nil, err // err will be non-nil
}
}
// coordinator establishes a connection to the coordinator for this consumer
// group.
func (cg *ConsumerGroup) coordinator() (coordinator, error) {
// NOTE : could try to cache the coordinator to avoid the double connect
// here. since consumer group balances happen infrequently and are
// an expensive operation, we're not currently optimizing that case
// in order to keep the code simpler.
conn, err := cg.config.connect(cg.config.Dialer, cg.config.Brokers...)
if err != nil {
return nil, err
}
defer conn.Close()
out, err := conn.findCoordinator(findCoordinatorRequestV0{
CoordinatorKey: cg.config.ID,
})
if err == nil && out.ErrorCode != 0 {
err = Error(out.ErrorCode)
}
if err != nil {
return nil, err
}
address := net.JoinHostPort(out.Coordinator.Host, strconv.Itoa(int(out.Coordinator.Port)))
return cg.config.connect(cg.config.Dialer, address)
}
// joinGroup attempts to join the reader to the consumer group.
// Returns GroupMemberAssignments is this Reader was selected as
// the leader. Otherwise, GroupMemberAssignments will be nil.
//
// Possible kafka error codes returned:
// * GroupLoadInProgress:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * InconsistentGroupProtocol:
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
request, err := cg.makeJoinGroupRequest(memberID)
if err != nil {
return "", 0, nil, err
}
response, err := conn.joinGroup(request)
if err == nil && response.ErrorCode != 0 {
err = Error(response.ErrorCode)
}
if err != nil {
return "", 0, nil, err
}
memberID = response.MemberID
generationID := response.GenerationID
cg.withLogger(func(l Logger) {
l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
})
var assignments GroupMemberAssignments
if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
v, err := cg.assignTopicPartitions(conn, response)
if err != nil {
return memberID, 0, nil, err
}
assignments = v
cg.withLogger(func(l Logger) {
for memberID, assignment := range assignments {
for topic, partitions := range assignment {
l.Printf("assigned member/topic/partitions %v/%v/%v", memberID, topic, partitions)
}
}
})
}
cg.withLogger(func(l Logger) {
l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID)
})
return memberID, generationID, assignments, nil
}
// makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
// request.
func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (joinGroupRequest, error) {
request := joinGroupRequest{
GroupID: cg.config.ID,
MemberID: memberID,
SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond),
RebalanceTimeout: int32(cg.config.RebalanceTimeout / time.Millisecond),
ProtocolType: defaultProtocolType,
}
for _, balancer := range cg.config.GroupBalancers {
userData, err := balancer.UserData()
if err != nil {
return joinGroupRequest{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
}
request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
ProtocolName: balancer.ProtocolName(),
ProtocolMetadata: groupMetadata{
Version: 1,
Topics: cg.config.Topics,
UserData: userData,
}.bytes(),
})
}
return request, nil
}
// assignTopicPartitions uses the selected GroupBalancer to assign members to
// their various partitions.
func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponse) (GroupMemberAssignments, error) {
cg.withLogger(func(l Logger) {
l.Printf("selected as leader for group, %s\n", cg.config.ID)
})
balancer, ok := findGroupBalancer(group.GroupProtocol, cg.config.GroupBalancers)
if !ok {
// NOTE : this shouldn't happen in practice...the broker should not
// return successfully from joinGroup unless all members support
// at least one common protocol.
return nil, fmt.Errorf("unable to find selected balancer, %v, for group, %v", group.GroupProtocol, cg.config.ID)
}
members, err := cg.makeMemberProtocolMetadata(group.Members)
if err != nil {
return nil, err
}
topics := extractTopics(members)
partitions, err := conn.readPartitions(topics...)
// it's not a failure if the topic doesn't exist yet. it results in no
// assignments for the topic. this matches the behavior of the official
// clients: java, python, and librdkafka.
// a topic watcher can trigger a rebalance when the topic comes into being.
if err != nil && !errors.Is(err, UnknownTopicOrPartition) {
return nil, err
}
cg.withLogger(func(l Logger) {
l.Printf("using '%v' balancer to assign group, %v", group.GroupProtocol, cg.config.ID)
for _, member := range members {
l.Printf("found member: %v/%#v", member.ID, member.UserData)
}
for _, partition := range partitions {
l.Printf("found topic/partition: %v/%v", partition.Topic, partition.ID)
}
})
return balancer.AssignGroups(members, partitions), nil
}
// makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember.
func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember) ([]GroupMember, error) {
members := make([]GroupMember, 0, len(in))
for _, item := range in {
metadata := groupMetadata{}
reader := bufio.NewReader(bytes.NewReader(item.MemberMetadata))
if remain, err := (&metadata).readFrom(reader, len(item.MemberMetadata)); err != nil || remain != 0 {
return nil, fmt.Errorf("unable to read metadata for member, %v: %w", item.MemberID, err)
}
members = append(members, GroupMember{
ID: item.MemberID,
Topics: metadata.Topics,
UserData: metadata.UserData,
})
}
return members, nil
}
// syncGroup completes the consumer group nextGeneration by accepting the
// memberAssignments (if this Reader is the leader) and returning this
// Readers subscriptions topic => partitions
//
// Possible kafka error codes returned:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * IllegalGeneration:
// * RebalanceInProgress:
// * GroupAuthorizationFailed:
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
response, err := conn.syncGroup(request)
if err == nil && response.ErrorCode != 0 {
err = Error(response.ErrorCode)
}
if err != nil {
return nil, err
}
assignments := groupAssignment{}
reader := bufio.NewReader(bytes.NewReader(response.MemberAssignments))
if _, err := (&assignments).readFrom(reader, len(response.MemberAssignments)); err != nil {
return nil, err
}
if len(assignments.Topics) == 0 {
cg.withLogger(func(l Logger) {
l.Printf("received empty assignments for group, %v as member %s for generation %d", cg.config.ID, memberID, generationID)
})
}
cg.withLogger(func(l Logger) {
l.Printf("sync group finished for group, %v", cg.config.ID)
})
return assignments.Topics, nil
}
func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID int32, memberAssignments GroupMemberAssignments) syncGroupRequestV0 {
request := syncGroupRequestV0{
GroupID: cg.config.ID,
GenerationID: generationID,
MemberID: memberID,
}
if memberAssignments != nil {
request.GroupAssignments = make([]syncGroupRequestGroupAssignmentV0, 0, 1)
for memberID, topics := range memberAssignments {
topics32 := make(map[string][]int32)
for topic, partitions := range topics {
partitions32 := make([]int32, len(partitions))
for i := range partitions {
partitions32[i] = int32(partitions[i])
}
topics32[topic] = partitions32
}
request.GroupAssignments = append(request.GroupAssignments, syncGroupRequestGroupAssignmentV0{
MemberID: memberID,
MemberAssignments: groupAssignment{
Version: 1,
Topics: topics32,
}.bytes(),
})
}
cg.withLogger(func(logger Logger) {
logger.Printf("Syncing %d assignments for generation %d as member %s", len(request.GroupAssignments), generationID, memberID)
})
}
return request
}
func (cg *ConsumerGroup) fetchOffsets(conn coordinator, subs map[string][]int32) (map[string]map[int]int64, error) {
req := offsetFetchRequestV1{
GroupID: cg.config.ID,
Topics: make([]offsetFetchRequestV1Topic, 0, len(cg.config.Topics)),
}
for _, topic := range cg.config.Topics {
req.Topics = append(req.Topics, offsetFetchRequestV1Topic{
Topic: topic,
Partitions: subs[topic],
})
}
offsets, err := conn.offsetFetch(req)
if err != nil {
return nil, err
}
offsetsByTopic := make(map[string]map[int]int64)
for _, res := range offsets.Responses {
offsetsByPartition := map[int]int64{}
offsetsByTopic[res.Topic] = offsetsByPartition
for _, pr := range res.PartitionResponses {
for _, partition := range subs[res.Topic] {
if partition == pr.Partition {
offset := pr.Offset
if offset < 0 {
offset = cg.config.StartOffset
}
offsetsByPartition[int(partition)] = offset
}
}
}
}
return offsetsByTopic, nil
}
func (cg *ConsumerGroup) makeAssignments(assignments map[string][]int32, offsets map[string]map[int]int64) map[string][]PartitionAssignment {
topicAssignments := make(map[string][]PartitionAssignment)
for _, topic := range cg.config.Topics {
topicPartitions := assignments[topic]
topicAssignments[topic] = make([]PartitionAssignment, 0, len(topicPartitions))
for _, partition := range topicPartitions {
var offset int64
partitionOffsets, ok := offsets[topic]
if ok {
offset, ok = partitionOffsets[int(partition)]
}
if !ok {
offset = cg.config.StartOffset
}
topicAssignments[topic] = append(topicAssignments[topic], PartitionAssignment{
ID: int(partition),
Offset: offset,
})
}
}
return topicAssignments
}
func (cg *ConsumerGroup) leaveGroup(memberID string) error {
// don't attempt to leave the group if no memberID was ever assigned.
if memberID == "" {
return nil
}
cg.withLogger(func(log Logger) {
log.Printf("Leaving group %s, member %s", cg.config.ID, memberID)
})
// IMPORTANT : leaveGroup establishes its own connection to the coordinator
// because it is often called after some other operation failed.
// said failure could be the result of connection-level issues,
// so we want to re-establish the connection to ensure that we
// are able to process the cleanup step.
coordinator, err := cg.coordinator()
if err != nil {
return err
}
_, err = coordinator.leaveGroup(leaveGroupRequestV0{
GroupID: cg.config.ID,
MemberID: memberID,
})
if err != nil {
cg.withErrorLogger(func(log Logger) {
log.Printf("leave group failed for group, %v, and member, %v: %v", cg.config.ID, memberID, err)
})
}
_ = coordinator.Close()
return err
}
func (cg *ConsumerGroup) withLogger(do func(Logger)) {
if cg.config.Logger != nil {
do(cg.config.Logger)
}
}
func (cg *ConsumerGroup) withErrorLogger(do func(Logger)) {
if cg.config.ErrorLogger != nil {
do(cg.config.ErrorLogger)
} else {
cg.withLogger(do)
}
}
|