File: socketstream.go

package info (click to toggle)
mtail 3.2.24-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 7,384 kB
  • sloc: yacc: 647; makefile: 226; sh: 78; lisp: 77; awk: 17
file content (156 lines) | stat: -rw-r--r-- 3,916 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright 2020 Google Inc. All Rights Reserved.
// This file is available under the Apache license.

package logstream

import (
	"context"
	"fmt"
	"net"
	"sync"

	"github.com/golang/glog"
	"github.com/jaqx0r/mtail/internal/logline"
	"github.com/jaqx0r/mtail/internal/waker"
)

type socketStream struct {
	streamBase

	cancel context.CancelFunc

	oneShot OneShotMode
	scheme  string // URL Scheme to listen with, either tcp or unix
	address string // Given name for the underlying socket path on the filesystem or host/port.
}

func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) {
	if address == "" {
		return nil, ErrEmptySocketAddress
	}
	ctx, cancel := context.WithCancel(ctx)
	ss := &socketStream{
		cancel:  cancel,
		oneShot: oneShot,
		scheme:  scheme,
		address: address,
		streamBase: streamBase{
			sourcename: fmt.Sprintf("%s://%s", scheme, address),
			lines:      make(chan *logline.LogLine),
		},
	}

	if err := ss.stream(ctx, wg, waker); err != nil {
		return nil, err
	}
	return ss, nil
}

// stream starts goroutines to read data from the stream socket, until Stop is called or the context is cancelled.
func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker) error {
	l, err := net.Listen(ss.scheme, ss.address)
	if err != nil {
		logErrors.Add(ss.address, 1)
		return err
	}
	glog.V(2).Infof("stream(%s): opened new socket listener %+v", ss.sourcename, l)

	// signals when a connection has been opened
	started := make(chan struct{})
	// tracks connection handling routines
	var connWg sync.WaitGroup

	// Set up for shutdown
	wg.Add(1)
	go func() {
		defer wg.Done()
		// If oneshot, wait only for the one conn handler to start, otherwise
		// wait for context Done or stopChan.
		<-started
		if !ss.oneShot {
			<-ctx.Done()
		}
		glog.V(2).Infof("stream(%s): closing listener", ss.sourcename)
		err := l.Close()
		if err != nil {
			glog.Info(err)
		}
		connWg.Wait()
		close(ss.lines)
	}()

	var connOnce sync.Once

	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			c, err := l.Accept()
			if err != nil {
				glog.Info(err)
				return
			}
			glog.V(2).Infof("stream(%s): got new conn %v", ss.sourcename, c)
			connWg.Add(1)
			go ss.handleConn(ctx, &connWg, waker, c)
			connOnce.Do(func() { close(started) })
			if ss.oneShot {
				glog.Infof("stream(%s): oneshot mode, exiting accept loop", ss.sourcename)
				return
			}
		}
	}()

	return nil
}

func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, c net.Conn) {
	defer wg.Done()

	lr := NewLineReader(ss.sourcename, ss.lines, c, defaultReadBufferSize, ss.cancel)
	var total int
	defer func() {
		glog.V(2).Infof("stream(%s): read total %d bytes from %s", ss.sourcename, c, total)
		glog.V(2).Infof("stream(%s): closing connection, %v", ss.sourcename, c)
		err := c.Close()
		if err != nil {
			logErrors.Add(ss.address, 1)
			glog.Info(err)
		}
		lr.Finish(ctx)
		logCloses.Add(ss.address, 1)
	}()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	SetReadDeadlineOnDone(ctx, c)

	for {
		n, err := lr.ReadAndSend(ctx)
		glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ss.sourcename, n, err)

		if n > 0 {
			total += n

			// No error implies more to read, so restart the loop.
			if err == nil && ctx.Err() == nil {
				continue
			}
		}

		if IsExitableError(err) {
			glog.V(2).Infof("stream(%s): exiting, conn has error %s", ss.sourcename, err)
			return
		}

		// Yield and wait
		glog.V(2).Infof("stream(%s): waiting", ss.sourcename)
		select {
		case <-ctx.Done():
			// Exit after next read attempt.
			glog.V(2).Infof("stream(%s:%s): context cancelled, exiting after next read timeout", ss.scheme, ss.address)
		case <-waker.Wake():
			// sleep until next Wake()
			glog.V(2).Infof("stream(%s): Wake received", ss.sourcename)
		}
	}
}