Skip to content

Commit

Permalink
Merge pull request #43 from coroot/performance_optimizations
Browse files Browse the repository at this point in the history
Performance optimizations
  • Loading branch information
def authored Oct 30, 2023
2 parents bd5a59a + 61d76ce commit 420f426
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 74 deletions.
35 changes: 21 additions & 14 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type AddrPair struct {
}

type ActiveConnection struct {
Dest netaddr.IPPort
ActualDest netaddr.IPPort
Pid uint32
Fd uint64
Expand Down Expand Up @@ -98,6 +99,11 @@ func (p *Process) isHostNs() bool {
return p.NetNsId == hostNetNsId
}

type PidFd struct {
Pid uint32
Fd uint64
}

type Container struct {
id ContainerID
cgroup *cgroup.Cgroup
Expand All @@ -119,6 +125,7 @@ type Container struct {
connectsFailed map[netaddr.IPPort]int64 // dst -> count
connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
connectionsActive map[AddrPair]*ActiveConnection
connectionsByPidFd map[PidFd]*ActiveConnection
retransmits map[AddrPair]int64 // dst:actual_dst -> count

l7Stats L7Stats
Expand Down Expand Up @@ -159,6 +166,7 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
connectsFailed: map[netaddr.IPPort]int64{},
connectLastAttempt: map[netaddr.IPPort]time.Time{},
connectionsActive: map[AddrPair]*ActiveConnection{},
connectionsByPidFd: map[PidFd]*ActiveConnection{},
retransmits: map[AddrPair]int64{},
l7Stats: L7Stats{},

Expand Down Expand Up @@ -300,11 +308,11 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
}

connections := map[AddrPair]int{}
for c, conn := range c.connectionsActive {
for addrPair, conn := range c.connectionsActive {
if !conn.Closed.IsZero() {
continue
}
connections[AddrPair{src: c.dst, dst: conn.ActualDest}]++
connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
}
for d, count := range connections {
ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
Expand Down Expand Up @@ -498,12 +506,15 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
c.connectsFailed[dst]++
} else {
c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
c.connectionsActive[AddrPair{src: src, dst: dst}] = &ActiveConnection{
connection := &ActiveConnection{
Dest: dst,
ActualDest: *actualDst,
Pid: pid,
Fd: fd,
Timestamp: timestamp,
}
c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
}
c.connectLastAttempt[dst] = time.Now()
}
Expand Down Expand Up @@ -553,20 +564,14 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
c.lock.Lock()
defer c.lock.Unlock()

var dest AddrPair
var conn *ActiveConnection
var found bool
for dest, conn = range c.connectionsActive {
if conn.Pid == pid && conn.Fd == fd && (timestamp == 0 || conn.Timestamp == timestamp) {
found = true
break
}
conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
if conn == nil {
return
}
if !found {
if timestamp != 0 && conn.Timestamp != timestamp {
return
}

stats := c.l7Stats.get(r.Protocol, dest.dst, conn.ActualDest)
stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
trace := tracing.NewTrace(string(c.id), conn.ActualDest)
switch r.Protocol {
case l7.ProtocolHTTP:
Expand Down Expand Up @@ -889,10 +894,12 @@ func (c *Container) gc(now time.Time) {
for srcDst, conn := range c.connectionsActive {
if _, ok := established[srcDst]; !ok {
delete(c.connectionsActive, srcDst)
delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
continue
}
if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
delete(c.connectionsActive, srcDst)
delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
}
}
for dst, at := range c.connectLastAttempt {
Expand Down
134 changes: 77 additions & 57 deletions ebpftracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ type Event struct {
L7Request *l7.RequestData
}

type perfMapType uint8

const (
perfMapTypeProcEvents perfMapType = 1
perfMapTypeTCPEvents perfMapType = 2
perfMapTypeFileEvents perfMapType = 3
perfMapTypeL7Events perfMapType = 4
)

type Tracer struct {
kernelVersion string
disableL7Tracing bool
Expand Down Expand Up @@ -142,7 +151,7 @@ func (t *Tracer) init(ch chan<- Event) error {
type perfMap struct {
name string
perCPUBufferSizePages int
event rawEvent
typ perfMapType
}

func (t *Tracer) ebpf(ch chan<- Event) error {
Expand Down Expand Up @@ -183,15 +192,15 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
t.collection = c

perfMaps := []perfMap{
{name: "proc_events", event: &procEvent{}, perCPUBufferSizePages: 4},
{name: "tcp_listen_events", event: &tcpEvent{}, perCPUBufferSizePages: 4},
{name: "tcp_connect_events", event: &tcpEvent{}, perCPUBufferSizePages: 8},
{name: "tcp_retransmit_events", event: &tcpEvent{}, perCPUBufferSizePages: 4},
{name: "file_events", event: &fileEvent{}, perCPUBufferSizePages: 4},
{name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
{name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
{name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
{name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
{name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
}

if !t.disableL7Tracing {
perfMaps = append(perfMaps, perfMap{name: "l7_events", event: &l7Event{}, perCPUBufferSizePages: 32})
perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
}

for _, pm := range perfMaps {
Expand All @@ -201,7 +210,7 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
return fmt.Errorf("failed to create ebpf reader: %w", err)
}
t.readers[pm.name] = r
go runEventsReader(pm.name, r, ch, pm.event)
go runEventsReader(pm.name, r, ch, pm.typ)
}

for _, programSpec := range collectionSpec.Programs {
Expand Down Expand Up @@ -274,52 +283,29 @@ func (t EventReason) String() string {
return "unknown: " + strconv.Itoa(int(t))
}

type rawEvent interface {
Event() Event
}

type procEvent struct {
Type uint32
Type EventType
Pid uint32
Reason uint32
}

func (e procEvent) Event() Event {
return Event{Type: EventType(e.Type), Reason: EventReason(e.Reason), Pid: e.Pid}
}

type tcpEvent struct {
Fd uint64
Timestamp uint64
Type uint32
Type EventType
Pid uint32
SPort uint16
DPort uint16
SAddr [16]byte
DAddr [16]byte
}

func (e tcpEvent) Event() Event {
return Event{
Type: EventType(e.Type),
Pid: e.Pid,
SrcAddr: ipPort(e.SAddr, e.SPort),
DstAddr: ipPort(e.DAddr, e.DPort),
Fd: e.Fd,
Timestamp: e.Timestamp,
}
}

type fileEvent struct {
Type uint32
Type EventType
Pid uint32
Fd uint64
}

func (e fileEvent) Event() Event {
return Event{Type: EventType(e.Type), Pid: e.Pid, Fd: e.Fd}
}

type l7Event struct {
Fd uint64
ConnectionTimestamp uint64
Expand All @@ -331,28 +317,9 @@ type l7Event struct {
Padding uint16
StatementId uint32
PayloadSize uint64
Payload [MaxPayloadSize]byte
}

func (e l7Event) Event() Event {
r := &l7.RequestData{
Protocol: l7.Protocol(e.Protocol),
Status: l7.Status(e.Status),
Duration: time.Duration(e.Duration),
Method: l7.Method(e.Method),
StatementId: e.StatementId,
}
switch {
case e.PayloadSize == 0:
case e.PayloadSize > MaxPayloadSize:
r.Payload = e.Payload[:MaxPayloadSize]
default:
r.Payload = e.Payload[:e.PayloadSize]
}
return Event{Type: EventTypeL7Request, Pid: e.Pid, Fd: e.Fd, Timestamp: e.ConnectionTimestamp, L7Request: r}
}

func runEventsReader(name string, r *perf.Reader, ch chan<- Event, e rawEvent) {
func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
for {
rec, err := r.Read()
if err != nil {
Expand All @@ -365,11 +332,64 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, e rawEvent) {
klog.Errorln(name, "lost samples:", rec.LostSamples)
continue
}
if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, e); err != nil {
klog.Warningln("failed to read msg:", err)
var event Event

switch typ {
case perfMapTypeL7Events:
v := &l7Event{}
if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
klog.Warningln("failed to read msg:", err)
continue
}
payload := rec.RawSample[len(rec.RawSample)-MaxPayloadSize:]
req := &l7.RequestData{
Protocol: l7.Protocol(v.Protocol),
Status: l7.Status(v.Status),
Duration: time.Duration(v.Duration),
Method: l7.Method(v.Method),
StatementId: v.StatementId,
}
switch {
case v.PayloadSize == 0:
case v.PayloadSize > MaxPayloadSize:
req.Payload = payload[:MaxPayloadSize]
default:
req.Payload = payload[:v.PayloadSize]
}
event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
case perfMapTypeFileEvents:
v := &fileEvent{}
if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
klog.Warningln("failed to read msg:", err)
continue
}
event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
case perfMapTypeProcEvents:
v := &procEvent{}
if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
klog.Warningln("failed to read msg:", err)
continue
}
event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
case perfMapTypeTCPEvents:
v := &tcpEvent{}
if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
klog.Warningln("failed to read msg:", err)
continue
}
event = Event{
Type: v.Type,
Pid: v.Pid,
SrcAddr: ipPort(v.SAddr, v.SPort),
DstAddr: ipPort(v.DAddr, v.DPort),
Fd: v.Fd,
Timestamp: v.Timestamp,
}
default:
continue
}
ch <- e.Event()

ch <- event
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/containerd/cgroups v1.0.3
github.com/containerd/containerd v1.5.17
github.com/coreos/go-systemd/v22 v22.5.0
github.com/coroot/logparser v1.0.12
github.com/coroot/logparser v1.0.13
github.com/docker/docker v20.10.21+incompatible
github.com/florianl/go-conntrack v0.3.0
github.com/mdlayher/taskstats v0.0.0-20230712191918-387b3d561d14
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coroot/logparser v1.0.12 h1:S8g6+FUQVMR6TVNC9ts1wEm1EHqTsQRsmij2yWwBH+A=
github.com/coroot/logparser v1.0.12/go.mod h1:GHsVO1xE8pR5mmu9Eiop9IXHwN/QzNRx1s0fuzVxq7I=
github.com/coroot/logparser v1.0.13 h1:BAiyRplf5iLe3ZtQeV4dZbDKNMykyG/fwQ8JXvSKbiU=
github.com/coroot/logparser v1.0.13/go.mod h1:GHsVO1xE8pR5mmu9Eiop9IXHwN/QzNRx1s0fuzVxq7I=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
Expand Down

0 comments on commit 420f426

Please sign in to comment.