Skip to content

Commit

Permalink
Merge pull request #68 from fschoell/fix/ch_allow_pk_duplicates
Browse files Browse the repository at this point in the history
allow primary key duplicates on Clickhouse
  • Loading branch information
maoueh authored Dec 21, 2024
2 parents a1361dc + 45441f8 commit 6eab99a
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased

* Added support for the Clickhouse `Date` type.
* Removed the check for duplicate primary keys on the Clickhouse dialect. This allows inserting multiple rows with the same primary key.

## v4.3.0

Expand Down
1 change: 1 addition & 0 deletions db/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type dialect interface {
Flush(tx Tx, ctx context.Context, l *Loader, outputModuleHash string, lastFinalBlock uint64) (int, error)
Revert(tx Tx, ctx context.Context, l *Loader, lastValidFinalBlock uint64) error
OnlyInserts() bool
AllowPkDuplicates() bool
CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, database string, readOnly bool) error
}

Expand Down
4 changes: 4 additions & 0 deletions db/dialect_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (d clickhouseDialect) OnlyInserts() bool {
return true
}

func (d clickhouseDialect) AllowPkDuplicates() bool {
return true
}

func (d clickhouseDialect) CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, _database string, readOnly bool) error {
user, pass := EscapeIdentifier(username), escapeStringValue(password)

Expand Down
4 changes: 4 additions & 0 deletions db/dialect_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ func (d postgresDialect) OnlyInserts() bool {
return false
}

func (d postgresDialect) AllowPkDuplicates() bool {
return false
}

func (d postgresDialect) CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, database string, readOnly bool) error {
user, pass, db := EscapeIdentifier(username), password, EscapeIdentifier(database)
var q string
Expand Down
2 changes: 1 addition & 1 deletion db/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map
l.entries.Set(tableName, entry)
}

if _, found := entry.Get(uniqueID); found {
if _, found := entry.Get(uniqueID); found && !l.getDialect().AllowPkDuplicates() {
return fmt.Errorf("attempting to insert in table %q a primary key %q, that is already scheduled for insertion, insert should only be called once for a given primary key", tableName, primaryKey)
}

Expand Down

0 comments on commit 6eab99a

Please sign in to comment.