Skip to content

Commit

Permalink
read schema using limited n concurrent goroutines
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach committed Jun 21, 2023
1 parent abda17f commit 4a9afb1
Showing 1 changed file with 28 additions and 26 deletions.
54 changes: 28 additions & 26 deletions go/vt/mysqlctl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"regexp"
"sort"
"strings"
"sync"

"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -96,18 +97,18 @@ func (mysqld *Mysqld) GetSchema(ctx context.Context, dbName string, request *tab
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}

eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(20)

// Get per-table schema concurrently.
tableNames := make([]string, 0, len(tds))
for _, td := range tds {
tableNames = append(tableNames, td.Name)
td := td

wg.Add(1)
go func(td *tabletmanagerdatapb.TableDefinition) {
defer wg.Done()

eg.Go(func() error {
fields, columns, schema, err := mysqld.collectSchema(ctx, dbName, td.Name, td.Type, request.TableSchemaOnly)
if err != nil {
// There's a possible race condition: it could happen that a table was dropped in between reading
Expand All @@ -116,40 +117,41 @@ func (mysqld *Mysqld) GetSchema(ctx context.Context, dbName string, request *tab
// This is fine. We identify the situation and keep the table without any fields/columns/key information
sqlErr, isSQLErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError)
if isSQLErr && sqlErr != nil && sqlErr.Number() == mysql.ERNoSuchTable {
return
return nil
}

allErrors.RecordError(err)
cancel()
return
return err
}

td.Fields = fields
td.Columns = columns
td.Schema = schema
}(td)
return nil
})
}

// Get primary columns concurrently.
// The below runs a single query on `INFORMATION_SCHEMA` and does not interact with the actual tables.
// It is therefore safe to run even if some tables are dropped in the interim.
colMap := map[string][]string{}
if len(tableNames) > 0 {
wg.Add(1)
go func() {
defer wg.Done()

var err error
colMap, err = mysqld.getPrimaryKeyColumns(ctx, dbName, tableNames...)
if err != nil {
allErrors.RecordError(err)
cancel()
return
}
}()
if !request.TableSchemaOnly {
// Get primary columns concurrently.
// The below runs a single query on `INFORMATION_SCHEMA` and does not interact with the actual tables.
// It is therefore safe to run even if some tables are dropped in the interim.
if len(tableNames) > 0 {
eg.Go(func() error {
var err error
colMap, err = mysqld.getPrimaryKeyColumns(ctx, dbName, tableNames...)
if err != nil {
allErrors.RecordError(err)
cancel()
return err
}
return nil
})
}
}

wg.Wait()
eg.Wait()
if err := allErrors.AggrError(vterrors.Aggregate); err != nil {
return nil, err
}
Expand Down

0 comments on commit 4a9afb1

Please sign in to comment.