Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
120150: kvserver: don't propose lease change if unknown leader r=pav-kv a=andrewbaptist

Previously we could propose a lease change if the leader was unknown. This request could stall as we waited for the lease request to fail. Not knowing the leader means we can't immediately submit this lease proposal.

With this commit, lease requests are rejected immediately if the leader is not known. We rely on the client to retry requests until a new leader has been established.

This behaviour is behind the `kv.lease.reject_on_leader_unknown.enabled` cluster setting, currently off by default.

Epic: none
Touches cockroachdb#120073
Touches cockroachdb#118435

121137: roachtest/mixedversion: wrap original errors in test failures r=herkolategan a=renatolabs

Previously, every test failure would be returned to the caller as a `testFailure` struct (that implements the `error` interface). That struct would display the string representation of the original error along with mixed-version state information to help with debugging.

That implementation, however, had a significant drawback: we lose all type information related to the original error that caused the test failure. Specifically, if the error that caused the test to fail is one that leads to the GitHub issue to be redirected to a different team (via `errors.Mark` or `registry.ErrorWithOwner`), that information is lost.

In this commit, we make the error returned to the caller a proper wrapper of the original error. The mixed-version state information is added as error details and they continue to be logged in the step's logger on failure.

Fixes: cockroachdb#120926

Release note: None

121160: opt: push Offset into IndexJoin r=mgartner a=mgartner

The `PushOffsetIntoIndexJoin` rule has been added that pushes Offset
expressions below IndexJoin expressions. It is very similar to
`PushLimitIntoIndexJoin`.

Fixes cockroachdb#121157

Release note (performance improvement): The optimizer now generates more
efficient query plans for some queries with `OFFSET` clauses.


Co-authored-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
Co-authored-by: Andrew Baptist <baptist@cockroachlabs.com>
Co-authored-by: Renato Costa <renato@cockroachlabs.com>
Co-authored-by: Marcus Gartner <marcus@cockroachlabs.com>
  • Loading branch information
5 people committed Mar 27, 2024
4 parents 085b9be + 282d953 + df530cf + 1e4be65 commit 502fa9b
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 135 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ go_library(
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_pkg_errors//:errors",
"@com_github_cockroachdb_errors//:errors",
"@org_golang_x_exp//maps",
],
)
Expand Down
5 changes: 3 additions & 2 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/errors"
)

// Helper is the struct passed to `stepFunc`s (user-provided or
Expand Down Expand Up @@ -111,8 +112,8 @@ func (h *Helper) Background(
return err
}

desc := fmt.Sprintf("error in background function %s: %s", name, err)
return h.runner.testFailure(desc, bgLogger, nil)
err := errors.Wrapf(err, "error in background function %s", name)
return h.runner.testFailure(err, bgLogger, nil)
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/testutils/release"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/pkg/errors"
"github.com/cockroachdb/errors"
)

