Skip to content

Commit

Permalink
Merge pull request #20 from w-h-a/traces
Browse files Browse the repository at this point in the history
feat: memory-based traces
  • Loading branch information
w-h-a authored Oct 16, 2024
2 parents f5800a5 + d966e94 commit c7117b5
Show file tree
Hide file tree
Showing 21 changed files with 440 additions and 118 deletions.
30 changes: 25 additions & 5 deletions cmd/grpc/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,45 @@ package grpc
import (
"context"

pb "github.com/w-h-a/pkg/proto/health"
pbHealth "github.com/w-h-a/pkg/proto/health"
pbTrace "github.com/w-h-a/pkg/proto/trace"
"github.com/w-h-a/pkg/telemetry/tracev2"
"github.com/w-h-a/pkg/utils/errorutils"
)

type HealthHandler interface {
Check(ctx context.Context, req *pb.HealthRequest, rsp *pb.HealthResponse) error
Check(ctx context.Context, req *pbHealth.HealthRequest, rsp *pbHealth.HealthResponse) error
Trace(ctx context.Context, req *pbTrace.TraceRequest, rsp *pbTrace.TraceResponse) error
}

type Health struct {
HealthHandler
}

type healthHandler struct {
tracer tracev2.Trace
}

func (h *healthHandler) Check(ctx context.Context, req *pb.HealthRequest, rsp *pb.HealthResponse) error {
func (h *healthHandler) Check(ctx context.Context, req *pbHealth.HealthRequest, rsp *pbHealth.HealthResponse) error {
rsp.Status = "ok"
return nil
}

func NewHealthHandler() HealthHandler {
return &Health{&healthHandler{}}
func (h *healthHandler) Trace(ctx context.Context, req *pbTrace.TraceRequest, rsp *pbTrace.TraceResponse) error {
spans, err := h.tracer.Read(
tracev2.ReadWithCount(int(req.Count)),
)
if err != nil {
return errorutils.InternalServerError("trace", "failed to retrieve traces: %v", err)
}

for _, span := range spans {
rsp.Spans = append(rsp.Spans, SerializeSpan(span))
}

return nil
}

func NewHealthHandler(t tracev2.Trace) HealthHandler {
return &Health{&healthHandler{t}}
}
34 changes: 30 additions & 4 deletions cmd/grpc/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package grpc

import (
"context"
"encoding/json"
"fmt"

pb "github.com/w-h-a/pkg/proto/sidecar"
"github.com/w-h-a/pkg/sidecar"
"github.com/w-h-a/pkg/telemetry/tracev2"
"github.com/w-h-a/pkg/utils/errorutils"
)

Expand All @@ -18,31 +21,54 @@ type Publish struct {

type publishHandler struct {
service sidecar.Sidecar
tracer tracev2.Trace
}

func (h *publishHandler) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.PublishResponse) error {
newCtx, spanId := h.tracer.Start(ctx, "grpc.PublishHandler")
defer h.tracer.Finish(spanId)

if req.Event == nil {
h.tracer.UpdateStatus(spanId, 1, "event is required")
return errorutils.BadRequest("sidecar", "event is required")
}

h.tracer.AddMetadata(spanId, map[string]string{
"eventName": req.Event.EventName,
"payload": string(req.Event.Payload),
})

if len(req.Event.EventName) == 0 {
h.tracer.UpdateStatus(spanId, 1, "an event name as topic is required")
return errorutils.BadRequest("sidecar", "an event name as topic is required")
}

payload := map[string]interface{}{}

err := json.Unmarshal(req.Event.Payload, &payload)
if err != nil {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("failed to unmarshal event payload: %v", err))
return errorutils.BadRequest("sidecar", "the payload could not be marshaled into map[string]interface{}")
}

event := &sidecar.Event{
EventName: req.Event.EventName,
Data: req.Event.Data.Value,
Payload: payload,
}

if err := h.service.WriteEventToBroker(event); err != nil && err == sidecar.ErrComponentNotFound {
if err := h.service.WriteEventToBroker(newCtx, event); err != nil && err == sidecar.ErrComponentNotFound {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("%s: %s", err.Error(), req.Event.EventName))
return errorutils.NotFound("sidecar", "%v: %s", err, req.Event.EventName)
} else if err != nil {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("failed to publish event: %v", err))
return errorutils.InternalServerError("sidecar", "failed to publish event: %v", err)
}

h.tracer.UpdateStatus(spanId, 2, "success")

return nil
}

