From fe21e2d2b8971985106fae8d166c20072a9109cc Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Sat, 8 Feb 2025 13:17:35 +0530 Subject: [PATCH 1/2] feat(bento): Configurable retry backoff and input start seq num Also resolves: #10 --- s2-bentobox/input.go | 9 +++++++++ s2-bentobox/multi_stream_input.go | 32 ++++++++++++++++++++++++++++--- s2-bentobox/output.go | 22 +++++++++++++++------ s2-bentobox/stream_input.go | 21 ++++++++++++++++---- s2/client.go | 12 +++++++----- 5 files changed, 78 insertions(+), 18 deletions(-) diff --git a/s2-bentobox/input.go b/s2-bentobox/input.go index d79e9ca..e4108d5 100644 --- a/s2-bentobox/input.go +++ b/s2-bentobox/input.go @@ -101,6 +101,13 @@ func (p PrefixedInputStreams) list(ctx context.Context, client *s2.BasinClient) return streams, nil } +type InputStartSeqNum uint + +const ( + InputStartSeqNumEarliest InputStartSeqNum = iota + InputStartSeqNumLatest +) + type InputConfig struct { *Config Streams InputStreams @@ -108,6 +115,8 @@ type InputConfig struct { UpdateStreamsInterval time.Duration Cache SeqNumCache Logger Logger + BackoffDuration time.Duration + StartSeqNum InputStartSeqNum } type recvOutput struct { diff --git a/s2-bentobox/multi_stream_input.go b/s2-bentobox/multi_stream_input.go index ab578b9..e91d3de 100644 --- a/s2-bentobox/multi_stream_input.go +++ b/s2-bentobox/multi_stream_input.go @@ -3,6 +3,7 @@ package s2bentobox import ( "context" "errors" + "math/rand/v2" "time" "github.com/s2-streamstore/s2-sdk-go/s2" @@ -73,6 +74,11 @@ func streamsManager( updateStreamsInterval = config.UpdateStreamsInterval } + backoffDuration := 100 * time.Millisecond + if config.BackoffDuration != 0 { + backoffDuration = config.BackoffDuration + } + updateList := make(chan struct{}, 1) updateList <- struct{}{} @@ -89,7 +95,18 @@ func streamsManager( } existingWorkers[stream] = worker - go streamSource(workerCtx, client, config.Logger, cache, stream, config.MaxInFlight, inputStream, workerCloser) + go streamSource( + workerCtx, + client, + config.Logger, + cache, + stream, + config.MaxInFlight, + backoffDuration, + config.StartSeqNum, + inputStream, + workerCloser, + ) } managerLoop: @@ -157,22 +174,31 @@ func streamSource( cache *seqNumCache, stream string, maxInflight int, + backoffDuration time.Duration, + inputStartSeqNum InputStartSeqNum, inputStream chan<- recvOutput, closer chan<- struct{}, ) { defer close(closer) + backoff := make(<-chan time.Time) + for { select { case <-ctx.Done(): return + case <-backoff: default: } - input, err := connectStreamInput(ctx, client, cache, logger, stream, maxInflight) + // Never receive (reset) + backoff = make(<-chan time.Time) + + input, err := connectStreamInput(ctx, client, cache, logger, stream, maxInflight, inputStartSeqNum) if err != nil { logger.With("error", err, "stream", stream).Error("Failed to connect, retrying.") - // TODO: Backoff + jitter := time.Duration(rand.Int64N(int64(10 * time.Millisecond))) + backoff = time.After(backoffDuration + jitter) continue } diff --git a/s2-bentobox/output.go b/s2-bentobox/output.go index 9aa8c81..e89bd87 100644 --- a/s2-bentobox/output.go +++ b/s2-bentobox/output.go @@ -4,7 +4,7 @@ import ( "context" "errors" "io" - "sync/atomic" + "sync" "github.com/s2-streamstore/s2-sdk-go/s2" ) @@ -23,8 +23,9 @@ type OutputConfig struct { } type Output struct { - closed atomic.Bool - sendCh chan<- sendInput + closeMu sync.Mutex // Protect the channel senders + closed bool + sendCh chan<- sendInput ackStreamCloser <-chan struct{} appendWorkerCloser <-chan struct{} @@ -100,7 +101,14 @@ func appendWorker( _ = sender.CloseSend() }(sender) + attemptsLeft := 3 + for s := range sendCh { + if attemptsLeft <= 0 { + // Exit the stream since we're getting too many send errors. + return + } + input := s2.AppendInput{ Records: s.batch, FencingToken: config.FencingToken, @@ -113,8 +121,8 @@ func appendWorker( return } - // TODO: Add a retry limit here. This may keep on happening due to a connection error too. s.reply <- err + attemptsLeft-- continue } @@ -160,10 +168,12 @@ func (o *Output) WriteBatch(ctx context.Context, batch *s2.AppendRecordBatch) er } func (o *Output) Close(ctx context.Context) error { - if !o.closed.Load() { + o.closeMu.Lock() + if !o.closed { close(o.sendCh) - o.closed.Store(true) + o.closed = true } + o.closeMu.Unlock() // Cancel the session for abandoning the requests. o.cancelSession() diff --git a/s2-bentobox/stream_input.go b/s2-bentobox/stream_input.go index 457c296..c226737 100644 --- a/s2-bentobox/stream_input.go +++ b/s2-bentobox/stream_input.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math" "sync" "github.com/s2-streamstore/s2-sdk-go/s2" @@ -144,13 +145,25 @@ func connectStreamInput( logger Logger, stream string, maxInflight int, + inputStartSeqNum InputStartSeqNum, ) (*streamInput, error) { + streamClient := client.StreamClient(stream) + // Try getting the sequence number from cache. startSeqNum, err := cache.Get(ctx, stream) if err != nil { - // We'll try to get the earliest available data. - // TODO: Make this configurable - earliest or latest. - startSeqNum = 0 + if inputStartSeqNum == InputStartSeqNumLatest { + var tErr error + startSeqNum, tErr = streamClient.CheckTail(ctx) + if tErr != nil { + logger.With("stream", stream, "error", tErr).Warn("Cannot check tail") + // Set the start sequence number to max uint. + // Let the terminal message handle. + startSeqNum = math.MaxUint64 + } + } else { + startSeqNum = 0 + } } logger.With("stream", stream, "start_seq_num", startSeqNum).Debug("Starting to read") @@ -158,7 +171,7 @@ func connectStreamInput( // Open a read session. streamCtx, closeStream := context.WithCancel(ctx) - receiver, err := client.StreamClient(stream).ReadSession(streamCtx, &s2.ReadSessionRequest{ + receiver, err := streamClient.ReadSession(streamCtx, &s2.ReadSessionRequest{ StartSeqNum: startSeqNum, }) if err != nil { diff --git a/s2/client.go b/s2/client.go index f583d23..e4f847c 100644 --- a/s2/client.go +++ b/s2/client.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "errors" "fmt" + "math/rand/v2" "os" "strings" "time" @@ -465,7 +466,6 @@ type clientInner struct { } func newClientInner(config *clientConfig, basin string) (*clientInner, error) { - // TODO: Configure dial options creds := credentials.NewTLS(&tls.Config{ MinVersion: tls.VersionTLS12, // Ensure HTTP/2 support }) @@ -562,9 +562,10 @@ func sendRetryableInner[T any]( finalErr = err + jitter := time.Duration(rand.Int64N(int64(10 * time.Millisecond))) + select { - // TODO: Add jitter - case <-time.After(retryBackoffDuration): + case <-time.After(retryBackoffDuration + jitter): case <-ctx.Done(): return v, ctx.Err() } @@ -619,9 +620,10 @@ func (r *readSessionReceiver) Recv() (ReadOutput, error) { finalErr = err + jitter := time.Duration(rand.Int64N(int64(10 * time.Millisecond))) + select { - // TODO: Add jitter - case <-time.After(r.Client.Config.RetryBackoffDuration): + case <-time.After(r.Client.Config.RetryBackoffDuration + jitter): newRecv, newErr := sendRetryable(r.ReqCtx, r.Client, r.ServiceReq) if newErr != nil { return nil, newErr From 5ee85a6ea9bd11a1e691b6387214735d709c823a Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Sat, 8 Feb 2025 13:19:35 +0530 Subject: [PATCH 2/2] fmt --- s2-bentobox/multi_stream_input.go | 2 ++ s2-bentobox/output.go | 1 + s2-bentobox/stream_input.go | 1 + 3 files changed, 4 insertions(+) diff --git a/s2-bentobox/multi_stream_input.go b/s2-bentobox/multi_stream_input.go index e91d3de..b086fb8 100644 --- a/s2-bentobox/multi_stream_input.go +++ b/s2-bentobox/multi_stream_input.go @@ -197,8 +197,10 @@ func streamSource( input, err := connectStreamInput(ctx, client, cache, logger, stream, maxInflight, inputStartSeqNum) if err != nil { logger.With("error", err, "stream", stream).Error("Failed to connect, retrying.") + jitter := time.Duration(rand.Int64N(int64(10 * time.Millisecond))) backoff = time.After(backoffDuration + jitter) + continue } diff --git a/s2-bentobox/output.go b/s2-bentobox/output.go index e89bd87..e4065b8 100644 --- a/s2-bentobox/output.go +++ b/s2-bentobox/output.go @@ -122,6 +122,7 @@ func appendWorker( } s.reply <- err + attemptsLeft-- continue diff --git a/s2-bentobox/stream_input.go b/s2-bentobox/stream_input.go index c226737..59877cd 100644 --- a/s2-bentobox/stream_input.go +++ b/s2-bentobox/stream_input.go @@ -154,6 +154,7 @@ func connectStreamInput( if err != nil { if inputStartSeqNum == InputStartSeqNumLatest { var tErr error + startSeqNum, tErr = streamClient.CheckTail(ctx) if tErr != nil { logger.With("stream", stream, "error", tErr).Warn("Cannot check tail")