Skip to content

Commit

Permalink
consumer: bound the size of an error stored as Etcd shard status
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Mar 12, 2024
1 parent 188c28b commit 0323445
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
11 changes: 10 additions & 1 deletion consumer/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,14 @@ func servePrimary(s *shard) (err error) {
if err != nil && s.ctx.Err() == nil {
log.WithFields(log.Fields{"err": err, "shard": s.FQN()}).Error("servePrimary failed")

var statusErr = err.Error()
if len(statusErr) > MAX_ETCD_ERR_LEN {
statusErr = statusErr[:MAX_ETCD_ERR_LEN] + " ...[truncated]"
}

updateStatusWithRetry(s, pc.ReplicaStatus{
Code: pc.ReplicaStatus_FAILED,
Errors: []string{err.Error()},
Errors: []string{statusErr},
})
}
s.wg.Done()
Expand Down Expand Up @@ -422,3 +427,7 @@ func backoff(attempt int) time.Duration {
return 5 * time.Second
}
}

// Etcd values should generally be small.
// We don't want to push arbitrary length error strings.
const MAX_ETCD_ERR_LEN = 2048
17 changes: 17 additions & 0 deletions consumer/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package consumer
import (
"context"
"errors"
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -222,3 +224,18 @@ func TestShardStoreStartCommitFails(t *testing.T) {

tf.allocateShard(spec) // Cleanup.
}

func TestShardErrorsAreTruncatedIfLarge(t *testing.T) {
var tf, cleanup = newTestFixture(t)
defer cleanup()

tf.app.newStoreErr = errors.New(strings.Repeat("e", MAX_ETCD_ERR_LEN+1))
tf.allocateShard(makeShard(shardA), localID)

var expect = fmt.Sprintf("completeRecovery: app.NewStore: %s", strings.Repeat("e", MAX_ETCD_ERR_LEN))
expect = fmt.Sprintf("%s ...[truncated]", expect[:MAX_ETCD_ERR_LEN])

require.Equal(t, expect, expectStatusCode(t, tf.state, pc.ReplicaStatus_FAILED).Errors[0])

tf.allocateShard(makeShard(shardA)) // Cleanup.
}

0 comments on commit 0323445

Please sign in to comment.