func NewPublishHandler(s sidecar.Sidecar) PublishHandler {
return &Publish{&publishHandler{s}}
func NewPublishHandler(s sidecar.Sidecar, t tracev2.Trace) PublishHandler {
return &Publish{&publishHandler{s, t}}
}
21 changes: 18 additions & 3 deletions cmd/grpc/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package grpc

import (
"context"
"fmt"

pb "github.com/w-h-a/pkg/proto/sidecar"
"github.com/w-h-a/pkg/sidecar"
"github.com/w-h-a/pkg/telemetry/tracev2"
"github.com/w-h-a/pkg/utils/errorutils"
)

Expand All @@ -18,21 +20,34 @@ type Secret struct {

type secretHandler struct {
service sidecar.Sidecar
tracer tracev2.Trace
}

func (h *secretHandler) Get(ctx context.Context, req *pb.GetSecretRequest, rsp *pb.GetSecretResponse) error {
secret, err := h.service.ReadFromSecretStore(req.SecretId, req.Key)
newCtx, spanId := h.tracer.Start(ctx, "grpc.SecretHandler")
defer h.tracer.Finish(spanId)

h.tracer.AddMetadata(spanId, map[string]string{
"secretId": req.SecretId,
"key": req.Key,
})

secret, err := h.service.ReadFromSecretStore(newCtx, req.SecretId, req.Key)
if err != nil && err == sidecar.ErrComponentNotFound {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("%s: %s", err.Error(), req.SecretId))
return errorutils.NotFound("sidecar", "%v: %s", err, req.SecretId)
} else if err != nil {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("failed to retrieve secret from store %s and key %s: %v", req.SecretId, req.Key, err))
return errorutils.InternalServerError("sidecar", "failed to retrieve secret from store %s and key %s: %v", req.SecretId, req.Key, err)
}

rsp.Secret = SerializeSecret(secret)

h.tracer.UpdateStatus(spanId, 2, "success")

return nil
}

func NewSecretHandler(s sidecar.Sidecar) SecretHandler {
return &Secret{&secretHandler{s}}
func NewSecretHandler(s sidecar.Sidecar, t tracev2.Trace) SecretHandler {
return &Secret{&secretHandler{s, t}}
}
66 changes: 60 additions & 6 deletions cmd/grpc/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package grpc

import (
"context"
"encoding/json"
"fmt"

pb "github.com/w-h-a/pkg/proto/sidecar"
"github.com/w-h-a/pkg/sidecar"
"github.com/w-h-a/pkg/store"
"github.com/w-h-a/pkg/telemetry/tracev2"
"github.com/w-h-a/pkg/utils/errorutils"
)

Expand All @@ -22,61 +25,112 @@ type State struct {

type stateHandler struct {
service sidecar.Sidecar
tracer tracev2.Trace
}

func (h *stateHandler) Post(ctx context.Context, req *pb.PostStateRequest, rsp *pb.PostStateResponse) error {
newCtx, spanId := h.tracer.Start(ctx, "grpc.PostStateHandler")
defer h.tracer.Finish(spanId)

records, _ := json.Marshal(req.Records)

h.tracer.AddMetadata(spanId, map[string]string{
"storeId": req.StoreId,
"records": string(records),
})

state := &sidecar.State{
StoreId: req.StoreId,
Records: DeserializeRecords(req.Records),
}

if err := h.service.SaveStateToStore(state); err != nil && err == sidecar.ErrComponentNotFound {
if err := h.service.SaveStateToStore(newCtx, state); err != nil && err == sidecar.ErrComponentNotFound {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("%s: %s", err.Error(), req.StoreId))
return errorutils.NotFound("sidecar", "%v: %s", err, req.StoreId)
} else if err != nil {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("failed to save state to store %s: %v", req.StoreId, err))
return errorutils.InternalServerError("sidecar", "failed to save state to store %s: %v", req.StoreId, err)
}

h.tracer.UpdateStatus(spanId, 2, "success")

return nil
}

func (h *stateHandler) List(ctx context.Context, req *pb.ListStateRequest, rsp *pb.ListStateResponse) error {
recs, err := h.service.ListStateFromStore(req.StoreId)
newCtx, spanId := h.tracer.Start(ctx, "grpc.ListStateHandler")
defer h.tracer.Finish(spanId)

h.tracer.AddMetadata(spanId, map[string]string{
"storeId": req.StoreId,
})

recs, err := h.service.ListStateFromStore(newCtx, req.StoreId)
if err != nil && err == sidecar.ErrComponentNotFound {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("%s: %s", err.Error(), req.StoreId))
return errorutils.NotFound("sidecar", "%v: %s", err, req.StoreId)
} else if err != nil {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("failed to retrieve state from store %s: %v", req.StoreId, err))
return errorutils.InternalServerError("sidecar", "failed to retrieve state from store %s: %v", req.StoreId, err)
}

rsp.Records = SerializeRecords(recs)

h.tracer.UpdateStatus(spanId, 2, "success")

return nil
}

