Skip to content

Commit

Permalink
Merge pull request #22 from w-h-a/prep-for-otelp-trace-exporter
Browse files Browse the repository at this point in the history
refactor: prep for otelp trace exporter
  • Loading branch information
w-h-a authored Oct 21, 2024
2 parents f7eaac9 + 4f11efe commit abc62c0
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 33 deletions.
1 change: 1 addition & 0 deletions cmd/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
Secret = os.Getenv("SECRET")
SecretAddress = os.Getenv("SECRET_ADDRESS")
SecretPrefix = os.Getenv("SECRET_PREFIX")
TraceExporter = os.Getenv("TRACE_EXPORTER")
AwsAccessKeyId = os.Getenv("AWS_ACCESS_KEY_ID")
AwsSecretAccessKey = os.Getenv("AWS_SECRET_ACCESS_KEY")
)
31 changes: 21 additions & 10 deletions cmd/grpc/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

pbHealth "github.com/w-h-a/pkg/proto/health"
pbTrace "github.com/w-h-a/pkg/proto/trace"
"github.com/w-h-a/pkg/telemetry/tracev2"
"github.com/w-h-a/pkg/utils/errorutils"
"github.com/w-h-a/pkg/telemetry/traceexporter"
"github.com/w-h-a/pkg/utils/memoryutils"
)

