diff --git a/bpf/include/api.h b/bpf/include/api.h index 48ded18dc89..c04571bc703 100644 --- a/bpf/include/api.h +++ b/bpf/include/api.h @@ -227,7 +227,7 @@ static __u64 BPF_FUNC(get_attach_cookie, void *ctx); static long BPF_FUNC(loop, __u32 nr_loops, void *callback_fn, void *callback_ctx, __u64 flags); -static long BPF_FUNC(ringbuf_output, void *data, uint64_t size, uint64_t flags); +static long BPF_FUNC(ringbuf_output, void *ringbuf, void *data, uint64_t size, uint64_t flags); static void BPF_FUNC(ringbuf_reserve, void *ringbuf, uint64_t size, uint64_t flags); static void BPF_FUNC(ringbuf_submit, void *data, uint64_t flags); static void BPF_FUNC(ringbuf_discard, void *data, uint64_t flags); diff --git a/bpf/lib/bpf_event.h b/bpf/lib/bpf_event.h index 52ade8e9104..0775aa214fa 100644 --- a/bpf/lib/bpf_event.h +++ b/bpf/lib/bpf_event.h @@ -11,9 +11,8 @@ struct event { }; struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); - __type(key, int); - __type(value, struct event); + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 8LU * 1024LU * 1024LU); // 8MB } tcpmon_map SEC(".maps"); #endif // __BPF_EVENT_H diff --git a/bpf/lib/process.h b/bpf/lib/process.h index bdadea67115..0a86b31edb0 100644 --- a/bpf/lib/process.h +++ b/bpf/lib/process.h @@ -582,7 +582,7 @@ static inline __attribute__((always_inline)) void perf_event_output_metric(void __u32 zero = 0; long err; - err = perf_event_output(ctx, map, flags, data, size); + err = ringbuf_output(map, data, size, 0); if (err < 0) { valp = map_lookup_elem(&tg_stats_map, &zero); if (valp) diff --git a/bpf/test/bpf_lseek.c b/bpf/test/bpf_lseek.c index dc71e48b107..a8b28dce183 100644 --- a/bpf/test/bpf_lseek.c +++ b/bpf/test/bpf_lseek.c @@ -49,8 +49,7 @@ test_lseek(struct sys_enter_lseek_args *ctx) msg.common.ktime = ktime_get_ns(); msg.common.size = size; msg.arg0 = get_smp_processor_id(); - perf_event_output(ctx, &tcpmon_map, BPF_F_CURRENT_CPU, &msg, - size); + ringbuf_output(&tcpmon_map, &msg, size, 0); } return 0; diff --git a/pkg/bpf/perf_linux.go b/pkg/bpf/perf_linux.go index 23a530336e9..f2035e6131c 100644 --- a/pkg/bpf/perf_linux.go +++ b/pkg/bpf/perf_linux.go @@ -8,10 +8,7 @@ import ( "io" "os" "path/filepath" - "runtime" "strings" - - "golang.org/x/sys/unix" ) const ( @@ -19,13 +16,7 @@ const ( ) type PerfEventConfig struct { - NumCpus int - NumPages int - MapName string - Type int - Config int - SampleType int - WakeupEvents int + MapName string } // GetNumPossibleCPUs returns a total number of possible CPUS, i.e. CPUs that @@ -74,30 +65,7 @@ func getNumPossibleCPUsFromReader(r io.Reader) int { // DefaultPerfEventConfig returns the default perf event configuration. It // relies on the map root to be set. func DefaultPerfEventConfig() *PerfEventConfig { - numCpus := GetNumPossibleCPUs() - if numCpus == 0 { - numCpus = runtime.NumCPU() - } return &PerfEventConfig{ - MapName: filepath.Join(MapPrefixPath(), eventsMapName), - Type: PERF_TYPE_SOFTWARE, - Config: PERF_COUNT_SW_BPF_OUTPUT, - SampleType: PERF_SAMPLE_RAW, - WakeupEvents: 1, - NumCpus: numCpus, - NumPages: 128, - } -} - -func UpdateElementFromPointers(fd int, structPtr, sizeOfStruct uintptr) error { - ret, _, err := unix.Syscall( - unix.SYS_BPF, - BPF_MAP_UPDATE_ELEM, - structPtr, - sizeOfStruct, - ) - if ret != 0 || err != 0 { - return fmt.Errorf("Unable to update element for map with file descriptor %d: %s", fd, err) + MapName: filepath.Join(MapPrefixPath(), eventsMapName), } - return nil } diff --git a/pkg/observer/observer.go b/pkg/observer/observer.go index b05dd2165f7..fb86e8aecce 100644 --- a/pkg/observer/observer.go +++ b/pkg/observer/observer.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "fmt" - "math" "os" "runtime" "strings" @@ -16,7 +15,7 @@ import ( "time" "github.com/cilium/ebpf" - "github.com/cilium/ebpf/perf" + "github.com/cilium/ebpf/ringbuf" "github.com/cilium/tetragon/pkg/api/ops" "github.com/cilium/tetragon/pkg/api/readyapi" "github.com/cilium/tetragon/pkg/bpf" @@ -33,10 +32,6 @@ import ( "github.com/sirupsen/logrus" ) -const ( - perCPUBufferBytes = 65535 -) - var ( eventHandler = make(map[uint8]func(r *bytes.Reader) ([]Event, error)) @@ -144,23 +139,6 @@ func (k *Observer) receiveEvent(data []byte) { } } -// Gets final size for single perf ring buffer rounded from -// passed size argument (kindly borrowed from ebpf/cilium) -func perfBufferSize(perCPUBuffer int) int { - pageSize := os.Getpagesize() - - // Smallest whole number of pages - nPages := (perCPUBuffer + pageSize - 1) / pageSize - - // Round up to nearest power of two number of pages - nPages = int(math.Pow(2, math.Ceil(math.Log2(float64(nPages))))) - - // Add one for metadata - nPages++ - - return nPages * pageSize -} - func sizeWithSuffix(size int) string { suffix := [4]string{"", "K", "M", "G"} @@ -173,26 +151,6 @@ func sizeWithSuffix(size int) string { return fmt.Sprintf("%d%s", size, suffix[i]) } -func (k *Observer) getRBSize(cpus int) int { - var size int - - if option.Config.RBSize == 0 && option.Config.RBSizeTotal == 0 { - size = perCPUBufferBytes - } else if option.Config.RBSize != 0 { - size = option.Config.RBSize - } else { - size = option.Config.RBSizeTotal / int(cpus) - } - - cpuSize := perfBufferSize(size) - totalSize := cpuSize * cpus - - k.log.WithField("percpu", sizeWithSuffix(cpuSize)). - WithField("total", sizeWithSuffix(totalSize)). - Info("Perf ring buffer size (bytes)") - return size -} - func (k *Observer) getRBQueueSize() int { size := option.Config.RBQueueSize if size == 0 { @@ -211,11 +169,9 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error { } defer perfMap.Close() - rbSize := k.getRBSize(int(perfMap.MaxEntries())) - perfReader, err := perf.NewReader(perfMap, rbSize) - + rd, err := ringbuf.NewReader(perfMap) if err != nil { - return fmt.Errorf("creating perf array reader failed: %w", err) + return fmt.Errorf("opening ringbuf reader: %w", err) } // Inform caller that we're about to start processing events. @@ -224,7 +180,7 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error { // We spawn go routine to read and process perf events, // connected with main app through eventsQueue channel. - eventsQueue := make(chan *perf.Record, k.getRBQueueSize()) + eventsQueue := make(chan *ringbuf.Record, k.getRBQueueSize()) // Listeners are ready and about to start reading from perf reader, tell // user everything is ready. @@ -237,7 +193,7 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error { go func() { defer wg.Done() for stopCtx.Err() == nil { - record, err := perfReader.Read() + record, err := rd.Read() if err != nil { // NOTE(JM and Djalal): count and log errors while excluding the stopping context if stopCtx.Err() == nil { @@ -256,11 +212,6 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error { k.recvCntr++ ringbufmetrics.PerfEventReceived.Inc() } - - if record.LostSamples > 0 { - atomic.AddUint64(&k.lostCntr, uint64(record.LostSamples)) - ringbufmetrics.PerfEventLost.Add(float64(record.LostSamples)) - } } } }() @@ -290,7 +241,7 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error { // Wait for context to be cancelled and then stop. <-stopCtx.Done() - return perfReader.Close() + return rd.Close() } // Observer represents the link between the BPF perf ring and the listeners. It diff --git a/pkg/testutils/perfring/perfring.go b/pkg/testutils/perfring/perfring.go index 58ee4eb46b7..21f256f8afd 100644 --- a/pkg/testutils/perfring/perfring.go +++ b/pkg/testutils/perfring/perfring.go @@ -15,7 +15,7 @@ import ( "testing" "github.com/cilium/ebpf" - "github.com/cilium/ebpf/perf" + "github.com/cilium/ebpf/ringbuf" "github.com/cilium/tetragon/pkg/bpf" testapi "github.com/cilium/tetragon/pkg/grpc/test" "github.com/cilium/tetragon/pkg/logger" @@ -69,7 +69,7 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted } defer perfMap.Close() - perfReader, err := perf.NewReader(perfMap, 65535) + rd, err := ringbuf.NewReader(perfMap) if err != nil { t.Fatalf("creating perf array reader failed: %v", err) } @@ -96,7 +96,7 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted break } - record, err := perfReader.Read() + record, err := rd.Read() if err != nil { if ctx.Err() == nil { errChan <- fmt.Errorf("error reading perfring data: %v", err) @@ -127,11 +127,11 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted case err := <-errChan: t.Fatal(err) case <-complChan: - perfReader.Close() + rd.Close() return case <-ctx.Done(): // Wait for context cancel. - perfReader.Close() + rd.Close() return } } diff --git a/vendor/github.com/cilium/ebpf/perf/doc.go b/vendor/github.com/cilium/ebpf/perf/doc.go deleted file mode 100644 index b92bc56af5a..00000000000 --- a/vendor/github.com/cilium/ebpf/perf/doc.go +++ /dev/null @@ -1,5 +0,0 @@ -// Package perf allows reading from BPF perf event arrays. -// -// A perf event array contains multiple perf event ringbuffers which can be used -// to exchange sample like data with user space. -package perf diff --git a/vendor/github.com/cilium/ebpf/perf/reader.go b/vendor/github.com/cilium/ebpf/perf/reader.go deleted file mode 100644 index 3c820708c72..00000000000 --- a/vendor/github.com/cilium/ebpf/perf/reader.go +++ /dev/null @@ -1,474 +0,0 @@ -package perf - -import ( - "encoding/binary" - "errors" - "fmt" - "io" - "os" - "runtime" - "sync" - "time" - - "github.com/cilium/ebpf" - "github.com/cilium/ebpf/internal" - "github.com/cilium/ebpf/internal/epoll" - "github.com/cilium/ebpf/internal/unix" -) - -var ( - ErrClosed = os.ErrClosed - errEOR = errors.New("end of ring") -) - -var perfEventHeaderSize = binary.Size(perfEventHeader{}) - -// perfEventHeader must match 'struct perf_event_header` in . -type perfEventHeader struct { - Type uint32 - Misc uint16 - Size uint16 -} - -func cpuForEvent(event *unix.EpollEvent) int { - return int(event.Pad) -} - -// Record contains either a sample or a counter of the -// number of lost samples. -type Record struct { - // The CPU this record was generated on. - CPU int - - // The data submitted via bpf_perf_event_output. - // Due to a kernel bug, this can contain between 0 and 7 bytes of trailing - // garbage from the ring depending on the input sample's length. - RawSample []byte - - // The number of samples which could not be output, since - // the ring buffer was full. - LostSamples uint64 - - // The minimum number of bytes remaining in the per-CPU buffer after this Record has been read. - // Negative for overwritable buffers. - Remaining int -} - -// Read a record from a reader and tag it as being from the given CPU. -// -// buf must be at least perfEventHeaderSize bytes long. -func readRecord(rd io.Reader, rec *Record, buf []byte, overwritable bool) error { - // Assert that the buffer is large enough. - buf = buf[:perfEventHeaderSize] - _, err := io.ReadFull(rd, buf) - if errors.Is(err, io.EOF) { - return errEOR - } else if err != nil { - return fmt.Errorf("read perf event header: %v", err) - } - - header := perfEventHeader{ - internal.NativeEndian.Uint32(buf[0:4]), - internal.NativeEndian.Uint16(buf[4:6]), - internal.NativeEndian.Uint16(buf[6:8]), - } - - switch header.Type { - case unix.PERF_RECORD_LOST: - rec.RawSample = rec.RawSample[:0] - rec.LostSamples, err = readLostRecords(rd) - return err - - case unix.PERF_RECORD_SAMPLE: - rec.LostSamples = 0 - // We can reuse buf here because perfEventHeaderSize > perfEventSampleSize. - rec.RawSample, err = readRawSample(rd, buf, rec.RawSample) - return err - - default: - return &unknownEventError{header.Type} - } -} - -func readLostRecords(rd io.Reader) (uint64, error) { - // lostHeader must match 'struct perf_event_lost in kernel sources. - var lostHeader struct { - ID uint64 - Lost uint64 - } - - err := binary.Read(rd, internal.NativeEndian, &lostHeader) - if err != nil { - return 0, fmt.Errorf("can't read lost records header: %v", err) - } - - return lostHeader.Lost, nil -} - -var perfEventSampleSize = binary.Size(uint32(0)) - -// This must match 'struct perf_event_sample in kernel sources. -type perfEventSample struct { - Size uint32 -} - -func readRawSample(rd io.Reader, buf, sampleBuf []byte) ([]byte, error) { - buf = buf[:perfEventSampleSize] - if _, err := io.ReadFull(rd, buf); err != nil { - return nil, fmt.Errorf("read sample size: %w", err) - } - - sample := perfEventSample{ - internal.NativeEndian.Uint32(buf), - } - - var data []byte - if size := int(sample.Size); cap(sampleBuf) < size { - data = make([]byte, size) - } else { - data = sampleBuf[:size] - } - - if _, err := io.ReadFull(rd, data); err != nil { - return nil, fmt.Errorf("read sample: %w", err) - } - return data, nil -} - -// Reader allows reading bpf_perf_event_output -// from user space. -type Reader struct { - poller *epoll.Poller - deadline time.Time - - // mu protects read/write access to the Reader structure with the - // exception of 'pauseFds', which is protected by 'pauseMu'. - // If locking both 'mu' and 'pauseMu', 'mu' must be locked first. - mu sync.Mutex - - // Closing a PERF_EVENT_ARRAY removes all event fds - // stored in it, so we keep a reference alive. - array *ebpf.Map - rings []*perfEventRing - epollEvents []unix.EpollEvent - epollRings []*perfEventRing - eventHeader []byte - - // pauseFds are a copy of the fds in 'rings', protected by 'pauseMu'. - // These allow Pause/Resume to be executed independently of any ongoing - // Read calls, which would otherwise need to be interrupted. - pauseMu sync.Mutex - pauseFds []int - - paused bool - overwritable bool - - bufferSize int -} - -// ReaderOptions control the behaviour of the user -// space reader. -type ReaderOptions struct { - // The number of written bytes required in any per CPU buffer before - // Read will process data. Must be smaller than PerCPUBuffer. - // The default is to start processing as soon as data is available. - Watermark int - // This perf ring buffer is overwritable, once full the oldest event will be - // overwritten by newest. - Overwritable bool -} - -// NewReader creates a new reader with default options. -// -// array must be a PerfEventArray. perCPUBuffer gives the size of the -// per CPU buffer in bytes. It is rounded up to the nearest multiple -// of the current page size. -func NewReader(array *ebpf.Map, perCPUBuffer int) (*Reader, error) { - return NewReaderWithOptions(array, perCPUBuffer, ReaderOptions{}) -} - -// NewReaderWithOptions creates a new reader with the given options. -func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) (pr *Reader, err error) { - if perCPUBuffer < 1 { - return nil, errors.New("perCPUBuffer must be larger than 0") - } - - var ( - fds []int - nCPU = int(array.MaxEntries()) - rings = make([]*perfEventRing, 0, nCPU) - pauseFds = make([]int, 0, nCPU) - ) - - poller, err := epoll.New() - if err != nil { - return nil, err - } - - defer func() { - if err != nil { - poller.Close() - for _, fd := range fds { - unix.Close(fd) - } - for _, ring := range rings { - if ring != nil { - ring.Close() - } - } - } - }() - - // bpf_perf_event_output checks which CPU an event is enabled on, - // but doesn't allow using a wildcard like -1 to specify "all CPUs". - // Hence we have to create a ring for each CPU. - bufferSize := 0 - for i := 0; i < nCPU; i++ { - ring, err := newPerfEventRing(i, perCPUBuffer, opts.Watermark, opts.Overwritable) - if errors.Is(err, unix.ENODEV) { - // The requested CPU is currently offline, skip it. - rings = append(rings, nil) - pauseFds = append(pauseFds, -1) - continue - } - - if err != nil { - return nil, fmt.Errorf("failed to create perf ring for CPU %d: %v", i, err) - } - - bufferSize = ring.size() - rings = append(rings, ring) - pauseFds = append(pauseFds, ring.fd) - - if err := poller.Add(ring.fd, i); err != nil { - return nil, err - } - } - - array, err = array.Clone() - if err != nil { - return nil, err - } - - pr = &Reader{ - array: array, - rings: rings, - poller: poller, - deadline: time.Time{}, - epollEvents: make([]unix.EpollEvent, len(rings)), - epollRings: make([]*perfEventRing, 0, len(rings)), - eventHeader: make([]byte, perfEventHeaderSize), - pauseFds: pauseFds, - overwritable: opts.Overwritable, - bufferSize: bufferSize, - } - if err = pr.Resume(); err != nil { - return nil, err - } - runtime.SetFinalizer(pr, (*Reader).Close) - return pr, nil -} - -// Close frees resources used by the reader. -// -// It interrupts calls to Read. -// -// Calls to perf_event_output from eBPF programs will return -// ENOENT after calling this method. -func (pr *Reader) Close() error { - if err := pr.poller.Close(); err != nil { - if errors.Is(err, os.ErrClosed) { - return nil - } - return fmt.Errorf("close poller: %w", err) - } - - // Trying to poll will now fail, so Read() can't block anymore. Acquire the - // lock so that we can clean up. - pr.mu.Lock() - defer pr.mu.Unlock() - - for _, ring := range pr.rings { - if ring != nil { - ring.Close() - } - } - pr.rings = nil - pr.pauseFds = nil - pr.array.Close() - - return nil -} - -// SetDeadline controls how long Read and ReadInto will block waiting for samples. -// -// Passing a zero time.Time will remove the deadline. Passing a deadline in the -// past will prevent the reader from blocking if there are no records to be read. -func (pr *Reader) SetDeadline(t time.Time) { - pr.mu.Lock() - defer pr.mu.Unlock() - - pr.deadline = t -} - -// Read the next record from the perf ring buffer. -// -// The function blocks until there are at least Watermark bytes in one -// of the per CPU buffers. Records from buffers below the Watermark -// are not returned. -// -// Records can contain between 0 and 7 bytes of trailing garbage from the ring -// depending on the input sample's length. -// -// Calling Close interrupts the function. -// -// Returns os.ErrDeadlineExceeded if a deadline was set. -func (pr *Reader) Read() (Record, error) { - var r Record - - return r, pr.ReadInto(&r) -} - -var errMustBePaused = fmt.Errorf("perf ringbuffer: must have been paused before reading overwritable buffer") - -// ReadInto is like Read except that it allows reusing Record and associated buffers. -func (pr *Reader) ReadInto(rec *Record) error { - pr.mu.Lock() - defer pr.mu.Unlock() - - pr.pauseMu.Lock() - defer pr.pauseMu.Unlock() - - if pr.overwritable && !pr.paused { - return errMustBePaused - } - - if pr.rings == nil { - return fmt.Errorf("perf ringbuffer: %w", ErrClosed) - } - - for { - if len(pr.epollRings) == 0 { - // NB: The deferred pauseMu.Unlock will panic if Wait panics, which - // might obscure the original panic. - pr.pauseMu.Unlock() - nEvents, err := pr.poller.Wait(pr.epollEvents, pr.deadline) - pr.pauseMu.Lock() - if err != nil { - return err - } - - // Re-validate pr.paused since we dropped pauseMu. - if pr.overwritable && !pr.paused { - return errMustBePaused - } - - for _, event := range pr.epollEvents[:nEvents] { - ring := pr.rings[cpuForEvent(&event)] - pr.epollRings = append(pr.epollRings, ring) - - // Read the current head pointer now, not every time - // we read a record. This prevents a single fast producer - // from keeping the reader busy. - ring.loadHead() - } - } - - // Start at the last available event. The order in which we - // process them doesn't matter, and starting at the back allows - // resizing epollRings to keep track of processed rings. - err := pr.readRecordFromRing(rec, pr.epollRings[len(pr.epollRings)-1]) - if err == errEOR { - // We've emptied the current ring buffer, process - // the next one. - pr.epollRings = pr.epollRings[:len(pr.epollRings)-1] - continue - } - - return err - } -} - -// Pause stops all notifications from this Reader. -// -// While the Reader is paused, any attempts to write to the event buffer from -// BPF programs will return -ENOENT. -// -// Subsequent calls to Read will block until a call to Resume. -func (pr *Reader) Pause() error { - pr.pauseMu.Lock() - defer pr.pauseMu.Unlock() - - if pr.pauseFds == nil { - return fmt.Errorf("%w", ErrClosed) - } - - for i := range pr.pauseFds { - if err := pr.array.Delete(uint32(i)); err != nil && !errors.Is(err, ebpf.ErrKeyNotExist) { - return fmt.Errorf("could't delete event fd for CPU %d: %w", i, err) - } - } - - pr.paused = true - - return nil -} - -// Resume allows this perf reader to emit notifications. -// -// Subsequent calls to Read will block until the next event notification. -func (pr *Reader) Resume() error { - pr.pauseMu.Lock() - defer pr.pauseMu.Unlock() - - if pr.pauseFds == nil { - return fmt.Errorf("%w", ErrClosed) - } - - for i, fd := range pr.pauseFds { - if fd == -1 { - continue - } - - if err := pr.array.Put(uint32(i), uint32(fd)); err != nil { - return fmt.Errorf("couldn't put event fd %d for CPU %d: %w", fd, i, err) - } - } - - pr.paused = false - - return nil -} - -// BufferSize is the size in bytes of each per-CPU buffer -func (pr *Reader) BufferSize() int { - return pr.bufferSize -} - -// NB: Has to be preceded by a call to ring.loadHead. -func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error { - defer ring.writeTail() - - rec.CPU = ring.cpu - err := readRecord(ring, rec, pr.eventHeader, pr.overwritable) - if pr.overwritable && (errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)) { - return errEOR - } - rec.Remaining = ring.remaining() - return err -} - -type unknownEventError struct { - eventType uint32 -} - -func (uev *unknownEventError) Error() string { - return fmt.Sprintf("unknown event type: %d", uev.eventType) -} - -// IsUnknownEvent returns true if the error occurred -// because an unknown event was submitted to the perf event ring. -func IsUnknownEvent(err error) bool { - var uee *unknownEventError - return errors.As(err, &uee) -} diff --git a/vendor/github.com/cilium/ebpf/perf/ring.go b/vendor/github.com/cilium/ebpf/perf/ring.go deleted file mode 100644 index ddf3519f2c5..00000000000 --- a/vendor/github.com/cilium/ebpf/perf/ring.go +++ /dev/null @@ -1,285 +0,0 @@ -package perf - -import ( - "errors" - "fmt" - "io" - "math" - "os" - "runtime" - "sync/atomic" - "unsafe" - - "github.com/cilium/ebpf/internal/unix" -) - -// perfEventRing is a page of metadata followed by -// a variable number of pages which form a ring buffer. -type perfEventRing struct { - fd int - cpu int - mmap []byte - ringReader -} - -func newPerfEventRing(cpu, perCPUBuffer, watermark int, overwritable bool) (*perfEventRing, error) { - if watermark >= perCPUBuffer { - return nil, errors.New("watermark must be smaller than perCPUBuffer") - } - - fd, err := createPerfEvent(cpu, watermark, overwritable) - if err != nil { - return nil, err - } - - if err := unix.SetNonblock(fd, true); err != nil { - unix.Close(fd) - return nil, err - } - - protections := unix.PROT_READ - if !overwritable { - protections |= unix.PROT_WRITE - } - - mmap, err := unix.Mmap(fd, 0, perfBufferSize(perCPUBuffer), protections, unix.MAP_SHARED) - if err != nil { - unix.Close(fd) - return nil, fmt.Errorf("can't mmap: %v", err) - } - - // This relies on the fact that we allocate an extra metadata page, - // and that the struct is smaller than an OS page. - // This use of unsafe.Pointer isn't explicitly sanctioned by the - // documentation, since a byte is smaller than sampledPerfEvent. - meta := (*unix.PerfEventMmapPage)(unsafe.Pointer(&mmap[0])) - - var reader ringReader - if overwritable { - reader = newReverseReader(meta, mmap[meta.Data_offset:meta.Data_offset+meta.Data_size]) - } else { - reader = newForwardReader(meta, mmap[meta.Data_offset:meta.Data_offset+meta.Data_size]) - } - - ring := &perfEventRing{ - fd: fd, - cpu: cpu, - mmap: mmap, - ringReader: reader, - } - runtime.SetFinalizer(ring, (*perfEventRing).Close) - - return ring, nil -} - -// perfBufferSize returns a valid mmap buffer size for use with perf_event_open (1+2^n pages) -func perfBufferSize(perCPUBuffer int) int { - pageSize := os.Getpagesize() - - // Smallest whole number of pages - nPages := (perCPUBuffer + pageSize - 1) / pageSize - - // Round up to nearest power of two number of pages - nPages = int(math.Pow(2, math.Ceil(math.Log2(float64(nPages))))) - - // Add one for metadata - nPages += 1 - - return nPages * pageSize -} - -func (ring *perfEventRing) Close() { - runtime.SetFinalizer(ring, nil) - - _ = unix.Close(ring.fd) - _ = unix.Munmap(ring.mmap) - - ring.fd = -1 - ring.mmap = nil -} - -func createPerfEvent(cpu, watermark int, overwritable bool) (int, error) { - if watermark == 0 { - watermark = 1 - } - - bits := unix.PerfBitWatermark - if overwritable { - bits |= unix.PerfBitWriteBackward - } - - attr := unix.PerfEventAttr{ - Type: unix.PERF_TYPE_SOFTWARE, - Config: unix.PERF_COUNT_SW_BPF_OUTPUT, - Bits: uint64(bits), - Sample_type: unix.PERF_SAMPLE_RAW, - Wakeup: uint32(watermark), - } - - attr.Size = uint32(unsafe.Sizeof(attr)) - fd, err := unix.PerfEventOpen(&attr, -1, cpu, -1, unix.PERF_FLAG_FD_CLOEXEC) - if err != nil { - return -1, fmt.Errorf("can't create perf event: %w", err) - } - return fd, nil -} - -type ringReader interface { - loadHead() - size() int - remaining() int - writeTail() - Read(p []byte) (int, error) -} - -type forwardReader struct { - meta *unix.PerfEventMmapPage - head, tail uint64 - mask uint64 - ring []byte -} - -func newForwardReader(meta *unix.PerfEventMmapPage, ring []byte) *forwardReader { - return &forwardReader{ - meta: meta, - head: atomic.LoadUint64(&meta.Data_head), - tail: atomic.LoadUint64(&meta.Data_tail), - // cap is always a power of two - mask: uint64(cap(ring) - 1), - ring: ring, - } -} - -func (rr *forwardReader) loadHead() { - rr.head = atomic.LoadUint64(&rr.meta.Data_head) -} - -func (rr *forwardReader) size() int { - return len(rr.ring) -} - -func (rr *forwardReader) remaining() int { - return int((rr.head - rr.tail) & rr.mask) -} - -func (rr *forwardReader) writeTail() { - // Commit the new tail. This lets the kernel know that - // the ring buffer has been consumed. - atomic.StoreUint64(&rr.meta.Data_tail, rr.tail) -} - -func (rr *forwardReader) Read(p []byte) (int, error) { - start := int(rr.tail & rr.mask) - - n := len(p) - // Truncate if the read wraps in the ring buffer - if remainder := cap(rr.ring) - start; n > remainder { - n = remainder - } - - // Truncate if there isn't enough data - if remainder := int(rr.head - rr.tail); n > remainder { - n = remainder - } - - copy(p, rr.ring[start:start+n]) - rr.tail += uint64(n) - - if rr.tail == rr.head { - return n, io.EOF - } - - return n, nil -} - -type reverseReader struct { - meta *unix.PerfEventMmapPage - // head is the position where the kernel last wrote data. - head uint64 - // read is the position we read the next data from. Updated as reads are made. - read uint64 - // tail is the end of the ring buffer. No reads must be made past it. - tail uint64 - mask uint64 - ring []byte -} - -func newReverseReader(meta *unix.PerfEventMmapPage, ring []byte) *reverseReader { - rr := &reverseReader{ - meta: meta, - mask: uint64(cap(ring) - 1), - ring: ring, - } - rr.loadHead() - return rr -} - -func (rr *reverseReader) loadHead() { - // The diagram below represents an overwritable perf ring buffer: - // - // head read tail - // | | | - // V V V - // +---+--------+------------+---------+--------+ - // | |H-D....D|H-C........C|H-B.....B|H-A....A| - // +---+--------+------------+---------+--------+ - // <--Write from right to left - // Read from left to right--> - // (H means header) - // - // The buffer is read left to right beginning from head to tail. - // [head, read) is the read portion of the buffer, [read, tail) the unread one. - // read is adjusted as we progress through the buffer. - - // Avoid reading sample D multiple times by discarding unread samples C, B, A. - rr.tail = rr.head - - // Get the new head and starting reading from it. - rr.head = atomic.LoadUint64(&rr.meta.Data_head) - rr.read = rr.head - - if rr.tail-rr.head > uint64(cap(rr.ring)) { - // ring has been fully written, only permit at most cap(rr.ring) - // bytes to be read. - rr.tail = rr.head + uint64(cap(rr.ring)) - } -} - -func (rr *reverseReader) size() int { - return len(rr.ring) -} - -func (rr *reverseReader) remaining() int { - // remaining data is inaccurate for overwritable buffers - // once an overwrite happens, so return -1 here. - return -1 -} - -func (rr *reverseReader) writeTail() { - // We do not care about tail for over writable perf buffer. - // So, this function is noop. -} - -func (rr *reverseReader) Read(p []byte) (int, error) { - start := int(rr.read & rr.mask) - - n := len(p) - // Truncate if the read wraps in the ring buffer - if remainder := cap(rr.ring) - start; n > remainder { - n = remainder - } - - // Truncate if there isn't enough data - if remainder := int(rr.tail - rr.read); n > remainder { - n = remainder - } - - copy(p, rr.ring[start:start+n]) - rr.read += uint64(n) - - if rr.read == rr.tail { - return n, io.EOF - } - - return n, nil -} diff --git a/vendor/github.com/cilium/ebpf/ringbuf/doc.go b/vendor/github.com/cilium/ebpf/ringbuf/doc.go new file mode 100644 index 00000000000..9e450121878 --- /dev/null +++ b/vendor/github.com/cilium/ebpf/ringbuf/doc.go @@ -0,0 +1,6 @@ +// Package ringbuf allows interacting with Linux BPF ring buffer. +// +// BPF allows submitting custom events to a BPF ring buffer map set up +// by userspace. This is very useful to push things like packet samples +// from BPF to a daemon running in user space. +package ringbuf diff --git a/vendor/github.com/cilium/ebpf/ringbuf/reader.go b/vendor/github.com/cilium/ebpf/ringbuf/reader.go new file mode 100644 index 00000000000..c6adaf2f969 --- /dev/null +++ b/vendor/github.com/cilium/ebpf/ringbuf/reader.go @@ -0,0 +1,244 @@ +package ringbuf + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "sync" + "time" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/internal" + "github.com/cilium/ebpf/internal/epoll" + "github.com/cilium/ebpf/internal/unix" +) + +var ( + ErrClosed = os.ErrClosed + errEOR = errors.New("end of ring") + errDiscard = errors.New("sample discarded") + errBusy = errors.New("sample not committed yet") +) + +var ringbufHeaderSize = binary.Size(ringbufHeader{}) + +// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c +type ringbufHeader struct { + Len uint32 + PgOff uint32 +} + +func (rh *ringbufHeader) isBusy() bool { + return rh.Len&unix.BPF_RINGBUF_BUSY_BIT != 0 +} + +func (rh *ringbufHeader) isDiscard() bool { + return rh.Len&unix.BPF_RINGBUF_DISCARD_BIT != 0 +} + +func (rh *ringbufHeader) dataLen() int { + return int(rh.Len & ^uint32(unix.BPF_RINGBUF_BUSY_BIT|unix.BPF_RINGBUF_DISCARD_BIT)) +} + +type Record struct { + RawSample []byte + + // The minimum number of bytes remaining in the ring buffer after this Record has been read. + Remaining int +} + +// Read a record from an event ring. +// +// buf must be at least ringbufHeaderSize bytes long. +func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error { + rd.loadConsumer() + + buf = buf[:ringbufHeaderSize] + if _, err := io.ReadFull(rd, buf); err == io.EOF { + return errEOR + } else if err != nil { + return fmt.Errorf("read event header: %w", err) + } + + header := ringbufHeader{ + internal.NativeEndian.Uint32(buf[0:4]), + internal.NativeEndian.Uint32(buf[4:8]), + } + + if header.isBusy() { + // the next sample in the ring is not committed yet so we + // exit without storing the reader/consumer position + // and start again from the same position. + return errBusy + } + + /* read up to 8 byte alignment */ + dataLenAligned := uint64(internal.Align(header.dataLen(), 8)) + + if header.isDiscard() { + // when the record header indicates that the data should be + // discarded, we skip it by just updating the consumer position + // to the next record instead of normal Read() to avoid allocating data + // and reading/copying from the ring (which normally keeps track of the + // consumer position). + rd.skipRead(dataLenAligned) + rd.storeConsumer() + + return errDiscard + } + + if cap(rec.RawSample) < int(dataLenAligned) { + rec.RawSample = make([]byte, dataLenAligned) + } else { + rec.RawSample = rec.RawSample[:dataLenAligned] + } + + if _, err := io.ReadFull(rd, rec.RawSample); err != nil { + return fmt.Errorf("read sample: %w", err) + } + + rd.storeConsumer() + rec.RawSample = rec.RawSample[:header.dataLen()] + rec.Remaining = rd.remaining() + return nil +} + +// Reader allows reading bpf_ringbuf_output +// from user space. +type Reader struct { + poller *epoll.Poller + + // mu protects read/write access to the Reader structure + mu sync.Mutex + ring *ringbufEventRing + epollEvents []unix.EpollEvent + header []byte + haveData bool + deadline time.Time + bufferSize int +} + +// NewReader creates a new BPF ringbuf reader. +func NewReader(ringbufMap *ebpf.Map) (*Reader, error) { + if ringbufMap.Type() != ebpf.RingBuf { + return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type()) + } + + maxEntries := int(ringbufMap.MaxEntries()) + if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 { + return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries) + } + + poller, err := epoll.New() + if err != nil { + return nil, err + } + + if err := poller.Add(ringbufMap.FD(), 0); err != nil { + poller.Close() + return nil, err + } + + ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries) + if err != nil { + poller.Close() + return nil, fmt.Errorf("failed to create ringbuf ring: %w", err) + } + + return &Reader{ + poller: poller, + ring: ring, + epollEvents: make([]unix.EpollEvent, 1), + header: make([]byte, ringbufHeaderSize), + bufferSize: ring.size(), + }, nil +} + +// Close frees resources used by the reader. +// +// It interrupts calls to Read. +func (r *Reader) Close() error { + if err := r.poller.Close(); err != nil { + if errors.Is(err, os.ErrClosed) { + return nil + } + return err + } + + // Acquire the lock. This ensures that Read isn't running. + r.mu.Lock() + defer r.mu.Unlock() + + if r.ring != nil { + r.ring.Close() + r.ring = nil + } + + return nil +} + +// SetDeadline controls how long Read and ReadInto will block waiting for samples. +// +// Passing a zero time.Time will remove the deadline. +func (r *Reader) SetDeadline(t time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + + r.deadline = t +} + +// Read the next record from the BPF ringbuf. +// +// Returns os.ErrClosed if Close is called on the Reader, or os.ErrDeadlineExceeded +// if a deadline was set and no valid entry was present. A producer might use BPF_RB_NO_WAKEUP +// which may cause the deadline to expire but a valid entry will be present. +func (r *Reader) Read() (Record, error) { + var rec Record + return rec, r.ReadInto(&rec) +} + +// ReadInto is like Read except that it allows reusing Record and associated buffers. +func (r *Reader) ReadInto(rec *Record) error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.ring == nil { + return fmt.Errorf("ringbuffer: %w", ErrClosed) + } + + for { + if !r.haveData { + _, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], r.deadline) + if errors.Is(err, os.ErrDeadlineExceeded) && !r.ring.isEmpty() { + // Ignoring this for reading a valid entry after timeout + // This can occur if the producer submitted to the ring buffer with BPF_RB_NO_WAKEUP + err = nil + } + if err != nil { + return err + } + r.haveData = true + } + + for { + err := readRecord(r.ring, rec, r.header) + // Not using errors.Is which is quite a bit slower + // For a tight loop it might make a difference + if err == errBusy || err == errDiscard { + continue + } + if err == errEOR { + r.haveData = false + break + } + return err + } + } +} + +// BufferSize returns the size in bytes of the ring buffer +func (r *Reader) BufferSize() int { + return r.bufferSize +} diff --git a/vendor/github.com/cilium/ebpf/ringbuf/ring.go b/vendor/github.com/cilium/ebpf/ringbuf/ring.go new file mode 100644 index 00000000000..6dd04a93eca --- /dev/null +++ b/vendor/github.com/cilium/ebpf/ringbuf/ring.go @@ -0,0 +1,127 @@ +package ringbuf + +import ( + "fmt" + "io" + "os" + "runtime" + "sync/atomic" + "unsafe" + + "github.com/cilium/ebpf/internal/unix" +) + +type ringbufEventRing struct { + prod []byte + cons []byte + *ringReader +} + +func newRingBufEventRing(mapFD, size int) (*ringbufEventRing, error) { + cons, err := unix.Mmap(mapFD, 0, os.Getpagesize(), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED) + if err != nil { + return nil, fmt.Errorf("can't mmap consumer page: %w", err) + } + + prod, err := unix.Mmap(mapFD, (int64)(os.Getpagesize()), os.Getpagesize()+2*size, unix.PROT_READ, unix.MAP_SHARED) + if err != nil { + _ = unix.Munmap(cons) + return nil, fmt.Errorf("can't mmap data pages: %w", err) + } + + cons_pos := (*uint64)(unsafe.Pointer(&cons[0])) + prod_pos := (*uint64)(unsafe.Pointer(&prod[0])) + + ring := &ringbufEventRing{ + prod: prod, + cons: cons, + ringReader: newRingReader(cons_pos, prod_pos, prod[os.Getpagesize():]), + } + runtime.SetFinalizer(ring, (*ringbufEventRing).Close) + + return ring, nil +} + +func (ring *ringbufEventRing) Close() { + runtime.SetFinalizer(ring, nil) + + _ = unix.Munmap(ring.prod) + _ = unix.Munmap(ring.cons) + + ring.prod = nil + ring.cons = nil +} + +type ringReader struct { + // These point into mmap'ed memory and must be accessed atomically. + prod_pos, cons_pos *uint64 + cons uint64 + mask uint64 + ring []byte +} + +func newRingReader(cons_ptr, prod_ptr *uint64, ring []byte) *ringReader { + return &ringReader{ + prod_pos: prod_ptr, + cons_pos: cons_ptr, + cons: atomic.LoadUint64(cons_ptr), + // cap is always a power of two + mask: uint64(cap(ring)/2 - 1), + ring: ring, + } +} + +func (rr *ringReader) loadConsumer() { + rr.cons = atomic.LoadUint64(rr.cons_pos) +} + +func (rr *ringReader) storeConsumer() { + atomic.StoreUint64(rr.cons_pos, rr.cons) +} + +// clamp delta to 'end' if 'start+delta' is beyond 'end' +func clamp(start, end, delta uint64) uint64 { + if remainder := end - start; delta > remainder { + return remainder + } + return delta +} + +func (rr *ringReader) skipRead(skipBytes uint64) { + rr.cons += clamp(rr.cons, atomic.LoadUint64(rr.prod_pos), skipBytes) +} + +func (rr *ringReader) isEmpty() bool { + cons := atomic.LoadUint64(rr.cons_pos) + prod := atomic.LoadUint64(rr.prod_pos) + + return prod == cons +} + +func (rr *ringReader) size() int { + return cap(rr.ring) +} + +func (rr *ringReader) remaining() int { + cons := atomic.LoadUint64(rr.cons_pos) + prod := atomic.LoadUint64(rr.prod_pos) + + return int((prod - cons) & rr.mask) +} + +func (rr *ringReader) Read(p []byte) (int, error) { + prod := atomic.LoadUint64(rr.prod_pos) + + n := clamp(rr.cons, prod, uint64(len(p))) + + start := rr.cons & rr.mask + + copy(p, rr.ring[start:start+n]) + rr.cons += n + + if prod == rr.cons { + return int(n), io.EOF + } + + return int(n), nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index b83b9ee7f66..2099064faba 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -223,7 +223,7 @@ github.com/cilium/ebpf/internal/sysenc github.com/cilium/ebpf/internal/tracefs github.com/cilium/ebpf/internal/unix github.com/cilium/ebpf/link -github.com/cilium/ebpf/perf +github.com/cilium/ebpf/ringbuf github.com/cilium/ebpf/rlimit # github.com/cilium/little-vm-helper v0.0.16 ## explicit; go 1.21.0