Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
126475: raft: introduce logMark and snapshot type r=nvanbenschoten a=pav-kv

This PR introduces the type-safe `logMark` and `snapshot` types which carry information about the leader term in whose context / coordinate system the log indices are considered. We convert the snapshot `restore` and `commitTo` methods to use the new types.

The follow-up work will utilize the `term` fields for tracking the "[last accepted term](https://github.com/cockroachdb/cockroach/blob/4ee0b103cd3c3adeedf3a478ead5db78ec52e68e/pkg/raft/raft.go#L348-L363)" in the `unstable` structure, which will then feed into the log write protocol, and be used for less conservative commit index advancement decisions.

Part of cockroachdb#124440

126757: crosscluster/logical: add synthetic failure injection r=dt a=dt

First three commits are cockroachdb#126755.

Co-authored-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
3 people committed Jul 5, 2024
3 parents 180bf2b + 34a98ca + 9889bab commit 327ad7b
Show file tree
Hide file tree
Showing 20 changed files with 542 additions and 172 deletions.
4 changes: 3 additions & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1346,9 +1346,11 @@
<tr><td>APPLICATION</td><td>logical_replication.flush_bytes</td><td>Number of bytes in a given flush</td><td>Logical bytes</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flush_hist_nanos</td><td>Time spent flushing messages across all replication streams</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flush_row_count</td><td>Number of rows in a given flush</td><td>Rows</td><td>HISTOGRAM</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.logical_bytes</td><td>Logical bytes (sum of keys + values) ingested by all replication jobs</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.logical_bytes</td><td>Logical bytes (sum of keys + values) received by all replication jobs</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.optimistic_insert_conflict_count</td><td>Total number of times the optimistic insert encountered a conflict</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.replicated_time_seconds</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Seconds</td><td>GAUGE</td><td>SECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.retry_queue_bytes</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.retry_queue_events</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>physical_replication.admit_latency</td><td>Event admission latency: a difference between event MVCC timestamp and the time it was admitted into ingestion processor</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>physical_replication.commit_latency</td><td>Event commit latency: a difference between event MVCC timestamp and the time it was flushed into disk. If we batch events, then the difference between the oldest event in the batch and flush is recorded</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>physical_replication.cutover_progress</td><td>The number of ranges left to revert in order to complete an inflight cutover</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ go_library(
"//pkg/util/metamorphic",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
"//pkg/util/timeutil",
Expand Down Expand Up @@ -119,6 +120,7 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down
66 changes: 66 additions & 0 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -492,3 +494,67 @@ func WaitUntilReplicatedTime(
return nil
})
}

type mockBatchHandler bool

var _ BatchHandler = mockBatchHandler(true)

func (m mockBatchHandler) HandleBatch(
_ context.Context, _ []streampb.StreamEvent_KV,
) (batchStats, error) {
if m {
return batchStats{}, errors.New("batch processing failure")
}
return batchStats{}, nil
}
func (m mockBatchHandler) GetLastRow() cdcevent.Row { return cdcevent.Row{} }
func (m mockBatchHandler) SetSyntheticFailurePercent(_ uint32) {}

type mockDLQ int

func (m *mockDLQ) Create() error { return nil }

func (m *mockDLQ) Log(
_ context.Context, _ int64, _ streampb.StreamEvent_KV, _ cdcevent.Row, _ error,
) error {
*m++
return nil
}

