1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
package splunk // import "github.com/docker/docker/daemon/logger/splunk"
import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"sync"
"testing"
)
func (message *splunkMessage) EventAsString() (string, error) {
if val, ok := message.Event.(string); ok {
return val, nil
}
return "", fmt.Errorf("Cannot cast Event %v to string", message.Event)
}
func (message *splunkMessage) EventAsMap() (map[string]interface{}, error) {
if val, ok := message.Event.(map[string]interface{}); ok {
return val, nil
}
return nil, fmt.Errorf("Cannot cast Event %v to map", message.Event)
}
type HTTPEventCollectorMock struct {
tcpAddr *net.TCPAddr
tcpListener *net.TCPListener
mu sync.Mutex
token string
simulateServerError bool
blockingCtx context.Context
test *testing.T
connectionVerified bool
gzipEnabled *bool
messages []*splunkMessage
numOfRequests int
}
func NewHTTPEventCollectorMock(t *testing.T) *HTTPEventCollectorMock {
tcpAddr := &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0, Zone: ""}
tcpListener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
t.Fatal(err)
}
return &HTTPEventCollectorMock{
tcpAddr: tcpAddr,
tcpListener: tcpListener,
token: "4642492F-D8BD-47F1-A005-0C08AE4657DF",
simulateServerError: false,
test: t,
connectionVerified: false}
}
func (hec *HTTPEventCollectorMock) simulateErr(b bool) {
hec.mu.Lock()
hec.simulateServerError = b
hec.mu.Unlock()
}
func (hec *HTTPEventCollectorMock) withBlock(ctx context.Context) {
hec.mu.Lock()
hec.blockingCtx = ctx
hec.mu.Unlock()
}
func (hec *HTTPEventCollectorMock) URL() string {
return "http://" + hec.tcpListener.Addr().String()
}
func (hec *HTTPEventCollectorMock) Serve() error {
return http.Serve(hec.tcpListener, hec)
}
func (hec *HTTPEventCollectorMock) Close() error {
return hec.tcpListener.Close()
}
func (hec *HTTPEventCollectorMock) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
var err error
hec.numOfRequests++
hec.mu.Lock()
simErr := hec.simulateServerError
ctx := hec.blockingCtx
hec.mu.Unlock()
if ctx != nil {
<-hec.blockingCtx.Done()
}
if simErr {
if request.Body != nil {
defer request.Body.Close()
}
writer.WriteHeader(http.StatusInternalServerError)
return
}
switch request.Method {
case http.MethodOptions:
// Verify that options method is getting called only once
if hec.connectionVerified {
hec.test.Errorf("Connection should not be verified more than once. Got second request with %s method.", request.Method)
}
hec.connectionVerified = true
writer.WriteHeader(http.StatusOK)
case http.MethodPost:
// Always verify that Driver is using correct path to HEC
if request.URL.String() != "/services/collector/event/1.0" {
hec.test.Errorf("Unexpected path %v", request.URL)
}
defer request.Body.Close()
if authorization, ok := request.Header["Authorization"]; !ok || authorization[0] != ("Splunk "+hec.token) {
hec.test.Error("Authorization header is invalid.")
}
gzipEnabled := false
if contentEncoding, ok := request.Header["Content-Encoding"]; ok && contentEncoding[0] == "gzip" {
gzipEnabled = true
}
if hec.gzipEnabled == nil {
hec.gzipEnabled = &gzipEnabled
} else if gzipEnabled != *hec.gzipEnabled {
// Nothing wrong with that, but we just know that Splunk Logging Driver does not do that
hec.test.Error("Driver should not change Content Encoding.")
}
var gzipReader *gzip.Reader
var reader io.Reader
if gzipEnabled {
gzipReader, err = gzip.NewReader(request.Body)
if err != nil {
hec.test.Fatal(err)
}
reader = gzipReader
} else {
reader = request.Body
}
// Read body
var body []byte
body, err = io.ReadAll(reader)
if err != nil {
hec.test.Fatal(err)
}
// Parse message
messageStart := 0
for i := 0; i < len(body); i++ {
if i == len(body)-1 || (body[i] == '}' && body[i+1] == '{') {
var message splunkMessage
err = json.Unmarshal(body[messageStart:i+1], &message)
if err != nil {
hec.test.Log(string(body[messageStart : i+1]))
hec.test.Fatal(err)
}
hec.messages = append(hec.messages, &message)
messageStart = i + 1
}
}
if gzipEnabled {
gzipReader.Close()
}
writer.WriteHeader(http.StatusOK)
default:
hec.test.Errorf("Unexpected HTTP method %s", http.MethodOptions)
writer.WriteHeader(http.StatusBadRequest)
}
}
|