Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add label to staleness metrics to distinguish published vs duplicate entries #93

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

buildGoModule {
pname = "oplogtoredis";
version = "3.8.2";
version = "3.8.3";
src = builtins.path { path = ./.; };

postInstall = ''
Expand Down
25 changes: 17 additions & 8 deletions lib/redispub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,18 @@ type PublishOpts struct {

// This script checks whether KEYS[1] is set. If it is, it does nothing. It not,
// it sets the key, using ARGV[1] as the expiration, and then publishes the
// message ARGV[2] to channels ARGV[3] and ARGV[4].
// message ARGV[2] to channels ARGV[3] and ARGV[4]. Returns 2 if published.
var publishDedupe = redis.NewScript(`
local res = 1
if redis.call("GET", KEYS[1]) == false then
redis.call("SETEX", KEYS[1], ARGV[1], 1)
for w in string.gmatch(ARGV[3], "([^$]+)") do
redis.call("PUBLISH", w, ARGV[2])
end
res = 2
end

return true
return res
`)

var metricSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -75,15 +77,15 @@ var metricLastOplogEntryStaleness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "redispub",
Name: "last_entry_staleness_seconds",
Help: "Gauge recording the difference between this server's clock and the timestamp on the last published oplog entry.",
}, []string{"ordinal"})
}, []string{"ordinal", "status"})

var metricOplogEntryStaleness = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "otr",
Subsystem: "redispub",
Name: "entry_staleness_seconds",
Help: "Histogram recording the difference between this server's clock and the timestamp of each processed oplog entry.",
Buckets: []float64{0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100},
}, []string{"ordinal"})
}, []string{"ordinal", "status"})

// PublishStream reads Publications from the given channel and publishes them
// to Redis.
Expand Down Expand Up @@ -181,10 +183,8 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s
start := time.Now()
ordinalStr := strconv.Itoa(ordinal)
staleness := float64(time.Since(time.Unix(int64(p.OplogTimestamp.T), 0)).Seconds())
metricLastOplogEntryStaleness.WithLabelValues(ordinalStr).Set(staleness)
metricOplogEntryStaleness.WithLabelValues(ordinalStr).Observe(staleness)

_, err := publishDedupe.Run(
res, err := publishDedupe.Run(
context.Background(),
client,
[]string{
Expand All @@ -201,7 +201,16 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s
dedupeExpirationSeconds, // ARGV[1], expiration time
p.Msg, // ARGV[2], message
strings.Join(p.Channels, "$"), // ARGV[3], channels
).Result()
).Int()

var status string
if res == 2 {
status = "published"
} else {
status = "duplicate"
}
metricLastOplogEntryStaleness.WithLabelValues(ordinalStr, status).Set(staleness)
metricOplogEntryStaleness.WithLabelValues(ordinalStr, status).Observe(staleness)

redisCommandDuration.WithLabelValues(ordinalStr).Observe(time.Since(start).Seconds())
return err
Expand Down
Loading