type HealthHandler interface {
Expand All @@ -19,7 +19,7 @@ type Health struct {
}

type healthHandler struct {
tracer tracev2.Trace
buffer *memoryutils.Buffer
}

func (h *healthHandler) Check(ctx context.Context, req *pbHealth.HealthRequest, rsp *pbHealth.HealthResponse) error {
Expand All @@ -28,11 +28,22 @@ func (h *healthHandler) Check(ctx context.Context, req *pbHealth.HealthRequest,
}

func (h *healthHandler) Trace(ctx context.Context, req *pbTrace.TraceRequest, rsp *pbTrace.TraceResponse) error {
spans, err := h.tracer.Read(
tracev2.ReadWithCount(int(req.Count)),
)
if err != nil {
return errorutils.InternalServerError("trace", "failed to retrieve traces: %v", err)
count := req.Count

var entries []*memoryutils.Entry

if count > 0 {
entries = h.buffer.Get(int(req.Count))
} else {
entries = h.buffer.Get(h.buffer.Options().Size)
}

spans := []*traceexporter.SpanData{}

for _, entry := range entries {
span := entry.Value.(*traceexporter.SpanData)

spans = append(spans, span)
}

for _, span := range spans {
Expand All @@ -42,6 +53,6 @@ func (h *healthHandler) Trace(ctx context.Context, req *pbTrace.TraceRequest, rs
return nil
}

func NewHealthHandler(t tracev2.Trace) HealthHandler {
return &Health{&healthHandler{t}}
func NewHealthHandler(b *memoryutils.Buffer) HealthHandler {
return &Health{&healthHandler{b}}
}
4 changes: 2 additions & 2 deletions cmd/grpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
pbTrace "github.com/w-h-a/pkg/proto/trace"
"github.com/w-h-a/pkg/sidecar"
"github.com/w-h-a/pkg/store"
"github.com/w-h-a/pkg/telemetry/tracev2"
"github.com/w-h-a/pkg/telemetry/traceexporter"
"google.golang.org/protobuf/types/known/anypb"
)

Expand Down Expand Up @@ -43,7 +43,7 @@ func SerializeSecret(secret *sidecar.Secret) *pb.Secret {
}
}

func SerializeSpan(s *tracev2.SpanData) *pbTrace.Span {
func SerializeSpan(s *traceexporter.SpanData) *pbTrace.Span {
return &pbTrace.Span{
Name: s.Name,
Id: s.Id,
Expand Down
29 changes: 19 additions & 10 deletions cmd/http/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"net/http"
"strconv"

"github.com/w-h-a/pkg/telemetry/tracev2"
"github.com/w-h-a/pkg/telemetry/traceexporter"
"github.com/w-h-a/pkg/utils/errorutils"
"github.com/w-h-a/pkg/utils/httputils"
"github.com/w-h-a/pkg/utils/memoryutils"
)

type HealthHandler interface {
Expand All @@ -15,7 +16,7 @@ type HealthHandler interface {
}

type healthHandler struct {
tracer tracev2.Trace
buffer *memoryutils.Buffer
}

func (h *healthHandler) Check(w http.ResponseWriter, r *http.Request) {
Expand All @@ -37,17 +38,25 @@ func (h *healthHandler) Trace(w http.ResponseWriter, r *http.Request) {
}
}

spans, err := h.tracer.Read(
tracev2.ReadWithCount(count),
)
if err != nil {
httputils.ErrResponse(w, errorutils.InternalServerError("trace", "failed to retrieve traces: %v", err))
return
var entries []*memoryutils.Entry

if count > 0 {
entries = h.buffer.Get(count)
} else {
entries = h.buffer.Get(h.buffer.Options().Size)
}

spans := []*traceexporter.SpanData{}

for _, entry := range entries {
span := entry.Value.(*traceexporter.SpanData)

spans = append(spans, span)
}

httputils.OkResponse(w, spans)
}

func NewHealthHandler(tracer tracev2.Trace) HealthHandler {
return &healthHandler{tracer}
func NewHealthHandler(b *memoryutils.Buffer) HealthHandler {
return &healthHandler{b}
}
22 changes: 14 additions & 8 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/w-h-a/pkg/telemetry/log"
memorylog "github.com/w-h-a/pkg/telemetry/log/memory"
"github.com/w-h-a/pkg/telemetry/tracev2"
memorytrace "github.com/w-h-a/pkg/telemetry/tracev2/memory"
otelwrapper "github.com/w-h-a/pkg/telemetry/tracev2/otel"
"github.com/w-h-a/pkg/utils/memoryutils"
"github.com/w-h-a/sidecar/cmd/config"
"github.com/w-h-a/sidecar/cmd/grpc"
Expand All @@ -35,17 +35,24 @@ func run(ctx *cli.Context) {
prefix := fmt.Sprintf("%s.%s:%s", config.Namespace, config.Name, config.Version)

// logger
logBuffer := memoryutils.NewBuffer()

logger := memorylog.NewLog(
log.LogWithPrefix(prefix),
memorylog.LogWithBuffer(memoryutils.NewBuffer()),
memorylog.LogWithBuffer(logBuffer),
)

log.SetLogger(logger)

// otel tracer
buffer := memoryutils.NewBuffer()
te, err := GetTraceExporterBuilder(config.TraceExporter)
if err != nil {
log.Fatal(err)
}

traceBuffer := memoryutils.NewBuffer()

exporter := memorytrace.NewExporter(buffer)
exporter := MakeTraceExporter(te, traceBuffer)

tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
Expand All @@ -54,9 +61,8 @@ func run(ctx *cli.Context) {

otel.SetTracerProvider(tp)

tracer := memorytrace.NewTrace(
tracer := otelwrapper.NewTrace(
tracev2.TraceWithName(prefix),
memorytrace.TraceWithBuffer(buffer),
)

// get clients
Expand Down Expand Up @@ -159,7 +165,7 @@ func run(ctx *cli.Context) {
// create http server
router := mux.NewRouter()

httpHealth := http.NewHealthHandler(tracer)
httpHealth := http.NewHealthHandler(traceBuffer)
httpPublish := http.NewPublishHandler(service, tracer)
httpState := http.NewStateHandler(service, tracer)
httpSecret := http.NewSecretHandler(service, tracer)
Expand Down Expand Up @@ -192,7 +198,7 @@ func run(ctx *cli.Context) {

grpcServer := grpcserver.NewServer(grpcOpts...)

grpcHealth := grpc.NewHealthHandler(tracer)
grpcHealth := grpc.NewHealthHandler(traceBuffer)
grpcPublish := grpc.NewPublishHandler(service, tracer)
grpcState := grpc.NewStateHandler(service, tracer)
grpcSecret := grpc.NewSecretHandler(service, tracer)
Expand Down
25 changes: 25 additions & 0 deletions cmd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"github.com/w-h-a/pkg/store"
"github.com/w-h-a/pkg/store/cockroach"
memorystore "github.com/w-h-a/pkg/store/memory"
"github.com/w-h-a/pkg/telemetry/traceexporter"
memorytraceexporter "github.com/w-h-a/pkg/telemetry/traceexporter/memory"
"github.com/w-h-a/pkg/utils/memoryutils"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

var (
Expand All @@ -29,6 +33,11 @@ var (
"ssm": ssm.NewSecret,
"env": env.NewSecret,
}

defaultTraceExporters = map[string]func(...traceexporter.ExporterOption) sdktrace.SpanExporter{
// "otelp": otelp.NewExporter,
"memory": memorytraceexporter.NewExporter,
}
)

func GetStoreBuilder(s string) (func(...store.StoreOption) store.Store, error) {
Expand Down Expand Up @@ -66,6 +75,22 @@ func MakeSecret(secretBuilder func(...secret.SecretOption) secret.Secret, nodes
)
}

func GetTraceExporterBuilder(s string) (func(...traceexporter.ExporterOption) sdktrace.SpanExporter, error) {
traceExporterBuilder, exists := defaultTraceExporters[s]
if !exists && len(s) > 0 {
return nil, fmt.Errorf("trace exporter %s is not supported", s)
} else if !exists {
return memorytraceexporter.NewExporter, nil
}
return traceExporterBuilder, nil
}

func MakeTraceExporter(tracerExporterBuilder func(...traceexporter.ExporterOption) sdktrace.SpanExporter, buffer *memoryutils.Buffer) sdktrace.SpanExporter {
return tracerExporterBuilder(
traceexporter.ExporterWithBuffer(buffer),
)
}

func GetBrokerBuilder(s string) (func(...broker.BrokerOption) broker.Broker, error) {
brokerBuilder, exists := defaultBrokers[s]
if !exists && len(s) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/stretchr/testify v1.9.0
github.com/urfave/cli v1.22.15
github.com/w-h-a/pkg v0.35.0
github.com/w-h-a/pkg v0.36.0
go.opentelemetry.io/otel v1.31.0
go.opentelemetry.io/otel/sdk v1.31.0
google.golang.org/grpc v1.65.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/urfave/cli v1.22.15 h1:nuqt+pdC/KqswQKhETJjo7pvn/k4xMUxgW6liI7XpnM=
github.com/urfave/cli v1.22.15/go.mod h1:wSan1hmo5zeyLGBjRJbzRTNk8gwoYa2B9n4q9dmRIc0=
github.com/w-h-a/pkg v0.35.0 h1:crNgl2xLW1UTS7uRzK9idMSjD2oYNn8gtz4ClPAs6+o=
github.com/w-h-a/pkg v0.35.0/go.mod h1:B00oIoJw4uW57x48f6OT/+H1IsgZ1RKQW32YOtjSC10=
github.com/w-h-a/pkg v0.36.0 h1:QbpuaGCxwWTzSRdKzgXol5+TSz7BrcNdl+nVRlpp0jI=
github.com/w-h-a/pkg v0.36.0/go.mod h1:B00oIoJw4uW57x48f6OT/+H1IsgZ1RKQW32YOtjSC10=
go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY=
go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE=
go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE=
Expand Down

0 comments on commit abc62c0

Please sign in to comment.