Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duckdb assign and copy improvements #74

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from

Conversation

egillax
Copy link
Contributor

@egillax egillax commented Feb 11, 2025

Fixes #73

  • Use attach and copy operations instead of batchApply in copyAndromeda and assignment ( "[[<-" )

    • On windows attach won't work because of file lock (even if from same process), so write to temp parquet instead
  • In "[[<-" attach/copy didn't work for lazy_queries: materialize query to parquet file and then assign into database

  • Use identical instead of all.equal, it's faster and more fit for purpose.

  • Set "wal_autocheckpoint = '0KB'" to immediately commit changes to the database instead of having them in the wal file. Had an issue with this in deepPlp and this seems to match the previous behaviour which disabled journal_mode for sqlite

  • Had to modify one test because table is a reserved word in duckdb so the test failed. Renamed the table to iris.

Microbenchmark reprex for identical vs all.equal
library(Andromeda)
#> Loading required package: dplyr
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union

obj <- andromeda(data = iris)

microbenchmark::microbenchmark(identical(obj, obj),
  all.equal(obj, obj),
  times = 100
)
#> Unit: microseconds
#>                 expr       min        lq        mean     median         uq
#>  identical(obj, obj)     1.251     1.567     2.80481     2.8225     3.9325
#>  all.equal(obj, obj) 21549.433 21927.425 22786.16796 22256.6920 23514.7060
#>        max neval
#>      7.853   100
#>  27373.431   100

Created on 2025-02-11 with reprex v2.1.1

Microbenchmark reprex for attach method vs `batchApply` for `[[<-`
library(Andromeda)
#> Loading required package: dplyr
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
andromeda <- andromeda()
n <- 16e5
n_subjects <- 100e3
n_cols <- 5e3

covariates <- data.frame(
  row = sample(1:n_subjects, n, replace = TRUE),
  col = sample(1:n_cols, n, replace = TRUE),
  c = rnorm(n)
)
covariateRef <- data.frame(
  a = 1:3e3,
  b = letters[1:3e3],
  c = rnorm(3e3)
)
analysisRef <- data.frame(
  a = 1:3e1,
  b = letters[1:3e1],
  c = rnorm(3e1)
)
andromeda$covariates <- covariates
andromeda$covariateRef <- covariateRef
andromeda$analysisRef <- analysisRef 


newAndromeda <- Andromeda::andromeda()
value <- andromeda$covariates

oldWay <- function() {
  doBatchedAppend <- function(batch) {
    duckdb::dbWriteTable(conn = newAndromeda, name = "old", value = batch, overwrite = FALSE, append = TRUE)
    return(TRUE)
  }
  
  dummy <- batchApply(value, doBatchedAppend)
  if (length(dummy) == 0) {
    duckdb::dbWriteTable(conn = newAndromeda, name = "old", value = dplyr::collect(value), overwrite = FALSE, append = TRUE)
  }
}

newWay <- function() {
  valueFile <- dbplyr::remote_con(value)@dbname
  DBI::dbExecute(newAndromeda, paste0("ATTACH DATABASE '", valueFile, "' AS old"))
  DBI::dbExecute(newAndromeda, paste0("CREATE TABLE new AS SELECT * FROM old.covariates"))
  DBI::dbExecute(newAndromeda, "DETACH DATABASE old")
}

microbenchmark::microbenchmark(oldWay(),
                               newWay(), times = 10,
                               setup = {
                                 newAndromeda$old <- NULL
                                 newAndromeda$new <- NULL
                                 }
                               )
#> Unit: milliseconds
#>      expr       min        lq      mean   median        uq       max neval
#>  oldWay() 786.22978 804.82203 837.65396 850.4561 862.61454 875.71107    10
#>  newWay()  30.72729  32.49021  33.92148  34.3015  35.16244  35.81522    10

Created on 2025-02-11 with reprex v2.1.1

@msuchard
Copy link
Member

@egillax -- since the behavior is different on windoz vs *ix machines, do we have a benchmark on both types?

@egillax
Copy link
Contributor Author

egillax commented Feb 11, 2025

Here it is (a tiny bit slower than attaching):

library(Andromeda)
#> Loading required package: dplyr
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
andromeda <- andromeda()
n <- 16e5
n_subjects <- 100e3
n_cols <- 5e3

covariates <- data.frame(
  row = sample(1:n_subjects, n, replace = TRUE),
  col = sample(1:n_cols, n, replace = TRUE),
  c = rnorm(n)
)
covariateRef <- data.frame(
  a = 1:3e3,
  b = letters[1:3e3],
  c = rnorm(3e3)
)
analysisRef <- data.frame(
  a = 1:3e1,
  b = letters[1:3e1],
  c = rnorm(3e1)
)
andromeda$covariates <- covariates
andromeda$covariateRef <- covariateRef
andromeda$analysisRef <- analysisRef 


newAndromeda <- Andromeda::andromeda()
value <- andromeda$covariates

