From 7257430bb6e85a1b49b924c21be3f53f99f9a08d Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Tue, 21 Jan 2025 14:47:41 -0600 Subject: [PATCH 01/13] added label to identify staleness for published vs duplicate entries --- lib/redispub/publisher.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 57a595f..22d1524 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -29,16 +29,17 @@ type PublishOpts struct { // This script checks whether KEYS[1] is set. If it is, it does nothing. It not, // it sets the key, using ARGV[1] as the expiration, and then publishes the -// message ARGV[2] to channels ARGV[3] and ARGV[4]. +// message ARGV[2] to channels ARGV[3] and ARGV[4]. Returns true if published. var publishDedupe = redis.NewScript(` if redis.call("GET", KEYS[1]) == false then redis.call("SETEX", KEYS[1], ARGV[1], 1) for w in string.gmatch(ARGV[3], "([^$]+)") do redis.call("PUBLISH", w, ARGV[2]) end + return true end - return true + return false `) var metricSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{ @@ -75,7 +76,7 @@ var metricLastOplogEntryStaleness = promauto.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: "redispub", Name: "last_entry_staleness_seconds", Help: "Gauge recording the difference between this server's clock and the timestamp on the last published oplog entry.", -}, []string{"ordinal"}) +}, []string{"ordinal", "status"}) var metricOplogEntryStaleness = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "otr", @@ -83,7 +84,7 @@ var metricOplogEntryStaleness = promauto.NewHistogramVec(prometheus.HistogramOpt Name: "entry_staleness_seconds", Help: "Histogram recording the difference between this server's clock and the timestamp of each processed oplog entry.", Buckets: []float64{0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100}, -}, []string{"ordinal"}) +}, []string{"ordinal", "status"}) // PublishStream reads Publications from the given channel and publishes them // to Redis. @@ -181,10 +182,8 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s start := time.Now() ordinalStr := strconv.Itoa(ordinal) staleness := float64(time.Since(time.Unix(int64(p.OplogTimestamp.T), 0)).Seconds()) - metricLastOplogEntryStaleness.WithLabelValues(ordinalStr).Set(staleness) - metricOplogEntryStaleness.WithLabelValues(ordinalStr).Observe(staleness) - _, err := publishDedupe.Run( + written, err := publishDedupe.Run( context.Background(), client, []string{ @@ -203,6 +202,15 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s strings.Join(p.Channels, "$"), // ARGV[3], channels ).Result() + var status string + if written.(bool) { + status = "published" + } else { + status = "duplicate" + } + metricLastOplogEntryStaleness.WithLabelValues(ordinalStr, status).Set(staleness) + metricOplogEntryStaleness.WithLabelValues(ordinalStr, status).Observe(staleness) + redisCommandDuration.WithLabelValues(ordinalStr).Observe(time.Since(start).Seconds()) return err } From 08112d65c2986bafac4bfcaa9676635b265b2436 Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Tue, 21 Jan 2025 15:57:52 -0600 Subject: [PATCH 02/13] type fix for lua script result --- default.nix | 2 +- lib/redispub/publisher.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/default.nix b/default.nix index 03b02ae..7bb4634 100644 --- a/default.nix +++ b/default.nix @@ -2,7 +2,7 @@ buildGoModule { pname = "oplogtoredis"; - version = "3.8.2"; + version = "3.8.3"; src = builtins.path { path = ./.; }; postInstall = '' diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 22d1524..48f2ad7 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -200,10 +200,10 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s dedupeExpirationSeconds, // ARGV[1], expiration time p.Msg, // ARGV[2], message strings.Join(p.Channels, "$"), // ARGV[3], channels - ).Result() + ).Bool() var status string - if written.(bool) { + if written { status = "published" } else { status = "duplicate" From 06aa80dbfe7abaac254bcc8240b000e9eb42198f Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Wed, 22 Jan 2025 10:29:24 -0600 Subject: [PATCH 03/13] checking tests --- lib/redispub/publisher.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 48f2ad7..2ddf2b9 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -182,8 +182,8 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s start := time.Now() ordinalStr := strconv.Itoa(ordinal) staleness := float64(time.Since(time.Unix(int64(p.OplogTimestamp.T), 0)).Seconds()) - - written, err := publishDedupe.Run( + written := true + _, err := publishDedupe.Run( context.Background(), client, []string{ @@ -200,7 +200,7 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s dedupeExpirationSeconds, // ARGV[1], expiration time p.Msg, // ARGV[2], message strings.Join(p.Channels, "$"), // ARGV[3], channels - ).Bool() + ).Result() var status string if written { From 7619d8ca2ebdf8b0e2c461fd5af5dbf85382db45 Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Wed, 22 Jan 2025 10:42:28 -0600 Subject: [PATCH 04/13] checking tests --- lib/redispub/publisher.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 2ddf2b9..1d30f42 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -36,10 +36,9 @@ var publishDedupe = redis.NewScript(` for w in string.gmatch(ARGV[3], "([^$]+)") do redis.call("PUBLISH", w, ARGV[2]) end - return true end - return false + return true `) var metricSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{ @@ -76,7 +75,7 @@ var metricLastOplogEntryStaleness = promauto.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: "redispub", Name: "last_entry_staleness_seconds", Help: "Gauge recording the difference between this server's clock and the timestamp on the last published oplog entry.", -}, []string{"ordinal", "status"}) +}, []string{"ordinal"}) var metricOplogEntryStaleness = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "otr", @@ -84,7 +83,7 @@ var metricOplogEntryStaleness = promauto.NewHistogramVec(prometheus.HistogramOpt Name: "entry_staleness_seconds", Help: "Histogram recording the difference between this server's clock and the timestamp of each processed oplog entry.", Buckets: []float64{0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100}, -}, []string{"ordinal", "status"}) +}, []string{"ordinal"}) // PublishStream reads Publications from the given channel and publishes them // to Redis. @@ -182,7 +181,7 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s start := time.Now() ordinalStr := strconv.Itoa(ordinal) staleness := float64(time.Since(time.Unix(int64(p.OplogTimestamp.T), 0)).Seconds()) - written := true + _, err := publishDedupe.Run( context.Background(), client, @@ -202,14 +201,8 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s strings.Join(p.Channels, "$"), // ARGV[3], channels ).Result() - var status string - if written { - status = "published" - } else { - status = "duplicate" - } - metricLastOplogEntryStaleness.WithLabelValues(ordinalStr, status).Set(staleness) - metricOplogEntryStaleness.WithLabelValues(ordinalStr, status).Observe(staleness) + metricLastOplogEntryStaleness.WithLabelValues(ordinalStr).Set(staleness) + metricOplogEntryStaleness.WithLabelValues(ordinalStr).Observe(staleness) redisCommandDuration.WithLabelValues(ordinalStr).Observe(time.Since(start).Seconds()) return err From 673058c502b998ad3096530932838015f1fe6c52 Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Wed, 22 Jan 2025 11:22:19 -0600 Subject: [PATCH 05/13] adding back lua change --- lib/redispub/publisher.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 1d30f42..47f3cd7 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -36,9 +36,10 @@ var publishDedupe = redis.NewScript(` for w in string.gmatch(ARGV[3], "([^$]+)") do redis.call("PUBLISH", w, ARGV[2]) end + return true end - return true + return false `) var metricSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{ From 054673df99666b905368cd437ac0d1e4d500c07f Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Wed, 22 Jan 2025 11:37:47 -0600 Subject: [PATCH 06/13] testing lua change --- lib/redispub/publisher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 47f3cd7..468f917 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -39,7 +39,7 @@ var publishDedupe = redis.NewScript(` return true end - return false + return true `) var metricSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{ From 85e9590aa4175694730bac9c9254b7353fe78f5f Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Wed, 22 Jan 2025 11:44:57 -0600 Subject: [PATCH 07/13] testing lua change --- lib/redispub/publisher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 468f917..4cf1950 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -200,7 +200,7 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s dedupeExpirationSeconds, // ARGV[1], expiration time p.Msg, // ARGV[2], message strings.Join(p.Channels, "$"), // ARGV[3], channels - ).Result() + ).Bool() metricLastOplogEntryStaleness.WithLabelValues(ordinalStr).Set(staleness) metricOplogEntryStaleness.WithLabelValues(ordinalStr).Observe(staleness) From 8f4767814d01e9e78621159aab39f3fdea47b928 Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Wed, 22 Jan 2025 11:52:31 -0600 Subject: [PATCH 08/13] testing lua change --- lib/redispub/publisher.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 4cf1950..093ad24 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -31,15 +31,16 @@ type PublishOpts struct { // it sets the key, using ARGV[1] as the expiration, and then publishes the // message ARGV[2] to channels ARGV[3] and ARGV[4]. Returns true if published. var publishDedupe = redis.NewScript(` + res = false if redis.call("GET", KEYS[1]) == false then redis.call("SETEX", KEYS[1], ARGV[1], 1) for w in string.gmatch(ARGV[3], "([^$]+)") do redis.call("PUBLISH", w, ARGV[2]) end - return true + res = true end - return true + return res `) var metricSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{ From 64c7180fa66d8c1791d23ec5403e7d69dae1831a Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Wed, 22 Jan 2025 11:57:00 -0600 Subject: [PATCH 09/13] testing lua change --- lib/redispub/publisher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 093ad24..db9ed94 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -31,7 +31,7 @@ type PublishOpts struct { // it sets the key, using ARGV[1] as the expiration, and then publishes the // message ARGV[2] to channels ARGV[3] and ARGV[4]. Returns true if published. var publishDedupe = redis.NewScript(` - res = false + local res = false if redis.call("GET", KEYS[1]) == false then redis.call("SETEX", KEYS[1], ARGV[1], 1) for w in string.gmatch(ARGV[3], "([^$]+)") do From 9ed1268a147c1a7650c40248d5ba894e4d821812 Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Wed, 22 Jan 2025 12:04:46 -0600 Subject: [PATCH 10/13] testing lua change --- lib/redispub/publisher.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index db9ed94..6ca277b 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -31,13 +31,13 @@ type PublishOpts struct { // it sets the key, using ARGV[1] as the expiration, and then publishes the // message ARGV[2] to channels ARGV[3] and ARGV[4]. Returns true if published. var publishDedupe = redis.NewScript(` - local res = false + local res = 1 if redis.call("GET", KEYS[1]) == false then redis.call("SETEX", KEYS[1], ARGV[1], 1) for w in string.gmatch(ARGV[3], "([^$]+)") do redis.call("PUBLISH", w, ARGV[2]) end - res = true + res = 2 end return res @@ -201,7 +201,7 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s dedupeExpirationSeconds, // ARGV[1], expiration time p.Msg, // ARGV[2], message strings.Join(p.Channels, "$"), // ARGV[3], channels - ).Bool() + ).Int() metricLastOplogEntryStaleness.WithLabelValues(ordinalStr).Set(staleness) metricOplogEntryStaleness.WithLabelValues(ordinalStr).Observe(staleness) From 4bbb6888a4f0beb40c738b0c679c8ca6b9221367 Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Wed, 22 Jan 2025 12:14:27 -0600 Subject: [PATCH 11/13] testing lua change --- lib/redispub/publisher.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 6ca277b..de6a6d7 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -77,7 +77,7 @@ var metricLastOplogEntryStaleness = promauto.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: "redispub", Name: "last_entry_staleness_seconds", Help: "Gauge recording the difference between this server's clock and the timestamp on the last published oplog entry.", -}, []string{"ordinal"}) +}, []string{"ordinal", "status"}) var metricOplogEntryStaleness = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "otr", @@ -85,7 +85,7 @@ var metricOplogEntryStaleness = promauto.NewHistogramVec(prometheus.HistogramOpt Name: "entry_staleness_seconds", Help: "Histogram recording the difference between this server's clock and the timestamp of each processed oplog entry.", Buckets: []float64{0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100}, -}, []string{"ordinal"}) +}, []string{"ordinal", "status"}) // PublishStream reads Publications from the given channel and publishes them // to Redis. @@ -184,7 +184,7 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s ordinalStr := strconv.Itoa(ordinal) staleness := float64(time.Since(time.Unix(int64(p.OplogTimestamp.T), 0)).Seconds()) - _, err := publishDedupe.Run( + res, err := publishDedupe.Run( context.Background(), client, []string{ @@ -203,8 +203,14 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s strings.Join(p.Channels, "$"), // ARGV[3], channels ).Int() - metricLastOplogEntryStaleness.WithLabelValues(ordinalStr).Set(staleness) - metricOplogEntryStaleness.WithLabelValues(ordinalStr).Observe(staleness) + var status string + if res == 2 { + status = "published" + } else { + status = "duplicate" + } + metricLastOplogEntryStaleness.WithLabelValues(ordinalStr, status).Set(staleness) + metricOplogEntryStaleness.WithLabelValues(ordinalStr, status).Observe(staleness) redisCommandDuration.WithLabelValues(ordinalStr).Observe(time.Since(start).Seconds()) return err From c411cb3c5ed22b96423ece1d58e6631b957b210a Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Wed, 22 Jan 2025 13:13:04 -0600 Subject: [PATCH 12/13] update script comment --- lib/redispub/publisher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index de6a6d7..b87cff6 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -29,7 +29,7 @@ type PublishOpts struct { // This script checks whether KEYS[1] is set. If it is, it does nothing. It not, // it sets the key, using ARGV[1] as the expiration, and then publishes the -// message ARGV[2] to channels ARGV[3] and ARGV[4]. Returns true if published. +// message ARGV[2] to channels ARGV[3] and ARGV[4]. Returns 2 if published. var publishDedupe = redis.NewScript(` local res = 1 if redis.call("GET", KEYS[1]) == false then From 24cccb898208a86089240bafef91780e9450ab86 Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Mon, 3 Feb 2025 11:57:55 -0600 Subject: [PATCH 13/13] use 0 and 1 for dedup return values --- lib/redispub/publisher.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index b87cff6..90342dd 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -29,15 +29,16 @@ type PublishOpts struct { // This script checks whether KEYS[1] is set. If it is, it does nothing. It not, // it sets the key, using ARGV[1] as the expiration, and then publishes the -// message ARGV[2] to channels ARGV[3] and ARGV[4]. Returns 2 if published. +// message ARGV[2] to channels ARGV[3] and ARGV[4]. Returns 1 if published. +// Note that an int is returned here as a boolean false is interpreted as an error. var publishDedupe = redis.NewScript(` - local res = 1 + local res = 0 if redis.call("GET", KEYS[1]) == false then redis.call("SETEX", KEYS[1], ARGV[1], 1) for w in string.gmatch(ARGV[3], "([^$]+)") do redis.call("PUBLISH", w, ARGV[2]) end - res = 2 + res = 1 end return res @@ -204,7 +205,7 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s ).Int() var status string - if res == 2 { + if res == 1 { status = "published" } else { status = "duplicate"