1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
|
package rpc
import (
"errors"
"io"
hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common"
"google.golang.org/protobuf/proto"
)
var errUnexpectedSequenceNumber = errors.New("unexpected sequence number")
type transport interface {
writeRequest(w io.Writer, method string, requestID int32, req proto.Message) error
readResponse(r io.Reader, method string, requestID int32, resp proto.Message) error
}
// basicTransport implements plain RPC.
type basicTransport struct {
// clientID is the client ID of this writer.
clientID []byte
}
// writeRequest writes an RPC message.
//
// A request packet:
// +-----------------------------------------------------------+
// | uint32 length of the next three parts |
// +-----------------------------------------------------------+
// | varint length + RpcRequestHeaderProto |
// +-----------------------------------------------------------+
// | varint length + RequestHeaderProto |
// +-----------------------------------------------------------+
// | varint length + Request |
// +-----------------------------------------------------------+
func (t *basicTransport) writeRequest(w io.Writer, method string, requestID int32, req proto.Message) error {
rrh := newRPCRequestHeader(requestID, t.clientID)
rh := newRequestHeader(method)
reqBytes, err := makeRPCPacket(rrh, rh, req)
if err != nil {
return err
}
_, err = w.Write(reqBytes)
return err
}
// ReadResponse reads a response message.
//
// A response from the namenode:
// +-----------------------------------------------------------+
// | uint32 length of the next two parts |
// +-----------------------------------------------------------+
// | varint length + RpcResponseHeaderProto |
// +-----------------------------------------------------------+
// | varint length + Response |
// +-----------------------------------------------------------+
func (t *basicTransport) readResponse(r io.Reader, method string, requestID int32, resp proto.Message) error {
rrh := &hadoop.RpcResponseHeaderProto{}
err := readRPCPacket(r, rrh, resp)
if err != nil {
return err
} else if int32(rrh.GetCallId()) != requestID {
return errUnexpectedSequenceNumber
} else if rrh.GetStatus() != hadoop.RpcResponseHeaderProto_SUCCESS {
return &NamenodeError{
method: method,
message: rrh.GetErrorMsg(),
code: int(rrh.GetErrorDetail()),
exception: rrh.GetExceptionClassName(),
}
}
return nil
}
|