1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
|
package gojay
import (
"strconv"
"sync"
"time"
)
// MarshalerStream is the interface to implement
// to continuously encode of stream of data.
type MarshalerStream interface {
MarshalStream(enc *StreamEncoder)
}
// A StreamEncoder reads and encodes values to JSON from an input stream.
//
// It implements conext.Context and provide a channel to notify interruption.
type StreamEncoder struct {
mux *sync.RWMutex
*Encoder
nConsumer int
delimiter byte
deadline *time.Time
done chan struct{}
}
// EncodeStream spins up a defined number of non blocking consumers of the MarshalerStream m.
//
// m must implement MarshalerStream. Ideally m is a channel. See example for implementation.
//
// See the documentation for Marshal for details about the conversion of Go value to JSON.
func (s *StreamEncoder) EncodeStream(m MarshalerStream) {
// if a single consumer, just use this encoder
if s.nConsumer == 1 {
go consume(s, s, m)
return
}
// else use this Encoder only for first consumer
// and use new encoders for other consumers
// this is to avoid concurrent writing to same buffer
// resulting in a weird JSON
go consume(s, s, m)
for i := 1; i < s.nConsumer; i++ {
s.mux.RLock()
select {
case <-s.done:
default:
ss := Stream.borrowEncoder(s.w)
ss.mux.Lock()
ss.done = s.done
ss.buf = make([]byte, 0, 512)
ss.delimiter = s.delimiter
go consume(s, ss, m)
ss.mux.Unlock()
}
s.mux.RUnlock()
}
return
}
// LineDelimited sets the delimiter to a new line character.
//
// It will add a new line after each JSON marshaled by the MarshalerStream
func (s *StreamEncoder) LineDelimited() *StreamEncoder {
s.delimiter = '\n'
return s
}
// CommaDelimited sets the delimiter to a comma.
//
// It will add a new line after each JSON marshaled by the MarshalerStream
func (s *StreamEncoder) CommaDelimited() *StreamEncoder {
s.delimiter = ','
return s
}
// NConsumer sets the number of non blocking go routine to consume the stream.
func (s *StreamEncoder) NConsumer(n int) *StreamEncoder {
s.nConsumer = n
return s
}
// Release sends back a Decoder to the pool.
// If a decoder is used after calling Release
// a panic will be raised with an InvalidUsagePooledDecoderError error.
func (s *StreamEncoder) Release() {
s.isPooled = 1
streamEncPool.Put(s)
}
// Done returns a channel that's closed when work is done.
// It implements context.Context
func (s *StreamEncoder) Done() <-chan struct{} {
return s.done
}
// Err returns nil if Done is not yet closed.
// If Done is closed, Err returns a non-nil error explaining why.
// It implements context.Context
func (s *StreamEncoder) Err() error {
return s.err
}
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
func (s *StreamEncoder) Deadline() (time.Time, bool) {
if s.deadline != nil {
return *s.deadline, true
}
return time.Time{}, false
}
// SetDeadline sets the deadline
func (s *StreamEncoder) SetDeadline(t time.Time) {
s.deadline = &t
}
// Value implements context.Context
func (s *StreamEncoder) Value(key interface{}) interface{} {
return nil
}
// Cancel cancels the consumers of the stream, interrupting the stream encoding.
//
// After calling cancel, Done() will return a closed channel.
func (s *StreamEncoder) Cancel(err error) {
s.mux.Lock()
defer s.mux.Unlock()
select {
case <-s.done:
default:
s.err = err
close(s.done)
}
}
// AddObject adds an object to be encoded.
// value must implement MarshalerJSONObject.
func (s *StreamEncoder) AddObject(v MarshalerJSONObject) {
if v.IsNil() {
return
}
s.Encoder.writeByte('{')
v.MarshalJSONObject(s.Encoder)
s.Encoder.writeByte('}')
s.Encoder.writeByte(s.delimiter)
}
// AddString adds a string to be encoded.
func (s *StreamEncoder) AddString(v string) {
s.Encoder.writeByte('"')
s.Encoder.writeString(v)
s.Encoder.writeByte('"')
s.Encoder.writeByte(s.delimiter)
}
// AddArray adds an implementation of MarshalerJSONArray to be encoded.
func (s *StreamEncoder) AddArray(v MarshalerJSONArray) {
s.Encoder.writeByte('[')
v.MarshalJSONArray(s.Encoder)
s.Encoder.writeByte(']')
s.Encoder.writeByte(s.delimiter)
}
// AddInt adds an int to be encoded.
func (s *StreamEncoder) AddInt(value int) {
s.buf = strconv.AppendInt(s.buf, int64(value), 10)
s.Encoder.writeByte(s.delimiter)
}
// AddFloat64 adds a float64 to be encoded.
func (s *StreamEncoder) AddFloat64(value float64) {
s.buf = strconv.AppendFloat(s.buf, value, 'f', -1, 64)
s.Encoder.writeByte(s.delimiter)
}
// AddFloat adds a float64 to be encoded.
func (s *StreamEncoder) AddFloat(value float64) {
s.AddFloat64(value)
}
// Non exposed
func consume(init *StreamEncoder, s *StreamEncoder, m MarshalerStream) {
defer s.Release()
for {
select {
case <-init.Done():
return
default:
m.MarshalStream(s)
if s.Encoder.err != nil {
init.Cancel(s.Encoder.err)
return
}
i, err := s.Encoder.Write()
if err != nil || i == 0 {
init.Cancel(err)
return
}
}
}
}
|