const (
Expand Down
63 changes: 31 additions & 32 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

type (
Expand All @@ -54,9 +55,7 @@ type (
stopFuncs []StopFunc
}

testFailure struct {
summarized bool
description string
testFailureDetails struct {
seed int64
testContext *Context
binaryVersions []roachpb.Version
Expand Down Expand Up @@ -171,7 +170,7 @@ func (tr *testRunner) run() (retErr error) {
return fmt.Errorf("background step `%s` returned error: %w", event.Name, event.Err)

case err := <-tr.monitor.Err():
return tr.testFailure(err.Error(), tr.logger, nil)
return tr.testFailure(err, tr.logger, nil)
}
}
}
Expand Down Expand Up @@ -303,18 +302,20 @@ func (tr *testRunner) startBackgroundStep(ss *singleStep, l *logger.Logger, stop
// cluster version before and after the step (in case the failure
// happened *while* the cluster version was updating).
func (tr *testRunner) stepError(err error, step *singleStep, l *logger.Logger) error {
desc := fmt.Sprintf("mixed-version test failure while running step %d (%s): %s",
step.ID, step.impl.Description(), err,
stepErr := errors.Wrapf(
err,
"mixed-version test failure while running step %d (%s)",
step.ID, step.impl.Description(),
)

return tr.testFailure(desc, l, &step.context)
return tr.testFailure(stepErr, l, &step.context)
}

// testFailure generates a `testFailure` with the given
// description. It logs the error to the logger passed, and renames
// the underlying file to include the "FAILED" prefix to help in
// debugging.
func (tr *testRunner) testFailure(desc string, l *logger.Logger, testContext *Context) error {
// testFailure generates a `testFailure` for failures that happened
// due to the given error. It logs the error to the logger passed,
// and renames the underlying file to include the "FAILED" prefix to
// help in debugging.
func (tr *testRunner) testFailure(err error, l *logger.Logger, testContext *Context) error {
clusterVersionsBefore := tr.clusterVersions
var clusterVersionsAfter atomic.Value
if tr.connCacheInitialized() {
Expand All @@ -325,24 +326,27 @@ func (tr *testRunner) testFailure(desc string, l *logger.Logger, testContext *Co
}
}

tf := &testFailure{
description: desc,
tf := &testFailureDetails{
seed: tr.seed,
testContext: testContext,
binaryVersions: loadAtomicVersions(tr.binaryVersions),
clusterVersionsBefore: loadAtomicVersions(clusterVersionsBefore),
clusterVersionsAfter: loadAtomicVersions(clusterVersionsAfter),
}

// failureErr wraps the original error, adding mixed-version state
// information as error details.
failureErr := errors.WithDetailf(err, "%s", tf.Format())

// Print the test failure on the step's logger for convenience, and
// to reduce cross referencing of logs.
l.Printf("%v", tf)
l.Printf("%+v", failureErr)

if err := renameFailedLogger(l); err != nil {
tr.logger.Printf("could not rename failed step logger: %v", err)
}

return tf
return failureErr
}

// teardown groups together all tasks that happen once a test finishes.
Expand Down Expand Up @@ -635,30 +639,25 @@ func (br *backgroundRunner) CompletedEvents() <-chan backgroundEvent {
return br.events
}

func (tf *testFailure) Error() string {
if tf.summarized {
return tf.description
}
tf.summarized = true

func (tfd *testFailureDetails) Format() string {
lines := []string{
tf.description,
fmt.Sprintf("test random seed: %d\n", tf.seed),
"test failed:",
fmt.Sprintf("test random seed: %d\n", tfd.seed),
}

tw := newTableWriter(len(tf.binaryVersions))
if tf.testContext != nil {
releasedVersions := make([]*clusterupgrade.Version, 0, len(tf.testContext.CockroachNodes))
for _, node := range tf.testContext.CockroachNodes {
releasedVersions = append(releasedVersions, tf.testContext.NodeVersion(node))
tw := newTableWriter(len(tfd.binaryVersions))
if tfd.testContext != nil {
releasedVersions := make([]*clusterupgrade.Version, 0, len(tfd.testContext.CockroachNodes))
for _, node := range tfd.testContext.CockroachNodes {
releasedVersions = append(releasedVersions, tfd.testContext.NodeVersion(node))
}
tw.AddRow("released versions", toString(releasedVersions)...)
}

tw.AddRow("logical binary versions", toString(tf.binaryVersions)...)
tw.AddRow("cluster versions before failure", toString(tf.clusterVersionsBefore)...)
tw.AddRow("logical binary versions", toString(tfd.binaryVersions)...)
tw.AddRow("cluster versions before failure", toString(tfd.clusterVersionsBefore)...)

if cv := tf.clusterVersionsAfter; cv != nil {
if cv := tfd.clusterVersionsAfter; cv != nil {
tw.AddRow("cluster versions after failure", toString(cv)...)
}

Expand Down
63 changes: 43 additions & 20 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,20 @@ type propBuf struct {
}

type rangeLeaderInfo struct {
// leader is the Raft group's leader. Equals 0 [roachpb.ReplicaID(raft.None)]
// if the leader is not known/set, in which case other fields are unset too.
leader roachpb.ReplicaID
// iAmTheLeader is set if the local replica is the leader.
iAmTheLeader bool
// leaderKnown is set if the local Raft machinery knows who the leader is. If
// not set, all other fields are empty.
leaderKnown bool
// leader represents the Raft group's leader. Not set if leaderKnown is not
// set.
leader roachpb.ReplicaID
// leaderEligibleForLease is set if the leader is known and its type of
// replica allows it to acquire a lease.
leaderEligibleForLease bool
}

func (r rangeLeaderInfo) leaderKnown() bool {
return r.leader != roachpb.ReplicaID(raft.None)
}

type admitEntHandle struct {
handle *kvflowcontrolpb.RaftAdmissionMeta
pCtx context.Context
Expand Down Expand Up @@ -682,11 +683,16 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked(
// Thus, we do one of two things:
// - if the leader is known, we reject this proposal and make sure the
// request that needed the lease is redirected to the leaseholder;
// - if the leader is not known, we don't do anything special here to
// - if the leader is not known [^1], we don't do anything special here to
// terminate the proposal, but we know that Raft will reject it with a
// ErrProposalDropped. We'll eventually re-propose it once a leader is
// known, at which point it will either go through or be rejected based on
// whether or not it is this replica that became the leader.
// whether it is this replica that became the leader.
//
// [^1]: however, if the leader is not known and RejectLeaseOnLeaderUnknown
// cluster setting is true, we reject the proposal.
// TODO(pav-kv): make this behaviour default. Right now, it is hidden behind
// the experimental cluster setting. See #120073 and #118435.
//
// A special case is when the leader is known, but is ineligible to get the
// lease. In that case, we have no choice but to continue with the proposal.
Expand All @@ -698,11 +704,25 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked(
if li.iAmTheLeader {
return false
}
leaderKnownAndEligible := li.leaderKnown && li.leaderEligibleForLease
ownsCurrentLease := b.p.ownsValidLease(ctx, b.clock.NowAsClockTimestamp())
if leaderKnownAndEligible && !ownsCurrentLease && !b.testing.allowLeaseProposalWhenNotLeader {
if b.p.ownsValidLease(ctx, b.clock.NowAsClockTimestamp()) {
log.VEventf(ctx, 2, "proposing lease extension even though we're not the leader; we hold the current lease")
return false
}

reject := false
if !li.leaderKnown() && RejectLeaseOnLeaderUnknown.Get(&b.settings.SV) {
log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; the leader is unknown")
reject = true
}
// TODO(pav-kv): the testing knob logic below doesn't exactly correspond to
// its name. Clean it up, potentially replace by the cluster setting above.
if li.leaderEligibleForLease && !b.testing.allowLeaseProposalWhenNotLeader {
log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is",
li.leader)
reject = true
}
if reject {
// NB: li.leader can be None.
b.p.rejectProposalWithRedirectLocked(ctx, p, li.leader)
if b.p.shouldCampaignOnRedirect(raftGroup) {
const format = "campaigning because Raft leader (id=%d) not live in node liveness map"
Expand All @@ -715,12 +735,9 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked(
}
return true
}
// If the leader is not known, or if it is known but it's ineligible
// for the lease, continue with the proposal as explained above. We
// also send lease extensions for an existing leaseholder.
if ownsCurrentLease {
log.VEventf(ctx, 2, "proposing lease extension even though we're not the leader; we hold the current lease")
} else if !li.leaderKnown {
// If the leader is not known, or if it is known but is ineligible for the
// lease, continue with the proposal as explained above.
if !li.leaderKnown() {
log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is unknown")
} else {
log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is ineligible")
Expand Down Expand Up @@ -810,8 +827,7 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked(
func (b *propBuf) leaderStatusRLocked(ctx context.Context, raftGroup proposerRaft) rangeLeaderInfo {
leaderInfo := b.p.leaderStatus(ctx, raftGroup)
// Sanity check.
if leaderInfo.leaderKnown && leaderInfo.leader == b.p.getReplicaID() &&
!leaderInfo.iAmTheLeader {
if leaderInfo.leader == b.p.getReplicaID() && !leaderInfo.iAmTheLeader {
log.Fatalf(ctx,
"inconsistent Raft state: state %s while the current replica is also the lead: %d",
raftGroup.BasicStatus().RaftState, leaderInfo.leader)
Expand Down Expand Up @@ -1402,7 +1418,6 @@ func (rp *replicaProposer) leaderStatus(
}
return rangeLeaderInfo{
iAmTheLeader: iAmTheLeader,
leaderKnown: leaderKnown,
leader: roachpb.ReplicaID(leader),
leaderEligibleForLease: leaderEligibleForLease,
}
Expand Down Expand Up @@ -1444,6 +1459,14 @@ func (rp *replicaProposer) rejectProposalWithRedirectLocked(
rangeDesc := r.descRLocked()
storeID := r.store.StoreID()
r.store.metrics.LeaseRequestErrorCount.Inc(1)
if redirectTo == roachpb.ReplicaID(raft.None) {
// We don't know the leader, so pass Lease{} to give no hint.
rp.rejectProposalWithErrLocked(ctx, prop, kvpb.NewError(
kvpb.NewNotLeaseHolderError(roachpb.Lease{}, storeID, rangeDesc,
"refusing to acquire lease on follower")))
return
}

redirectRep, _ /* ok */ := rangeDesc.GetReplicaDescriptorByID(redirectTo)
log.VEventf(ctx, 2, "redirecting proposal to node %s; request: %s", redirectRep.NodeID, prop.Request)
rp.rejectProposalWithErrLocked(ctx, prop, kvpb.NewError(
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ func (t *testProposer) leaderStatus(ctx context.Context, raftGroup proposerRaft)
}
return rangeLeaderInfo{
iAmTheLeader: iAmTheLeader,
leaderKnown: leaderKnown,
leader: leaderRep,
leaderEligibleForLease: leaderEligibleForLease,
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,18 @@ var LeaseCheckPreferencesOnAcquisitionEnabled = settings.RegisterBoolSetting(
true,
)

// RejectLeaseOnLeaderUnknown controls whether a replica that does not know the
// current raft leader rejects a lease request.
//
// TODO(pav-kv): flip the default to true, and remove this setting when this
// becomes the only behaviour.
var RejectLeaseOnLeaderUnknown = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.lease.reject_on_leader_unknown.enabled",
"reject lease requests on a replica that does not know the raft leader",
false,
)

var leaseStatusLogLimiter = func() *log.EveryN {
e := log.Every(15 * time.Second)
e.ShouldLog() // waste the first shot
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/opt/exec/execbuilder/testdata/select_index
Original file line number Diff line number Diff line change
Expand Up @@ -1466,16 +1466,16 @@ EXPLAIN (VERBOSE) SELECT * FROM noncover ORDER BY c LIMIT 5 OFFSET 5
distribution: local
vectorized: true
·
limit
index join
│ columns: (a, b, c, d)
│ offset: 5
│ ordering: +c
│ estimated row count: 5 (missing stats)
│ table: noncover@noncover_pkey
│ key columns: a
└── • index join
│ columns: (a, b, c, d)
│ ordering: +c
│ estimated row count: 10 (missing stats)
│ table: noncover@noncover_pkey
│ key columns: a
└── • limit
│ columns: (a, c)
│ offset: 5
└── • scan
columns: (a, c)
Expand Down
21 changes: 19 additions & 2 deletions pkg/sql/opt/xform/rules/limit.opt
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@

# PushLimitIntoIndexJoin pushes a limit through an index join. Since index
# lookup can be expensive, it's always better to discard rows beforehand.
#
# TODO(radu): we can similarly push Offset too.
[PushLimitIntoIndexJoin, Explore]
(Limit
(IndexJoin $input:* $indexJoinPrivate:*) &
Expand All @@ -56,6 +54,25 @@
$indexJoinPrivate
)

# PushOffsetIntoIndexJoin pushes an offset through an index join. Since an index
# lookup can be expensive, it's always better to discard rows beforehand.
[PushOffsetIntoIndexJoin, Explore]
(Offset
(IndexJoin $input:* $indexJoinPrivate:*) &
(IndexJoinPreservesRows $indexJoinPrivate)
$offsetExpr:(Const $offset:* & (IsPositiveInt $offset))
$ordering:* &
(OrderingCanProjectCols
$ordering
$cols:(OutputCols $input)
)
)
=>
(IndexJoin
(Offset $input $offsetExpr (PruneOrdering $ordering $cols))
$indexJoinPrivate
)

# SplitLimitedScanIntoUnionScans splits a non-inverted scan under a limit into a
# union-all of limited scans over disjoint intervals. Example:
#
Expand Down
Loading

0 comments on commit 502fa9b

Please sign in to comment.