Skip to content

Commit

Permalink
Merge pull request #36 from coroot/ebpf_l7_http2
Browse files Browse the repository at this point in the history
HTTP/2 protocol support
  • Loading branch information
apetruhin authored Sep 8, 2023
2 parents f118458 + 739dc94 commit 172da08
Show file tree
Hide file tree
Showing 37 changed files with 1,216 additions and 760 deletions.
158 changes: 75 additions & 83 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/coroot/coroot-node-agent/cgroup"
"github.com/coroot/coroot-node-agent/common"
"github.com/coroot/coroot-node-agent/ebpftracer"
"github.com/coroot/coroot-node-agent/ebpftracer/l7"
"github.com/coroot/coroot-node-agent/flags"
"github.com/coroot/coroot-node-agent/logs"
"github.com/coroot/coroot-node-agent/node"
Expand Down Expand Up @@ -73,12 +74,9 @@ type ActiveConnection struct {
Timestamp uint64
Closed time.Time

PreparedStatements map[string]string
}

type L7Stats struct {
Requests *prometheus.CounterVec
Latency prometheus.Histogram
http2Parser *l7.Http2Parser
postgresParser *l7.PostgresParser
mysqlParser *l7.MysqlParser
}

type ListenDetails struct {
Expand Down Expand Up @@ -123,7 +121,7 @@ type Container struct {
connectionsActive map[AddrPair]*ActiveConnection
retransmits map[AddrPair]int64 // dst:actual_dst -> count

l7Stats map[ebpftracer.L7Protocol]map[AddrPair]*L7Stats // protocol -> dst:actual_dst -> stats
l7Stats L7Stats

oomKills int

Expand Down Expand Up @@ -162,7 +160,7 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
connectLastAttempt: map[netaddr.IPPort]time.Time{},
connectionsActive: map[AddrPair]*ActiveConnection{},
retransmits: map[AddrPair]int64{},
l7Stats: map[ebpftracer.L7Protocol]map[AddrPair]*L7Stats{},
l7Stats: L7Stats{},

mounts: map[string]proc.MountInfo{},

Expand Down Expand Up @@ -343,16 +341,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
ch <- gauge(metrics.ApplicationType, 1, appType)
}

for _, protoStats := range c.l7Stats {
for _, s := range protoStats {
if s.Requests != nil {
s.Requests.Collect(ch)
}
if s.Latency != nil {
s.Latency.Collect(ch)
}
}
}
c.l7Stats.collect(ch)

if !*flags.DisablePinger {
for ip, rtt := range c.ping() {
Expand Down Expand Up @@ -510,11 +499,10 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
} else {
c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
c.connectionsActive[AddrPair{src: src, dst: dst}] = &ActiveConnection{
ActualDest: *actualDst,
Pid: pid,
Fd: fd,
Timestamp: timestamp,
PreparedStatements: map[string]string{},
ActualDest: *actualDst,
Pid: pid,
Fd: fd,
Timestamp: timestamp,
}
}
c.connectLastAttempt[dst] = time.Now()
Expand Down Expand Up @@ -561,64 +549,74 @@ func (c *Container) onConnectionClose(srcDst AddrPair) bool {
return true
}

func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *ebpftracer.L7Request) {
func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) {
c.lock.Lock()
defer c.lock.Unlock()
for dest, conn := range c.connectionsActive {

var dest AddrPair
var conn *ActiveConnection
var found bool
for dest, conn = range c.connectionsActive {
if conn.Pid == pid && conn.Fd == fd && (timestamp == 0 || conn.Timestamp == timestamp) {
key := AddrPair{src: dest.dst, dst: conn.ActualDest}
stats := c.l7Stats[r.Protocol]
if stats == nil {
stats = map[AddrPair]*L7Stats{}
c.l7Stats[r.Protocol] = stats
}
tracing.HandleL7Request(string(c.id), conn.ActualDest, r, conn.PreparedStatements)
if r.Method == ebpftracer.L7MethodStatementClose {
return
}
s := stats[key]
if s == nil {
constLabels := map[string]string{"destination": key.src.String(), "actual_destination": key.dst.String()}
s = &L7Stats{}
cOpts, ok := L7Requests[r.Protocol]
if !ok {
klog.Warningln("cannot find metric description for L7 protocol: %s", r.Protocol.String())
return
}
if cOpts.Name != "" {
labels := []string{"status"}
if r.Protocol == ebpftracer.L7ProtocolRabbitmq || r.Protocol == ebpftracer.L7ProtocolNats {
labels = append(labels, "method")
}
s.Requests = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: cOpts.Name, Help: cOpts.Help, ConstLabels: constLabels}, labels,
)
}
hOpts, ok := L7Latency[r.Protocol]
if !ok {
klog.Warningln("cannot find metric description for L7 protocol: %s", r.Protocol.String())
return
}
if hOpts.Name != "" {
s.Latency = prometheus.NewHistogram(
prometheus.HistogramOpts{Name: hOpts.Name, Help: hOpts.Help, ConstLabels: constLabels},
)
}
stats[key] = s
}
if s.Requests != nil {
if r.Protocol == ebpftracer.L7ProtocolRabbitmq || r.Protocol == ebpftracer.L7ProtocolNats {
s.Requests.WithLabelValues(r.StatusString(), r.Method.String()).Inc()
} else {
s.Requests.WithLabelValues(r.StatusString()).Inc()
}
}
if s.Latency != nil {
s.Latency.Observe(r.Duration.Seconds())
}
return
found = true
break
}
}
if !found {
return
}

stats := c.l7Stats.get(r.Protocol, dest.dst, conn.ActualDest)
trace := tracing.NewTrace(string(c.id), conn.ActualDest)
switch r.Protocol {
case l7.ProtocolHTTP:
stats.observe(r.Status.Http(), "", r.Duration)
method, path := l7.ParseHttp(r.Payload)
trace.HttpRequest(method, path, r.Status, r.Duration)
case l7.ProtocolHTTP2:
if conn.http2Parser == nil {
conn.http2Parser = l7.NewHttp2Parser()
}
requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
for _, req := range requests {
stats.observe(req.Status.Http(), "", req.Duration)
trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
}
case l7.ProtocolPostgres:
if r.Method != l7.MethodStatementClose {
stats.observe(r.Status.String(), "", r.Duration)
}
if conn.postgresParser == nil {
conn.postgresParser = l7.NewPostgresParser()
}
query := conn.postgresParser.Parse(r.Payload)
trace.PostgresQuery(query, r.Status.Error(), r.Duration)
case l7.ProtocolMysql:
if r.Method != l7.MethodStatementClose {
stats.observe(r.Status.String(), "", r.Duration)
}
if conn.mysqlParser == nil {
conn.mysqlParser = l7.NewMysqlParser()
}
query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
trace.MysqlQuery(query, r.Status.Error(), r.Duration)
case l7.ProtocolMemcached:
stats.observe(r.Status.String(), "", r.Duration)
cmd, items := l7.ParseMemcached(r.Payload)
trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
case l7.ProtocolRedis:
stats.observe(r.Status.String(), "", r.Duration)
cmd, args := l7.ParseRedis(r.Payload)
trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
case l7.ProtocolMongo:
stats.observe(r.Status.String(), "", r.Duration)
query := l7.ParseMongo(r.Payload)
trace.MongoQuery(query, r.Status.Error(), r.Duration)
case l7.ProtocolKafka, l7.ProtocolCassandra:
stats.observe(r.Status.String(), "", r.Duration)
case l7.ProtocolRabbitmq, l7.ProtocolNats:
stats.observe(r.Status.String(), r.Method.String(), 0)
}
}

