File: storage.go

package info (click to toggle)
burrow 1.2.1-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 864 kB
  • sloc: sh: 59; makefile: 6
file content (185 lines) | stat: -rw-r--r-- 8,032 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/* Copyright 2017 LinkedIn Corp. Licensed under the Apache License, Version
 * 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 */

package protocol

import "encoding/json"

// StorageRequestConstant is used in StorageRequest to indicate the type of request. Numeric ordering is not important
type StorageRequestConstant int

const (
	// StorageSetBrokerOffset is the request type to store a broker offset. Requires Cluster, Topic, Partition,
	// TopicPartitionCount, and Offset fields
	StorageSetBrokerOffset StorageRequestConstant = 0

	// StorageSetConsumerOffset is the request type to store a consumer offset. Requires Cluster, Group, Topic,
	// Partition, Offset, and Timestamp fields
	StorageSetConsumerOffset StorageRequestConstant = 1

	// StorageSetConsumerOwner is the request type to store a consumer owner. Requires Cluster, Group, Topic, Partition,
	// and Owner fields
	StorageSetConsumerOwner StorageRequestConstant = 2

	// StorageSetDeleteTopic is the request type to remove a topic from the broker and all consumers. Requires Cluster,
	// Group, and Topic fields
	StorageSetDeleteTopic StorageRequestConstant = 3

	// StorageSetDeleteGroup is the request type to remove a consumer group. Requires Cluster and Group fields
	StorageSetDeleteGroup StorageRequestConstant = 4

	// StorageFetchClusters is the request type to retrieve a list of clusters. Requires Reply. Returns a []string
	StorageFetchClusters StorageRequestConstant = 5

	// StorageFetchConsumers is the request type to retrieve a list of consumer groups in a cluster. Requires Reply and
	// Cluster fields. Returns a []string
	StorageFetchConsumers StorageRequestConstant = 6

	// StorageFetchTopics is the request type to retrieve a list of topics in a cluster. Requires Reply and Cluster
	// fields. Returns a []string
	StorageFetchTopics StorageRequestConstant = 7

	// StorageFetchConsumer is the request type to retrieve all stored information for a single consumer group. Requires
	// Reply, Cluster, and Group fields. Returns a ConsumerTopics object
	StorageFetchConsumer StorageRequestConstant = 8

	// StorageFetchTopic is the request type to retrieve the current broker offsets (one per partition) for a topic.
	// Requires Reply, Cluster, and Topic fields.
	// Returns a []int64
	StorageFetchTopic StorageRequestConstant = 9

	// StorageClearConsumerOwners is the request type to remove all partition owner information for a single group.
	// Requires Cluster and Group fields
	StorageClearConsumerOwners StorageRequestConstant = 10

	// StorageFetchConsumersForTopic is the request type to obtain a list of all consumer groups consuming from a topic.
	// Returns a []string
	StorageFetchConsumersForTopic StorageRequestConstant = 11
)

var storageRequestStrings = [...]string{
	"StorageSetBrokerOffset",
	"StorageSetConsumerOffset",
	"StorageSetConsumerOwner",
	"StorageSetDeleteTopic",
	"StorageSetDeleteGroup",
	"StorageFetchClusters",
	"StorageFetchConsumers",
	"StorageFetchTopics",
	"StorageFetchConsumer",
	"StorageFetchTopic",
	"StorageClearConsumerOwners",
	"StorageFetchConsumersForTopic",
}

// String returns a string representation of a StorageRequestConstant for logging
func (c StorageRequestConstant) String() string {
	if (c >= 0) && (c < StorageRequestConstant(len(storageRequestStrings))) {
		return storageRequestStrings[c]
	}
	return "UNKNOWN"
}

// MarshalText implements the encoding.TextMarshaler interface. The status is the string representation of
// StorageRequestConstant
func (c StorageRequestConstant) MarshalText() ([]byte, error) {
	return []byte(c.String()), nil
}

// MarshalJSON implements the json.Marshaler interface. The status is the string representation of
// StorageRequestConstant
func (c StorageRequestConstant) MarshalJSON() ([]byte, error) {
	return json.Marshal(c.String())
}

// StorageRequest is sent over the StorageChannel that is stored in the application context. It is a query to either
// send information to the storage subsystem, or retrieve information from it . The RequestType indiciates the
// particular type of request. "Set" and "Clear" requests do not get a response. "Fetch" requests will send a response
// over the Reply channel supplied in the request
type StorageRequest struct {
	// The type of request that this struct encapsulates
	RequestType StorageRequestConstant

	// If the RequestType is a "Fetch" request, Reply must contain a channel to receive the response on
	Reply chan interface{}

	// The name of the cluster to which the request applies. Required for all request types except StorageFetchClusters
	Cluster string

	// The name of the consumer group to which the request applies
	Group string

	// The name of the topic to which the request applies
	Topic string

	// The ID of the partition to which the request applies
	Partition int32

	// For StorageSetBrokerOffset requests, TopicPartitionCount indiciates the total number of partitions for the topic
	TopicPartitionCount int32

	// For StorageSetBrokerOffset and StorageSetConsumerOffset requests, the offset to store
	Offset int64

	// For StorageSetConsumerOffset requests, the timestamp of the offset being stored
	Timestamp int64

	// For StorageSetConsumerOwner requests, a string describing the consumer host that owns the partition
	Owner string

	// For StorageSetConsumerOwner requests, a string containing the client_id set by the consumer
	ClientID string
}

// ConsumerPartition represents the information stored for a group for a single partition. It is used as part of the
// response to a StorageFetchConsumer request
type ConsumerPartition struct {
	// A slice containing a ConsumerOffset object for each offset Burrow has stored for this partition. This can be any
	// length up to the number of intervals Burrow has been configured to store, depending on how many offset commits
	// have been seen for this partition
	Offsets []*ConsumerOffset `json:"offsets"`

	// A slice containing the history of broker offsets stored for this partition. This is used for evaluation only,
	// and as such it is not provided when encoding to JSON (for HTTP responses)
	BrokerOffsets []int64 `json:"-"`

	// A string that describes the consumer host that currently owns this partition, if the information is available
	// (for active new consumers)
	Owner string `json:"owner"`

	// A string containing the client_id set by the consumer (for active new consumers)
	ClientID string `json:"client_id"`

	// The current number of messages that the consumer is behind for this partition. This is calculated using the
	// last committed offset and the current broker end offset
	CurrentLag uint64 `json:"current-lag"`
}

// ConsumerOffset represents a single offset stored. It is used as part of the response to a StorageFetchConsumer
// request
type ConsumerOffset struct {
	// The offset that is stored
	Offset int64 `json:"offset"`

	// The timestamp at which the offset was committed
	Timestamp int64 `json:"timestamp"`

	// The number of messages that the consumer was behind at the time that the offset was committed. This number is
	// not updated after the offset was committed, so it does not represent the current lag of the consumer.
	Lag uint64 `json:"lag"`
}

// ConsumerTopics is the response that is sent for a StorageFetchConsumer request. It is a map of topic names to
// ConsumerPartitions objects that describe that topic
type ConsumerTopics map[string]ConsumerPartitions

// ConsumerPartitions describes all partitions for a single topic. The index indicates the partition ID, and the value
// is a pointer to a ConsumerPartition object with the offset information for that partition.
type ConsumerPartitions []*ConsumerPartition