File: compress.go

package info (click to toggle)
golang-github-segmentio-kafka-go 0.4.49%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,292 kB
  • sloc: sh: 17; makefile: 10
file content (124 lines) | stat: -rw-r--r-- 2,733 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package compress

import (
	"encoding"
	"fmt"
	"io"
	"strconv"
	"strings"

	"github.com/segmentio/kafka-go/compress/gzip"
	"github.com/segmentio/kafka-go/compress/lz4"
	"github.com/segmentio/kafka-go/compress/snappy"
	"github.com/segmentio/kafka-go/compress/zstd"
)

// Compression represents the compression applied to a record set.
type Compression int8

const (
	None   Compression = 0
	Gzip   Compression = 1
	Snappy Compression = 2
	Lz4    Compression = 3
	Zstd   Compression = 4
)

func (c Compression) Codec() Codec {
	if i := int(c); i >= 0 && i < len(Codecs) {
		return Codecs[i]
	}
	return nil
}

func (c Compression) String() string {
	if codec := c.Codec(); codec != nil {
		return codec.Name()
	}
	return "uncompressed"
}

func (c Compression) MarshalText() ([]byte, error) {
	return []byte(c.String()), nil
}

func (c *Compression) UnmarshalText(b []byte) error {
	switch string(b) {
	case "none", "uncompressed":
		*c = None
		return nil
	}

	for _, codec := range Codecs[None+1:] {
		if codec.Name() == string(b) {
			*c = Compression(codec.Code())
			return nil
		}
	}

	i, err := strconv.ParseInt(string(b), 10, 64)
	if err == nil && i >= 0 && i < int64(len(Codecs)) {
		*c = Compression(i)
		return nil
	}

	s := &strings.Builder{}
	s.WriteString("none, uncompressed")

	for i, codec := range Codecs[None+1:] {
		if i < (len(Codecs) - 1) {
			s.WriteString(", ")
		} else {
			s.WriteString(", or ")
		}
		s.WriteString(codec.Name())
	}

	return fmt.Errorf("compression format must be one of %s, not %q", s, b)
}

var (
	_ encoding.TextMarshaler   = Compression(0)
	_ encoding.TextUnmarshaler = (*Compression)(nil)
)

// Codec represents a compression codec to encode and decode the messages.
// See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
//
// A Codec must be safe for concurrent access by multiple go routines.
type Codec interface {
	// Code returns the compression codec code
	Code() int8

	// Human-readable name for the codec.
	Name() string

	// Constructs a new reader which decompresses data from r.
	NewReader(r io.Reader) io.ReadCloser

	// Constructs a new writer which writes compressed data to w.
	NewWriter(w io.Writer) io.WriteCloser
}

var (
	// The global gzip codec installed on the Codecs table.
	GzipCodec gzip.Codec

	// The global snappy codec installed on the Codecs table.
	SnappyCodec snappy.Codec

	// The global lz4 codec installed on the Codecs table.
	Lz4Codec lz4.Codec

	// The global zstd codec installed on the Codecs table.
	ZstdCodec zstd.Codec

	// The global table of compression codecs supported by the kafka protocol.
	Codecs = [...]Codec{
		None:   nil,
		Gzip:   &GzipCodec,
		Snappy: &SnappyCodec,
		Lz4:    &Lz4Codec,
		Zstd:   &ZstdCodec,
	}
)