Skip to content

Commit

Permalink
VStream: Add IS NULL operator filter support
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <noblemittal@outlook.com>
  • Loading branch information
beingnoble03 committed Feb 24, 2025
1 parent ef84b32 commit 4f62a40
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 10 deletions.
31 changes: 22 additions & 9 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ const (
GreaterThanEqual
// NotEqual is used to filter a comparable column if != specific value
NotEqual
// IsNotNull is used to filter a column if it is NULL
// IsNotNull is used to filter a column if it is not NULL
IsNotNull
// IsNull is used to filter a column if it is NULL
IsNull
// In is used to filter a comparable column if equals any of the values from a specific tuple
In
// Note that we do not implement filtering for BETWEEN because
Expand Down Expand Up @@ -259,6 +261,10 @@ func (plan *Plan) filter(values, result []sqltypes.Value, charsets []collations.
if values[filter.ColNum].IsNull() {
return false, nil
}
case IsNull:
if !values[filter.ColNum].IsNull() {
return false, nil
}
case In:
if filter.Values == nil {
return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected empty filter values when performing IN operator")
Expand Down Expand Up @@ -671,10 +677,7 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil {
return err
}
case *sqlparser.IsExpr: // Needed for CreateLookupVindex with ignore_nulls
if expr.Right != sqlparser.IsNotNullOp {
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
case *sqlparser.IsExpr:
qualifiedName, ok := expr.Left.(*sqlparser.ColName)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
Expand All @@ -686,10 +689,20 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
if err != nil {
return err
}
plan.Filters = append(plan.Filters, Filter{
Opcode: IsNotNull,
ColNum: colnum,
})
switch expr.Right {
case sqlparser.IsNullOp:
plan.Filters = append(plan.Filters, Filter{
Opcode: IsNull,
ColNum: colnum,
})
case sqlparser.IsNotNullOp:
plan.Filters = append(plan.Filters, Filter{
Opcode: IsNotNull,
ColNum: colnum,
})
default:
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
case *sqlparser.BetweenExpr:
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,9 +848,15 @@ func TestPlanBuilderFilterComparison(t *testing.T) {
outFilters: []Filter{
{Opcode: NotBetween, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(5)}},
},
}, {
name: "is-null-operator",
inFilter: "select * from t1 where val is null",
outFilters: []Filter{
{Opcode: IsNull, ColNum: 1},
},
}, {
name: "vindex-and-operators",
inFilter: "select * from t1 where in_keyrange(id, 'hash', '-80') and id = 2 and val <> 'xyz' and id in (100, 30) and id between 20 and 60",
inFilter: "select * from t1 where in_keyrange(id, 'hash', '-80') and id = 2 and val <> 'xyz' and id in (100, 30) and val is null and id between 20 and 60",
outFilters: []Filter{
{
Opcode: VindexMatch,
Expand All @@ -867,6 +873,7 @@ func TestPlanBuilderFilterComparison(t *testing.T) {
{Opcode: NotEqual, ColNum: 1, Value: sqltypes.NewVarChar("xyz")},
{Opcode: In, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(100), sqltypes.NewInt64(30)}},
{Opcode: GreaterThanEqual, ColNum: 0, Value: sqltypes.NewInt64(20)},
{Opcode: IsNull, ColNum: 1},
{Opcode: LessThanEqual, ColNum: 0, Value: sqltypes.NewInt64(60)},
},
}}
Expand Down
71 changes: 71 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2236,3 +2236,74 @@ func TestFilteredBetweenOperator(t *testing.T) {
})
}
}

func TestFilteredIsNullOperator(t *testing.T) {
testCases := []struct {
name string
filter string
testQueries []*TestQuery
}{
{
name: "is-null",
filter: "select id1, val from t1 where val is null",
testQueries: []*TestQuery{
{"begin", nil},
{"insert into t1 values (1, 100, 'aaa')", noEvents},
{"insert into t1 values (2, 200, 'bbb')", noEvents},
{"insert into t1 values (3, 100, 'ccc')", noEvents},
{"insert into t1 values (4, 200, NULL)", nil},
{"insert into t1 values (5, 200, NULL)", nil},
{"commit", nil},
},
},
{
name: "is-null-and-is-not-null",
filter: "select id1, val from t1 where val is null and id2 is not null",
testQueries: []*TestQuery{
{"begin", nil},
{"insert into t1 values (1, 100, 'aaa')", noEvents},
{"insert into t1 values (2, 200, 'bbb')", noEvents},
{"insert into t1 values (3, 100, NULL)", nil},
{"insert into t1 values (4, NULL, NULL)", noEvents},
{"insert into t1 values (5, 200, NULL)", nil},
{"commit", nil},
},
},
{
name: "is-null-and-other-op",
filter: "select id1, val from t1 where val is null and id1 != 4 and id2 not between 100 and 150",
testQueries: []*TestQuery{
{"begin", nil},
{"insert into t1 values (1, 100, 'd')", noEvents},
{"insert into t1 values (2, 200, 'e')", noEvents},
{"insert into t1 values (3, 100, NULL)", noEvents},
{"insert into t1 values (4, 200, NULL)", noEvents},
{"insert into t1 values (5, 200, NULL)", nil},
{"commit", nil},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ts := &TestSpec{
t: t,
ddls: []string{
"create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))",
},
options: &TestSpecOptions{
filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: tc.filter,
}},
},
},
}
defer ts.Close()
ts.Init()
ts.fieldEvents["t1"].cols[1].skip = true
ts.tests = [][]*TestQuery{tc.testQueries}
ts.Run()
})
}
}

0 comments on commit 4f62a40

Please sign in to comment.