Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Support for BETWEEN/NOT BETWEEN filter in VStream #17721

Merged
merged 13 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@ func TestVStreamPushdownFilters(t *testing.T) {
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(context.Background(), 1*time.Minute)
streamCtx, streamCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer streamCancel()
done := make(chan struct{})

Expand Down Expand Up @@ -1173,19 +1173,31 @@ func TestVStreamPushdownFilters(t *testing.T) {
Filter: "select * from customer where name = 'påul'",
}},
}
flags := &vtgatepb.VStreamFlags{}
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

// So we should have at least one paul row event in the copy phase.
copyPhaseRowEvents := 0
// And we should have many paul row events in the running phase.
runningPhaseRowEvents := 0
copyPhase := true
// So we should have at least one paul row event in the copy phase, and
// we should have many paul row events in the running phase.
copyPhaseRowEvents, runningPhaseRowEvents := runVStreamAndGetNumOfRowEvents(t, ctx, vstreamConn, vgtid, filter, done)

require.NotZero(t, createdPauls)
require.NotZero(t, createdNonPauls)
require.Greater(t, createdNonPauls, createdPauls)
require.NotZero(t, copyPhaseRowEvents)
require.NotZero(t, runningPhaseRowEvents)

t.Logf("Created pauls: %d, pauls copied: %d, pauls replicated: %d", createdPauls, copyPhaseRowEvents, runningPhaseRowEvents)
require.Equal(t, createdPauls, copyPhaseRowEvents+runningPhaseRowEvents)
}

// runVStreamAndGetNumOfRowEvents runs VStream with the specified filter and
// returns number of copy phase and running phase row events.
func runVStreamAndGetNumOfRowEvents(t *testing.T, ctx context.Context, vstreamConn *vtgateconn.VTGateConn,
vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, done chan struct{}) (copyPhaseRowEvents int, runningPhaseRowEvents int) {
copyPhase := true
func() {
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{})
require.NoError(t, err)
for {
evs, err := reader.Recv()
Expand Down Expand Up @@ -1214,13 +1226,5 @@ func TestVStreamPushdownFilters(t *testing.T) {
}
}
}()

require.NotZero(t, createdPauls)
require.NotZero(t, createdNonPauls)
require.Greater(t, createdNonPauls, createdPauls)
require.NotZero(t, copyPhaseRowEvents)
require.NotZero(t, runningPhaseRowEvents)

t.Logf("Created pauls: %d, pauls copied: %d, pauls replicated: %d", createdPauls, copyPhaseRowEvents, runningPhaseRowEvents)
require.Equal(t, createdPauls, copyPhaseRowEvents+runningPhaseRowEvents)
return
}
107 changes: 94 additions & 13 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ const (
IsNotNull
// In is used to filter a comparable column if equals any of the values from a specific tuple
In
// Note that we do not implement filtering for BETWEEN because
// in the plan we rewrite `x BETWEEN a AND b` to `x >= a AND x <= b`
// NotBetween is used to filter a comparable column if it doesn't lie within a specific range
NotBetween
)

