Skip to content

Commit

Permalink
Unify replica client metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Jun 2, 2021
1 parent 88909e3 commit 1c0c69a
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 113 deletions.
45 changes: 15 additions & 30 deletions abs/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/internal"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -95,7 +93,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
var generations []string
var marker azblob.Marker
for marker.NotDone() {
operationTotalCounterVec.WithLabelValues("LIST").Inc()
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()

resp, err := c.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{
Prefix: litestream.GenerationsPath(c.Path) + "/",
Expand Down Expand Up @@ -130,7 +128,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string)

var marker azblob.Marker
for marker.NotDone() {
operationTotalCounterVec.WithLabelValues("LIST").Inc()
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()

resp, err := c.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"})
if err != nil {
Expand All @@ -139,7 +137,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string)
marker = resp.NextMarker

for _, item := range resp.Segment.BlobItems {
operationTotalCounterVec.WithLabelValues("DELETE").Inc()
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()

blobURL := c.containerURL.NewBlobURL(item.Name)
if _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); isNotExists(err) {
Expand Down Expand Up @@ -185,8 +183,8 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in
return info, err
}

operationTotalCounterVec.WithLabelValues("PUT").Inc()
operationBytesCounterVec.WithLabelValues("PUT").Add(float64(rc.N()))
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N()))

// log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))

Expand Down Expand Up @@ -217,8 +215,8 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i
return nil, fmt.Errorf("cannot start new reader for %q: %w", key, err)
}

operationTotalCounterVec.WithLabelValues("GET").Inc()
operationBytesCounterVec.WithLabelValues("GET").Add(float64(resp.ContentLength()))
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(resp.ContentLength()))

return resp.Body(azblob.RetryReaderOptions{}), nil
}
Expand All @@ -234,7 +232,7 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i
return fmt.Errorf("cannot determine snapshot path: %w", err)
}

operationTotalCounterVec.WithLabelValues("DELETE").Inc()
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()

blobURL := c.containerURL.NewBlobURL(key)
if _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); isNotExists(err) {
Expand Down Expand Up @@ -275,8 +273,8 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos,
return info, err
}

operationTotalCounterVec.WithLabelValues("PUT").Inc()
operationBytesCounterVec.WithLabelValues("PUT").Add(float64(rc.N()))
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N()))

return litestream.WALSegmentInfo{
Generation: pos.Generation,
Expand Down Expand Up @@ -307,8 +305,8 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos
return nil, fmt.Errorf("cannot start new reader for %q: %w", key, err)
}

operationTotalCounterVec.WithLabelValues("GET").Inc()
operationBytesCounterVec.WithLabelValues("GET").Add(float64(resp.ContentLength()))
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(resp.ContentLength()))

return resp.Body(azblob.RetryReaderOptions{}), nil
}
Expand All @@ -325,7 +323,7 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
return fmt.Errorf("cannot determine wal segment path: %w", err)
}

operationTotalCounterVec.WithLabelValues("DELETE").Inc()
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()

blobURL := c.containerURL.NewBlobURL(key)
if _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); isNotExists(err) {
Expand Down Expand Up @@ -375,7 +373,7 @@ func (itr *snapshotIterator) fetch() error {

var marker azblob.Marker
for marker.NotDone() {
operationTotalCounterVec.WithLabelValues("LIST").Inc()
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()

resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"})
if err != nil {
Expand Down Expand Up @@ -481,7 +479,7 @@ func (itr *walSegmentIterator) fetch() error {

var marker azblob.Marker
for marker.NotDone() {
operationTotalCounterVec.WithLabelValues("LIST").Inc()
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()

resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"})
if err != nil {
Expand Down Expand Up @@ -559,16 +557,3 @@ func isNotExists(err error) bool {
return false
}
}

// Azure blob storage metrics.
var (
operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_abs_operation_total",
Help: "The number of ABS operations performed",
}, []string{"type"})

operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_abs_operation_bytes",
Help: "The number of bytes used by ABS operations",
}, []string{"type"})
)
40 changes: 13 additions & 27 deletions gcs/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import (

"cloud.google.com/go/storage"
"github.com/benbjohnson/litestream"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/benbjohnson/litestream/internal"
"google.golang.org/api/iterator"
)

Expand Down Expand Up @@ -105,7 +104,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string)
}

