Skip to content

Commit

Permalink
source-mysql: Handle inconsistent column name case
Browse files Browse the repository at this point in the history
Apparently column names in MySQL are always case-insensitive,
which means that it's possible to issue a DDL query naming a
column with different capitalization from what we've got in
our metadata, and that ought to work.

This is solved by using a case-sensitive lookup from the column
name in the DDL alteration query to an index in our metadata,
and then "canonicalizing" the name to match what's in our
metadata so that the rest of the metadata update logic works
reliably.

This fixes #2421
  • Loading branch information
willdonnelly committed Feb 19, 2025
1 parent 004ffff commit 639e679
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
Binding 0:
{
"recommended_name": "test/altertable_renamecolumncaseinsensitive_41432361",
"resource_config_json": {
"namespace": "test",
"stream": "AlterTable_RenameColumnCaseInsensitive_41432361"
},
"document_schema_json": {
"$defs": {
"TestAlterTable_RenameColumnCaseInsensitive_41432361": {
"type": "object",
"required": [
"id"
],
"$anchor": "TestAlterTable_RenameColumnCaseInsensitive_41432361",
"properties": {
"id": {
"type": "integer",
"description": "(source type: non-nullable int)"
},
"new_data": {
"description": "(source type: text)",
"type": [
"string",
"null"
]
}
}
}
},
"allOf": [
{
"if": {
"properties": {
"_meta": {
"properties": {
"op": {
"const": "d"
}
}
}
}
},
"then": {
"reduce": {
"delete": true,
"strategy": "merge"
}
},
"else": {
"reduce": {
"strategy": "merge"
}
},
"required": [
"_meta"
],
"properties": {
"_meta": {
"type": "object",
"required": [
"op",
"source"
],
"properties": {
"before": {
"$ref": "#TestAlterTable_RenameColumnCaseInsensitive_41432361",
"description": "Record state immediately before this change was applied.",
"reduce": {
"strategy": "firstWriteWins"
}
},
"op": {
"enum": [
"c",
"d",
"u"
],
"description": "Change operation type: 'c' Create/Insert, 'u' Update, 'd' Delete."
},
"source": {
"$id": "https://github.com/estuary/connectors/source-mysql/mysql-source-info",
"properties": {
"ts_ms": {
"type": "integer",
"description": "Unix timestamp (in millis) at which this event was recorded by the database."
},
"schema": {
"type": "string",
"description": "Database schema (namespace) of the event."
},
"snapshot": {
"type": "boolean",
"description": "Snapshot is true if the record was produced from an initial table backfill and unset if produced from the replication log."
},
"table": {
"type": "string",
"description": "Database table of the event."
},
"cursor": {
"type": "string",
"description": "Cursor value representing the current position in the binlog."
},
"txid": {
"type": "string",
"description": "The global transaction identifier associated with a change by MySQL. Only set if GTIDs are enabled."
}
},
"type": "object",
"required": [
"schema",
"table",
"cursor"
]
}
},
"reduce": {
"strategy": "merge"
}
}
}
},
{
"$ref": "#TestAlterTable_RenameColumnCaseInsensitive_41432361"
}
]
},
"key": [
"/id"
]
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# ================================
# Collection "acmeCo/test/test/altertable_renamecolumncaseinsensitive_41432361": 2 Documents
# ================================
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"AlterTable_RenameColumnCaseInsensitive_41432361","cursor":"backfill:0"}},"dataColumn":"aaa","id":1}
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"AlterTable_RenameColumnCaseInsensitive_41432361","cursor":"backfill:1"}},"dataColumn":"bbb","id":2}
# ================================
# Final State Checkpoint
# ================================
{"bindingStateV1":{"test%2FAlterTable_RenameColumnCaseInsensitive_41432361":{"backfilled":2,"key_columns":["id"],"metadata":{"charset":"utf8mb4","schema":{"columns":["id","dataColumn"],"types":{"dataColumn":{"charset":"utf8mb4","type":"text"},"id":{"type":"int"}}}},"mode":"Active"}},"cursor":"binlog.000123:56789"}

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# ================================
# Collection "acmeCo/test/test/altertable_renamecolumncaseinsensitive_41432361": 2 Documents
# ================================
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"AlterTable_RenameColumnCaseInsensitive_41432361","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"dataColumn":"ccc","id":3}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"AlterTable_RenameColumnCaseInsensitive_41432361","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"id":4,"new_data":"ddd"}
# ================================
# Final State Checkpoint
# ================================
{"bindingStateV1":{"test%2FAlterTable_RenameColumnCaseInsensitive_41432361":{"backfilled":2,"key_columns":["id"],"metadata":{"charset":"utf8mb4","schema":{"columns":["id","new_data"],"types":{"id":{"type":"int"},"new_data":{"charset":"utf8mb4","type":"text"}}}},"mode":"Active"}},"cursor":"binlog.000123:56789"}