// TestFlushErrorHandling exercises the flush path in cases where writes fail.
func TestFlushErrorHandling(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
dlq := mockDLQ(0)
lrw := &logicalReplicationWriterProcessor{
metrics: MakeMetrics(0).(*Metrics),
getBatchSize: func() int { return 1 },
dlqClient: &dlq,
}
lrw.purgatory.flush = lrw.flushBuffer
lrw.purgatory.bytesGauge = lrw.metrics.RetryQueueBytes
lrw.purgatory.eventsGauge = lrw.metrics.RetryQueueEvents

lrw.bh = []BatchHandler{(mockBatchHandler(true))}

// One failure immediately means a zero-byte purgatory is full.
require.NoError(t, lrw.handleStreamBuffer(ctx, []streampb.StreamEvent_KV{skv("a")}))
require.Equal(t, int64(1), lrw.metrics.RetryQueueEvents.Value())
require.True(t, lrw.purgatory.full())
require.Equal(t, 0, int(dlq))

// Another failure causes a forced drain of purgatory, incrementing DLQ count.
require.NoError(t, lrw.handleStreamBuffer(ctx, []streampb.StreamEvent_KV{skv("b")}))
require.Equal(t, int64(1), lrw.metrics.RetryQueueEvents.Value())
require.Equal(t, 1, int(dlq))

// Bump up the purgatory size limit and observe no more DLQ'ed items.
lrw.purgatory.byteLimit = 1 << 20
require.False(t, lrw.purgatory.full())
require.NoError(t, lrw.handleStreamBuffer(ctx, []streampb.StreamEvent_KV{skv("c")}))
require.NoError(t, lrw.handleStreamBuffer(ctx, []streampb.StreamEvent_KV{skv("d")}))
require.Equal(t, 1, int(dlq))
require.Equal(t, int64(3), lrw.metrics.RetryQueueEvents.Value())

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type logicalReplicationWriterProcessor struct {

bh []BatchHandler

getBatchSize func() int

streamPartitionClient streamclient.Client

// frontier keeps track of the progress for the spans tracked by this processor
Expand Down Expand Up @@ -140,7 +142,13 @@ func newLogicalReplicationWriterProcessor(
}

lrw := &logicalReplicationWriterProcessor{
spec: spec,
spec: spec,
getBatchSize: func() int {
if useImplicitTxns.Get(&flowCtx.Cfg.Settings.SV) {
return 1
}
return int(flushBatchSize.Get(&flowCtx.Cfg.Settings.SV))
},
bh: bhPool,
frontier: frontier,
stopCh: make(chan struct{}),
Expand All @@ -152,13 +160,16 @@ func newLogicalReplicationWriterProcessor(
ProcessorID: processorID,
},
dlqClient: InitDeadLetterQueueClient(),
metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics),
}
lrw.purgatory = purgatory{
deadline: time.Minute,
delay: time.Second * 5,
levelLimit: 10,
flush: lrw.flushBuffer,
checkpoint: lrw.checkpoint,
deadline: time.Minute,
delay: time.Second * 5,
byteLimit: 8 << 20,
flush: lrw.flushBuffer,
checkpoint: lrw.checkpoint,
bytesGauge: lrw.metrics.RetryQueueBytes,
eventsGauge: lrw.metrics.RetryQueueEvents,
}

if err := lrw.Init(ctx, lrw, post, logicalReplicationWriterResultType, flowCtx, processorID, nil, /* memMonitor */
Expand Down Expand Up @@ -452,12 +463,12 @@ func (lrw *logicalReplicationWriterProcessor) handleStreamBuffer(
ctx context.Context, kvs []streampb.StreamEvent_KV,
) error {
const notRetry = false
unapplied, err := lrw.flushBuffer(ctx, kvs, notRetry, false)
unapplied, unappliedBytes, err := lrw.flushBuffer(ctx, kvs, notRetry, false)
if err != nil {
return err
}
// Put any events that failed to apply into purgatory (flushing if needed).
if err := lrw.purgatory.Store(ctx, unapplied); err != nil {
if err := lrw.purgatory.Store(ctx, unapplied, unappliedBytes); err != nil {
return err
}

Expand Down Expand Up @@ -492,17 +503,29 @@ const maxWriterWorkers = 32
// retry.
func (lrw *logicalReplicationWriterProcessor) flushBuffer(
ctx context.Context, kvs []streampb.StreamEvent_KV, isRetry, mustProcess bool,
) (notProcessed []streampb.StreamEvent_KV, _ error) {
) (notProcessed []streampb.StreamEvent_KV, notProcessedByteSize int64, _ error) {
ctx, sp := tracing.ChildSpan(ctx, "logical-replication-writer-flush")
defer sp.Finish()

if len(kvs) == 0 {
return nil, nil
return nil, 0, nil
}

// Ensure the batcher is always reset, even on early error returns.
preFlushTime := timeutil.Now()
lrw.debug.RecordFlushStart(preFlushTime, int64(len(kvs)))

// Inform the debugging helper that a flush is starting and configure failure
// injection if it indicates it is requested.
testingFailPercent := lrw.debug.RecordFlushStart(preFlushTime, int64(len(kvs)))
if testingFailPercent > 0 {
for i := range lrw.bh {
lrw.bh[i].SetSyntheticFailurePercent(testingFailPercent)
}
defer func() {
for i := range lrw.bh {
lrw.bh[i].SetSyntheticFailurePercent(0)
}
}()
}

// k returns the row key for some KV event, truncated if needed to the row key
// prefix.
Expand All @@ -527,19 +550,19 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(

perChunkStats := make([]flushStats, len(lrw.bh))

total := int64(len(kvs))
todo := kvs
g := ctxgroup.WithContext(ctx)
for worker := range lrw.bh {
if len(kvs) == 0 {
if len(todo) == 0 {
break
}
// The chunk should end after the first new key after chunk size.
chunkEnd := min(chunkSize, len(kvs))
for chunkEnd < len(kvs) && k(kvs[chunkEnd-1]).Equal(k(kvs[chunkEnd])) {
chunkEnd := min(chunkSize, len(todo))
for chunkEnd < len(todo) && k(todo[chunkEnd-1]).Equal(k(todo[chunkEnd])) {
chunkEnd++
}
chunk := kvs[0:chunkEnd]
kvs = kvs[len(chunk):]
chunk := todo[0:chunkEnd]
todo = todo[len(chunk):]
bh := lrw.bh[worker]

g.GoCtx(func(ctx context.Context) error {
Expand All @@ -554,7 +577,7 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(
}

if err := g.Wait(); err != nil {
return nil, err
return nil, 0, err
}

var stats flushStats
Expand All @@ -567,11 +590,10 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(
}

flushTime := timeutil.Since(preFlushTime).Nanoseconds()
lrw.debug.RecordFlushComplete(flushTime, total, stats.processed.bytes)
lrw.debug.RecordFlushComplete(flushTime, int64(len(kvs)), stats.processed.bytes)

lrw.metrics.AppliedRowUpdates.Inc(stats.processed.success)
lrw.metrics.DLQedRowUpdates.Inc(stats.processed.dlq)
lrw.metrics.AppliedLogicalBytes.Inc(stats.processed.bytes)
lrw.metrics.CommitToCommitLatency.RecordValue(timeutil.Since(firstKeyTS).Nanoseconds())

if isRetry {
Expand All @@ -581,23 +603,21 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(
lrw.metrics.InitialApplySuccesses.Inc(stats.processed.success)
lrw.metrics.InitialApplyFailures.Inc(stats.notProcessed.count + stats.processed.dlq)
lrw.metrics.StreamBatchNanosHist.RecordValue(flushTime)
lrw.metrics.StreamBatchRowsHist.RecordValue(total)
lrw.metrics.StreamBatchRowsHist.RecordValue(int64(len(kvs)))
lrw.metrics.StreamBatchBytesHist.RecordValue(stats.processed.bytes + stats.notProcessed.bytes)
lrw.metrics.ReceivedLogicalBytes.Inc(stats.processed.bytes + stats.notProcessed.bytes)
}
return notProcessed, nil
return notProcessed, stats.notProcessed.bytes, nil
}

// flushChunk is the per-thread body of flushBuffer; see flushBuffer's contract.
func (lrw *logicalReplicationWriterProcessor) flushChunk(
ctx context.Context, bh BatchHandler, chunk []streampb.StreamEvent_KV, mustProcess bool,
) (flushStats, error) {
batchSize := int(flushBatchSize.Get(&lrw.FlowCtx.Cfg.Settings.SV))
batchSize := lrw.getBatchSize()
// TODO(yuzefovich): we should have a better heuristic for when to use the
// implicit vs explicit txns (for example, depending on the presence of the
// secondary indexes).
if useImplicitTxns.Get(&lrw.FlowCtx.Cfg.Settings.SV) {
batchSize = 1
}

var stats flushStats
// TODO: The batching here in production would need to be much
Expand Down Expand Up @@ -721,6 +741,7 @@ type BatchHandler interface {
// implicit txn.
HandleBatch(context.Context, []streampb.StreamEvent_KV) (batchStats, error)
GetLastRow() cdcevent.Row
SetSyntheticFailurePercent(uint32)
}

// RowProcessor knows how to process a single row from an event stream.
Expand All @@ -730,6 +751,7 @@ type RowProcessor interface {
// before the change was applied on the source.
ProcessRow(context.Context, isql.Txn, roachpb.KeyValue, roachpb.Value) (batchStats, error)
GetLastRow() cdcevent.Row
SetSyntheticFailurePercent(uint32)
}

type txnBatch struct {
Expand Down Expand Up @@ -779,6 +801,10 @@ func (t *txnBatch) GetLastRow() cdcevent.Row {
return t.rp.GetLastRow()
}

func (t *txnBatch) SetSyntheticFailurePercent(rate uint32) {
t.rp.SetSyntheticFailurePercent(rate)
}

func init() {
rowexec.NewLogicalReplicationWriterProcessor = newLogicalReplicationWriterProcessor
}
16 changes: 16 additions & 0 deletions pkg/ccl/crosscluster/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -63,6 +64,9 @@ type sqlLastWriteWinsRowProcessor struct {
settings *cluster.Settings
ie isql.Executor
lastRow cdcevent.Row

// testing knobs.
testingInjectFailurePercent uint32
}

type queryBuilder struct {
Expand Down Expand Up @@ -177,9 +181,17 @@ func makeSQLLastWriteWinsHandler(
}, nil
}

var errInjected = errors.New("injected synthetic error")

func (lww *sqlLastWriteWinsRowProcessor) ProcessRow(
ctx context.Context, txn isql.Txn, kv roachpb.KeyValue, prevValue roachpb.Value,
) (batchStats, error) {
if lww.testingInjectFailurePercent != 0 {
if randutil.FastUint32()%100 < lww.testingInjectFailurePercent {
return batchStats{}, errInjected
}
}

var err error
kv.Key, err = keys.StripTenantPrefix(kv.Key)
if err != nil {
Expand All @@ -205,6 +217,10 @@ func (lww *sqlLastWriteWinsRowProcessor) GetLastRow() cdcevent.Row {
return lww.lastRow
}

func (lww *sqlLastWriteWinsRowProcessor) SetSyntheticFailurePercent(rate uint32) {
lww.testingInjectFailurePercent = rate
}

var (
implicitTxnOverrides = sessiondata.InternalExecutorOverride{
AttributeToUser: true,
Expand Down
Loading

0 comments on commit 327ad7b

Please sign in to comment.