func (c *Container) onRetransmit(srcDst AddrPair) bool {
Expand Down Expand Up @@ -910,13 +908,7 @@ func (c *Container) gc(now time.Time) {
delete(c.retransmits, d)
}
}
for _, protoStats := range c.l7Stats {
for d := range protoStats {
if d.src == dst {
delete(protoStats, d)
}
}
}
c.l7Stats.delete(dst)
}
}
}
Expand Down
92 changes: 92 additions & 0 deletions containers/l7.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package containers

import (
"github.com/coroot/coroot-node-agent/ebpftracer/l7"
"github.com/prometheus/client_golang/prometheus"
"inet.af/netaddr"
"k8s.io/klog/v2"
"time"
)

type L7Metrics struct {
Requests *prometheus.CounterVec
Latency prometheus.Histogram
}

func (m *L7Metrics) observe(status, method string, duration time.Duration) {
if m.Requests != nil {
var err error
var c prometheus.Counter
if method != "" {
c, err = m.Requests.GetMetricWithLabelValues(status, method)
} else {
c, err = m.Requests.GetMetricWithLabelValues(status)
}
if err != nil {
klog.Warningln(err)
} else {
c.Inc()
}
}
if m.Latency != nil && duration != 0 {
m.Latency.Observe(duration.Seconds())
}
}

type L7Stats map[l7.Protocol]map[AddrPair]*L7Metrics // protocol -> dst:actual_dst -> metrics