// Iterate over every object in generation and delete it.
operationTotalCounterVec.WithLabelValues("LIST").Inc()
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
for it := c.bkt.Objects(ctx, &storage.Query{Prefix: dir + "/"}); ; {
attrs, err := it.Next()
if err == iterator.Done {
Expand All @@ -119,7 +118,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string)
} else if err != nil {
return fmt.Errorf("cannot delete object %q: %w", attrs.Name, err)
}
operationTotalCounterVec.WithLabelValues("DELETE").Inc()
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
}

// log.Printf("%s(%s): retainer: deleting generation: %s", r.db.Path(), r.Name(), generation)
Expand Down Expand Up @@ -161,8 +160,8 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in
return info, err
}

operationTotalCounterVec.WithLabelValues("PUT").Inc()
operationBytesCounterVec.WithLabelValues("PUT").Add(float64(n))
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(n))

// log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))

Expand Down Expand Up @@ -192,8 +191,8 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i
return nil, fmt.Errorf("cannot start new reader for %q: %w", key, err)
}

operationTotalCounterVec.WithLabelValues("GET").Inc()
operationBytesCounterVec.WithLabelValues("GET").Add(float64(r.Attrs.Size))
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(r.Attrs.Size))

return r, nil
}
Expand All @@ -213,7 +212,7 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i
return fmt.Errorf("cannot delete snapshot %q: %w", key, err)
}

operationTotalCounterVec.WithLabelValues("DELETE").Inc()
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
return nil
}

Expand Down Expand Up @@ -251,8 +250,8 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos,
return info, err
}

operationTotalCounterVec.WithLabelValues("PUT").Inc()
operationBytesCounterVec.WithLabelValues("PUT").Add(float64(n))
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(n))

return litestream.WALSegmentInfo{
Generation: pos.Generation,
Expand Down Expand Up @@ -282,8 +281,8 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos
return nil, err
}

operationTotalCounterVec.WithLabelValues("GET").Inc()
operationBytesCounterVec.WithLabelValues("GET").Add(float64(r.Attrs.Size))
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(r.Attrs.Size))

return r, nil
}
Expand All @@ -303,7 +302,7 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
if err := c.bkt.Object(key).Delete(ctx); err != nil && !isNotExists(err) {
return fmt.Errorf("cannot delete wal segment %q: %w", key, err)
}
operationTotalCounterVec.WithLabelValues("DELETE").Inc()
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
}

return nil
Expand Down Expand Up @@ -427,16 +426,3 @@ func (itr *walSegmentIterator) WALSegment() litestream.WALSegmentInfo {
func isNotExists(err error) bool {
return err == storage.ErrObjectNotExist
}

// GCS metrics.
var (
operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_gcs_operation_total",
Help: "The number of GCS operations performed",
}, []string{"type"})

operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_gcs_operation_bytes",
Help: "The number of bytes used by GCS operations",
}, []string{"type"})
)
16 changes: 16 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"io"
"os"
"syscall"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// ReadCloser wraps a reader to also attach a separate closer.
Expand Down Expand Up @@ -123,3 +126,16 @@ func MkdirAll(path string, fi os.FileInfo) error {
_ = os.Chown(path, uid, gid)
return nil
}

// Shared replica metrics.
var (
OperationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_replica_operation_total",
Help: "The number of replica operations performed",
}, []string{"replica_type", "operation"})

OperationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_replica_operation_bytes",
Help: "The number of bytes used by replica operations",
}, []string{"replica_type", "operation"})
)
Loading

0 comments on commit 1c0c69a

Please sign in to comment.