// Filter contains opcodes for filtering.
Expand Down Expand Up @@ -273,6 +277,26 @@ func (plan *Plan) filter(values, result []sqltypes.Value, charsets []collations.
if !found {
return false, nil
}
case NotBetween:
// Note that we do not implement filtering for BETWEEN because
// in the plan we rewrite `x BETWEEN a AND b` to `x >= a AND x <= b`
// This is the filtering for NOT BETWEEN since we don't have support
// for OR yet.
if filter.Values == nil || len(filter.Values) != 2 {
return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected 2 filter values when performing NOT BETWEEN")
}
leftFilterValue, rightFilterValue := filter.Values[0], filter.Values[1]
isValueLessThanLeftFilter, err := compare(LessThan, values[filter.ColNum], leftFilterValue, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil {
return false, err
}
if isValueLessThanLeftFilter {
continue
}
isValueGreaterThanRightFilter, err := compare(GreaterThan, values[filter.ColNum], rightFilterValue, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil || !isValueGreaterThanRightFilter {
return false, err
}
default:
match, err := compare(filter.Opcode, values[filter.ColNum], filter.Value, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil {
Expand Down Expand Up @@ -570,6 +594,23 @@ func (plan *Plan) appendTupleFilter(values sqlparser.ValTuple, opcode Opcode, co
return nil
}

func (plan *Plan) getEvalResultForLiteral(expr sqlparser.Expr) (*evalengine.EvalResult, error) {
literalExpr, ok := expr.(*sqlparser.Literal)
if !ok {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
pv, err := evalengine.Translate(literalExpr, &evalengine.Config{
Collation: plan.env.CollationEnv().DefaultConnectionCharset(),
Environment: plan.env,
})
if err != nil {
return nil, err
}
env := evalengine.EmptyExpressionEnv(plan.env)
resolved, err := env.Evaluate(pv)
return &resolved, err
}

func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) error {
if where == nil {
return nil
Expand Down Expand Up @@ -606,21 +647,11 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
if err != nil {
return err
}
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
continue
}
val, ok := expr.Right.(*sqlparser.Literal)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
pv, err := evalengine.Translate(val, &evalengine.Config{
Collation: plan.env.CollationEnv().DefaultConnectionCharset(),
Environment: plan.env,
})
if err != nil {
return err
}
env := evalengine.EmptyExpressionEnv(plan.env)
resolved, err := env.Evaluate(pv)
resolved, err := plan.getEvalResultForLiteral(expr.Right)
if err != nil {
return err
}
Expand Down Expand Up @@ -661,6 +692,56 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
})
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
case *sqlparser.BetweenExpr:
qualifiedName, ok := expr.Left.(*sqlparser.ColName)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
if !qualifiedName.Qualifier.IsEmpty() {
return fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(qualifiedName))
}
colnum, err := findColumn(plan.Table, qualifiedName.Name)
if err != nil {
return err
}
fromResolved, err := plan.getEvalResultForLiteral(expr.From)
if err != nil {
return err
}
toResolved, err := plan.getEvalResultForLiteral(expr.To)
if err != nil {
return err
}

if !expr.IsBetween {
// `x NOT BETWEEN a AND b` means: `x < a OR x > b`
// Also, since we do not have OR implemented yet,
// NOT BETWEEN needs to be handled separately.
plan.Filters = append(plan.Filters, Filter{
Opcode: NotBetween,
ColNum: colnum,
Values: []sqltypes.Value{
fromResolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
toResolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
},
})
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
continue
}

// `x BETWEEN a AND b` means: `x >= a AND x <= b`
plan.Filters = append(plan.Filters, Filter{
Opcode: GreaterThanEqual,
ColNum: colnum,
Value: fromResolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
}, Filter{
Opcode: LessThanEqual,
ColNum: colnum,
Value: toResolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
})
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
default:
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
Expand Down
100 changes: 99 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,89 @@ func TestPlanBuilder(t *testing.T) {
}},
env: vtenv.NewTestEnv(),
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select val, id from t1 where id between 2 and 5"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarChar,
Charset: unicodeCollationID,
},
}, {
ColNum: 0,
Field: &querypb.Field{
Name: "id",
Type: sqltypes.Int64,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_NUM_FLAG),
},
}},
Filters: []Filter{{
Opcode: GreaterThanEqual,
ColNum: 0,
Value: sqltypes.NewInt64(2),
Vindex: nil,
VindexColumns: nil,
KeyRange: nil,
}, {
Opcode: LessThanEqual,
ColNum: 0,
Value: sqltypes.NewInt64(5),
Vindex: nil,
VindexColumns: nil,
KeyRange: nil,
}},
whereExprsToPushDown: []sqlparser.Expr{
&sqlparser.BetweenExpr{
IsBetween: true,
Left: sqlparser.NewColName("id"),
From: sqlparser.NewIntLiteral("2"),
To: sqlparser.NewIntLiteral("5"),
},
},
env: vtenv.NewTestEnv(),
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select val, id from t1 where id not between 2 and 5"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarChar,
Charset: unicodeCollationID,
},
}, {
ColNum: 0,
Field: &querypb.Field{
Name: "id",
Type: sqltypes.Int64,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_NUM_FLAG),
},
}},
Filters: []Filter{{
Opcode: NotBetween,
ColNum: 0,
Values: []sqltypes.Value{sqltypes.NewInt64(2), sqltypes.NewInt64(5)},
Vindex: nil,
VindexColumns: nil,
KeyRange: nil,
}},
whereExprsToPushDown: []sqlparser.Expr{
&sqlparser.BetweenExpr{
IsBetween: false,
Left: sqlparser.NewColName("id"),
From: sqlparser.NewIntLiteral("2"),
To: sqlparser.NewIntLiteral("5"),
},
},
env: vtenv.NewTestEnv(),
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "/*/"},
Expand Down Expand Up @@ -752,9 +835,22 @@ func TestPlanBuilderFilterComparison(t *testing.T) {
outFilters: []Filter{
{Opcode: In, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}},
},
}, {
name: "between-operator",
inFilter: "select * from t1 where id between 1 and 5",
outFilters: []Filter{
{Opcode: GreaterThanEqual, ColNum: 0, Value: sqltypes.NewInt64(1)},
{Opcode: LessThanEqual, ColNum: 0, Value: sqltypes.NewInt64(5)},
},
}, {
name: "not-between-operator",
inFilter: "select * from t1 where id not between 1 and 5",
outFilters: []Filter{
{Opcode: NotBetween, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(5)}},
},
}, {
name: "vindex-and-operators",
inFilter: "select * from t1 where in_keyrange(id, 'hash', '-80') and id = 2 and val <> 'xyz' and id in (100, 30)",
inFilter: "select * from t1 where in_keyrange(id, 'hash', '-80') and id = 2 and val <> 'xyz' and id in (100, 30) and id between 20 and 60",
outFilters: []Filter{
{
Opcode: VindexMatch,
Expand All @@ -770,6 +866,8 @@ func TestPlanBuilderFilterComparison(t *testing.T) {
{Opcode: Equal, ColNum: 0, Value: sqltypes.NewInt64(2)},
{Opcode: NotEqual, ColNum: 1, Value: sqltypes.NewVarChar("xyz")},
{Opcode: In, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(100), sqltypes.NewInt64(30)}},
{Opcode: GreaterThanEqual, ColNum: 0, Value: sqltypes.NewInt64(20)},
{Opcode: LessThanEqual, ColNum: 0, Value: sqltypes.NewInt64(60)},
},
}}

Expand Down
Loading
Loading