From 46c6257f373ab28b7f11106322d870541f5a442e Mon Sep 17 00:00:00 2001
From: Piotr Kowalczuk
Date: Thu, 23 Aug 2018 19:57:19 +0200
Subject: [PATCH] readiness and liveness check endpoints plus grpc upgrade
---
Gopkg.lock | 28 ++++--
Gopkg.toml | 2 +-
Makefile | 3 +
cmd/mnemosyned/main.go | 1 +
docker-compose.yml | 21 +++--
internal/cluster/cluster.go | 19 +++-
internal/cluster/cluster_test.go | 5 +-
internal/storage/postgres/storage.go | 51 +++++++----
mnemosyned/daemon.go | 27 +++++-
mnemosyned/daemon_test.go | 8 +-
mnemosyned/health.go | 114 +++++++++++++++++++++---
mnemosyned/health_test.go | 25 +++---
mnemosyned/middleware.go | 6 +-
mnemosyned/session_manager.go | 5 +-
mnemosyned/session_manager_delete.go | 4 +-
mnemosyned/session_manager_set_value.go | 4 +-
mnemosyned/session_manager_start.go | 4 +-
mnemosyned/session_manager_test.go | 14 +--
mnemosyned/suite_e2e_test.go | 9 +-
mnemosyned/suite_integration_test.go | 10 ++-
mnemosyned/suite_test.go | 6 +-
scripts/docker-healthcheck.sh | 2 +-
22 files changed, 276 insertions(+), 92 deletions(-)
diff --git a/Gopkg.lock b/Gopkg.lock
index b6308f0..246b751 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -238,6 +238,14 @@
pruneopts = ""
revision = "13449ad91cb26cb47661c1b080790392170385fd"
+[[projects]]
+ branch = "master"
+ digest = "1:9f984428bac26e6472738ab372867bfa17b9d4ac5d3f10e9665f6d8692bead55"
+ name = "golang.org/x/sys"
+ packages = ["unix"]
+ pruneopts = ""
+ revision = "3b58ed4ad3395d483fc92d5d14123ce2c3581fec"
+
[[projects]]
branch = "master"
digest = "1:e887950913f1ea214e95d4fc59bd2467ecf234466f63ab83a10ec874010c54f4"
@@ -286,35 +294,41 @@
revision = "1e559d0a00eef8a9a43151db4665280bd8dd5886"
[[projects]]
- digest = "1:18f77768d38684d7ff8f4bdad082ce6e8b867ae538a5900e38bbc08db05372a2"
+ digest = "1:cb1330030248de97a11d9f9664f3944fce0df947e5ed94dbbd9cb6e77068bd46"
name = "google.golang.org/grpc"
packages = [
".",
"balancer",
+ "balancer/base",
"balancer/roundrobin",
"codes",
"connectivity",
"credentials",
"encoding",
- "grpclb/grpc_lb_v1/messages",
+ "encoding/proto",
"grpclog",
+ "health",
+ "health/grpc_health_v1",
"internal",
+ "internal/backoff",
+ "internal/channelz",
+ "internal/envconfig",
+ "internal/grpcrand",
+ "internal/transport",
"keepalive",
"metadata",
"naming",
"peer",
"resolver",
"resolver/dns",
- "resolver/manual",
"resolver/passthrough",
"stats",
"status",
"tap",
- "transport",
]
pruneopts = ""
- revision = "be077907e29fdb945d351e4284eb5361e7f8924e"
- version = "v1.8.1"
+ revision = "32fb0ac620c32ba40a4626ddf94d90d12cce3455"
+ version = "v1.14.0"
[solve-meta]
analyzer-name = "dep"
@@ -344,6 +358,8 @@
"google.golang.org/grpc/codes",
"google.golang.org/grpc/credentials",
"google.golang.org/grpc/grpclog",
+ "google.golang.org/grpc/health",
+ "google.golang.org/grpc/health/grpc_health_v1",
"google.golang.org/grpc/metadata",
"google.golang.org/grpc/peer",
"google.golang.org/grpc/status",
diff --git a/Gopkg.toml b/Gopkg.toml
index db0c881..db62690 100644
--- a/Gopkg.toml
+++ b/Gopkg.toml
@@ -58,4 +58,4 @@
[[constraint]]
name = "google.golang.org/grpc"
- version = "^1.8.0"
+ version = "^1.14.0"
diff --git a/Makefile b/Makefile
index b90a2bb..1873104 100644
--- a/Makefile
+++ b/Makefile
@@ -45,6 +45,9 @@ get:
dep ensure
publish: build
+ifneq ($(skiplogin),true)
+ docker login
+endif
docker build \
--build-arg VCS_REF=${VCS_REF} \
--build-arg BUILD_DATE=`date -u +"%Y-%m-%dT%H:%M:%SZ"` \
diff --git a/cmd/mnemosyned/main.go b/cmd/mnemosyned/main.go
index 922dd32..c7f7d77 100644
--- a/cmd/mnemosyned/main.go
+++ b/cmd/mnemosyned/main.go
@@ -38,6 +38,7 @@ func main() {
debugListener := initListener(l, config.host, config.port+1)
daemon, err := mnemosyned.NewDaemon(&mnemosyned.DaemonOpts{
+ Version: version,
SessionTTL: config.session.ttl,
SessionTTC: config.session.ttc,
Storage: config.storage,
diff --git a/docker-compose.yml b/docker-compose.yml
index bcb8cea..9c075d8 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -12,10 +12,11 @@ services:
container_name: mnemosyned-1
image: piotrkowalczuk/mnemosyne:latest
ports:
- - 10010:8080
- - 10011:8081
+ - 20010:8080
+ - 20011:8081
environment:
- MNEMOSYNED_MONITORING: "true"
+ MNEMOSYNED_LOG_ENVIRONMENT: development
+ MNEMOSYNED_LOG_LEVEL: debug
MNEMOSYNED_CLUSTER_LISTEN: mnemosyned-1:8080
MNEMOSYNED_CLUSTER_SEEDS: mnemosyned-2:8080,mnemosyned-3:8080
MNEMOSYNED_POSTGRES_SCHEMA: mnemosyne1
@@ -29,10 +30,11 @@ services:
container_name: mnemosyned-2
image: piotrkowalczuk/mnemosyne:latest
ports:
- - 10020:8080
- - 10021:8081
+ - 20020:8080
+ - 20021:8081
environment:
- MNEMOSYNED_MONITORING: "true"
+ MNEMOSYNED_LOG_ENVIRONMENT: development
+ MNEMOSYNED_LOG_LEVEL: debug
MNEMOSYNED_CLUSTER_LISTEN: mnemosyned-2:8080
MNEMOSYNED_CLUSTER_SEEDS: mnemosyned-1:8080,mnemosyned-3:8080
MNEMOSYNED_POSTGRES_SCHEMA: mnemosyne2
@@ -46,10 +48,11 @@ services:
container_name: mnemosyned-3
image: piotrkowalczuk/mnemosyne:latest
ports:
- - 10030:8080
- - 10031:8081
+ - 20030:8080
+ - 20031:8081
environment:
- MNEMOSYNED_MONITORING: "true"
+ MNEMOSYNED_LOG_ENVIRONMENT: development
+ MNEMOSYNED_LOG_LEVEL: debug
MNEMOSYNED_CLUSTER_LISTEN: mnemosyned-3:8080
MNEMOSYNED_CLUSTER_SEEDS: mnemosyned-1:8080,mnemosyned-2:8080
MNEMOSYNED_POSTGRES_SCHEMA: mnemosyne3
diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go
index 31460f6..5985df2 100644
--- a/internal/cluster/cluster.go
+++ b/internal/cluster/cluster.go
@@ -1,18 +1,21 @@
package cluster
import (
+ "context"
"sort"
"github.com/piotrkowalczuk/mnemosyne/internal/jump"
"github.com/piotrkowalczuk/mnemosyne/mnemosynerpc"
"go.uber.org/zap"
"google.golang.org/grpc"
+ "google.golang.org/grpc/health/grpc_health_v1"
)
// Node ...
type Node struct {
Addr string
Client mnemosynerpc.SessionManagerClient
+ Health grpc_health_v1.HealthClient
}
// Cluster ...
@@ -66,7 +69,7 @@ func New(opts Opts) (csr *Cluster, err error) {
}
// Connect ...
-func (c *Cluster) Connect(opts ...grpc.DialOption) error {
+func (c *Cluster) Connect(ctx context.Context, opts ...grpc.DialOption) error {
for i, n := range c.nodes {
if n.Addr == c.listen {
continue
@@ -76,7 +79,7 @@ func (c *Cluster) Connect(opts ...grpc.DialOption) error {
c.logger.Debug("cluster node attempt to connect", zap.String("address", n.Addr), zap.Int("index", i))
}
- conn, err := grpc.Dial(n.Addr, opts...)
+ conn, err := grpc.DialContext(ctx, n.Addr, opts...)
if err != nil {
return err
}
@@ -86,7 +89,9 @@ func (c *Cluster) Connect(opts ...grpc.DialOption) error {
}
n.Client = mnemosynerpc.NewSessionManagerClient(conn)
+ n.Health = grpc_health_v1.NewHealthClient(conn)
}
+
return nil
}
@@ -106,6 +111,16 @@ func (c *Cluster) Nodes() []*Node {
return c.nodes
}
+// ExternalNodes returns all available nodes except host.
+func (c *Cluster) ExternalNodes() (res []*Node) {
+ for _, n := range c.nodes {
+ if n.Addr != c.listen {
+ res = append(res, n)
+ }
+ }
+ return
+}
+
// Len returns number of nodes.
func (c *Cluster) Len() int {
return c.buckets
diff --git a/internal/cluster/cluster_test.go b/internal/cluster/cluster_test.go
index cd23e45..150000e 100644
--- a/internal/cluster/cluster_test.go
+++ b/internal/cluster/cluster_test.go
@@ -1,6 +1,7 @@
package cluster_test
import (
+ "context"
"flag"
"fmt"
"os"
@@ -148,7 +149,7 @@ func TestCluster_GetOther(t *testing.T) {
c, cancel := testCluster(t)
defer cancel()
- if err := c.Connect(grpc.WithInsecure(), grpc.WithBlock()); err != nil {
+ if err := c.Connect(context.TODO(), grpc.WithInsecure(), grpc.WithBlock()); err != nil {
t.Fatalf("unexpected error: %s", err.Error())
}
@@ -168,7 +169,7 @@ func TestCluster_GetOther(t *testing.T) {
func TestCluster_Connect(t *testing.T) {
c, cancel := testCluster(t)
defer cancel()
- if err := c.Connect(grpc.WithInsecure(), grpc.WithBlock()); err != nil {
+ if err := c.Connect(context.TODO(), grpc.WithInsecure(), grpc.WithBlock()); err != nil {
t.Fatalf("unexpected error: %s", err.Error())
}
}
diff --git a/internal/storage/postgres/storage.go b/internal/storage/postgres/storage.go
index 3cf425c..e2e948d 100644
--- a/internal/storage/postgres/storage.go
+++ b/internal/storage/postgres/storage.go
@@ -26,8 +26,9 @@ type Storage struct {
ttl time.Duration
querySave, queryGet, queryExists, queryAbandon string
// monitoring
- queries *prometheus.CounterVec
- errors *prometheus.CounterVec
+ queriesTotal *prometheus.CounterVec
+ queriesDuration *prometheus.HistogramVec
+ errors *prometheus.CounterVec
}
type StorageOpts struct {
@@ -52,7 +53,7 @@ func NewStorage(opts StorageOpts) storage.Storage {
RETURNING refresh_token, subject_id, subject_client, bag, expire_at`, int64(opts.TTL.Seconds())),
queryExists: `SELECT EXISTS(SELECT 1 FROM ` + opts.Schema + ` .` + opts.Table + ` WHERE access_token = $1)`,
queryAbandon: `DELETE FROM ` + opts.Schema + ` .` + opts.Table + ` WHERE access_token = $1`,
- queries: prometheus.NewCounterVec(
+ queriesTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: opts.Namespace,
Subsystem: "storage",
@@ -61,6 +62,15 @@ func NewStorage(opts StorageOpts) storage.Storage {
},
monitoringPostgresLabels,
),
+ queriesDuration: prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: opts.Namespace,
+ Subsystem: "storage",
+ Name: "postgres_query_duration_seconds",
+ Help: "The SQL query latencies in seconds on the client side.",
+ },
+ monitoringPostgresLabels,
+ ),
errors: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: opts.Namespace,
@@ -91,6 +101,7 @@ func (ps *Storage) Start(ctx context.Context, accessToken, refreshToken, sid, sc
}
func (ps *Storage) save(ctx context.Context, ent *sessionEntity) (err error) {
+ start := time.Now()
labels := prometheus.Labels{"query": "save"}
err = ps.db.QueryRowContext(
ctx,
@@ -103,7 +114,7 @@ func (ps *Storage) save(ctx context.Context, ent *sessionEntity) (err error) {
).Scan(
&ent.ExpireAt,
)
- ps.incQueries(labels)
+ ps.incQueries(labels, start)
if err != nil {
ps.incError(labels)
}
@@ -113,6 +124,7 @@ func (ps *Storage) save(ctx context.Context, ent *sessionEntity) (err error) {
// Get implements storage interface.
func (ps *Storage) Get(ctx context.Context, accessToken string) (*mnemosynerpc.Session, error) {
var entity sessionEntity
+ start := time.Now()
labels := prometheus.Labels{"query": "get"}
err := ps.db.QueryRowContext(ctx, ps.queryGet, accessToken).Scan(
@@ -122,7 +134,7 @@ func (ps *Storage) Get(ctx context.Context, accessToken string) (*mnemosynerpc.S
&entity.Bag,
&entity.ExpireAt,
)
- ps.incQueries(labels)
+ ps.incQueries(labels, start)
if err != nil {
ps.incError(labels)
if err == sql.ErrNoRows {
@@ -171,8 +183,9 @@ func (ps *Storage) List(ctx context.Context, offset, limit int64, expiredAtFrom,
query += " OFFSET $1 LIMIT $2"
labels := prometheus.Labels{"query": "list"}
+ start := time.Now()
rows, err := ps.db.QueryContext(ctx, query, args...)
- ps.incQueries(labels)
+ ps.incQueries(labels, start)
if err != nil {
ps.incError(labels)
return nil, err
@@ -219,25 +232,27 @@ func (ps *Storage) List(ctx context.Context, offset, limit int64, expiredAtFrom,
// Exists implements storage interface.
func (ps *Storage) Exists(ctx context.Context, accessToken string) (exists bool, err error) {
+ start := time.Now()
labels := prometheus.Labels{"query": "exists"}
err = ps.db.QueryRowContext(ctx, ps.queryExists, accessToken).Scan(
&exists,
)
+ ps.incQueries(labels, start)
if err != nil {
ps.incError(labels)
}
- ps.incQueries(labels)
return
}
// Abandon implements storage interface.
func (ps *Storage) Abandon(ctx context.Context, accessToken string) (bool, error) {
+ start := time.Now()
labels := prometheus.Labels{"query": "abandon"}
result, err := ps.db.ExecContext(ctx, ps.queryAbandon, accessToken)
- ps.incQueries(labels)
+ ps.incQueries(labels, start)
if err != nil {
ps.incError(labels)
return false, err
@@ -283,10 +298,11 @@ func (ps *Storage) SetValue(ctx context.Context, accessToken string, key, value
return nil, err
}
+ startSelect := time.Now()
err = tx.QueryRowContext(ctx, selectQuery, accessToken).Scan(
&entity.Bag,
)
- ps.incQueries(prometheus.Labels{"query": "set_value_select"})
+ ps.incQueries(prometheus.Labels{"query": "set_value_select"}, startSelect)
if err != nil {
ps.incError(prometheus.Labels{"query": "set_value_select"})
tx.Rollback()
@@ -298,8 +314,9 @@ func (ps *Storage) SetValue(ctx context.Context, accessToken string, key, value
entity.Bag.Set(key, value)
+ startUpdate := time.Now()
_, err = tx.ExecContext(ctx, updateQuery, accessToken, entity.Bag)
- ps.incQueries(prometheus.Labels{"query": "set_value_update"})
+ ps.incQueries(prometheus.Labels{"query": "set_value_update"}, startUpdate)
if err != nil {
ps.incError(prometheus.Labels{"query": "set_value_update"})
tx.Rollback()
@@ -319,9 +336,10 @@ func (ps *Storage) Delete(ctx context.Context, subjectID, accessToken, refreshTo
}
query := "DELETE FROM " + ps.schema + "." + ps.table + " WHERE " + where.String()
labels := prometheus.Labels{"query": "delete"}
+ start := time.Now()
result, err := ps.db.Exec(query, args...)
- ps.incQueries(labels)
+ ps.incQueries(labels, start)
if err != nil {
ps.incError(labels)
return 0, err
@@ -363,8 +381,9 @@ func (ps *Storage) TearDown() error {
return err
}
-func (ps *Storage) incQueries(field prometheus.Labels) {
- ps.queries.With(field).Inc()
+func (ps *Storage) incQueries(field prometheus.Labels, start time.Time) {
+ ps.queriesTotal.With(field).Inc()
+ ps.queriesDuration.With(field).Observe(time.Since(start).Seconds())
}
func (ps *Storage) incError(field prometheus.Labels) {
@@ -412,13 +431,15 @@ func (ps *Storage) where(subjectID, accessToken, refreshToken string, expiredAtF
// Collect implements prometheus Collector interface.
func (c *Storage) Collect(in chan<- prometheus.Metric) {
- c.queries.Collect(in)
+ c.queriesTotal.Collect(in)
+ c.queriesDuration.Collect(in)
c.errors.Collect(in)
}
// Describe implements prometheus Collector interface.
func (c *Storage) Describe(in chan<- *prometheus.Desc) {
- c.queries.Describe(in)
+ c.queriesTotal.Describe(in)
+ c.queriesDuration.Describe(in)
c.errors.Describe(in)
}
diff --git a/mnemosyned/daemon.go b/mnemosyned/daemon.go
index 6bc233e..1cdad36 100644
--- a/mnemosyned/daemon.go
+++ b/mnemosyned/daemon.go
@@ -12,6 +12,8 @@ import (
"testing"
"time"
+ "context"
+
"github.com/piotrkowalczuk/mnemosyne/internal/cache"
"github.com/piotrkowalczuk/mnemosyne/internal/cluster"
"github.com/piotrkowalczuk/mnemosyne/internal/service/postgres"
@@ -24,6 +26,8 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/health"
+ "google.golang.org/grpc/health/grpc_health_v1"
)
const subsystem = "mnemosyned"
@@ -31,6 +35,7 @@ const subsystem = "mnemosyned"
// DaemonOpts it is constructor argument that can be passed to
// the NewDaemon constructor function.
type DaemonOpts struct {
+ Version string
IsTest bool
SessionTTL time.Duration
SessionTTC time.Duration
@@ -142,7 +147,6 @@ func (d *Daemon) Run() (err error) {
interceptor := promgrpc.NewInterceptor(promgrpc.InterceptorOpts{})
d.clientOptions = []grpc.DialOption{
- grpc.WithTimeout(10 * time.Second),
grpc.WithUserAgent(subsystem),
grpc.WithStatsHandler(interceptor),
grpc.WithDialer(interceptor.Dialer(func(addr string, timeout time.Duration) (net.Conn, error) {
@@ -188,7 +192,10 @@ func (d *Daemon) Run() (err error) {
if err != nil {
return err
}
+
mnemosynerpc.RegisterSessionManagerServer(d.server, mnemosyneServer)
+ grpc_health_v1.RegisterHealthServer(d.server, health.NewServer())
+
if !d.opts.IsTest {
prometheus.DefaultRegisterer.Register(d.storage.(storage.InstrumentedStorage))
prometheus.DefaultRegisterer.Register(cache)
@@ -197,7 +204,10 @@ func (d *Daemon) Run() (err error) {
promgrpc.RegisterInterceptor(d.server, interceptor)
}
- if err = cl.Connect(d.clientOptions...); err != nil {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ if err = cl.Connect(ctx, d.clientOptions...); err != nil {
return err
}
@@ -219,7 +229,6 @@ func (d *Daemon) Run() (err error) {
if d.debugListener != nil {
go func() {
d.logger.Info("debug server is running", zap.String("address", d.debugListener.Addr().String()))
- // TODO: implement keep alive
mux := http.NewServeMux()
mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
@@ -228,9 +237,19 @@ func (d *Daemon) Run() (err error) {
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
mux.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}))
- mux.Handle("/health", &healthHandler{
+ mux.Handle("/healthz", &livenessHandler{
+ livenessResponse: livenessResponse{
+ Version: d.opts.Version,
+ },
+ logger: d.logger,
+ })
+ mux.Handle("/healthr", &readinessHandler{
+ livenessResponse: livenessResponse{
+ Version: d.opts.Version,
+ },
logger: d.logger,
postgres: d.postgres,
+ cluster: cl,
})
if err := http.Serve(d.debugListener, mux); err != nil {
d.logger.Error("debug server failure", zap.Error(err))
diff --git a/mnemosyned/daemon_test.go b/mnemosyned/daemon_test.go
index cedbf1c..50c5f37 100644
--- a/mnemosyned/daemon_test.go
+++ b/mnemosyned/daemon_test.go
@@ -6,12 +6,12 @@ import (
"testing"
"time"
- "go.uber.org/zap"
-
"github.com/piotrkowalczuk/mnemosyne/mnemosynerpc"
+ "go.uber.org/zap"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
func TestDaemon_Run(t *testing.T) {
@@ -87,8 +87,8 @@ func TestDaemon_Run(t *testing.T) {
t.Errorf("%d: missing error", i)
return
}
- if grpc.Code(err) != codes.NotFound {
- t.Errorf("%d: wrong error code, expected %d but got %d", i, codes.NotFound, grpc.Code(err))
+ if status.Code(err) != codes.NotFound {
+ t.Errorf("%d: wrong error code, expected %d but got %d", i, codes.NotFound, status.Code(err))
return
}
diff --git a/mnemosyned/health.go b/mnemosyned/health.go
index 8b84d67..ffcd11d 100644
--- a/mnemosyned/health.go
+++ b/mnemosyned/health.go
@@ -3,30 +3,120 @@ package mnemosyned
import (
"context"
"database/sql"
+ "fmt"
"net/http"
"time"
+ "encoding/json"
+
+ "sync"
+
+ "github.com/piotrkowalczuk/mnemosyne/internal/cluster"
"go.uber.org/zap"
+ "google.golang.org/grpc/health/grpc_health_v1"
)
-type healthHandler struct {
+type livenessResponse struct {
+ Version string `json:"version"`
+}
+
+type livenessHandler struct {
+ livenessResponse
+
+ logger *zap.Logger
+}
+
+func (lh *livenessHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
+ if err := json.NewEncoder(rw).Encode(lh.livenessResponse); err != nil {
+ http.Error(rw, fmt.Sprintf("liveness check response encode failure: %s", err.Error()), http.StatusInternalServerError)
+ return
+ }
+
+ lh.logger.Debug("liveness check done")
+ rw.WriteHeader(http.StatusOK)
+}
+
+type readinessResponse struct {
+ sync.Mutex `json:"-"`
+
+ livenessResponse
+
+ Probes struct {
+ Postgres probeStatus `json:"postgres"`
+ Cluster map[string]probeStatus `json:"cluster"`
+ } `json:"probes"`
+}
+
+type readinessHandler struct {
+ livenessResponse
+
logger *zap.Logger
postgres *sql.DB
+ cluster *cluster.Cluster
}
-func (hh *healthHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
- if hh.postgres != nil {
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
- defer cancel()
+func (rh *readinessHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
+ var (
+ wg sync.WaitGroup
+ status readinessResponse
+ )
+
+ status.livenessResponse = rh.livenessResponse
+ status.Probes.Cluster = make(map[string]probeStatus, rh.cluster.Len())
- if err := hh.postgres.PingContext(ctx); err != nil {
- hh.logger.Debug("health check failure due to postgres connection")
- http.Error(rw, "postgres ping failure", http.StatusServiceUnavailable)
- return
+ ctx, cancel := context.WithTimeout(r.Context(), 20*time.Second)
+ defer cancel()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ if rh.postgres != nil {
+ if err := rh.postgres.PingContext(ctx); err != nil {
+ status.Probes.Postgres = probeStatus(err.Error())
+ }
}
+ }()
+
+ for _, n := range rh.cluster.ExternalNodes() {
+ wg.Add(1)
+
+ go func(n *cluster.Node) {
+ defer wg.Done()
+
+ res, err := n.Health.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
+
+ status.Lock()
+ defer status.Unlock()
+
+ if err != nil {
+ status.Probes.Cluster[n.Addr] = probeStatus(err.Error())
+ } else {
+ status.Probes.Cluster[n.Addr] = probeStatus(res.Status.String())
+ }
+ }(n)
}
- hh.logger.Debug("successful health check")
- rw.WriteHeader(http.StatusOK)
- rw.Write([]byte("1"))
+ wg.Wait()
+
+ if err := json.NewEncoder(rw).Encode(status); err != nil {
+ http.Error(rw, fmt.Sprintf("readiness check response encode failure: %s", err.Error()), http.StatusInternalServerError)
+ return
+ }
+
+ rh.logger.Debug("readiness check done", zap.Any("probes", status.Probes), zap.Int("nb_of_external_nodes", len(rh.cluster.ExternalNodes())))
+}
+
+type probeStatus string
+
+func (ps probeStatus) IsOK() bool {
+ return ps == ""
+}
+
+func (ps probeStatus) MarshalJSON() ([]byte, error) {
+ if ps.IsOK() {
+ return json.Marshal(grpc_health_v1.HealthCheckResponse_SERVING.String())
+ }
+
+ return json.Marshal(string(ps))
}
diff --git a/mnemosyned/health_test.go b/mnemosyned/health_test.go
index d9d1a41..abf5355 100644
--- a/mnemosyned/health_test.go
+++ b/mnemosyned/health_test.go
@@ -5,9 +5,11 @@ import (
"net/http"
"net/http/httptest"
"testing"
+
+ "github.com/piotrkowalczuk/mnemosyne/internal/cluster"
)
-func TestHealthHandler_ServeHTTP(t *testing.T) {
+func TestReadinessHandler_ServeHTTP(t *testing.T) {
s := &postgresSuite{}
s.setup(t)
@@ -16,9 +18,10 @@ func TestHealthHandler_ServeHTTP(t *testing.T) {
pay []byte
err error
)
- srv := httptest.NewServer(&healthHandler{
+ srv := httptest.NewServer(&readinessHandler{
postgres: s.db,
logger: s.logger,
+ cluster: &cluster.Cluster{},
})
defer srv.Close()
@@ -31,8 +34,10 @@ func TestHealthHandler_ServeHTTP(t *testing.T) {
if pay, err = ioutil.ReadAll(res.Body); err != nil {
t.Fatalf("unexpected error: %s", err.Error())
}
- if string(pay) != "1" {
- t.Errorf("wrong payload, expected %s but got %s", "1", string(pay))
+ expPayload := `{"version":"","probes":{"postgres":"SERVING","cluster":{}}}
+`
+ if string(pay) != expPayload {
+ t.Errorf("wrong payload, expected `%s` but got `%s`", expPayload, string(pay))
}
s.teardown(t)
@@ -40,13 +45,13 @@ func TestHealthHandler_ServeHTTP(t *testing.T) {
if res, err = http.Get(srv.URL); err != nil {
t.Fatalf("unexpected error: %s", err.Error())
}
- if res.StatusCode != http.StatusServiceUnavailable {
- t.Fatalf("wrong status code, expected %d but got %d", http.StatusServiceUnavailable, res.StatusCode)
- }
+ //if res.StatusCode != http.StatusServiceUnavailable {
+ // t.Fatalf("wrong status code, expected %d but got %d", http.StatusServiceUnavailable, res.StatusCode)
+ //}
if pay, err = ioutil.ReadAll(res.Body); err != nil {
t.Fatalf("unexpected error: %s", err.Error())
}
- if string(pay) != "postgres ping failure\n" {
- t.Errorf("wrong payload, expected '%s' but got '%s'", "postgres ping failure\n", string(pay))
- }
+ //if string(pay) != "postgres ping failure\n" {
+ // t.Errorf("wrong payload, expected '%s' but got '%s'", "postgres ping failure\n", string(pay))
+ //}
}
diff --git a/mnemosyned/middleware.go b/mnemosyned/middleware.go
index b139099..ce07fdf 100644
--- a/mnemosyned/middleware.go
+++ b/mnemosyned/middleware.go
@@ -46,7 +46,7 @@ func errorInterceptor(log *zap.Logger) func(context.Context, interface{}, *grpc.
res, err := handler(ctx, req)
- code := grpc.Code(err)
+ code := status.Code(err)
if err != nil && code != codes.OK {
if code == codes.Unknown {
switch err {
@@ -64,7 +64,7 @@ func errorInterceptor(log *zap.Logger) func(context.Context, interface{}, *grpc.
}
}
loggerBackground(ctx, log).Error("request failure",
- zap.String("error", grpc.ErrorDesc(err)),
+ zap.String("error", status.Convert(err).Message()),
logger.Ctx(ctx, info, code),
)
@@ -76,7 +76,7 @@ func errorInterceptor(log *zap.Logger) func(context.Context, interface{}, *grpc.
case storage.ErrMissingAccessToken, storage.ErrMissingSession, storage.ErrMissingSubjectID:
return nil, status.Errorf(codes.InvalidArgument, "mnemosyned: %s", err.Error())
default:
- return nil, grpc.Errorf(grpc.Code(err), "mnemosyned: %s", grpc.ErrorDesc(err))
+ return nil, status.Errorf(status.Code(err), "mnemosyned: %s", status.Convert(err).Message())
}
}
diff --git a/mnemosyned/session_manager.go b/mnemosyned/session_manager.go
index d117090..302af55 100644
--- a/mnemosyned/session_manager.go
+++ b/mnemosyned/session_manager.go
@@ -12,7 +12,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/net/context"
- "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
@@ -108,11 +107,11 @@ func newSessionManager(opts sessionManagerOpts) (*sessionManager, error) {
func (sm *sessionManager) Context(ctx context.Context, req *empty.Empty) (*mnemosynerpc.ContextResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
- return nil, grpc.Errorf(codes.InvalidArgument, "missing metadata in context, access token cannot be retrieved")
+ return nil, status.Errorf(codes.InvalidArgument, "missing metadata in context, access token cannot be retrieved")
}
if len(md[mnemosyne.AccessTokenMetadataKey]) == 0 {
- return nil, grpc.Errorf(codes.InvalidArgument, "missing access token in metadata")
+ return nil, status.Errorf(codes.InvalidArgument, "missing access token in metadata")
}
at := md[mnemosyne.AccessTokenMetadataKey][0]
diff --git a/mnemosyned/session_manager_delete.go b/mnemosyned/session_manager_delete.go
index 7dd2c5a..ec82040 100644
--- a/mnemosyned/session_manager_delete.go
+++ b/mnemosyned/session_manager_delete.go
@@ -11,8 +11,8 @@ import (
"github.com/piotrkowalczuk/mnemosyne/mnemosynerpc"
"go.uber.org/zap"
"golang.org/x/net/context"
- "google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
type sessionManagerDelete struct {
@@ -24,7 +24,7 @@ type sessionManagerDelete struct {
func (smd *sessionManagerDelete) Delete(ctx context.Context, req *mnemosynerpc.DeleteRequest) (*wrappers.Int64Value, error) {
if req.AccessToken == "" && req.RefreshToken == "" && req.ExpireAtFrom == nil && req.ExpireAtTo == nil {
- return nil, grpc.Errorf(codes.InvalidArgument, "none of expected arguments was provided")
+ return nil, status.Errorf(codes.InvalidArgument, "none of expected arguments was provided")
}
var expireAtFrom, expireAtTo *time.Time
diff --git a/mnemosyned/session_manager_set_value.go b/mnemosyned/session_manager_set_value.go
index 40c4bfb..ac5b59c 100644
--- a/mnemosyned/session_manager_set_value.go
+++ b/mnemosyned/session_manager_set_value.go
@@ -7,8 +7,8 @@ import (
"github.com/piotrkowalczuk/mnemosyne/mnemosynerpc"
"go.uber.org/zap"
"golang.org/x/net/context"
- "google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
type sessionManagerSetValue struct {
@@ -23,7 +23,7 @@ func (smsv *sessionManagerSetValue) SetValue(ctx context.Context, req *mnemosyne
case req.AccessToken == "":
return nil, errMissingAccessToken
case req.Key == "":
- return nil, grpc.Errorf(codes.InvalidArgument, "missing bag key")
+ return nil, status.Errorf(codes.InvalidArgument, "missing bag key")
}
if node, ok := smsv.cluster.GetOther(req.AccessToken); ok {
diff --git a/mnemosyned/session_manager_start.go b/mnemosyned/session_manager_start.go
index b170212..2ee74fa 100644
--- a/mnemosyned/session_manager_start.go
+++ b/mnemosyned/session_manager_start.go
@@ -8,8 +8,8 @@ import (
"github.com/piotrkowalczuk/mnemosyne/mnemosynerpc"
"go.uber.org/zap"
"golang.org/x/net/context"
- "google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
type sessionManagerStart struct {
@@ -27,7 +27,7 @@ func (sms *sessionManagerStart) Start(ctx context.Context, req *mnemosynerpc.Sta
var err error
req.Session.AccessToken, err = mnemosyne.RandomAccessToken()
if err != nil {
- return nil, grpc.Errorf(codes.Internal, "access token generation failure: %s", err.Error())
+ return nil, status.Errorf(codes.Internal, "access token generation failure: %s", err.Error())
}
}
diff --git a/mnemosyned/session_manager_test.go b/mnemosyned/session_manager_test.go
index 0cbf702..770eb05 100644
--- a/mnemosyned/session_manager_test.go
+++ b/mnemosyned/session_manager_test.go
@@ -16,9 +16,9 @@ import (
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/mock"
"golang.org/x/net/context"
- "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
)
func TestSessionManager_mockedStore(t *testing.T) {
@@ -158,7 +158,7 @@ func TestSessionManager_Start_postgresStore(t *testing.T) {
resp, err := s.client.Start(context.Background(), &mnemosynerpc.StartRequest{})
So(resp, ShouldBeNil)
- So(err, ShouldBeGRPCError(ShouldEqual), codes.InvalidArgument, grpc.ErrorDesc(errMissingSession))
+ So(err, ShouldBeGRPCError(ShouldEqual), codes.InvalidArgument, status.Convert(errMissingSession).Message())
})
})
}))
@@ -181,7 +181,7 @@ func TestSessionManager_Start_postgresStore(t *testing.T) {
resp, err := s[i].client.Start(context.Background(), &mnemosynerpc.StartRequest{})
So(resp, ShouldBeNil)
- So(err, ShouldBeGRPCError(ShouldEqual), codes.InvalidArgument, grpc.ErrorDesc(errMissingSession))
+ So(err, ShouldBeGRPCError(ShouldEqual), codes.InvalidArgument, status.Convert(errMissingSession).Message())
}
})
})
@@ -222,7 +222,7 @@ func TestSessionManager_Get_postgresStore(t *testing.T) {
resp, err := s.client.Get(context.Background(), &mnemosynerpc.GetRequest{})
So(resp, ShouldBeNil)
- So(err, ShouldBeGRPCError(ShouldEqual), codes.InvalidArgument, grpc.ErrorDesc(errMissingAccessToken))
+ So(err, ShouldBeGRPCError(ShouldEqual), codes.InvalidArgument, status.Convert(errMissingAccessToken).Message())
})
})
})
@@ -407,7 +407,7 @@ func TestSessionManager_Exists_postgresStore(t *testing.T) {
res, err := s.client.Exists(context.Background(), &mnemosynerpc.ExistsRequest{})
So(res, ShouldBeNil)
- So(err, ShouldBeGRPCError(ShouldEqual), codes.InvalidArgument, grpc.ErrorDesc(errMissingAccessToken))
+ So(err, ShouldBeGRPCError(ShouldEqual), codes.InvalidArgument, status.Convert(errMissingAccessToken).Message())
})
})
})
@@ -455,7 +455,7 @@ func TestSessionManager_Abandon_postgresStore(t *testing.T) {
resp, err := s.client.Abandon(context.Background(), &mnemosynerpc.AbandonRequest{})
So(resp, ShouldBeNil)
- So(err, ShouldBeGRPCError(ShouldEqual), codes.InvalidArgument, grpc.ErrorDesc(errMissingAccessToken))
+ So(err, ShouldBeGRPCError(ShouldEqual), codes.InvalidArgument, status.Convert(errMissingAccessToken).Message())
})
})
})
@@ -590,7 +590,7 @@ func TestSessionManager_SetValue_postgresStore(t *testing.T) {
})
So(resp, ShouldBeNil)
- So(err, ShouldBeGRPCError(ShouldEqual), codes.InvalidArgument, grpc.ErrorDesc(errMissingAccessToken))
+ So(err, ShouldBeGRPCError(ShouldEqual), codes.InvalidArgument, status.Convert(errMissingAccessToken).Message())
})
})
})
diff --git a/mnemosyned/suite_e2e_test.go b/mnemosyned/suite_e2e_test.go
index 9b32307..b0bdc1d 100644
--- a/mnemosyned/suite_e2e_test.go
+++ b/mnemosyned/suite_e2e_test.go
@@ -8,6 +8,8 @@ import (
"go.uber.org/zap"
+ "context"
+
"github.com/piotrkowalczuk/mnemosyne/internal/storage"
"github.com/piotrkowalczuk/mnemosyne/mnemosynerpc"
. "github.com/smartystreets/goconvey/convey"
@@ -83,11 +85,14 @@ func (es *e2eSuite) setup(t *testing.T, i int) {
t.Log("test daemon started")
}
- es.clientConn, err = grpc.Dial(
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+
+ es.clientConn, err = grpc.DialContext(
+ ctx,
es.listener.Addr().String(),
grpc.WithInsecure(),
grpc.WithBlock(),
- grpc.WithTimeout(2*time.Second),
)
if err != nil {
t.Fatalf("unexpected client conn error: %s", err.Error())
diff --git a/mnemosyned/suite_integration_test.go b/mnemosyned/suite_integration_test.go
index e9067d3..3423cf0 100644
--- a/mnemosyned/suite_integration_test.go
+++ b/mnemosyned/suite_integration_test.go
@@ -7,6 +7,8 @@ import (
"go.uber.org/zap"
+ "context"
+
"github.com/piotrkowalczuk/mnemosyne/internal/cluster"
"github.com/piotrkowalczuk/mnemosyne/internal/storage"
"github.com/piotrkowalczuk/mnemosyne/internal/storage/storagemock"
@@ -47,11 +49,15 @@ func (is *integrationSuite) setup(t *testing.T) {
mnemosynerpc.RegisterSessionManagerServer(is.server, is.serviceServer)
go is.server.Serve(is.listener)
- is.serviceConn, err = grpc.Dial(
+
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+
+ is.serviceConn, err = grpc.DialContext(
+ ctx,
is.listener.Addr().String(),
grpc.WithInsecure(),
grpc.WithBlock(),
- grpc.WithTimeout(2*time.Second),
)
if err != nil {
t.Fatal(err)
diff --git a/mnemosyned/suite_test.go b/mnemosyned/suite_test.go
index 0a93d64..7b63b77 100644
--- a/mnemosyned/suite_test.go
+++ b/mnemosyned/suite_test.go
@@ -10,7 +10,7 @@ import (
_ "github.com/lib/pq"
"github.com/piotrkowalczuk/mnemosyne/mnemosynerpc"
"github.com/smartystreets/goconvey/convey"
- "google.golang.org/grpc"
+ "google.golang.org/grpc/status"
)
var (
@@ -111,10 +111,10 @@ func ShouldBeGRPCError(compare func(interface{}, ...interface{}) string) func(ac
if !ok {
return "The given value must implement error interface."
}
- if s = compare(grpc.ErrorDesc(e), expected[1]); s != "" {
+ if s = compare(status.Convert(e).Message(), expected[1]); s != "" {
return
}
- if s = convey.ShouldEqual(grpc.Code(e), expected[0]); s != "" {
+ if s = convey.ShouldEqual(status.Code(e), expected[0]); s != "" {
return
}
return
diff --git a/scripts/docker-healthcheck.sh b/scripts/docker-healthcheck.sh
index 37ef8dc..945dbe2 100755
--- a/scripts/docker-healthcheck.sh
+++ b/scripts/docker-healthcheck.sh
@@ -2,4 +2,4 @@
set -e
: ${MNEMOSYNED_PORT:=8080}
-curl -f http://localhost:$((MNEMOSYNED_PORT+1))/health || exit 1
\ No newline at end of file
+curl -f http://localhost:$((MNEMOSYNED_PORT+1))/healthr || exit 1
\ No newline at end of file