1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
|
//go:build linux
package ringbuf
import (
"errors"
"fmt"
"os"
"sync"
"time"
"unsafe"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal/epoll"
"github.com/cilium/ebpf/internal/sys"
"github.com/cilium/ebpf/internal/unix"
)
var (
ErrClosed = os.ErrClosed
ErrFlushed = epoll.ErrFlushed
errEOR = errors.New("end of ring")
errBusy = errors.New("sample not committed yet")
)
// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c
type ringbufHeader struct {
Len uint32
_ uint32 // pg_off, only used by kernel internals
}
const ringbufHeaderSize = int(unsafe.Sizeof(ringbufHeader{}))
func (rh *ringbufHeader) isBusy() bool {
return rh.Len&sys.BPF_RINGBUF_BUSY_BIT != 0
}
func (rh *ringbufHeader) isDiscard() bool {
return rh.Len&sys.BPF_RINGBUF_DISCARD_BIT != 0
}
func (rh *ringbufHeader) dataLen() int {
return int(rh.Len & ^uint32(sys.BPF_RINGBUF_BUSY_BIT|sys.BPF_RINGBUF_DISCARD_BIT))
}
type Record struct {
RawSample []byte
// The minimum number of bytes remaining in the ring buffer after this Record has been read.
Remaining int
}
// Reader allows reading bpf_ringbuf_output
// from user space.
type Reader struct {
poller *epoll.Poller
// mu protects read/write access to the Reader structure
mu sync.Mutex
ring *ringbufEventRing
epollEvents []unix.EpollEvent
haveData bool
deadline time.Time
bufferSize int
pendingErr error
}
// NewReader creates a new BPF ringbuf reader.
func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
if ringbufMap.Type() != ebpf.RingBuf {
return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type())
}
maxEntries := int(ringbufMap.MaxEntries())
if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 {
return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries)
}
poller, err := epoll.New()
if err != nil {
return nil, err
}
if err := poller.Add(ringbufMap.FD(), 0); err != nil {
poller.Close()
return nil, err
}
ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries)
if err != nil {
poller.Close()
return nil, fmt.Errorf("failed to create ringbuf ring: %w", err)
}
return &Reader{
poller: poller,
ring: ring,
epollEvents: make([]unix.EpollEvent, 1),
bufferSize: ring.size(),
}, nil
}
// Close frees resources used by the reader.
//
// It interrupts calls to Read.
func (r *Reader) Close() error {
if err := r.poller.Close(); err != nil {
if errors.Is(err, os.ErrClosed) {
return nil
}
return err
}
// Acquire the lock. This ensures that Read isn't running.
r.mu.Lock()
defer r.mu.Unlock()
if r.ring != nil {
r.ring.Close()
r.ring = nil
}
return nil
}
// SetDeadline controls how long Read and ReadInto will block waiting for samples.
//
// Passing a zero time.Time will remove the deadline.
func (r *Reader) SetDeadline(t time.Time) {
r.mu.Lock()
defer r.mu.Unlock()
r.deadline = t
}
// Read the next record from the BPF ringbuf.
//
// Calling [Close] interrupts the method with [os.ErrClosed]. Calling [Flush]
// makes it return all records currently in the ring buffer, followed by [ErrFlushed].
//
// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records
// have been read from the ring.
//
// See [ReadInto] for a more efficient version of this method.
func (r *Reader) Read() (Record, error) {
var rec Record
return rec, r.ReadInto(&rec)
}
// ReadInto is like Read except that it allows reusing Record and associated buffers.
func (r *Reader) ReadInto(rec *Record) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.ring == nil {
return fmt.Errorf("ringbuffer: %w", ErrClosed)
}
for {
if !r.haveData {
if pe := r.pendingErr; pe != nil {
r.pendingErr = nil
return pe
}
_, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], r.deadline)
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) {
// Ignoring this for reading a valid entry after timeout or flush.
// This can occur if the producer submitted to the ring buffer
// with BPF_RB_NO_WAKEUP.
r.pendingErr = err
} else if err != nil {
return err
}
r.haveData = true
}
for {
err := r.ring.readRecord(rec)
// Not using errors.Is which is quite a bit slower
// For a tight loop it might make a difference
if err == errBusy {
continue
}
if err == errEOR {
r.haveData = false
break
}
return err
}
}
}
// BufferSize returns the size in bytes of the ring buffer
func (r *Reader) BufferSize() int {
return r.bufferSize
}
// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point,
// until you receive a ErrFlushed error.
func (r *Reader) Flush() error {
return r.poller.Flush()
}
// AvailableBytes returns the amount of data available to read in the ring buffer in bytes.
func (r *Reader) AvailableBytes() int {
// Don't need to acquire the lock here since the implementation of AvailableBytes
// performs atomic loads on the producer and consumer positions.
return int(r.ring.AvailableBytes())
}
|