Skip to content

Commit

Permalink
Merge pull request #116 from coroot/traffic_metrics
Browse files Browse the repository at this point in the history
add `container_net_tcp_bytes_sent_total` and `container_net_tcp_bytes_received_total` metrics
  • Loading branch information
def authored Jul 29, 2024
2 parents c40380d + 9f6909a commit 83c3c1c
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 157 deletions.
80 changes: 69 additions & 11 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type ActiveConnection struct {
Timestamp uint64
Closed time.Time

BytesSent uint64
BytesReceived uint64

http2Parser *l7.Http2Parser
postgresParser *l7.PostgresParser
mysqlParser *l7.MysqlParser
Expand All @@ -93,9 +96,11 @@ type PidFd struct {
}

type ConnectionStats struct {
Count int64
Count uint64
TotalTime time.Duration
Retransmissions int64
Retransmissions uint64
BytesSent uint64
BytesReceived uint64
}

type Container struct {
Expand Down Expand Up @@ -136,12 +141,14 @@ type Container struct {
nsConntrack *Conntrack
lbConntracks []*Conntrack

registry *Registry

lock sync.RWMutex

done chan struct{}
}

func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
netNs, err := proc.GetNetNs(pid)
if err != nil {
return nil, err
Expand Down Expand Up @@ -173,6 +180,8 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host

hostConntrack: hostConntrack,

registry: registry,

done: make(chan struct{}),
}

Expand Down Expand Up @@ -228,6 +237,8 @@ func (c *Container) Describe(ch chan<- *prometheus.Desc) {
}

func (c *Container) Collect(ch chan<- prometheus.Metric) {
c.registry.updateTrafficStatsIfNecessary()

c.lock.RLock()
defer c.lock.RUnlock()

Expand Down Expand Up @@ -300,6 +311,8 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
if stats.Retransmissions > 0 {
ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), d.src.String(), d.dst.String())
}
ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
}
for dst, count := range c.connectsFailed {
ch <- counter(metrics.NetConnectionsFailed, float64(count), dst.String())
Expand Down Expand Up @@ -372,15 +385,15 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
}
}

func (c *Container) onProcessStart(pid uint32, trace *ebpftracer.Tracer) *Process {
func (c *Container) onProcessStart(pid uint32) *Process {
c.lock.Lock()
defer c.lock.Unlock()
stats, err := TaskstatsPID(pid)
if err != nil {
return nil
}
c.zombieAt = time.Time{}
p := NewProcess(pid, stats, trace)
p := NewProcess(pid, stats, c.registry.tracer)

if p == nil {
return nil
Expand Down Expand Up @@ -583,15 +596,60 @@ func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*
return nil, nil
}

func (c *Container) onConnectionClose(srcDst AddrPair) bool {
func (c *Container) onConnectionClose(e ebpftracer.Event) bool {
srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
c.lock.Lock()
conn, ok := c.connectionsActive[srcDst]
c.lock.Unlock()
if conn != nil {
if conn.Closed.IsZero() {
if e.Pid == 0 && e.Fd == 0 {
stats, err := c.registry.tracer.GetAndDeleteTCPConnection(conn.Pid, conn.Fd)
if err != nil {
klog.Warningln(c.id, conn.Pid, conn.Fd, conn.ActualDest, err)
} else {
c.lock.Lock()
c.updateConnectionTrafficStats(conn, stats.BytesSent, stats.BytesReceived)
c.lock.Unlock()
}
} else if e.TrafficStats != nil {
c.lock.Lock()
c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
c.lock.Unlock()
}
conn.Closed = time.Now()
}
}
return ok
}

func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
if u == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
conn := c.connectionsActive[srcDst]
if conn == nil {
return false
c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived)
}

func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received uint64) {
if ac == nil {
return
}
conn.Closed = time.Now()
return true
key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
stats := c.connectsSuccessful[key]
if stats == nil {
stats = &ConnectionStats{}
c.connectsSuccessful[key] = stats
}
if sent > ac.BytesSent {
stats.BytesSent += sent - ac.BytesSent
}
if received > ac.BytesReceived {
stats.BytesReceived += received - ac.BytesReceived
}
ac.BytesSent = sent
ac.BytesReceived = received
}

func (c *Container) onDNSRequest(r *l7.RequestData) map[netaddr.IP]string {
Expand Down
4 changes: 4 additions & 0 deletions containers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var metrics = struct {
NetConnectionsActive *prometheus.Desc
NetRetransmits *prometheus.Desc
NetLatency *prometheus.Desc
NetBytesSent *prometheus.Desc
NetBytesReceived *prometheus.Desc

LogMessages *prometheus.Desc

Expand Down Expand Up @@ -81,6 +83,8 @@ var metrics = struct {
NetConnectionsActive: metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination"),
NetRetransmits: metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination"),
NetLatency: metric("container_net_latency_seconds", "Round-trip time between the container and a remote IP", "destination_ip"),
NetBytesSent: metric("container_net_tcp_bytes_sent_total", "Total number of bytes sent to the peer", "destination", "actual_destination"),
NetBytesReceived: metric("container_net_tcp_bytes_received_total", "Total number of bytes received from the peer", "destination", "actual_destination"),

LogMessages: metric("container_log_messages_total", "Number of messages grouped by the automatically extracted repeated pattern", "source", "level", "pattern_hash", "sample"),

Expand Down
64 changes: 58 additions & 6 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"k8s.io/klog/v2"
)

const MinTrafficStatsUpdateInterval = 5 * time.Second

var (
selfNetNs = netns.None()
hostNetNsId = netns.None().UniqueId()
Expand Down Expand Up @@ -48,6 +50,10 @@ type Registry struct {
ip2fqdnLock sync.Mutex

processInfoCh chan<- ProcessInfo

trafficStatsLastUpdated time.Time
trafficStatsLock sync.Mutex
trafficStatsUpdateCh chan *TrafficStatsUpdate
}

func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh chan<- ProcessInfo) (*Registry, error) {
Expand Down Expand Up @@ -106,6 +112,8 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh
processInfoCh: processInfoCh,

tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing),

trafficStatsUpdateCh: make(chan *TrafficStatsUpdate),
}
if err = reg.Register(r); err != nil {
return nil, err
Expand Down Expand Up @@ -188,6 +196,13 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
}
}
r.ip2fqdnLock.Unlock()
case u := <-r.trafficStatsUpdateCh:
if u == nil {
continue
}
if c := r.containersByPid[u.Pid]; c != nil {
c.updateTrafficStats(u)
}
case e, more := <-ch:
if !more {
return
Expand All @@ -206,7 +221,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
}
}
if c := r.getOrCreateContainer(e.Pid); c != nil {
p := c.onProcessStart(e.Pid, r.tracer)
p := c.onProcessStart(e.Pid)
if r.processInfoCh != nil && p != nil {
r.processInfoCh <- ProcessInfo{Pid: p.Pid, ContainerId: c.id, StartedAt: p.StartedAt}
}
Expand Down Expand Up @@ -247,10 +262,15 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
klog.Infoln("TCP connection error from unknown container", e)
}
case ebpftracer.EventTypeConnectionClose:
srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
for _, c := range r.containersById {
if c.onConnectionClose(srcDst) {
break
if e.Pid != 0 && e.Fd != 0 {
if c := r.containersByPid[e.Pid]; c != nil {
c.onConnectionClose(e)
}
} else {
for _, c := range r.containersById {
if c.onConnectionClose(e) {
break
}
}
}
case ebpftracer.EventTypeTCPRetransmit:
Expand Down Expand Up @@ -340,7 +360,7 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
r.containersByCgroupId[cg.Id] = c
return c
}
c, err := NewContainer(id, cg, md, r.hostConntrack, pid)
c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r)
if err != nil {
klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
return nil
Expand All @@ -357,6 +377,31 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
return c
}