func (s L7Stats) get(protocol l7.Protocol, destination, actualDestination netaddr.IPPort) *L7Metrics {
if protocol == l7.ProtocolHTTP2 {
protocol = l7.ProtocolHTTP
}
protoStats := s[protocol]
if protoStats == nil {
protoStats = map[AddrPair]*L7Metrics{}
s[protocol] = protoStats
}
dest := AddrPair{src: destination, dst: actualDestination}
m := protoStats[dest]
if m == nil {
m = &L7Metrics{}
protoStats[dest] = m
constLabels := map[string]string{"destination": destination.String(), "actual_destination": actualDestination.String()}
labels := []string{"status"}
switch protocol {
case l7.ProtocolRabbitmq, l7.ProtocolNats:
labels = append(labels, "method")
default:
hOpts := L7Latency[protocol]
m.Latency = prometheus.NewHistogram(
prometheus.HistogramOpts{Name: hOpts.Name, Help: hOpts.Help, ConstLabels: constLabels},
)
}
cOpts := L7Requests[protocol]
m.Requests = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: cOpts.Name, Help: cOpts.Help, ConstLabels: constLabels}, labels,
)
}
return m
}

func (s L7Stats) collect(ch chan<- prometheus.Metric) {
for _, protoStats := range s {
for _, m := range protoStats {
if m.Requests != nil {
m.Requests.Collect(ch)
}
if m.Latency != nil {
m.Latency.Collect(ch)
}
}
}
}

func (s L7Stats) delete(dst netaddr.IPPort) {
for _, protoStats := range s {
for d := range protoStats {
if d.src == dst {
delete(protoStats, d)
}
}
}
}
44 changes: 21 additions & 23 deletions containers/metrics.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package containers

import (
"github.com/coroot/coroot-node-agent/ebpftracer"
"github.com/coroot/coroot-node-agent/ebpftracer/l7"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -89,29 +89,27 @@ var metrics = struct {
}

var (
L7Requests = map[ebpftracer.L7Protocol]prometheus.CounterOpts{
ebpftracer.L7ProtocolHTTP: {Name: "container_http_requests_total", Help: "Total number of outbound HTTP requests"},
ebpftracer.L7ProtocolPostgres: {Name: "container_postgres_queries_total", Help: "Total number of outbound Postgres queries"},
ebpftracer.L7ProtocolRedis: {Name: "container_redis_queries_total", Help: "Total number of outbound Redis queries"},
ebpftracer.L7ProtocolMemcached: {Name: "container_memcached_queries_total", Help: "Total number of outbound Memcached queries"},
ebpftracer.L7ProtocolMysql: {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
ebpftracer.L7ProtocolMongo: {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
ebpftracer.L7ProtocolKafka: {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
ebpftracer.L7ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
ebpftracer.L7ProtocolRabbitmq: {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
ebpftracer.L7ProtocolNats: {Name: "container_nats_messages_total", Help: "Total number of NATS messages produced or consumed by the container"},
L7Requests = map[l7.Protocol]prometheus.CounterOpts{
l7.ProtocolHTTP: {Name: "container_http_requests_total", Help: "Total number of outbound HTTP requests"},
l7.ProtocolPostgres: {Name: "container_postgres_queries_total", Help: "Total number of outbound Postgres queries"},
l7.ProtocolRedis: {Name: "container_redis_queries_total", Help: "Total number of outbound Redis queries"},
l7.ProtocolMemcached: {Name: "container_memcached_queries_total", Help: "Total number of outbound Memcached queries"},
l7.ProtocolMysql: {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
l7.ProtocolMongo: {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
l7.ProtocolKafka: {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
l7.ProtocolRabbitmq: {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
l7.ProtocolNats: {Name: "container_nats_messages_total", Help: "Total number of NATS messages produced or consumed by the container"},
}
L7Latency = map[ebpftracer.L7Protocol]prometheus.HistogramOpts{
ebpftracer.L7ProtocolHTTP: {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
ebpftracer.L7ProtocolPostgres: {Name: "container_postgres_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Postgres query"},
ebpftracer.L7ProtocolRedis: {Name: "container_redis_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Redis query"},
ebpftracer.L7ProtocolMemcached: {Name: "container_memcached_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Memcached query"},
ebpftracer.L7ProtocolMysql: {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
ebpftracer.L7ProtocolMongo: {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
ebpftracer.L7ProtocolKafka: {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
ebpftracer.L7ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
ebpftracer.L7ProtocolRabbitmq: {Name: "", Help: ""},
ebpftracer.L7ProtocolNats: {Name: "", Help: ""},
L7Latency = map[l7.Protocol]prometheus.HistogramOpts{
l7.ProtocolHTTP: {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
l7.ProtocolPostgres: {Name: "container_postgres_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Postgres query"},
l7.ProtocolRedis: {Name: "container_redis_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Redis query"},
l7.ProtocolMemcached: {Name: "container_memcached_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Memcached query"},
l7.ProtocolMysql: {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
l7.ProtocolMongo: {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
l7.ProtocolKafka: {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
}
)

Expand Down
Loading

0 comments on commit 172da08

Please sign in to comment.