Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Feb 27, 2025
1 parent b05df12 commit 32b3e6e
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 2 deletions.
17 changes: 15 additions & 2 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,21 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
}
newvgtid := &binlogdatapb.VGtid{}
for _, sgtid := range vgtid.ShardGtids {
// validate table last pks, if present
for _, tablepk := range sgtid.TablePKs {
log.Infof("tablepk: %+v", tablepk)
if tablepk.TableName == "" {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"table name cannot be empty for keyspace/shard %s/%s: %v",
sgtid.GetKeyspace(), sgtid.GetShard(), tablepk)
}
lastPK := tablepk.GetLastpk()
if len(lastPK.GetFields()) == 0 || len(lastPK.GetRows()) == 0 || lastPK.GetFields()[0].Name == "" {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"invalid lastPK for keyspace/shard %s/%s, table %s: %v",
sgtid.GetKeyspace(), sgtid.GetShard(), tablepk.TableName, tablepk)
}
}
if sgtid.Shard == "" {
if sgtid.Gtid != "current" && sgtid.Gtid != "" {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current' or empty; got: %+v",
Expand All @@ -287,8 +302,6 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
}
}

// TODO add tablepk validations

return newvgtid, filter, flags, nil
}

Expand Down
120 changes: 120 additions & 0 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,126 @@ func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) {
}
}

func TestResolveParams(t *testing.T) {
ctx := context.Background()
cell := "test_cell"
ks := "test_keyspace"
shard := "0"
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{shard})
vsm := newTestVStreamManager(ctx, hc, st, cell)

type testcase struct {
name string
vgtid *binlogdatapb.VGtid
filter *binlogdatapb.Filter
flags *vtgatepb.VStreamFlags
wantErr string
}
testcases := []testcase{
{
name: "nil vgtid",
vgtid: nil,
wantErr: "vgtid must have at least one value with a starting position in ShardGtids",
},
{
name: "empty ShardGtids",
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{},
},
wantErr: "vgtid must have at least one value with a starting position in ShardGtids",
},
{
name: "empty keyspace with non-current Gtid",
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "",
Shard: shard,
Gtid: "invalid",
}},
},
wantErr: "for an empty keyspace, the Gtid value must be 'current'",
},
{
name: "unspecified shards with non-current Gtid",
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "",
Gtid: "invalid",
}},
},
wantErr: "if shards are unspecified, the Gtid value must be 'current' or empty",
},
{
name: "valid vgtid",
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
Gtid: "current",
}},
},
wantErr: "",
},
{
name: "invalid TableLastPK",
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
Gtid: "current",
TablePKs: []*binlogdatapb.TableLastPK{{
TableName: "table1",
Lastpk: &querypb.QueryResult{
Fields: []*querypb.Field{},
Rows: []*querypb.Row{{
Lengths: []int64{1},
Values: []byte{1},
}},
},
}},
}},
},
wantErr: "invalid lastPK",
},
{
name: "invalid TableLastPK field",
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
Gtid: "current",
TablePKs: []*binlogdatapb.TableLastPK{{
TableName: "table1",
Lastpk: &querypb.QueryResult{
Fields: []*querypb.Field{{}},
Rows: []*querypb.Row{{
Lengths: []int64{1},
Values: []byte{1},
}},
},
}},
}},
},
wantErr: "invalid lastPK",
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
_, _, _, err := vsm.resolveParams(ctx, topodatapb.TabletType_PRIMARY, tc.vgtid, tc.filter, tc.flags)
if tc.wantErr != "" {
require.Error(t, err)
log.Infof("err: %v", err)
require.Contains(t, err.Error(), tc.wantErr)
} else {
require.NoError(t, err)
}
})
}
}

func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager {
gw := NewTabletGateway(ctx, hc, serv, cell)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
Expand Down

0 comments on commit 32b3e6e

Please sign in to comment.