Skip to content

Commit

Permalink
eBPF tracer: rabbitmq amqp protocol support
Browse files Browse the repository at this point in the history
  • Loading branch information
apetruhin committed Dec 9, 2022
1 parent 9690d58 commit 215ae7e
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 40 deletions.
48 changes: 26 additions & 22 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"inet.af/netaddr"
"k8s.io/klog/v2"
"os"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -300,8 +299,12 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {

for _, protoStats := range c.l7Stats {
for _, s := range protoStats {
s.Requests.Collect(ch)
s.Latency.Collect(ch)
if s.Requests != nil {
s.Requests.Collect(ch)
}
if s.Latency != nil {
s.Latency.Collect(ch)
}
}
}

Expand Down Expand Up @@ -479,42 +482,43 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *ebpf
s := stats[key]
if s == nil {
constLabels := map[string]string{"destination": key.src.String(), "actual_destination": key.dst.String()}
s = &L7Stats{}
cOpts, ok := L7Requests[r.Protocol]
if !ok {
klog.Warningln("cannot find metric description for L7 protocol: %s", r.Protocol.String())
return
}
if cOpts.Name != "" {
labels := []string{"status"}
if r.Protocol == ebpftracer.L7ProtocolRabbitmq {
labels = append(labels, "method")
}
s.Requests = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: cOpts.Name, Help: cOpts.Help, ConstLabels: constLabels}, labels,
)
}
hOpts, ok := L7Latency[r.Protocol]
if !ok {
klog.Warningln("cannot find metric description for L7 protocol: %s", r.Protocol.String())
return
}
s = &L7Stats{
Requests: prometheus.NewCounterVec(
prometheus.CounterOpts{Name: cOpts.Name, Help: cOpts.Help, ConstLabels: constLabels},
[]string{"status"},
),
Latency: prometheus.NewHistogram(
if hOpts.Name != "" {
s.Latency = prometheus.NewHistogram(
prometheus.HistogramOpts{Name: hOpts.Name, Help: hOpts.Help, ConstLabels: constLabels},
),
)
}
stats[key] = s
}
status := ""
switch r.Protocol {
case ebpftracer.L7ProtocolHTTP:
status = strconv.Itoa(r.Status)
case ebpftracer.L7ProtocolMongo, ebpftracer.L7ProtocolKafka:
status = "unknown"
default:
if r.Status == 500 {
status = "failed"
if s.Requests != nil {
if r.Protocol == ebpftracer.L7ProtocolRabbitmq {
s.Requests.WithLabelValues(r.StatusString(), r.Method.String()).Inc()
} else {
status = "ok"
s.Requests.WithLabelValues(r.StatusString()).Inc()
}
}
s.Requests.WithLabelValues(status).Inc()
s.Latency.Observe(r.Duration.Seconds())
if s.Latency != nil {
s.Latency.Observe(r.Duration.Seconds())
}
return
}
}
Expand Down
2 changes: 2 additions & 0 deletions containers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ var (
ebpftracer.L7ProtocolMongo: {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
ebpftracer.L7ProtocolKafka: {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
ebpftracer.L7ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
ebpftracer.L7ProtocolRabbitmq: {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
}
L7Latency = map[ebpftracer.L7Protocol]prometheus.HistogramOpts{
ebpftracer.L7ProtocolHTTP: {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
Expand All @@ -105,6 +106,7 @@ var (
ebpftracer.L7ProtocolMongo: {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
ebpftracer.L7ProtocolKafka: {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
ebpftracer.L7ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
ebpftracer.L7ProtocolRabbitmq: {Name: "", Help: ""},
}
)

Expand Down
14 changes: 14 additions & 0 deletions ebpftracer/ebpf/ebpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@

#define EVENT_REASON_OOM_KILL 1

#define bpf_read(src, dst) \
({ \
if (bpf_probe_read(&dst, sizeof(dst), src) < 0) { \
return 0; \
} \
})

#define bpf_printk(fmt, ...) \
({ \
char ____fmt[] = fmt; \
bpf_trace_printk(____fmt, sizeof(____fmt), ##__VA_ARGS__); \
})


#include "proc.c"
#include "file.c"
#include "tcp/state.c"
Expand Down
74 changes: 56 additions & 18 deletions ebpftracer/ebpf/l7/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "mongo.c"
#include "kafka.c"
#include "cassandra.c"
#include "rabbitmq.c"


#define PROTOCOL_UNKNOWN 0
Expand All @@ -17,6 +18,11 @@
#define PROTOCOL_MONGO 6
#define PROTOCOL_KAFKA 7
#define PROTOCOL_CASSANDRA 8
#define PROTOCOL_RABBITMQ 9

#define METHOD_UNKNOWN 0
#define METHOD_PRODUCE 1
#define METHOD_CONSUME 2

struct l7_event {
__u64 fd;
Expand All @@ -25,6 +31,7 @@ struct l7_event {
__u32 status;
__u64 duration;
__u8 protocol;
__u8 method;
};

struct {
Expand All @@ -36,6 +43,7 @@ struct {
struct rw_args_t {
__u64 fd;
char* buf;
__u64 size;
};

struct {
Expand Down Expand Up @@ -85,7 +93,19 @@ struct iov {
};

static inline __attribute__((__always_inline__))
int trace_enter_write(__u64 fd, char *buf, __u64 size) {
__u64 get_connection_timestamp(__u32 pid, __u64 fd) {
struct sk_info sk = {};
sk.pid = pid;
sk.fd = fd;
__u64 *timestamp = bpf_map_lookup_elem(&connection_timestamps, &sk);
if (timestamp) {
return *timestamp;
}
return 0;
}

static inline __attribute__((__always_inline__))
int trace_enter_write(struct trace_event_raw_sys_enter_rw__stub* ctx, __u64 fd, char *buf, __u64 size) {
__u64 id = bpf_get_current_pid_tgid();
struct l7_request req = {};
req.protocol = PROTOCOL_UNKNOWN;
Expand All @@ -108,6 +128,16 @@ int trace_enter_write(__u64 fd, char *buf, __u64 size) {
req.protocol = PROTOCOL_MYSQL;
} else if (is_mongo_query(buf, size)) {
req.protocol = PROTOCOL_MONGO;
} else if (is_rabbitmq_produce(buf, size)) {
struct l7_event e = {};
e.protocol = PROTOCOL_RABBITMQ;
e.fd = k.fd;
e.pid = k.pid;
e.status = 200;
e.method = METHOD_PRODUCE;
e.connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
return 0;
} else {
__s32 request_id = is_kafka_request(buf, size);
if (request_id > 0) {
Expand Down Expand Up @@ -141,6 +171,7 @@ int trace_enter_read(struct trace_event_raw_sys_enter_rw__stub* ctx) {
struct rw_args_t args = {};
args.fd = ctx->fd;
args.buf = ctx->buf;
args.size = ctx->size;
bpf_map_update_elem(&active_reads, &id, &args, BPF_ANY);
return 0;
}
Expand All @@ -153,20 +184,37 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
if (!args) {
return 0;
}
char *buf;
struct socket_key k = {};
k.pid = id >> 32;
k.fd = args->fd;
k.stream_id = -1;
buf = args->buf;
char *buf = args->buf;
__u64 size = args->size;

bpf_map_delete_elem(&active_reads, &id);

if (ctx->ret <= 0) {
return 0;
}

struct l7_event e = {};
e.fd = k.fd;
e.pid = k.pid;
e.connection_timestamp = 0;
e.status = 0;
e.method = METHOD_UNKNOWN;

if (is_rabbitmq_consume(buf, size)) {
e.protocol = PROTOCOL_RABBITMQ;
e.status = 200;
e.method = METHOD_CONSUME;
e.connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
return 0;
}

struct cassandra_header cassandra_response = {};
cassandra_response.stream_id = -1;

struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
if (!req) {
if (bpf_probe_read(&cassandra_response, sizeof(cassandra_response), (void *)(buf)) < 0) {
Expand All @@ -179,11 +227,7 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
}
}
__s32 request_id = req->request_id;
struct l7_event e = {};
e.protocol = req->protocol;
e.fd = k.fd;
e.pid = k.pid;
e.connection_timestamp = 0;
__u64 ns = req->ns;
__u8 partial = req->partial;
bpf_map_delete_elem(&active_l7_requests, &k);
Expand Down Expand Up @@ -216,13 +260,7 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
return 0;
}
e.duration = bpf_ktime_get_ns() - ns;
struct sk_info sk = {};
sk.pid = k.pid;
sk.fd = k.fd;
__u64 *timestamp = bpf_map_lookup_elem(&connection_timestamps, &sk);
if (timestamp) {
e.connection_timestamp = *timestamp;
}
e.connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
return 0;
}
Expand All @@ -233,17 +271,17 @@ int sys_enter_writev(struct trace_event_raw_sys_enter_rw__stub* ctx) {
if (bpf_probe_read(&iov0, sizeof(struct iov), (void *)ctx->buf) < 0) {
return 0;
}
return trace_enter_write(ctx->fd, iov0.buf, iov0.size);
return trace_enter_write(ctx, ctx->fd, iov0.buf, iov0.size);
}

SEC("tracepoint/syscalls/sys_enter_write")
int sys_enter_write(struct trace_event_raw_sys_enter_rw__stub* ctx) {
return trace_enter_write(ctx->fd, ctx->buf, ctx->size);
return trace_enter_write(ctx, ctx->fd, ctx->buf, ctx->size);
}

SEC("tracepoint/syscalls/sys_enter_sendto")
int sys_enter_sendto(struct trace_event_raw_sys_enter_rw__stub* ctx) {
return trace_enter_write(ctx->fd, ctx->buf, ctx->size);
return trace_enter_write(ctx, ctx->fd, ctx->buf, ctx->size);
}

SEC("tracepoint/syscalls/sys_enter_read")
Expand Down
57 changes: 57 additions & 0 deletions ebpftracer/ebpf/l7/rabbitmq.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// AMQP 0-9-1 Protocol Specification
// https://www.rabbitmq.com/protocol.html

#define RABBITMQ_FRAME_TYPE_METHOD 1
#define RABBITMQ_FRAME_END 0xCE

#define RABBITMQ_CLASS_BASIC 60
#define RABBITMQ_METHOD_PUBLISH 40
#define RABBITMQ_METHOD_DELIVER 60

static __always_inline
int rabbitmq_method_is(char *buf, __u64 buf_size, __u16 expected_method) {
if (buf_size < 12) {
return 0;
}
__u8 type = 0;
bpf_read(buf, type);
if (type != RABBITMQ_FRAME_TYPE_METHOD) {
return 0;
}

__u32 size = 0;
bpf_read(buf+3, size);
size = bpf_htonl(size);
if (7 + size + 1 > buf_size) {
return 0;
}
__u8 end = 0;
bpf_read(buf+7+size, end);
if (end != RABBITMQ_FRAME_END) {
return 0;
}

__u16 class = 0;
bpf_read(buf+7, class);
if (bpf_htons(class) != RABBITMQ_CLASS_BASIC) {
return 0;
}

__u16 method = 0;
bpf_read(buf+9, method);
if (bpf_htons(method) != expected_method) {
return 0;
}

return 1;
}

static __always_inline
int is_rabbitmq_produce(char *buf, __u64 buf_size) {
return rabbitmq_method_is(buf, buf_size, RABBITMQ_METHOD_PUBLISH);
}

static __always_inline
int is_rabbitmq_consume(char *buf, __u64 buf_size) {
return rabbitmq_method_is(buf, buf_size, RABBITMQ_METHOD_DELIVER);
}
Loading

0 comments on commit 215ae7e

Please sign in to comment.