diff --git a/source-postgres/replication.go b/source-postgres/replication.go index f319939da..eb9742980 100644 --- a/source-postgres/replication.go +++ b/source-postgres/replication.go @@ -716,6 +716,30 @@ func (s *replicationStream) decodeTuple( return nil, nil } + // Check if tuple columns match relation columns. They should never differ, so if + // they do something is very wrong and we need to log a bunch of debug info. + if len(tuple.Columns) > len(rel.Columns) { + var relColumnNames []string + var relColumnTypes []uint32 + for _, col := range rel.Columns { + relColumnNames = append(relColumnNames, col.Name) + relColumnTypes = append(relColumnTypes, col.DataType) + } + var tupleColumnTypes []uint8 + var tupleColumnData []string + for _, col := range tuple.Columns { + tupleColumnTypes = append(tupleColumnTypes, col.DataType) + tupleColumnData = append(tupleColumnData, string(col.Data)) + } + logrus.WithFields(logrus.Fields{ + "relNames": relColumnNames, + "relTypeOIDs": relColumnTypes, + "tupleTypes": tupleColumnTypes, + "tupleData": tupleColumnData, + }).Warn("tuple has more columns than relation") + return nil, fmt.Errorf("tuple has %d columns but relation only has %d columns", len(tuple.Columns), len(rel.Columns)) + } + var fields = make(map[string]interface{}) for idx, col := range tuple.Columns { if keyOnly && (rel.Columns[idx].Flags&1) == 0 {