1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
package sideband
import (
"errors"
"fmt"
"io"
"github.com/go-git/go-git/v5/plumbing/format/pktline"
)
// ErrMaxPackedExceeded returned by Read, if the maximum packed size is exceeded
var ErrMaxPackedExceeded = errors.New("max. packed size exceeded")
// Progress where the progress information is stored
type Progress interface {
io.Writer
}
// Demuxer demultiplexes the progress reports and error info interleaved with the
// packfile itself.
//
// A sideband has three different channels the main one, called PackData, contains
// the packfile data; the ErrorMessage channel, that contains server errors; and
// the last one, ProgressMessage channel, containing information about the ongoing
// task happening in the server (optional, can be suppressed sending NoProgress
// or Quiet capabilities to the server)
//
// In order to demultiplex the data stream, method `Read` should be called to
// retrieve the PackData channel, the incoming data from the ProgressMessage is
// written at `Progress` (if any), if any message is retrieved from the
// ErrorMessage channel an error is returned and we can assume that the
// connection has been closed.
type Demuxer struct {
t Type
r io.Reader
s *pktline.Scanner
max int
pending []byte
// Progress is where the progress messages are stored
Progress Progress
}
// NewDemuxer returns a new Demuxer for the given t and read from r
func NewDemuxer(t Type, r io.Reader) *Demuxer {
max := MaxPackedSize64k
if t == Sideband {
max = MaxPackedSize
}
return &Demuxer{
t: t,
r: r,
max: max,
s: pktline.NewScanner(r),
}
}
// Read reads up to len(p) bytes from the PackData channel into p, an error can
// be return if an error happens when reading or if a message is sent in the
// ErrorMessage channel.
//
// When a ProgressMessage is read, is not copy to b, instead of this is written
// to the Progress
func (d *Demuxer) Read(b []byte) (n int, err error) {
var read, req int
req = len(b)
for read < req {
n, err := d.doRead(b[read:req])
read += n
if err != nil {
return read, err
}
}
return read, nil
}
func (d *Demuxer) doRead(b []byte) (int, error) {
read, err := d.nextPackData()
size := len(read)
wanted := len(b)
if size > wanted {
d.pending = read[wanted:]
}
if wanted > size {
wanted = size
}
size = copy(b, read[:wanted])
return size, err
}
func (d *Demuxer) nextPackData() ([]byte, error) {
content := d.getPending()
if len(content) != 0 {
return content, nil
}
if !d.s.Scan() {
if err := d.s.Err(); err != nil {
return nil, err
}
return nil, io.EOF
}
content = d.s.Bytes()
size := len(content)
if size == 0 {
return nil, io.EOF
} else if size > d.max {
return nil, ErrMaxPackedExceeded
}
switch Channel(content[0]) {
case PackData:
return content[1:], nil
case ProgressMessage:
if d.Progress != nil {
_, err := d.Progress.Write(content[1:])
return nil, err
}
case ErrorMessage:
return nil, fmt.Errorf("unexpected error: %s", content[1:])
default:
return nil, fmt.Errorf("unknown channel %s", content)
}
return nil, nil
}
func (d *Demuxer) getPending() (b []byte) {
if len(d.pending) == 0 {
return nil
}
content := d.pending
d.pending = nil
return content
}
|