diff --git a/default.nix b/default.nix index 03b02ae..7bb4634 100644 --- a/default.nix +++ b/default.nix @@ -2,7 +2,7 @@ buildGoModule { pname = "oplogtoredis"; - version = "3.8.2"; + version = "3.8.3"; src = builtins.path { path = ./.; }; postInstall = '' diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 57a595f..90342dd 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -29,16 +29,19 @@ type PublishOpts struct { // This script checks whether KEYS[1] is set. If it is, it does nothing. It not, // it sets the key, using ARGV[1] as the expiration, and then publishes the -// message ARGV[2] to channels ARGV[3] and ARGV[4]. +// message ARGV[2] to channels ARGV[3] and ARGV[4]. Returns 1 if published. +// Note that an int is returned here as a boolean false is interpreted as an error. var publishDedupe = redis.NewScript(` + local res = 0 if redis.call("GET", KEYS[1]) == false then redis.call("SETEX", KEYS[1], ARGV[1], 1) for w in string.gmatch(ARGV[3], "([^$]+)") do redis.call("PUBLISH", w, ARGV[2]) end + res = 1 end - return true + return res `) var metricSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{ @@ -75,7 +78,7 @@ var metricLastOplogEntryStaleness = promauto.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: "redispub", Name: "last_entry_staleness_seconds", Help: "Gauge recording the difference between this server's clock and the timestamp on the last published oplog entry.", -}, []string{"ordinal"}) +}, []string{"ordinal", "status"}) var metricOplogEntryStaleness = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "otr", @@ -83,7 +86,7 @@ var metricOplogEntryStaleness = promauto.NewHistogramVec(prometheus.HistogramOpt Name: "entry_staleness_seconds", Help: "Histogram recording the difference between this server's clock and the timestamp of each processed oplog entry.", Buckets: []float64{0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100}, -}, []string{"ordinal"}) +}, []string{"ordinal", "status"}) // PublishStream reads Publications from the given channel and publishes them // to Redis. @@ -181,10 +184,8 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s start := time.Now() ordinalStr := strconv.Itoa(ordinal) staleness := float64(time.Since(time.Unix(int64(p.OplogTimestamp.T), 0)).Seconds()) - metricLastOplogEntryStaleness.WithLabelValues(ordinalStr).Set(staleness) - metricOplogEntryStaleness.WithLabelValues(ordinalStr).Observe(staleness) - _, err := publishDedupe.Run( + res, err := publishDedupe.Run( context.Background(), client, []string{ @@ -201,7 +202,16 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s dedupeExpirationSeconds, // ARGV[1], expiration time p.Msg, // ARGV[2], message strings.Join(p.Channels, "$"), // ARGV[3], channels - ).Result() + ).Int() + + var status string + if res == 1 { + status = "published" + } else { + status = "duplicate" + } + metricLastOplogEntryStaleness.WithLabelValues(ordinalStr, status).Set(staleness) + metricOplogEntryStaleness.WithLabelValues(ordinalStr, status).Observe(staleness) redisCommandDuration.WithLabelValues(ordinalStr).Observe(time.Since(start).Seconds()) return err