oldWay <- function() {
  doBatchedAppend <- function(batch) {
    duckdb::dbWriteTable(conn = newAndromeda, name = "old", value = batch, overwrite = FALSE, append = TRUE)
    return(TRUE)
  }
  
  dummy <- batchApply(value, doBatchedAppend)
  if (length(dummy) == 0) {
    duckdb::dbWriteTable(conn = newAndromeda, name = "old", value = dplyr::collect(value), overwrite = FALSE, append = TRUE)
  }
}

newWay <- function() {
  valueFile <- dbplyr::remote_con(value)@dbname
  DBI::dbExecute(newAndromeda, paste0("ATTACH DATABASE '", valueFile, "' AS old"))
  DBI::dbExecute(newAndromeda, paste0("CREATE TABLE new AS SELECT * FROM old.covariates"))
  DBI::dbExecute(newAndromeda, "DETACH DATABASE old")
}

parquetWay <- function() {
        tempFile <- tempfile(fileext = ".parquet")
        DBI::dbExecute(
          dbplyr::remote_con(value),
          sprintf(
            "COPY (%s) TO '%s' (FORMAT 'parquet')",
            dbplyr::sql_render(value), tempFile
          )
        )

        DBI::dbExecute(
          newAndromeda,
          sprintf(
            "CREATE OR REPLACE TABLE %s AS SELECT * FROM read_parquet('%s')",
            "old",
            tempFile
          )
        )
        unlink(tempFile)
}

microbenchmark::microbenchmark(oldWay(),
                               newWay(),
                               parquetWay(),
                               times = 10,
                               setup = {
                                 newAndromeda$old <- NULL
                                 newAndromeda$new <- NULL
                                 }
                               )
#> Unit: milliseconds
#>          expr       min        lq      mean    median        uq       max neval
#>      oldWay() 794.35739 808.35575 842.31538 851.49410 865.55682 881.60433    10
#>      newWay()  31.91771  32.64865  35.34855  35.83560  36.58089  39.37339    10
#>  parquetWay()  59.34795  59.74585  61.57503  60.65007  63.11939  67.11480    10

Created on 2025-02-11 with reprex v2.1.1

For completeness here is the benchmark with a lazy query (which uses a temporary parquet file like on windows):

lazy query benchmark
library(Andromeda)
#> Loading required package: dplyr
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
andromeda <- andromeda()
n <- 16e5
n_subjects <- 100e3
n_cols <- 5e3

covariates <- data.frame(
  row = sample(1:n_subjects, n, replace = TRUE),
  col = sample(1:n_cols, n, replace = TRUE),
  c = rnorm(n)
)
covariateRef <- data.frame(
  a = 1:3e3,
  b = letters[1:3e3],
  c = rnorm(3e3)
)
analysisRef <- data.frame(
  a = 1:3e1,
  b = letters[1:3e1],
  c = rnorm(3e1)
)

population <- data.frame(
  row = 1:floor(n_subjects / 2)
)


andromeda$covariates <- covariates
andromeda$covariateRef <- covariateRef
andromeda$analysisRef <- analysisRef 
andromeda$population <- population

newAndromeda <- Andromeda::andromeda()

# create the query, join with population which only includes half of the subjects
value <- andromeda$covariates %>% dplyr::inner_join(andromeda$population, by = "row")

oldWay <- function() {
  doBatchedAppend <- function(batch) {
    duckdb::dbWriteTable(conn = newAndromeda, name = "old", value = batch, overwrite = FALSE, append = TRUE)
    return(TRUE)
  }
  
  dummy <- batchApply(value, doBatchedAppend)
  if (length(dummy) == 0) {
    duckdb::dbWriteTable(conn = newAndromeda, name = "old", value = dplyr::collect(value), overwrite = FALSE, append = TRUE)
  }
}

newWay <- function() {
  valueFile <- dbplyr::remote_con(value)@dbname
  DBI::dbExecute(newAndromeda, paste0("ATTACH DATABASE '", valueFile, "' AS old"))
  DBI::dbExecute(newAndromeda, paste0("CREATE TABLE new AS SELECT * FROM old.covariates"))
  DBI::dbExecute(newAndromeda, "DETACH DATABASE old")
}

parquetWay <- function() {
        tempFile <- tempfile(fileext = ".parquet")
        DBI::dbExecute(
          dbplyr::remote_con(value),
          sprintf(
            "COPY (%s) TO '%s' (FORMAT 'parquet')",
            dbplyr::sql_render(value), tempFile
          )
        )

        DBI::dbExecute(
          newAndromeda,
          sprintf(
            "CREATE OR REPLACE TABLE %s AS SELECT * FROM read_parquet('%s')",
            "old",
            tempFile
          )
        )
        unlink(tempFile)
}

microbenchmark::microbenchmark(oldWay(),
                               parquetWay(),
                               times = 10,
                               setup = {
                                 newAndromeda$old <- NULL
                                 newAndromeda$new <- NULL
                                 }
                               )
#> Unit: milliseconds
#>          expr      min       lq     mean   median       uq      max neval
#>      oldWay() 428.4652 436.4600 452.3760 439.7648 454.9417 518.0835    10
#>  parquetWay() 122.6131 123.2617 124.5367 124.1885 126.1864 127.1723    10

Created on 2025-02-11 with reprex v2.1.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants