Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bento): Configurable retry backoff and input start seq num #54

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions s2-bentobox/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,22 @@ func (p PrefixedInputStreams) list(ctx context.Context, client *s2.BasinClient)
return streams, nil
}

type InputStartSeqNum uint

const (
InputStartSeqNumEarliest InputStartSeqNum = iota
InputStartSeqNumLatest
)

type InputConfig struct {
*Config
Streams InputStreams
MaxInFlight int
UpdateStreamsInterval time.Duration
Cache SeqNumCache
Logger Logger
BackoffDuration time.Duration
StartSeqNum InputStartSeqNum
}

type recvOutput struct {
Expand Down
34 changes: 31 additions & 3 deletions s2-bentobox/multi_stream_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package s2bentobox
import (
"context"
"errors"
"math/rand/v2"
"time"

"github.com/s2-streamstore/s2-sdk-go/s2"
Expand Down Expand Up @@ -73,6 +74,11 @@ func streamsManager(
updateStreamsInterval = config.UpdateStreamsInterval
}

backoffDuration := 100 * time.Millisecond
if config.BackoffDuration != 0 {
backoffDuration = config.BackoffDuration
}

updateList := make(chan struct{}, 1)
updateList <- struct{}{}

Expand All @@ -89,7 +95,18 @@ func streamsManager(
}
existingWorkers[stream] = worker

go streamSource(workerCtx, client, config.Logger, cache, stream, config.MaxInFlight, inputStream, workerCloser)
go streamSource(
workerCtx,
client,
config.Logger,
cache,
stream,
config.MaxInFlight,
backoffDuration,
config.StartSeqNum,
inputStream,
workerCloser,
)
}

managerLoop:
Expand Down Expand Up @@ -157,22 +174,33 @@ func streamSource(
cache *seqNumCache,
stream string,
maxInflight int,
backoffDuration time.Duration,
inputStartSeqNum InputStartSeqNum,
inputStream chan<- recvOutput,
closer chan<- struct{},
) {
defer close(closer)

backoff := make(<-chan time.Time)

for {
select {
case <-ctx.Done():
return
case <-backoff:
default:
}

input, err := connectStreamInput(ctx, client, cache, logger, stream, maxInflight)
// Never receive (reset)
backoff = make(<-chan time.Time)

input, err := connectStreamInput(ctx, client, cache, logger, stream, maxInflight, inputStartSeqNum)
if err != nil {
logger.With("error", err, "stream", stream).Error("Failed to connect, retrying.")
// TODO: Backoff

jitter := time.Duration(rand.Int64N(int64(10 * time.Millisecond)))
backoff = time.After(backoffDuration + jitter)

continue
}

Expand Down
23 changes: 17 additions & 6 deletions s2-bentobox/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"io"
"sync/atomic"
"sync"

"github.com/s2-streamstore/s2-sdk-go/s2"
)
Expand All @@ -23,8 +23,9 @@ type OutputConfig struct {
}

type Output struct {
closed atomic.Bool
sendCh chan<- sendInput
closeMu sync.Mutex // Protect the channel senders
closed bool
sendCh chan<- sendInput

ackStreamCloser <-chan struct{}
appendWorkerCloser <-chan struct{}
Expand Down Expand Up @@ -100,7 +101,14 @@ func appendWorker(
_ = sender.CloseSend()
}(sender)

attemptsLeft := 3

for s := range sendCh {
if attemptsLeft <= 0 {
// Exit the stream since we're getting too many send errors.
return
}

input := s2.AppendInput{
Records: s.batch,
FencingToken: config.FencingToken,
Expand All @@ -113,9 +121,10 @@ func appendWorker(
return
}

// TODO: Add a retry limit here. This may keep on happening due to a connection error too.
s.reply <- err

attemptsLeft--

continue
}

Expand Down Expand Up @@ -160,10 +169,12 @@ func (o *Output) WriteBatch(ctx context.Context, batch *s2.AppendRecordBatch) er
}

func (o *Output) Close(ctx context.Context) error {
if !o.closed.Load() {
o.closeMu.Lock()
if !o.closed {
close(o.sendCh)
o.closed.Store(true)
o.closed = true
}
o.closeMu.Unlock()

// Cancel the session for abandoning the requests.
o.cancelSession()
Expand Down
22 changes: 18 additions & 4 deletions s2-bentobox/stream_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"

"github.com/s2-streamstore/s2-sdk-go/s2"
Expand Down Expand Up @@ -144,21 +145,34 @@ func connectStreamInput(
logger Logger,
stream string,
maxInflight int,
inputStartSeqNum InputStartSeqNum,
) (*streamInput, error) {
streamClient := client.StreamClient(stream)

// Try getting the sequence number from cache.
startSeqNum, err := cache.Get(ctx, stream)
if err != nil {
// We'll try to get the earliest available data.
// TODO: Make this configurable - earliest or latest.
startSeqNum = 0
if inputStartSeqNum == InputStartSeqNumLatest {
var tErr error

startSeqNum, tErr = streamClient.CheckTail(ctx)
if tErr != nil {
logger.With("stream", stream, "error", tErr).Warn("Cannot check tail")
// Set the start sequence number to max uint.
// Let the terminal message handle.
startSeqNum = math.MaxUint64
}
} else {
startSeqNum = 0
}
}

logger.With("stream", stream, "start_seq_num", startSeqNum).Debug("Starting to read")

// Open a read session.
streamCtx, closeStream := context.WithCancel(ctx)

receiver, err := client.StreamClient(stream).ReadSession(streamCtx, &s2.ReadSessionRequest{
receiver, err := streamClient.ReadSession(streamCtx, &s2.ReadSessionRequest{
StartSeqNum: startSeqNum,
})
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions s2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"math/rand/v2"
"os"
"strings"
"time"
Expand Down Expand Up @@ -482,7 +483,6 @@ type clientInner struct {
}

func newClientInner(config *clientConfig, basin string) (*clientInner, error) {
// TODO: Configure dial options
creds := credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12, // Ensure HTTP/2 support
})
Expand Down Expand Up @@ -579,9 +579,10 @@ func sendRetryableInner[T any](

finalErr = err

jitter := time.Duration(rand.Int64N(int64(10 * time.Millisecond)))

select {
// TODO: Add jitter
case <-time.After(retryBackoffDuration):
case <-time.After(retryBackoffDuration + jitter):
case <-ctx.Done():
return v, ctx.Err()
}
Expand Down Expand Up @@ -636,9 +637,10 @@ func (r *readSessionReceiver) Recv() (ReadOutput, error) {

finalErr = err

jitter := time.Duration(rand.Int64N(int64(10 * time.Millisecond)))

select {
// TODO: Add jitter
case <-time.After(r.Client.Config.RetryBackoffDuration):
case <-time.After(r.Client.Config.RetryBackoffDuration + jitter):
newRecv, newErr := sendRetryable(r.ReqCtx, r.Client, r.ServiceReq)
if newErr != nil {
return nil, newErr
Expand Down