1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
package kafka
import (
"errors"
"sync"
)
var errUnknownCodec = errors.New("invalid codec")
var codecs = make(map[int8]CompressionCodec)
var codecsMutex sync.RWMutex
// RegisterCompressionCodec registers a compression codec so it can be used by a Writer.
func RegisterCompressionCodec(codec func() CompressionCodec) {
c := codec()
codecsMutex.Lock()
codecs[c.Code()] = c
codecsMutex.Unlock()
}
// resolveCodec looks up a codec by Code()
func resolveCodec(code int8) (codec CompressionCodec, err error) {
codecsMutex.RLock()
codec = codecs[code]
codecsMutex.RUnlock()
if codec == nil {
err = errUnknownCodec
}
return
}
// CompressionCodec represents a compression codec to encode and decode
// the messages.
// See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
//
// A CompressionCodec must be safe for concurrent access by multiple go
// routines.
type CompressionCodec interface {
// Code returns the compression codec code
Code() int8
// Encode encodes the src data
Encode(src []byte) ([]byte, error)
// Decode decodes the src data
Decode(src []byte) ([]byte, error)
}
const compressionCodecMask int8 = 0x03
const DefaultCompressionLevel int = -1
const CompressionNoneCode = 0
|