1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
package transfer
import (
"bytes"
"encoding/binary"
"errors"
"hash/crc32"
"io"
"math"
hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
"google.golang.org/protobuf/proto"
)
var errInvalidChecksum = errors.New("invalid checksum")
// blockReadStream implements io.Reader for reading a packet stream for a single
// block from a single datanode.
type blockReadStream struct {
reader io.Reader
checksumTab *crc32.Table
chunkSize int
checksums bytes.Buffer
chunk bytes.Buffer
packetLength int
chunkIndex int
numChunks int
lastPacket bool
}
func newBlockReadStream(reader io.Reader, chunkSize int, checksumTab *crc32.Table) *blockReadStream {
return &blockReadStream{
reader: reader,
chunkSize: chunkSize,
checksumTab: checksumTab,
}
}
func (s *blockReadStream) Read(b []byte) (int, error) {
if s.chunkIndex == s.numChunks {
if s.lastPacket {
return 0, io.EOF
}
err := s.startPacket()
if err != nil {
return 0, err
}
}
remainingInPacket := (s.packetLength - (s.chunkIndex * s.chunkSize))
// For small reads, we need to buffer a single chunk. If we did that
// previously, read the rest of the buffer, so we're aligned back on a
// chunk boundary.
if s.chunk.Len() > 0 {
n, _ := s.chunk.Read(b)
return n, nil
} else if len(b) < s.chunkSize {
chunkSize := s.chunkSize
if chunkSize > remainingInPacket {
chunkSize = remainingInPacket
}
_, err := io.CopyN(&s.chunk, s.reader, int64(chunkSize))
if err != nil {
return 0, err
}
err = s.validateChecksum(s.chunk.Bytes())
if err != nil {
return 0, err
}
s.chunkIndex++
n, _ := s.chunk.Read(b)
return n, nil
}
// Always align reads to a chunk boundary. This makes the code much simpler,
// and with readers that pick sane read sizes (like io.Copy), should be
// efficient.
var amountToRead int
var chunksToRead int
if len(b) > remainingInPacket {
chunksToRead = s.numChunks - s.chunkIndex
amountToRead = remainingInPacket
} else {
chunksToRead = len(b) / s.chunkSize
amountToRead = chunksToRead * s.chunkSize
}
n, err := io.ReadFull(s.reader, b[:amountToRead])
if err != nil {
return n, err
}
// Validate the bytes we just read into b against the packet checksums.
for i := 0; i < chunksToRead; i++ {
chunkOff := i * s.chunkSize
chunkEnd := chunkOff + s.chunkSize
if chunkEnd >= n {
chunkEnd = n
}
err := s.validateChecksum(b[chunkOff:chunkEnd])
if err != nil {
return n, err
}
s.chunkIndex++
}
// EOF would be returned by the next call to Read anyway, but it's nice to
// return it here.
if s.chunkIndex == s.numChunks && s.lastPacket {
err = io.EOF
}
return n, err
}
func (s *blockReadStream) validateChecksum(b []byte) error {
checksumOffset := 4 * s.chunkIndex
checksumBytes := s.checksums.Bytes()[checksumOffset : checksumOffset+4]
checksum := binary.BigEndian.Uint32(checksumBytes)
crc := crc32.Checksum(b, s.checksumTab)
if crc != checksum {
return errInvalidChecksum
}
return nil
}
func (s *blockReadStream) startPacket() error {
header, err := s.readPacketHeader()
if err != nil {
return err
}
dataLength := int(header.GetDataLen())
numChunks := int(math.Ceil(float64(dataLength) / float64(s.chunkSize)))
// TODO don't assume checksum size is 4
checksumsLength := numChunks * 4
s.checksums.Reset()
s.checksums.Grow(checksumsLength)
_, err = io.CopyN(&s.checksums, s.reader, int64(checksumsLength))
if err != nil {
return err
}
s.packetLength = dataLength
s.numChunks = numChunks
s.chunkIndex = 0
s.lastPacket = header.GetLastPacketInBlock()
return nil
}
func (s *blockReadStream) readPacketHeader() (*hdfs.PacketHeaderProto, error) {
lengthBytes := make([]byte, 6)
_, err := io.ReadFull(s.reader, lengthBytes)
if err != nil {
return nil, err
}
// We don't actually care about the total length.
packetHeaderLength := binary.BigEndian.Uint16(lengthBytes[4:])
packetHeaderBytes := make([]byte, packetHeaderLength)
_, err = io.ReadFull(s.reader, packetHeaderBytes)
if err != nil {
return nil, err
}
packetHeader := &hdfs.PacketHeaderProto{}
err = proto.Unmarshal(packetHeaderBytes, packetHeader)
return packetHeader, nil
}
|