1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
|
// Copyright 2022-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package jetstream
import (
"encoding/json"
"fmt"
"time"
)
type (
// ConsumerInfo is the detailed information about a JetStream consumer.
ConsumerInfo struct {
// Stream specifies the name of the stream that the consumer is bound
// to.
Stream string `json:"stream_name"`
// Name represents the unique identifier for the consumer. This can be
// either set explicitly by the client or generated automatically if not
// set.
Name string `json:"name"`
// Created is the timestamp when the consumer was created.
Created time.Time `json:"created"`
// Config contains the configuration settings of the consumer, set when
// creating or updating the consumer.
Config ConsumerConfig `json:"config"`
// Delivered holds information about the most recently delivered
// message, including its sequence numbers and timestamp.
Delivered SequenceInfo `json:"delivered"`
// AckFloor indicates the message before the first unacknowledged
// message.
AckFloor SequenceInfo `json:"ack_floor"`
// NumAckPending is the number of messages that have been delivered but
// not yet acknowledged.
NumAckPending int `json:"num_ack_pending"`
// NumRedelivered counts the number of messages that have been
// redelivered and not yet acknowledged. Each message is counted only
// once, even if it has been redelivered multiple times. This count is
// reset when the message is eventually acknowledged.
NumRedelivered int `json:"num_redelivered"`
// NumWaiting is the count of active pull requests. It is only relevant
// for pull-based consumers.
NumWaiting int `json:"num_waiting"`
// NumPending is the number of messages that match the consumer's
// filter, but have not been delivered yet.
NumPending uint64 `json:"num_pending"`
// Cluster contains information about the cluster to which this consumer
// belongs (if applicable).
Cluster *ClusterInfo `json:"cluster,omitempty"`
// PushBound indicates whether at least one subscription exists for the
// delivery subject of this consumer. This is only applicable to
// push-based consumers.
PushBound bool `json:"push_bound,omitempty"`
// TimeStamp indicates when the info was gathered by the server.
TimeStamp time.Time `json:"ts"`
// PriorityGroups contains the information about the currently defined priority groups
PriorityGroups []PriorityGroupState `json:"priority_groups,omitempty"`
// Paused indicates whether the consumer is paused.
Paused bool `json:"paused,omitempty"`
// PauseRemaining contains the amount of time left until the consumer
// unpauses. It will only be non-zero if the consumer is currently paused.
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
}
PriorityGroupState struct {
// Group this status is for.
Group string `json:"group"`
// PinnedClientID is the generated ID of the pinned client.
PinnedClientID string `json:"pinned_client_id,omitempty"`
// PinnedTS is the timestamp when the client was pinned.
PinnedTS time.Time `json:"pinned_ts,omitempty"`
}
// ConsumerConfig represents the configuration of a JetStream consumer,
// encompassing both push and pull consumer settings
ConsumerConfig struct {
// Name is an optional name for the consumer. If not set, one is
// generated automatically.
//
// Name cannot contain whitespace, ., *, >, path separators (forward or
// backwards slash), and non-printable characters.
Name string `json:"name,omitempty"`
// Durable is an optional durable name for the consumer. If both Durable
// and Name are set, they have to be equal. Unless InactiveThreshold is set, a
// durable consumer will not be cleaned up automatically.
//
// Durable cannot contain whitespace, ., *, >, path separators (forward or
// backwards slash), and non-printable characters.
Durable string `json:"durable_name,omitempty"`
// Description provides an optional description of the consumer.
Description string `json:"description,omitempty"`
// DeliverPolicy defines from which point to start delivering messages
// from the stream. Defaults to DeliverAllPolicy.
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
// OptStartSeq is an optional sequence number from which to start
// message delivery. Only applicable when DeliverPolicy is set to
// DeliverByStartSequencePolicy.
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
// OptStartTime is an optional time from which to start message
// delivery. Only applicable when DeliverPolicy is set to
// DeliverByStartTimePolicy.
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
// AckPolicy defines the acknowledgement policy for the consumer.
// Defaults to AckExplicitPolicy.
AckPolicy AckPolicy `json:"ack_policy"`
// AckWait defines how long the server will wait for an acknowledgement
// before resending a message. If not set, server default is 30 seconds.
AckWait time.Duration `json:"ack_wait,omitempty"`
// MaxDeliver defines the maximum number of delivery attempts for a
// message. Applies to any message that is re-sent due to ack policy.
// If not set, server default is -1 (unlimited).
MaxDeliver int `json:"max_deliver,omitempty"`
// BackOff specifies the optional back-off intervals for retrying
// message delivery after a failed acknowledgement. It overrides
// AckWait.
//
// BackOff only applies to messages not acknowledged in specified time,
// not messages that were nack'ed.
//
// The number of intervals specified must be lower or equal to
// MaxDeliver. If the number of intervals is lower, the last interval is
// used for all remaining attempts.
BackOff []time.Duration `json:"backoff,omitempty"`
// FilterSubject can be used to filter messages delivered from the
// stream. FilterSubject is exclusive with FilterSubjects.
FilterSubject string `json:"filter_subject,omitempty"`
// ReplayPolicy defines the rate at which messages are sent to the
// consumer. If ReplayOriginalPolicy is set, messages are sent in the
// same intervals in which they were stored on stream. This can be used
// e.g. to simulate production traffic in development environments. If
// ReplayInstantPolicy is set, messages are sent as fast as possible.
// Defaults to ReplayInstantPolicy.
ReplayPolicy ReplayPolicy `json:"replay_policy"`
// RateLimit specifies an optional maximum rate of message delivery in
// bits per second.
RateLimit uint64 `json:"rate_limit_bps,omitempty"`
// SampleFrequency is an optional frequency for sampling how often
// acknowledgements are sampled for observability. See
// https://docs.nats.io/running-a-nats-service/nats_admin/monitoring/monitoring_jetstream
SampleFrequency string `json:"sample_freq,omitempty"`
// MaxWaiting is a maximum number of pull requests waiting to be
// fulfilled. If not set, this will inherit settings from stream's
// ConsumerLimits or (if those are not set) from account settings. If
// neither are set, server default is 512.
MaxWaiting int `json:"max_waiting,omitempty"`
// MaxAckPending is a maximum number of outstanding unacknowledged
// messages. Once this limit is reached, the server will suspend sending
// messages to the consumer. If not set, server default is 1000.
// Set to -1 for unlimited.
MaxAckPending int `json:"max_ack_pending,omitempty"`
// HeadersOnly indicates whether only headers of messages should be sent
// (and no payload). Defaults to false.
HeadersOnly bool `json:"headers_only,omitempty"`
// MaxRequestBatch is the optional maximum batch size a single pull
// request can make. When set with MaxRequestMaxBytes, the batch size
// will be constrained by whichever limit is hit first.
MaxRequestBatch int `json:"max_batch,omitempty"`
// MaxRequestExpires is the maximum duration a single pull request will
// wait for messages to be available to pull.
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
// MaxRequestMaxBytes is the optional maximum total bytes that can be
// requested in a given batch. When set with MaxRequestBatch, the batch
// size will be constrained by whichever limit is hit first.
MaxRequestMaxBytes int `json:"max_bytes,omitempty"`
// InactiveThreshold is a duration which instructs the server to clean
// up the consumer if it has been inactive for the specified duration.
// Durable consumers will not be cleaned up by default, but if
// InactiveThreshold is set, they will be. If not set, this will inherit
// settings from stream's ConsumerLimits. If neither are set, server
// default is 5 seconds.
//
// A consumer is considered inactive there are not pull requests
// received by the server (for pull consumers), or no interest detected
// on deliver subject (for push consumers), not if there are no
// messages to be delivered.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
// Replicas the number of replicas for the consumer's state. By default,
// consumers inherit the number of replicas from the stream.
Replicas int `json:"num_replicas"`
// MemoryStorage is a flag to force the consumer to use memory storage
// rather than inherit the storage type from the stream.
MemoryStorage bool `json:"mem_storage,omitempty"`
// FilterSubjects allows filtering messages from a stream by subject.
// This field is exclusive with FilterSubject. Requires nats-server
// v2.10.0 or later.
FilterSubjects []string `json:"filter_subjects,omitempty"`
// Metadata is a set of application-defined key-value pairs for
// associating metadata on the consumer. This feature requires
// nats-server v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`
// PauseUntil is for suspending the consumer until the deadline.
PauseUntil *time.Time `json:"pause_until,omitempty"`
// PriorityPolicy represents he priority policy the consumer is set to.
// Requires nats-server v2.11.0 or later.
PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty"`
// PinnedTTL represents the time after which the client will be unpinned
// if no new pull requests are sent.Used with PriorityPolicyPinned.
// Requires nats-server v2.11.0 or later.
PinnedTTL time.Duration `json:"priority_timeout,omitempty"`
// PriorityGroups is a list of priority groups this consumer supports.
PriorityGroups []string `json:"priority_groups,omitempty"`
// Fields specific for push consumers:
// DeliverSubject is the subject to deliver messages to for push consumers
DeliverSubject string `json:"deliver_subject,omitempty"`
// DeliverGroup is the group name for push consumers
DeliverGroup string `json:"deliver_group,omitempty"`
// FlowControl is a flag to enable flow control for the consumer.
// When set, server will regularly send an empty message with Status
// header 100 and a reply subject, consumers must reply to these
// messages to control the rate of message delivery
FlowControl bool `json:"flow_control,omitempty"`
// IdleHeartbeat enables push consumer idle heartbeat messages.
// If the Consumer is idle for more than the set value, an empty message
// with Status header 100 will be sent indicating the consumer is still
// alive.
IdleHeartbeat time.Duration `json:"idle_heartbeat,omitempty"`
}
// OrderedConsumerConfig is the configuration of an ordered JetStream
// consumer. For more information, see [Ordered Consumers] in README
//
// [Ordered Consumers]: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md#ordered-consumers
OrderedConsumerConfig struct {
// FilterSubjects allows filtering messages from a stream by subject.
// This field is exclusive with FilterSubject. Requires nats-server
// v2.10.0 or later.
FilterSubjects []string `json:"filter_subjects,omitempty"`
// DeliverPolicy defines from which point to start delivering messages
// from the stream. Defaults to DeliverAllPolicy.
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
// OptStartSeq is an optional sequence number from which to start
// message delivery. Only applicable when DeliverPolicy is set to
// DeliverByStartSequencePolicy.
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
// OptStartTime is an optional time from which to start message
// delivery. Only applicable when DeliverPolicy is set to
// DeliverByStartTimePolicy.
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
// ReplayPolicy defines the rate at which messages are sent to the
// consumer. If ReplayOriginalPolicy is set, messages are sent in the
// same intervals in which they were stored on stream. This can be used
// e.g. to simulate production traffic in development environments. If
// ReplayInstantPolicy is set, messages are sent as fast as possible.
// Defaults to ReplayInstantPolicy.
ReplayPolicy ReplayPolicy `json:"replay_policy"`
// InactiveThreshold is a duration which instructs the server to clean
// up the consumer if it has been inactive for the specified duration.
// Defaults to 5m.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
// HeadersOnly indicates whether only headers of messages should be sent
// (and no payload). Defaults to false.
HeadersOnly bool `json:"headers_only,omitempty"`
// Maximum number of attempts for the consumer to be recreated in a
// single recreation cycle. Defaults to unlimited.
MaxResetAttempts int
// Metadata is a set of application-defined key-value pairs for
// associating metadata on the consumer. This feature requires
// nats-server v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`
// NamePrefix is an optional custom prefix for the consumer name.
// If provided, ordered consumer names will be generated as:
// {NamePrefix}_{sequence_number} (e.g., "custom_1", "custom_2").
// If not provided, a unique ID (NUID) will be used as the prefix.
NamePrefix string `json:"-"`
}
// DeliverPolicy determines from which point to start delivering messages.
DeliverPolicy int
// AckPolicy determines how the consumer should acknowledge delivered
// messages.
AckPolicy int
// ReplayPolicy determines how the consumer should replay messages it
// already has queued in the stream.
ReplayPolicy int
// SequenceInfo has both the consumer and the stream sequence and last
// activity.
SequenceInfo struct {
Consumer uint64 `json:"consumer_seq"`
Stream uint64 `json:"stream_seq"`
Last *time.Time `json:"last_active,omitempty"`
}
// PriorityPolicy determines the priority policy the consumer is set to.
PriorityPolicy int
)
const (
// PriorityPolicyNone is the default priority policy.
PriorityPolicyNone PriorityPolicy = iota
// PriorityPolicyPinned is the priority policy that pins a consumer to a
// specific client.
PriorityPolicyPinned
// PriorityPolicyOverflow is the priority policy that allows for
// restricting when a consumer will receive messages based on the number of
// pending messages or acks.
PriorityPolicyOverflow
// PriorityPolicyPrioritized is the priority policy that allows for the
// server to deliver messages to clients based on their priority (instead
// of round-robin). Requires nats-server v2.12.0 or later.
PriorityPolicyPrioritized
)
func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString(""):
*p = PriorityPolicyNone
case jsonString("pinned_client"):
*p = PriorityPolicyPinned
case jsonString("overflow"):
*p = PriorityPolicyOverflow
case jsonString("prioritized"):
*p = PriorityPolicyPrioritized
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
func (p PriorityPolicy) MarshalJSON() ([]byte, error) {
switch p {
case PriorityPolicyNone:
return json.Marshal("")
case PriorityPolicyPinned:
return json.Marshal("pinned_client")
case PriorityPolicyOverflow:
return json.Marshal("overflow")
case PriorityPolicyPrioritized:
return json.Marshal("prioritized")
}
return nil, fmt.Errorf("nats: unknown priority policy %v", p)
}
const (
// DeliverAllPolicy starts delivering messages from the very beginning of a
// stream. This is the default.
DeliverAllPolicy DeliverPolicy = iota
// DeliverLastPolicy will start the consumer with the last sequence
// received.
DeliverLastPolicy
// DeliverNewPolicy will only deliver new messages that are sent after the
// consumer is created.
DeliverNewPolicy
// DeliverByStartSequencePolicy will deliver messages starting from a given
// sequence configured with OptStartSeq in ConsumerConfig.
DeliverByStartSequencePolicy
// DeliverByStartTimePolicy will deliver messages starting from a given time
// configured with OptStartTime in ConsumerConfig.
DeliverByStartTimePolicy
// DeliverLastPerSubjectPolicy will start the consumer with the last message
// for all subjects received.
DeliverLastPerSubjectPolicy
)
func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("all"), jsonString("undefined"):
*p = DeliverAllPolicy
case jsonString("last"):
*p = DeliverLastPolicy
case jsonString("new"):
*p = DeliverNewPolicy
case jsonString("by_start_sequence"):
*p = DeliverByStartSequencePolicy
case jsonString("by_start_time"):
*p = DeliverByStartTimePolicy
case jsonString("last_per_subject"):
*p = DeliverLastPerSubjectPolicy
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
switch p {
case DeliverAllPolicy:
return json.Marshal("all")
case DeliverLastPolicy:
return json.Marshal("last")
case DeliverNewPolicy:
return json.Marshal("new")
case DeliverByStartSequencePolicy:
return json.Marshal("by_start_sequence")
case DeliverByStartTimePolicy:
return json.Marshal("by_start_time")
case DeliverLastPerSubjectPolicy:
return json.Marshal("last_per_subject")
}
return nil, fmt.Errorf("nats: unknown deliver policy %v", p)
}
func (p DeliverPolicy) String() string {
switch p {
case DeliverAllPolicy:
return "all"
case DeliverLastPolicy:
return "last"
case DeliverNewPolicy:
return "new"
case DeliverByStartSequencePolicy:
return "by_start_sequence"
case DeliverByStartTimePolicy:
return "by_start_time"
case DeliverLastPerSubjectPolicy:
return "last_per_subject"
}
return ""
}
const (
// AckExplicitPolicy requires ack or nack for all messages.
AckExplicitPolicy AckPolicy = iota
// AckAllPolicy when acking a sequence number, this implicitly acks all
// sequences below this one as well.
AckAllPolicy
// AckNonePolicy requires no acks for delivered messages.
AckNonePolicy
)
func (p *AckPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("none"):
*p = AckNonePolicy
case jsonString("all"):
*p = AckAllPolicy
case jsonString("explicit"):
*p = AckExplicitPolicy
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
func (p AckPolicy) MarshalJSON() ([]byte, error) {
switch p {
case AckNonePolicy:
return json.Marshal("none")
case AckAllPolicy:
return json.Marshal("all")
case AckExplicitPolicy:
return json.Marshal("explicit")
}
return nil, fmt.Errorf("nats: unknown acknowledgement policy %v", p)
}
func (p AckPolicy) String() string {
switch p {
case AckNonePolicy:
return "AckNone"
case AckAllPolicy:
return "AckAll"
case AckExplicitPolicy:
return "AckExplicit"
}
return "Unknown AckPolicy"
}
const (
// ReplayInstantPolicy will replay messages as fast as possible.
ReplayInstantPolicy ReplayPolicy = iota
// ReplayOriginalPolicy will maintain the same timing as the messages were
// received.
ReplayOriginalPolicy
)
func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("instant"):
*p = ReplayInstantPolicy
case jsonString("original"):
*p = ReplayOriginalPolicy
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
func (p ReplayPolicy) MarshalJSON() ([]byte, error) {
switch p {
case ReplayOriginalPolicy:
return json.Marshal("original")
case ReplayInstantPolicy:
return json.Marshal("instant")
}
return nil, fmt.Errorf("nats: unknown replay policy %v", p)
}
func (p ReplayPolicy) String() string {
switch p {
case ReplayOriginalPolicy:
return "original"
case ReplayInstantPolicy:
return "instant"
}
return ""
}
|