func (h *stateHandler) Get(ctx context.Context, req *pb.GetStateRequest, rsp *pb.GetStateResponse) error {
recs, err := h.service.SingleStateFromStore(req.StoreId, req.Key)
newCtx, spanId := h.tracer.Start(ctx, "grpc.GetStateHandler")
defer h.tracer.Finish(spanId)

h.tracer.AddMetadata(spanId, map[string]string{
"storeId": req.StoreId,
"key": req.Key,
})

recs, err := h.service.SingleStateFromStore(newCtx, req.StoreId, req.Key)
if err != nil && err == sidecar.ErrComponentNotFound {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("%s: %s", err.Error(), req.StoreId))
return errorutils.NotFound("sidecar", "%v: %s", err, req.StoreId)
} else if err != nil && err == store.ErrRecordNotFound {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("there is no such record at store %s and key %s", req.StoreId, req.Key))
return errorutils.NotFound("sidecar", "there is no such record at store %s and key %s", req.StoreId, req.Key)
} else if err != nil {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("failed to retrieve state from store %s and key %s: %v", req.StoreId, req.Key, err))
return errorutils.InternalServerError("sidecar", "failed to retrieve state from store %s and key %s: %v", req.StoreId, req.Key, err)
}

rsp.Records = SerializeRecords(recs)

h.tracer.UpdateStatus(spanId, 2, "success")

return nil
}

func (h *stateHandler) Delete(ctx context.Context, req *pb.DeleteStateRequest, rsp *pb.DeleteStateResponse) error {
if err := h.service.RemoveStateFromStore(req.StoreId, req.Key); err != nil && err == sidecar.ErrComponentNotFound {
newCtx, spanId := h.tracer.Start(ctx, "grpc.DeleteStateHandler")
defer h.tracer.Finish(spanId)

h.tracer.AddMetadata(spanId, map[string]string{
"storeId": req.StoreId,
"key": req.Key,
})

if err := h.service.RemoveStateFromStore(newCtx, req.StoreId, req.Key); err != nil && err == sidecar.ErrComponentNotFound {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("%s: %s", err.Error(), req.StoreId))
return errorutils.NotFound("sidecar", "%v: %s", err, req.StoreId)
} else if err != nil {
h.tracer.UpdateStatus(spanId, 1, fmt.Sprintf("failed to remove state from store %s and key %s: %v", req.StoreId, req.Key, err))
return errorutils.InternalServerError("sidecar", "failed to remove state from store %s and key %s: %v", req.StoreId, req.Key, err)
}

h.tracer.UpdateStatus(spanId, 2, "success")

return nil
}

func NewStateHandler(s sidecar.Sidecar) StateHandler {
return &State{&stateHandler{s}}
func NewStateHandler(s sidecar.Sidecar, t tracev2.Trace) StateHandler {
return &State{&stateHandler{s, t}}
}
14 changes: 14 additions & 0 deletions cmd/grpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package grpc

import (
pb "github.com/w-h-a/pkg/proto/sidecar"
pbTrace "github.com/w-h-a/pkg/proto/trace"
"github.com/w-h-a/pkg/sidecar"
"github.com/w-h-a/pkg/store"
"github.com/w-h-a/pkg/telemetry/tracev2"
"google.golang.org/protobuf/types/known/anypb"
)

Expand Down Expand Up @@ -40,3 +42,15 @@ func SerializeSecret(secret *sidecar.Secret) *pb.Secret {
Data: secret.Data,
}
}

func SerializeSpan(s *tracev2.SpanData) *pbTrace.Span {
return &pbTrace.Span{
Name: s.Name,
Id: s.Id,
Parent: s.Parent,
Trace: s.Trace,
Started: uint64(s.Started.UnixNano()),
Ended: uint64(s.Ended.UnixNano()),
Metadata: s.Metadata,
}
}
40 changes: 37 additions & 3 deletions cmd/http/health.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,53 @@
package http

import "net/http"
import (
"net/http"
"strconv"

"github.com/w-h-a/pkg/telemetry/tracev2"
"github.com/w-h-a/pkg/utils/errorutils"
"github.com/w-h-a/pkg/utils/httputils"
)

type HealthHandler interface {
Check(w http.ResponseWriter, r *http.Request)
Trace(w http.ResponseWriter, r *http.Request)
}

type healthHandler struct {
tracer tracev2.Trace
}

func (h *healthHandler) Check(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte("ok"))
}

func NewHealthHandler() HealthHandler {
return &healthHandler{}
func (h *healthHandler) Trace(w http.ResponseWriter, r *http.Request) {
c := r.URL.Query().Get("count")

var count int
var err error

if len(c) > 0 {
count, err = strconv.Atoi(c)
if err != nil {
httputils.ErrResponse(w, errorutils.BadRequest("trace", "received bad count query param: %v", err))
return
}
}

spans, err := h.tracer.Read(
tracev2.ReadWithCount(count),
)
if err != nil {
httputils.ErrResponse(w, errorutils.InternalServerError("trace", "failed to retrieve traces: %v", err))
return
}

httputils.OkResponse(w, spans)
}

func NewHealthHandler(tracer tracev2.Trace) HealthHandler {
return &healthHandler{tracer}
}
Loading

0 comments on commit c7117b5

Please sign in to comment.