1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
|
package nats
import (
"context"
"encoding/json"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/transport"
"github.com/go-kit/log"
"github.com/nats-io/nats.go"
)
// Subscriber wraps an endpoint and provides nats.MsgHandler.
type Subscriber struct {
e endpoint.Endpoint
dec DecodeRequestFunc
enc EncodeResponseFunc
before []RequestFunc
after []SubscriberResponseFunc
errorEncoder ErrorEncoder
finalizer []SubscriberFinalizerFunc
errorHandler transport.ErrorHandler
}
// NewSubscriber constructs a new subscriber, which provides nats.MsgHandler and wraps
// the provided endpoint.
func NewSubscriber(
e endpoint.Endpoint,
dec DecodeRequestFunc,
enc EncodeResponseFunc,
options ...SubscriberOption,
) *Subscriber {
s := &Subscriber{
e: e,
dec: dec,
enc: enc,
errorEncoder: DefaultErrorEncoder,
errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()),
}
for _, option := range options {
option(s)
}
return s
}
// SubscriberOption sets an optional parameter for subscribers.
type SubscriberOption func(*Subscriber)
// SubscriberBefore functions are executed on the publisher request object before the
// request is decoded.
func SubscriberBefore(before ...RequestFunc) SubscriberOption {
return func(s *Subscriber) { s.before = append(s.before, before...) }
}
// SubscriberAfter functions are executed on the subscriber reply after the
// endpoint is invoked, but before anything is published to the reply.
func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption {
return func(s *Subscriber) { s.after = append(s.after, after...) }
}
// SubscriberErrorEncoder is used to encode errors to the subscriber reply
// whenever they're encountered in the processing of a request. Clients can
// use this to provide custom error formatting. By default,
// errors will be published with the DefaultErrorEncoder.
func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption {
return func(s *Subscriber) { s.errorEncoder = ee }
}
// SubscriberErrorLogger is used to log non-terminal errors. By default, no errors
// are logged. This is intended as a diagnostic measure. Finer-grained control
// of error handling, including logging in more detail, should be performed in a
// custom SubscriberErrorEncoder which has access to the context.
// Deprecated: Use SubscriberErrorHandler instead.
func SubscriberErrorLogger(logger log.Logger) SubscriberOption {
return func(s *Subscriber) { s.errorHandler = transport.NewLogErrorHandler(logger) }
}
// SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors
// are ignored. This is intended as a diagnostic measure. Finer-grained control
// of error handling, including logging in more detail, should be performed in a
// custom SubscriberErrorEncoder which has access to the context.
func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption {
return func(s *Subscriber) { s.errorHandler = errorHandler }
}
// SubscriberFinalizer is executed at the end of every request from a publisher through NATS.
// By default, no finalizer is registered.
func SubscriberFinalizer(f ...SubscriberFinalizerFunc) SubscriberOption {
return func(s *Subscriber) { s.finalizer = f }
}
// ServeMsg provides nats.MsgHandler.
func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg) {
return func(msg *nats.Msg) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if len(s.finalizer) > 0 {
defer func() {
for _, f := range s.finalizer {
f(ctx, msg)
}
}()
}
for _, f := range s.before {
ctx = f(ctx, msg)
}
request, err := s.dec(ctx, msg)
if err != nil {
s.errorHandler.Handle(ctx, err)
if msg.Reply == "" {
return
}
s.errorEncoder(ctx, err, msg.Reply, nc)
return
}
response, err := s.e(ctx, request)
if err != nil {
s.errorHandler.Handle(ctx, err)
if msg.Reply == "" {
return
}
s.errorEncoder(ctx, err, msg.Reply, nc)
return
}
for _, f := range s.after {
ctx = f(ctx, nc)
}
if msg.Reply == "" {
return
}
if err := s.enc(ctx, msg.Reply, nc, response); err != nil {
s.errorHandler.Handle(ctx, err)
s.errorEncoder(ctx, err, msg.Reply, nc)
return
}
}
}
// ErrorEncoder is responsible for encoding an error to the subscriber reply.
// Users are encouraged to use custom ErrorEncoders to encode errors to
// their replies, and will likely want to pass and check for their own error
// types.
type ErrorEncoder func(ctx context.Context, err error, reply string, nc *nats.Conn)
// SubscriberFinalizerFunc can be used to perform work at the end of an request
// from a publisher, after the response has been written to the publisher. The principal
// intended use is for request logging.
type SubscriberFinalizerFunc func(ctx context.Context, msg *nats.Msg)
// NopRequestDecoder is a DecodeRequestFunc that can be used for requests that do not
// need to be decoded, and simply returns nil, nil.
func NopRequestDecoder(_ context.Context, _ *nats.Msg) (interface{}, error) {
return nil, nil
}
// EncodeJSONResponse is a EncodeResponseFunc that serializes the response as a
// JSON object to the subscriber reply. Many JSON-over services can use it as
// a sensible default.
func EncodeJSONResponse(_ context.Context, reply string, nc *nats.Conn, response interface{}) error {
b, err := json.Marshal(response)
if err != nil {
return err
}
return nc.Publish(reply, b)
}
// DefaultErrorEncoder writes the error to the subscriber reply.
func DefaultErrorEncoder(_ context.Context, err error, reply string, nc *nats.Conn) {
logger := log.NewNopLogger()
type Response struct {
Error string `json:"err"`
}
var response Response
response.Error = err.Error()
b, err := json.Marshal(response)
if err != nil {
logger.Log("err", err)
return
}
if err := nc.Publish(reply, b); err != nil {
logger.Log("err", err)
}
}
|