22 changes: 22 additions & 0 deletions source-mysql/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,28 @@ func TestAlterTable_AddUnsignedColumn(t *testing.T) {
t.Run("rebackfilled", func(t *testing.T) { tests.VerifiedCapture(ctx, t, cs) })
}

// TestAlterTable_RenameColumnCaseInsensitive verifies that column renames work correctly
// even when the ALTER TABLE query uses different capitalization than the original column
// definition. MySQL always treats column identifiers as case-insensitive.
func TestAlterTable_RenameColumnCaseInsensitive(t *testing.T) {
var tb, ctx = mysqlTestBackend(t), context.Background()
var uniqueID = "41432361"
var table = tb.CreateTable(ctx, t, uniqueID, "(id INTEGER PRIMARY KEY, dataColumn TEXT)")
tb.Insert(ctx, t, table, [][]interface{}{{1, "aaa"}, {2, "bbb"}})

var cs = tb.CaptureSpec(ctx, t, regexp.MustCompile(uniqueID))
t.Run("init", func(t *testing.T) { tests.VerifiedCapture(ctx, t, cs) })

// Use different capitalization in the rename query
tb.Insert(ctx, t, table, [][]interface{}{{3, "ccc"}})
tb.Query(ctx, t, fmt.Sprintf("ALTER TABLE %s RENAME COLUMN `DATACOLUMN` TO `new_data`;", table))
tb.Insert(ctx, t, table, [][]interface{}{{4, "ddd"}})
t.Run("renamed", func(t *testing.T) { tests.VerifiedCapture(ctx, t, cs) })

// Verify schema discovery still works after the rename
t.Run("discover", func(t *testing.T) { tb.CaptureSpec(ctx, t).VerifyDiscover(ctx, t, regexp.MustCompile(uniqueID)) })
}

