diff --git a/bpf/include/api.h b/bpf/include/api.h index 48ded18dc89..c04571bc703 100644 --- a/bpf/include/api.h +++ b/bpf/include/api.h @@ -227,7 +227,7 @@ static __u64 BPF_FUNC(get_attach_cookie, void *ctx); static long BPF_FUNC(loop, __u32 nr_loops, void *callback_fn, void *callback_ctx, __u64 flags); -static long BPF_FUNC(ringbuf_output, void *data, uint64_t size, uint64_t flags); +static long BPF_FUNC(ringbuf_output, void *ringbuf, void *data, uint64_t size, uint64_t flags); static void BPF_FUNC(ringbuf_reserve, void *ringbuf, uint64_t size, uint64_t flags); static void BPF_FUNC(ringbuf_submit, void *data, uint64_t flags); static void BPF_FUNC(ringbuf_discard, void *data, uint64_t flags); diff --git a/bpf/lib/bpf_event.h b/bpf/lib/bpf_event.h index 52ade8e9104..0775aa214fa 100644 --- a/bpf/lib/bpf_event.h +++ b/bpf/lib/bpf_event.h @@ -11,9 +11,8 @@ struct event { }; struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); - __type(key, int); - __type(value, struct event); + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 8LU * 1024LU * 1024LU); // 8MB } tcpmon_map SEC(".maps"); #endif // __BPF_EVENT_H diff --git a/bpf/lib/process.h b/bpf/lib/process.h index bdadea67115..0a86b31edb0 100644 --- a/bpf/lib/process.h +++ b/bpf/lib/process.h @@ -582,7 +582,7 @@ static inline __attribute__((always_inline)) void perf_event_output_metric(void __u32 zero = 0; long err; - err = perf_event_output(ctx, map, flags, data, size); + err = ringbuf_output(map, data, size, 0); if (err < 0) { valp = map_lookup_elem(&tg_stats_map, &zero); if (valp) diff --git a/bpf/test/bpf_lseek.c b/bpf/test/bpf_lseek.c index dc71e48b107..a8b28dce183 100644 --- a/bpf/test/bpf_lseek.c +++ b/bpf/test/bpf_lseek.c @@ -49,8 +49,7 @@ test_lseek(struct sys_enter_lseek_args *ctx) msg.common.ktime = ktime_get_ns(); msg.common.size = size; msg.arg0 = get_smp_processor_id(); - perf_event_output(ctx, &tcpmon_map, BPF_F_CURRENT_CPU, &msg, - size); + ringbuf_output(&tcpmon_map, &msg, size, 0); } return 0; diff --git a/pkg/bpf/perf_linux.go b/pkg/bpf/perf_linux.go index 23a530336e9..f2035e6131c 100644 --- a/pkg/bpf/perf_linux.go +++ b/pkg/bpf/perf_linux.go @@ -8,10 +8,7 @@ import ( "io" "os" "path/filepath" - "runtime" "strings" - - "golang.org/x/sys/unix" ) const ( @@ -19,13 +16,7 @@ const ( ) type PerfEventConfig struct { - NumCpus int - NumPages int - MapName string - Type int - Config int - SampleType int - WakeupEvents int + MapName string } // GetNumPossibleCPUs returns a total number of possible CPUS, i.e. CPUs that @@ -74,30 +65,7 @@ func getNumPossibleCPUsFromReader(r io.Reader) int { // DefaultPerfEventConfig returns the default perf event configuration. It // relies on the map root to be set. func DefaultPerfEventConfig() *PerfEventConfig { - numCpus := GetNumPossibleCPUs() - if numCpus == 0 { - numCpus = runtime.NumCPU() - } return &PerfEventConfig{ - MapName: filepath.Join(MapPrefixPath(), eventsMapName), - Type: PERF_TYPE_SOFTWARE, - Config: PERF_COUNT_SW_BPF_OUTPUT, - SampleType: PERF_SAMPLE_RAW, - WakeupEvents: 1, - NumCpus: numCpus, - NumPages: 128, - } -} - -func UpdateElementFromPointers(fd int, structPtr, sizeOfStruct uintptr) error { - ret, _, err := unix.Syscall( - unix.SYS_BPF, - BPF_MAP_UPDATE_ELEM, - structPtr, - sizeOfStruct, - ) - if ret != 0 || err != 0 { - return fmt.Errorf("Unable to update element for map with file descriptor %d: %s", fd, err) + MapName: filepath.Join(MapPrefixPath(), eventsMapName), } - return nil } diff --git a/pkg/observer/observer.go b/pkg/observer/observer.go index b05dd2165f7..bfed93f8c72 100644 --- a/pkg/observer/observer.go +++ b/pkg/observer/observer.go @@ -16,7 +16,7 @@ import ( "time" "github.com/cilium/ebpf" - "github.com/cilium/ebpf/perf" + "github.com/cilium/ebpf/ringbuf" "github.com/cilium/tetragon/pkg/api/ops" "github.com/cilium/tetragon/pkg/api/readyapi" "github.com/cilium/tetragon/pkg/bpf" @@ -211,11 +211,9 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error { } defer perfMap.Close() - rbSize := k.getRBSize(int(perfMap.MaxEntries())) - perfReader, err := perf.NewReader(perfMap, rbSize) - + rd, err := ringbuf.NewReader(perfMap) if err != nil { - return fmt.Errorf("creating perf array reader failed: %w", err) + return fmt.Errorf("opening ringbuf reader: %w", err) } // Inform caller that we're about to start processing events. @@ -224,7 +222,7 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error { // We spawn go routine to read and process perf events, // connected with main app through eventsQueue channel. - eventsQueue := make(chan *perf.Record, k.getRBQueueSize()) + eventsQueue := make(chan *ringbuf.Record, k.getRBQueueSize()) // Listeners are ready and about to start reading from perf reader, tell // user everything is ready. @@ -237,7 +235,7 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error { go func() { defer wg.Done() for stopCtx.Err() == nil { - record, err := perfReader.Read() + record, err := rd.Read() if err != nil { // NOTE(JM and Djalal): count and log errors while excluding the stopping context if stopCtx.Err() == nil { @@ -256,11 +254,6 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error { k.recvCntr++ ringbufmetrics.PerfEventReceived.Inc() } - - if record.LostSamples > 0 { - atomic.AddUint64(&k.lostCntr, uint64(record.LostSamples)) - ringbufmetrics.PerfEventLost.Add(float64(record.LostSamples)) - } } } }() @@ -290,7 +283,7 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error { // Wait for context to be cancelled and then stop. <-stopCtx.Done() - return perfReader.Close() + return rd.Close() } // Observer represents the link between the BPF perf ring and the listeners. It diff --git a/vendor/github.com/cilium/ebpf/ringbuf/doc.go b/vendor/github.com/cilium/ebpf/ringbuf/doc.go new file mode 100644 index 00000000000..9e450121878 --- /dev/null +++ b/vendor/github.com/cilium/ebpf/ringbuf/doc.go @@ -0,0 +1,6 @@ +// Package ringbuf allows interacting with Linux BPF ring buffer. +// +// BPF allows submitting custom events to a BPF ring buffer map set up +// by userspace. This is very useful to push things like packet samples +// from BPF to a daemon running in user space. +package ringbuf diff --git a/vendor/github.com/cilium/ebpf/ringbuf/reader.go b/vendor/github.com/cilium/ebpf/ringbuf/reader.go new file mode 100644 index 00000000000..c6adaf2f969 --- /dev/null +++ b/vendor/github.com/cilium/ebpf/ringbuf/reader.go @@ -0,0 +1,244 @@ +package ringbuf + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "sync" + "time" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/internal" + "github.com/cilium/ebpf/internal/epoll" + "github.com/cilium/ebpf/internal/unix" +) + +var ( + ErrClosed = os.ErrClosed + errEOR = errors.New("end of ring") + errDiscard = errors.New("sample discarded") + errBusy = errors.New("sample not committed yet") +) + +var ringbufHeaderSize = binary.Size(ringbufHeader{}) + +// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c +type ringbufHeader struct { + Len uint32 + PgOff uint32 +} + +func (rh *ringbufHeader) isBusy() bool { + return rh.Len&unix.BPF_RINGBUF_BUSY_BIT != 0 +} + +func (rh *ringbufHeader) isDiscard() bool { + return rh.Len&unix.BPF_RINGBUF_DISCARD_BIT != 0 +} + +func (rh *ringbufHeader) dataLen() int { + return int(rh.Len & ^uint32(unix.BPF_RINGBUF_BUSY_BIT|unix.BPF_RINGBUF_DISCARD_BIT)) +} + +type Record struct { + RawSample []byte + + // The minimum number of bytes remaining in the ring buffer after this Record has been read. + Remaining int +} + +// Read a record from an event ring. +// +// buf must be at least ringbufHeaderSize bytes long. +func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error { + rd.loadConsumer() + + buf = buf[:ringbufHeaderSize] + if _, err := io.ReadFull(rd, buf); err == io.EOF { + return errEOR + } else if err != nil { + return fmt.Errorf("read event header: %w", err) + } + + header := ringbufHeader{ + internal.NativeEndian.Uint32(buf[0:4]), + internal.NativeEndian.Uint32(buf[4:8]), + } + + if header.isBusy() { + // the next sample in the ring is not committed yet so we + // exit without storing the reader/consumer position + // and start again from the same position. + return errBusy + } + + /* read up to 8 byte alignment */ + dataLenAligned := uint64(internal.Align(header.dataLen(), 8)) + + if header.isDiscard() { + // when the record header indicates that the data should be + // discarded, we skip it by just updating the consumer position + // to the next record instead of normal Read() to avoid allocating data + // and reading/copying from the ring (which normally keeps track of the + // consumer position). + rd.skipRead(dataLenAligned) + rd.storeConsumer() + + return errDiscard + } + + if cap(rec.RawSample) < int(dataLenAligned) { + rec.RawSample = make([]byte, dataLenAligned) + } else { + rec.RawSample = rec.RawSample[:dataLenAligned] + } + + if _, err := io.ReadFull(rd, rec.RawSample); err != nil { + return fmt.Errorf("read sample: %w", err) + } + + rd.storeConsumer() + rec.RawSample = rec.RawSample[:header.dataLen()] + rec.Remaining = rd.remaining() + return nil +} + +// Reader allows reading bpf_ringbuf_output +// from user space. +type Reader struct { + poller *epoll.Poller + + // mu protects read/write access to the Reader structure + mu sync.Mutex + ring *ringbufEventRing + epollEvents []unix.EpollEvent + header []byte + haveData bool + deadline time.Time + bufferSize int +} + +// NewReader creates a new BPF ringbuf reader. +func NewReader(ringbufMap *ebpf.Map) (*Reader, error) { + if ringbufMap.Type() != ebpf.RingBuf { + return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type()) + } + + maxEntries := int(ringbufMap.MaxEntries()) + if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 { + return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries) + } + + poller, err := epoll.New() + if err != nil { + return nil, err + } + + if err := poller.Add(ringbufMap.FD(), 0); err != nil { + poller.Close() + return nil, err + } + + ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries) + if err != nil { + poller.Close() + return nil, fmt.Errorf("failed to create ringbuf ring: %w", err) + } + + return &Reader{ + poller: poller, + ring: ring, + epollEvents: make([]unix.EpollEvent, 1), + header: make([]byte, ringbufHeaderSize), + bufferSize: ring.size(), + }, nil +} + +// Close frees resources used by the reader. +// +// It interrupts calls to Read. +func (r *Reader) Close() error { + if err := r.poller.Close(); err != nil { + if errors.Is(err, os.ErrClosed) { + return nil + } + return err + } + + // Acquire the lock. This ensures that Read isn't running. + r.mu.Lock() + defer r.mu.Unlock() + + if r.ring != nil { + r.ring.Close() + r.ring = nil + } + + return nil +} + +// SetDeadline controls how long Read and ReadInto will block waiting for samples. +// +// Passing a zero time.Time will remove the deadline. +func (r *Reader) SetDeadline(t time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + + r.deadline = t +} + +// Read the next record from the BPF ringbuf. +// +// Returns os.ErrClosed if Close is called on the Reader, or os.ErrDeadlineExceeded +// if a deadline was set and no valid entry was present. A producer might use BPF_RB_NO_WAKEUP +// which may cause the deadline to expire but a valid entry will be present. +func (r *Reader) Read() (Record, error) { + var rec Record + return rec, r.ReadInto(&rec) +} + +// ReadInto is like Read except that it allows reusing Record and associated buffers. +func (r *Reader) ReadInto(rec *Record) error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.ring == nil { + return fmt.Errorf("ringbuffer: %w", ErrClosed) + } + + for { + if !r.haveData { + _, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], r.deadline) + if errors.Is(err, os.ErrDeadlineExceeded) && !r.ring.isEmpty() { + // Ignoring this for reading a valid entry after timeout + // This can occur if the producer submitted to the ring buffer with BPF_RB_NO_WAKEUP + err = nil + } + if err != nil { + return err + } + r.haveData = true + } + + for { + err := readRecord(r.ring, rec, r.header) + // Not using errors.Is which is quite a bit slower + // For a tight loop it might make a difference + if err == errBusy || err == errDiscard { + continue + } + if err == errEOR { + r.haveData = false + break + } + return err + } + } +} + +// BufferSize returns the size in bytes of the ring buffer +func (r *Reader) BufferSize() int { + return r.bufferSize +} diff --git a/vendor/github.com/cilium/ebpf/ringbuf/ring.go b/vendor/github.com/cilium/ebpf/ringbuf/ring.go new file mode 100644 index 00000000000..6dd04a93eca --- /dev/null +++ b/vendor/github.com/cilium/ebpf/ringbuf/ring.go @@ -0,0 +1,127 @@ +package ringbuf + +import ( + "fmt" + "io" + "os" + "runtime" + "sync/atomic" + "unsafe" + + "github.com/cilium/ebpf/internal/unix" +) + +type ringbufEventRing struct { + prod []byte + cons []byte + *ringReader +} + +func newRingBufEventRing(mapFD, size int) (*ringbufEventRing, error) { + cons, err := unix.Mmap(mapFD, 0, os.Getpagesize(), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED) + if err != nil { + return nil, fmt.Errorf("can't mmap consumer page: %w", err) + } + + prod, err := unix.Mmap(mapFD, (int64)(os.Getpagesize()), os.Getpagesize()+2*size, unix.PROT_READ, unix.MAP_SHARED) + if err != nil { + _ = unix.Munmap(cons) + return nil, fmt.Errorf("can't mmap data pages: %w", err) + } + + cons_pos := (*uint64)(unsafe.Pointer(&cons[0])) + prod_pos := (*uint64)(unsafe.Pointer(&prod[0])) + + ring := &ringbufEventRing{ + prod: prod, + cons: cons, + ringReader: newRingReader(cons_pos, prod_pos, prod[os.Getpagesize():]), + } + runtime.SetFinalizer(ring, (*ringbufEventRing).Close) + + return ring, nil +} + +func (ring *ringbufEventRing) Close() { + runtime.SetFinalizer(ring, nil) + + _ = unix.Munmap(ring.prod) + _ = unix.Munmap(ring.cons) + + ring.prod = nil + ring.cons = nil +} + +type ringReader struct { + // These point into mmap'ed memory and must be accessed atomically. + prod_pos, cons_pos *uint64 + cons uint64 + mask uint64 + ring []byte +} + +func newRingReader(cons_ptr, prod_ptr *uint64, ring []byte) *ringReader { + return &ringReader{ + prod_pos: prod_ptr, + cons_pos: cons_ptr, + cons: atomic.LoadUint64(cons_ptr), + // cap is always a power of two + mask: uint64(cap(ring)/2 - 1), + ring: ring, + } +} + +func (rr *ringReader) loadConsumer() { + rr.cons = atomic.LoadUint64(rr.cons_pos) +} + +func (rr *ringReader) storeConsumer() { + atomic.StoreUint64(rr.cons_pos, rr.cons) +} + +// clamp delta to 'end' if 'start+delta' is beyond 'end' +func clamp(start, end, delta uint64) uint64 { + if remainder := end - start; delta > remainder { + return remainder + } + return delta +} + +func (rr *ringReader) skipRead(skipBytes uint64) { + rr.cons += clamp(rr.cons, atomic.LoadUint64(rr.prod_pos), skipBytes) +} + +func (rr *ringReader) isEmpty() bool { + cons := atomic.LoadUint64(rr.cons_pos) + prod := atomic.LoadUint64(rr.prod_pos) + + return prod == cons +} + +func (rr *ringReader) size() int { + return cap(rr.ring) +} + +func (rr *ringReader) remaining() int { + cons := atomic.LoadUint64(rr.cons_pos) + prod := atomic.LoadUint64(rr.prod_pos) + + return int((prod - cons) & rr.mask) +} + +func (rr *ringReader) Read(p []byte) (int, error) { + prod := atomic.LoadUint64(rr.prod_pos) + + n := clamp(rr.cons, prod, uint64(len(p))) + + start := rr.cons & rr.mask + + copy(p, rr.ring[start:start+n]) + rr.cons += n + + if prod == rr.cons { + return int(n), io.EOF + } + + return int(n), nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index b83b9ee7f66..386f5d803aa 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -224,6 +224,7 @@ github.com/cilium/ebpf/internal/tracefs github.com/cilium/ebpf/internal/unix github.com/cilium/ebpf/link github.com/cilium/ebpf/perf +github.com/cilium/ebpf/ringbuf github.com/cilium/ebpf/rlimit # github.com/cilium/little-vm-helper v0.0.16 ## explicit; go 1.21.0