Skip to content

Commit

Permalink
add subscriptions logging and http endpoints
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Mason <mason@packet.com>
  • Loading branch information
mikemrm committed Aug 20, 2020
1 parent b74a58e commit 6fc8ff0
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 9 deletions.
53 changes: 44 additions & 9 deletions grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"strconv"
"strings"
"time"
"unicode"

"github.com/itchyny/gojq"
Expand Down Expand Up @@ -258,6 +259,7 @@ func (s *server) Get(ctx context.Context, in *hegel.GetRequest) (*hegel.GetRespo
}

func (s *server) Subscribe(in *hegel.SubscribeRequest, stream hegel.Hegel_SubscribeServer) error {
startedAt := time.Now().UTC()
metrics.TotalSubscriptions.Inc()
metrics.Subscriptions.WithLabelValues("initializing").Inc()

Expand All @@ -275,6 +277,7 @@ func (s *server) Subscribe(in *hegel.SubscribeRequest, stream hegel.Hegel_Subscr
return initError(errors.New("could not get peer info from client"))
}

var id string
ip := p.Addr.(*net.TCPAddr).IP.String()
logger = logger.With("ip", ip, "client", p.Addr)

Expand All @@ -294,9 +297,11 @@ func (s *server) Subscribe(in *hegel.SubscribeRequest, stream hegel.Hegel_Subscr
return initError(err)
}

id = hw.(*tink.Hardware).Id

ctx, cancel = context.WithCancel(stream.Context())
watch, err = s.hardwareClient.Watch(ctx, &tink.GetRequest{
Id: hw.(*tink.Hardware).Id,
Id: id,
})

if err != nil {
Expand All @@ -319,10 +324,11 @@ func (s *server) Subscribe(in *hegel.SubscribeRequest, stream hegel.Hegel_Subscr
}

hwID := hwJSON["id"]
id = hwID.(string)

ctx, cancel = context.WithCancel(stream.Context())
watch, err = s.hardwareClient.Watch(ctx, &cacher.GetRequest{
ID: hwID.(string),
ID: id,
})

if err != nil {
Expand All @@ -331,8 +337,38 @@ func (s *server) Subscribe(in *hegel.SubscribeRequest, stream hegel.Hegel_Subscr
}
}

sub := &subscription{
ID: id,
IP: ip,
StartedAt: startedAt,
InitDuration: time.Since(startedAt),
cancel: cancel,
updateChan: make(chan []byte, 1),
}

s.subLock.Lock()
old := s.subscriptions[id]
s.subscriptions[id] = sub
s.subLock.Unlock()

// Disconnect previous client if a client is already connected for this hardware id
if old != nil {
old.cancel()
}

defer func() {
s.subLock.Lock()
defer s.subLock.Unlock()
// Check if subscription for hardware id exists.
// If the subscriptions exists, make sure it has not been replaced by a new connection.
if cSub := s.subscriptions[id]; cSub == sub {
delete(s.subscriptions, id)
}
}()

metrics.Subscriptions.WithLabelValues("initializing").Dec()
metrics.Subscriptions.WithLabelValues("active").Inc()

activeError := func(err error) error {
if err == nil {
return err
Expand All @@ -344,7 +380,6 @@ func (s *server) Subscribe(in *hegel.SubscribeRequest, stream hegel.Hegel_Subscr
}

errs := make(chan error, 1)
ehws := make(chan []byte, 1)
go func() {
for {
var hw []byte
Expand All @@ -358,13 +393,13 @@ func (s *server) Subscribe(in *hegel.SubscribeRequest, stream hegel.Hegel_Subscr
err = status.Error(codes.OK, "stream ended")
}
errs <- err
close(ehws)
close(sub.updateChan)
return
}
hw, err = json.Marshal(util.HardwareWrapper{Hardware: resp})
if err != nil {
errs <- errors.New("could not marshal hardware")
close(ehws)
close(sub.updateChan)
return
}
default:
Expand All @@ -375,7 +410,7 @@ func (s *server) Subscribe(in *hegel.SubscribeRequest, stream hegel.Hegel_Subscr
err = status.Error(codes.OK, "stream ended")
}
errs <- err
close(ehws)
close(sub.updateChan)
return
}
hw = []byte(resp.JSON)
Expand All @@ -384,16 +419,16 @@ func (s *server) Subscribe(in *hegel.SubscribeRequest, stream hegel.Hegel_Subscr
ehw, err := exportHardware(hw)
if err != nil {
errs <- err
close(ehws)
close(sub.updateChan)
return
}

ehws <- ehw
sub.updateChan <- ehw
}
}()
go func() {
l := logger.With("op", "send")
for ehw := range ehws {
for ehw := range sub.updateChan {
l.Info()
err := stream.Send(&hegel.SubscribeResponse{
JSON: string(ehw),
Expand Down
55 changes: 55 additions & 0 deletions http_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -209,3 +210,57 @@ func getIPFromRequest(r *http.Request) string {
}
return IPAddress
}

func writeJSON(w http.ResponseWriter, status int, data interface{}) error {
var body []byte
body, err := json.Marshal(data)
if err != nil {
if status < 400 {
return jsonError(w, http.StatusInternalServerError, err, "marshalling response")
} else {
status = 500
logger.Error(err, "failed to marshal error")
body = []byte(`{"error", {"comment": "Failed to marshal error"}}`)
}
}
w.WriteHeader(status)
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(body)
return err
}

func jsonError(w http.ResponseWriter, status int, err error, msg string) error {
logger.Error(err, msg)
resp := map[string]interface{}{
"error": map[string]interface{}{
"error": err.Error(),
"comment": msg,
},
}
return writeJSON(w, status, resp)
}

func handleSubscriptions(w http.ResponseWriter, r *http.Request) {
var getid string
if strings.HasPrefix(r.URL.Path, "/subscriptions/") {
getid = strings.TrimPrefix(r.URL.Path, "/subscriptions/")
}
hegelServer.subLock.RLock()
defer hegelServer.subLock.RUnlock()
var err error
if getid == "" {
err = writeJSON(w, http.StatusOK, hegelServer.subscriptions)
} else if sub, ok := hegelServer.subscriptions[getid]; ok {
err = writeJSON(w, http.StatusOK, sub)
} else {
err = jsonError(w, http.StatusNotFound, fmt.Errorf("%s not found", getid), "item not found")
}
if err != nil {
logger.Error(err)
}
}

func buildSubscriberHandlers(hegelServer *server) {
http.HandleFunc("/subscriptions", handleSubscriptions)
http.HandleFunc("/subscriptions/", handleSubscriptions)
}
15 changes: 15 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
type server struct {
log log.Logger
hardwareClient hardwareGetter

subLock sync.RWMutex
subscriptions map[string]*subscription
}

type hardwareGetter interface {
Expand All @@ -41,6 +44,15 @@ type hardwareGetter interface {
type getRequest interface{}
type hardware interface{}

type subscription struct {
ID string `json:"id"`
IP string `json:"ip"`
InitDuration time.Duration `json:"init_duration"`
StartedAt time.Time `json:"started_at"`
cancel func()
updateChan chan []byte
}

type hardwareGetterCacher struct {
client cacher.CacherClient
}
Expand Down Expand Up @@ -199,6 +211,7 @@ func main() {
hegelServer = &server{
log: logger,
hardwareClient: hg,
subscriptions: make(map[string]*subscription),
}

hegel.RegisterHegelServer(grpcServer, hegelServer)
Expand All @@ -218,6 +231,8 @@ func main() {
http.HandleFunc("/2009-04-04", ec2Handler) // workaround for making trailing slash optional
http.HandleFunc("/2009-04-04/", ec2Handler)

buildSubscriberHandlers(hegelServer)

err = registerCustomEndpoints()
if err != nil {
logger.Fatal(err, "could not register custom endpoints")
Expand Down

0 comments on commit 6fc8ff0

Please sign in to comment.