func TestSkipBackfills(t *testing.T) {
// Set up three tables with some data in them, a catalog which captures all three,
// but a configuration which specifies that tables A and C should skip backfilling
Expand Down
33 changes: 26 additions & 7 deletions source-mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,23 +776,26 @@ func (rs *mysqlReplicationStream) handleAlterTable(ctx context.Context, stmt *sq
var oldName = alter.OldName.Name.String()
var newName = alter.NewName.Name.String()

var colIndex = slices.Index(meta.Schema.Columns, oldName)
var colIndex = findColumnIndex(meta.Schema.Columns, oldName)
if colIndex == -1 {
return fmt.Errorf("unknown column %q", oldName)
}
oldName = meta.Schema.Columns[colIndex] // Use the actual column name from the metadata
meta.Schema.Columns[colIndex] = newName

var colType = meta.Schema.ColumnTypes[oldName]
meta.Schema.ColumnTypes[oldName] = nil
meta.Schema.ColumnTypes[newName] = colType
logrus.WithField("columns", meta.Schema.Columns).WithField("types", meta.Schema.ColumnTypes).Info("processed RENAME COLUMN alteration")
case *sqlparser.RenameTableName:
return fmt.Errorf("unsupported table alteration (go.estuary.dev/eVVwet): %s", query)
case *sqlparser.ChangeColumn:
var oldName = alter.OldColumn.Name.String()
var oldIndex = slices.Index(meta.Schema.Columns, oldName)
var oldIndex = findColumnIndex(meta.Schema.Columns, oldName)
if oldIndex == -1 {
return fmt.Errorf("unknown column %q", oldName)
}
oldName = meta.Schema.Columns[oldIndex] // Use the actual column name from the metadata
meta.Schema.Columns = slices.Delete(meta.Schema.Columns, oldIndex, oldIndex+1)

var newName = alter.NewColDefinition.Name.String()
Expand All @@ -802,7 +805,7 @@ func (rs *mysqlReplicationStream) handleAlterTable(ctx context.Context, stmt *sq
newIndex = 0
} else if alter.After != nil {
var afterName = alter.After.Name.String()
var afterIndex = slices.Index(meta.Schema.Columns, afterName)
var afterIndex = findColumnIndex(meta.Schema.Columns, afterName)
if afterIndex == -1 {
return fmt.Errorf("unknown column %q", afterName)
}
Expand All @@ -814,10 +817,11 @@ func (rs *mysqlReplicationStream) handleAlterTable(ctx context.Context, stmt *sq
logrus.WithField("columns", meta.Schema.Columns).WithField("types", meta.Schema.ColumnTypes).Info("processed CHANGE COLUMN alteration")
case *sqlparser.ModifyColumn:
var colName = alter.NewColDefinition.Name.String()
var oldIndex = slices.Index(meta.Schema.Columns, colName)
var oldIndex = findColumnIndex(meta.Schema.Columns, colName)
if oldIndex == -1 {
return fmt.Errorf("unknown column %q", colName)
}
colName = meta.Schema.Columns[oldIndex] // Use the actual column name from the metadata
meta.Schema.Columns = slices.Delete(meta.Schema.Columns, oldIndex, oldIndex+1)

var newType = translateDataType(meta, alter.NewColDefinition.Type)
Expand All @@ -826,7 +830,7 @@ func (rs *mysqlReplicationStream) handleAlterTable(ctx context.Context, stmt *sq
newIndex = 0
} else if alter.After != nil {
var afterName = alter.After.Name.String()
var afterIndex = slices.Index(meta.Schema.Columns, afterName)
var afterIndex = findColumnIndex(meta.Schema.Columns, afterName)
if afterIndex == -1 {
return fmt.Errorf("unknown column %q", afterName)
}
Expand All @@ -841,7 +845,7 @@ func (rs *mysqlReplicationStream) handleAlterTable(ctx context.Context, stmt *sq
if alter.First {
insertAt = 0
} else if after := alter.After; after != nil {
var afterIndex = slices.Index(meta.Schema.Columns, after.Name.String())
var afterIndex = findColumnIndex(meta.Schema.Columns, after.Name.String())
if afterIndex == -1 {
return fmt.Errorf("unknown column %q", after.Name.String())
}
Expand All @@ -859,10 +863,11 @@ func (rs *mysqlReplicationStream) handleAlterTable(ctx context.Context, stmt *sq
logrus.WithField("columns", meta.Schema.Columns).WithField("types", meta.Schema.ColumnTypes).Info("processed CHANGE COLUMN alteration")
case *sqlparser.DropColumn:
var colName = alter.Name.Name.String()
var oldIndex = slices.Index(meta.Schema.Columns, colName)
var oldIndex = findColumnIndex(meta.Schema.Columns, colName)
if oldIndex == -1 {
return fmt.Errorf("unknown column %q", colName)
}
colName = meta.Schema.Columns[oldIndex] // Use the actual column name from the metadata
meta.Schema.Columns = slices.Delete(meta.Schema.Columns, oldIndex, oldIndex+1)
meta.Schema.ColumnTypes[colName] = nil // Set to nil rather than delete so that JSON patch merging deletes it
logrus.WithField("columns", meta.Schema.Columns).WithField("types", meta.Schema.ColumnTypes).Info("processed CHANGE COLUMN alteration")
Expand All @@ -885,6 +890,20 @@ func (rs *mysqlReplicationStream) handleAlterTable(ctx context.Context, stmt *sq
return nil
}

// findColumnIndex performs a case-insensitive search for a column name in a slice of column names.
// It returns the index of the first matching column, or -1 if no match is found.
//
// According to https://dev.mysql.com/doc/refman/8.4/en/identifier-case-sensitivity.html:
// > [...] column [...] names are not case-sensitive on any platform, nor are column aliases.
func findColumnIndex(columns []string, name string) int {
for i, col := range columns {
if strings.EqualFold(col, name) {
return i
}
}
return -1
}

func translateDataType(meta *mysqlTableMetadata, t sqlparser.ColumnType) any {
switch typeName := strings.ToLower(t.Type); typeName {
case "enum":
Expand Down

0 comments on commit 639e679

Please sign in to comment.