func (r *Registry) updateTrafficStatsIfNecessary() {
r.trafficStatsLock.Lock()
defer r.trafficStatsLock.Unlock()

if time.Now().Sub(r.trafficStatsLastUpdated) < MinTrafficStatsUpdateInterval {
return
}
iter := r.tracer.ActiveConnectionsIterator()
cid := ebpftracer.ConnectionId{}
stats := ebpftracer.Connection{}
for iter.Next(&cid, &stats) {
r.trafficStatsUpdateCh <- &TrafficStatsUpdate{
Pid: cid.PID,
FD: cid.FD,
BytesSent: stats.BytesSent,
BytesReceived: stats.BytesReceived,
}
}
if err := iter.Err(); err != nil {
klog.Warningln(err)
}
r.trafficStatsUpdateCh <- nil
r.trafficStatsLastUpdated = time.Now()
}

func calcId(cg *cgroup.Cgroup, md *ContainerMetadata) ContainerID {
if cg.ContainerType == cgroup.ContainerTypeSystemdService {
if strings.HasPrefix(cg.ContainerId, "/system.slice/crio-conmon-") {
Expand Down Expand Up @@ -446,3 +491,10 @@ func getContainerMetadata(cg *cgroup.Cgroup) (*ContainerMetadata, error) {
}
return nil, fmt.Errorf("failed to interact with dockerd (%s) or with containerd (%s)", dockerdErr, containerdErr)
}

type TrafficStatsUpdate struct {
Pid uint32
FD uint64
BytesSent uint64
BytesReceived uint64
}
10 changes: 5 additions & 5 deletions containers/systemd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ import (
)

var (
conn *dbus.Conn
dbusConn *dbus.Conn
dbusTimeout = time.Second
)

func init() {
var err error
conn, err = dbus.NewConnection(func() (*gdbus.Conn, error) {
dbusConn, err = dbus.NewConnection(func() (*gdbus.Conn, error) {
c, err := gdbus.Dial("unix:path=" + proc.HostPath("/run/systemd/private"))
if err != nil {
return nil, err
}
methods := []gdbus.Auth{gdbus.AuthExternal(strconv.Itoa(os.Getuid()))}
if err = c.Auth(methods); err != nil {
conn.Close()
dbusConn.Close()
return nil, err
}
return c, nil
Expand All @@ -40,14 +40,14 @@ func init() {
}

func SystemdTriggeredBy(id string) string {
if conn == nil {
if dbusConn == nil {
return ""
}
ctx, cancel := context.WithTimeout(context.Background(), dbusTimeout)
defer cancel()
parts := strings.Split(id, "/")
unit := parts[len(parts)-1]
if prop, _ := conn.GetUnitPropertyContext(ctx, unit, "TriggeredBy"); prop != nil {
if prop, _ := dbusConn.GetUnitPropertyContext(ctx, unit, "TriggeredBy"); prop != nil {
if values, _ := prop.Value.Value().([]string); len(values) > 0 {
return values[0]
}
Expand Down
16 changes: 8 additions & 8 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion ebpftracer/ebpf/l7/gotls.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ int go_crypto_tls_read_enter(struct pt_regs *ctx) {
__u64 goroutine_id = GOROUTINE(ctx);
__u64 pid = pid_tgid >> 32;
__u64 id = pid << 32 | goroutine_id | IS_TLS_READ_ID;
return trace_enter_read(id, fd, buf_ptr, 0, 0);
return trace_enter_read(id, pid, fd, buf_ptr, 0, 0);
}

SEC("uprobe/go_crypto_tls_read_exit")
Expand Down
Loading

0 comments on commit 83c3c1c

Please sign in to comment.