1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
|
// Package tailfile provides helper functions to read the nth lines of any
// ReadSeeker.
package tailfile // import "github.com/docker/docker/pkg/tailfile"
import (
"bufio"
"bytes"
"context"
"errors"
"io"
"os"
)
const blockSize = 1024
var eol = []byte("\n")
// ErrNonPositiveLinesNumber is an error returned if the lines number was negative.
var ErrNonPositiveLinesNumber = errors.New("The number of lines to extract from the file must be positive")
// TailFile returns last n lines of the passed in file.
func TailFile(f *os.File, n int) ([][]byte, error) {
size, err := f.Seek(0, io.SeekEnd)
if err != nil {
return nil, err
}
rAt := io.NewSectionReader(f, 0, size)
r, nLines, err := NewTailReader(context.Background(), rAt, n)
if err != nil {
return nil, err
}
buf := make([][]byte, 0, nLines)
scanner := bufio.NewScanner(r)
for scanner.Scan() {
buf = append(buf, scanner.Bytes())
}
return buf, nil
}
// SizeReaderAt is an interface used to get a ReaderAt as well as the size of the underlying reader.
// Note that the size of the underlying reader should not change when using this interface.
type SizeReaderAt interface {
io.ReaderAt
Size() int64
}
// NewTailReader scopes the passed in reader to just the last N lines passed in
func NewTailReader(ctx context.Context, r SizeReaderAt, reqLines int) (io.Reader, int, error) {
return NewTailReaderWithDelimiter(ctx, r, reqLines, eol)
}
// NewTailReaderWithDelimiter scopes the passed in reader to just the last N lines passed in
// In this case a "line" is defined by the passed in delimiter.
//
// Delimiter lengths should be generally small, no more than 12 bytes
func NewTailReaderWithDelimiter(ctx context.Context, r SizeReaderAt, reqLines int, delimiter []byte) (io.Reader, int, error) {
if reqLines < 1 {
return nil, 0, ErrNonPositiveLinesNumber
}
if len(delimiter) == 0 {
return nil, 0, errors.New("must provide a delimiter")
}
var (
size = r.Size()
tailStart int64
tailEnd = size
found int
)
if int64(len(delimiter)) >= size {
return bytes.NewReader(nil), 0, nil
}
scanner := newScanner(r, delimiter)
for scanner.Scan(ctx) {
if err := scanner.Err(); err != nil {
return nil, 0, scanner.Err()
}
found++
if found == 1 {
tailEnd = scanner.End()
}
if found == reqLines {
break
}
}
tailStart = scanner.Start(ctx)
if found == 0 {
return bytes.NewReader(nil), 0, nil
}
if found < reqLines && tailStart != 0 {
tailStart = 0
}
return io.NewSectionReader(r, tailStart, tailEnd-tailStart), found, nil
}
func newScanner(r SizeReaderAt, delim []byte) *scanner {
size := r.Size()
readSize := blockSize
if readSize > int(size) {
readSize = int(size)
}
// silly case...
if len(delim) >= readSize/2 {
readSize = len(delim)*2 + 2
}
return &scanner{
r: r,
pos: size,
buf: make([]byte, readSize),
delim: delim,
}
}
type scanner struct {
r SizeReaderAt
pos int64
buf []byte
delim []byte
err error
idx int
}
func (s *scanner) Start(ctx context.Context) int64 {
if s.idx > 0 {
idx := bytes.LastIndex(s.buf[:s.idx], s.delim)
if idx >= 0 {
return s.pos + int64(idx) + int64(len(s.delim))
}
}
// slow path
buf := make([]byte, len(s.buf))
copy(buf, s.buf)
readAhead := &scanner{
r: s.r,
pos: s.pos,
delim: s.delim,
idx: s.idx,
buf: buf,
}
if !readAhead.Scan(ctx) {
return 0
}
return readAhead.End()
}
func (s *scanner) End() int64 {
return s.pos + int64(s.idx) + int64(len(s.delim))
}
func (s *scanner) Err() error {
return s.err
}
func (s *scanner) Scan(ctx context.Context) bool {
if s.err != nil {
return false
}
for {
select {
case <-ctx.Done():
s.err = ctx.Err()
return false
default:
}
idx := s.idx - len(s.delim)
if idx < 0 {
readSize := int(s.pos)
if readSize > len(s.buf) {
readSize = len(s.buf)
}
if readSize < len(s.delim) {
return false
}
offset := s.pos - int64(readSize)
n, err := s.r.ReadAt(s.buf[:readSize], offset)
if err != nil && err != io.EOF {
s.err = err
return false
}
s.pos -= int64(n)
idx = n
}
s.idx = bytes.LastIndex(s.buf[:idx], s.delim)
if s.idx >= 0 {
return true
}
if len(s.delim) > 1 && s.pos > 0 {
// in this case, there may be a partial delimiter at the front of the buffer, so set the position forward
// up to the maximum size partial that could be there so it can be read again in the next iteration with any
// potential remainder.
// An example where delimiter is `####`:
// [##asdfqwerty]
// ^
// This resets the position to where the arrow is pointing.
// It could actually check if a partial exists and at the front, but that is pretty similar to the indexing
// code above though a bit more complex since each byte has to be checked (`len(delimiter)-1`) factorial).
// It's much simpler and cleaner to just re-read `len(delimiter)-1` bytes again.
s.pos += int64(len(s.delim)) - 1
}
}
}
|