1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
package gelf
import (
"bufio"
"encoding/json"
"fmt"
"net"
"time"
)
type TCPReader struct {
listener *net.TCPListener
conn net.Conn
messages chan []byte
}
type connChannels struct {
drop chan string
confirm chan string
}
func newTCPReader(addr string) (*TCPReader, chan string, chan string, error) {
var err error
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return nil, nil, nil, fmt.Errorf("ListenTCP: %s", err)
}
r := &TCPReader{
listener: listener,
messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages
}
closeSignal := make(chan string, 1)
doneSignal := make(chan string, 1)
go r.listenUntilCloseSignal(closeSignal, doneSignal)
return r, closeSignal, doneSignal, nil
}
func (r *TCPReader) accepter(connections chan net.Conn) {
for {
conn, err := r.listener.Accept()
if err != nil {
break
}
connections <- conn
}
}
func (r *TCPReader) listenUntilCloseSignal(closeSignal chan string, doneSignal chan string) {
defer func() { doneSignal <- "done" }()
defer r.listener.Close()
var conns []connChannels
connectionsChannel := make(chan net.Conn, 1)
go r.accepter(connectionsChannel)
for {
select {
case conn := <-connectionsChannel:
dropSignal := make(chan string, 1)
dropConfirm := make(chan string, 1)
channels := connChannels{drop: dropSignal, confirm: dropConfirm}
go handleConnection(conn, r.messages, dropSignal, dropConfirm)
conns = append(conns, channels)
default:
}
select {
case sig := <-closeSignal:
if sig == "stop" || sig == "drop" {
if len(conns) >= 1 {
for _, s := range conns {
if s.drop != nil {
s.drop <- "drop"
<-s.confirm
conns = append(conns[:0], conns[1:]...)
}
}
if sig == "stop" {
return
}
} else if sig == "stop" {
closeSignal <- "stop"
}
if sig == "drop" {
doneSignal <- "done"
}
}
default:
}
}
}
func (r *TCPReader) addr() string {
return r.listener.Addr().String()
}
func handleConnection(conn net.Conn, messages chan<- []byte, dropSignal chan string, dropConfirm chan string) {
defer func() { dropConfirm <- "done" }()
defer conn.Close()
reader := bufio.NewReader(conn)
var b []byte
var err error
drop := false
canDrop := false
for {
conn.SetDeadline(time.Now().Add(2 * time.Second))
if b, err = reader.ReadBytes(0); err != nil {
if drop {
return
}
} else if len(b) > 0 {
messages <- b
canDrop = true
if drop {
return
}
} else if drop {
return
}
select {
case sig := <-dropSignal:
if sig == "drop" {
drop = true
time.Sleep(1 * time.Second)
if canDrop {
return
}
}
default:
}
}
}
func (r *TCPReader) readMessage() (*Message, error) {
b := <-r.messages
var msg Message
if err := json.Unmarshal(b[:len(b)-1], &msg); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %s", err)
}
return &msg, nil
}
func (r *TCPReader) Close() {
r.listener.Close()
}
|