Skip to content

Commit

Permalink
Use sessions
Browse files Browse the repository at this point in the history
Fixes #557
  • Loading branch information
hadley committed Nov 14, 2023
1 parent 338d66e commit 38f3196
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 61 deletions.
1 change: 1 addition & 0 deletions R/bq-perform.R
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ bq_perform_upload <- function(x, values,
create_disposition = "CREATE_IF_NEEDED",
write_disposition = "WRITE_EMPTY",
...,
session = NULL,
billing = x$project
) {

Expand Down
18 changes: 18 additions & 0 deletions R/bq-session.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
bq_session_create <- function(project) {
check_string(project)

url <- bq_path(project, jobs = "")
body <- list(
configuration = list(
query = list(
query = "SELECT 1;",
createSession = list(
value = unbox(TRUE)
)
)
)
)

res <- bq_post(url, body = bq_body(body))
res$statistics$sessionInfo$sessionId
}
92 changes: 35 additions & 57 deletions R/dbi-connection.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ NULL
BigQueryConnection <- function(project,
dataset,
billing,
session = NULL,
page_size = 1e4,
quiet = NA,
use_legacy_sql = FALSE,
Expand All @@ -15,6 +16,7 @@ BigQueryConnection <- function(project,
project = project,
dataset = dataset,
billing = billing,
session = session,
page_size = as.integer(page_size),
quiet = quiet,
use_legacy_sql = use_legacy_sql,
Expand All @@ -31,6 +33,7 @@ setClass(
project = "character",
dataset = "ANY",
billing = "character",
session = "ANY",
use_legacy_sql = "logical",
page_size = "integer",
quiet = "logical",
Expand Down Expand Up @@ -91,6 +94,7 @@ setMethod("dbExecute", c("BigQueryConnection", "character"), function(conn, stat

job <- bq_perform_query(statement,
billing = conn@billing,
session = conn@session,
default_dataset = ds,
quiet = conn@quiet,
...
Expand Down Expand Up @@ -191,36 +195,16 @@ dbWriteTable_bq <- function(conn,
check_bool(overwrite)
check_bool(append)

if (!is.null(field.types)) {
cli::cli_abort(
"{.arg field.types} not supported by bigrquery.",
call = quote(DBI::dbWriteTable())
)
}
if (!identical(temporary, FALSE)) {
cli::cli_abort(
"{.code temporary = FALSE} not supported by bigrquery.",
call = quote(DBI::dbWriteTable())
)
}

if (append) {
create_disposition <- "CREATE_NEVER"
write_disposition <- "WRITE_APPEND"
} else {
create_disposition <- "CREATE_IF_NEEDED"
write_disposition <- if (overwrite) "WRITE_TRUNCATE" else "WRITE_EMPTY"
}
tb <- as_bq_table(conn, name)

bq_table_upload(
tb,
value,
create_disposition = create_disposition,
write_disposition = write_disposition,
billing = conn@billing,
...
browser()
sql <- DBI::sqlCreateTable(
con,
name,
fields = field.types,
temporary = temporary,
row.names = row.names
)
DBI::dbExecute(sql)

invisible(TRUE)
}

Expand All @@ -238,26 +222,27 @@ dbWriteTable_bq <- function(conn,
#' @param field.types,temporary Ignored. Included for compatibility with
#' generic.
#' @export
setMethod(
"dbWriteTable",
c("BigQueryConnection", "character", "data.frame"),
dbWriteTable_bq
)

#' @rdname DBI
#' @export
setMethod(
"dbWriteTable",
c("BigQueryConnection", "Id", "data.frame"),
dbWriteTable_bq
)
#' setMethod(
#' "dbWriteTable",
#' c("BigQueryConnection", "character", "data.frame"),
#' dbWriteTable_bq
#' )
#'
#' #' @rdname DBI
#' #' @export
#' setMethod(
#' "dbWriteTable",
#' c("BigQueryConnection", "Id", "data.frame"),
#' dbWriteTable_bq
#' )

dbAppendTable_bq <- function(conn, name, value, ..., row.names = NULL) {
tb <- as_bq_table(conn, name)

bq_table_upload(tb, value,
create_disposition = "CREATE_NEVER",
write_disposition = "WRITE_APPEND",
session = conn@session,
...
)
on_connection_updated(conn)
Expand All @@ -280,15 +265,9 @@ dbCreateTable_bq <- function(conn,
...,
row.names = NULL,
temporary = FALSE) {
if (!identical(temporary, FALSE)) {
cli::cli_abort(
"{.code temporary = FALSE} not supported by bigrquery.",
call = quote(DBI::dbCreateTable())
)
}

tb <- as_bq_table(conn, name)
bq_table_create(tb, fields)
bq_table_create(tb, fields, temporary = temporary, session = conn@session)
on_connection_updated(conn)

invisible(TRUE)
Expand All @@ -305,7 +284,7 @@ setMethod("dbCreateTable", "BigQueryConnection", dbCreateTable_bq)

dbReadTable_bq <- function(conn, name, ...) {
tb <- as_bq_table(conn, name)
bq_table_download(tb, ...)
bq_table_download(tb, ..., session = conn@session)
}

#' @rdname DBI
Expand All @@ -328,7 +307,7 @@ setMethod(
}
ds <- bq_dataset(conn@project, conn@dataset)

tbs <- bq_dataset_tables(ds, ...)
tbs <- bq_dataset_tables(ds, ..., session = conn@session)
map_chr(tbs, function(x) x$table)
})

Expand All @@ -347,7 +326,7 @@ setMethod("dbExistsTable", c("BigQueryConnection", "Id"), dbExistsTable_bq)

dbListFields_bq <- function(conn, name, ...) {
tb <- as_bq_table(conn, name)
flds <- bq_table_fields(tb)
flds <- bq_table_fields(tb, session = conn@session)
map_chr(flds, function(x) x$name)
}

Expand All @@ -362,7 +341,7 @@ setMethod("dbListFields", c("BigQueryConnection", "Id"), dbListFields_bq)

dbRemoveTable_bq <- function(conn, name, ...) {
tb <- as_bq_table(conn, name)
bq_table_delete(tb)
bq_table_delete(tb, session = conn@session)
on_connection_updated(conn)
invisible(TRUE)
}
Expand Down Expand Up @@ -398,7 +377,7 @@ setMethod(
setMethod(
"dbBegin", "BigQueryConnection",
function(conn, ...) {
testthat::skip("Not yet implemented: dbBegin(Connection)")
dbExecute(conn, "BEGIN TRANSACTION")
})

#' @rdname DBI
Expand All @@ -407,7 +386,7 @@ setMethod(
setMethod(
"dbCommit", "BigQueryConnection",
function(conn, ...) {
testthat::skip("Not yet implemented: dbCommit(Connection)")
dbExecute(conn, "COMMIT TRANSACTION")
})

#' @rdname DBI
Expand All @@ -416,7 +395,7 @@ setMethod(
setMethod(
"dbRollback", "BigQueryConnection",
function(conn, ...) {
testthat::skip("Not yet implemented: dbRollback(Connection)")
dbExecute(conn, "ROLLBACK TRANSACTION")
})
# nocov end

Expand All @@ -428,7 +407,6 @@ as_bq_dataset.BigQueryConnection <- function(x, ..., error_arg, error_call) {
bq_dataset(x@project, x@dataset)
}


#' @export
as_bq_table.BigQueryConnection <- function(x, name, ...) {
if (inherits(name, "dbplyr_table_ident")) {
Expand Down
3 changes: 3 additions & 0 deletions R/dbi-driver.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,13 @@ setMethod(
check_bool(use_legacy_sql)
bigint <- arg_match(bigint)

session <- bq_session_create(billing)

BigQueryConnection(
project = project,
dataset = dataset,
billing = billing,
session = session,
page_size = page_size,
quiet = quiet,
use_legacy_sql = use_legacy_sql,
Expand Down
4 changes: 0 additions & 4 deletions R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ db_copy_to.BigQueryConnection <- function(con,
analyze = TRUE,
in_transaction = TRUE) {

if (temporary) {
cli::cli_abort("BigQuery does not support temporary tables")
}

tb <- as_bq_table(con, table)
write <- if (overwrite) "WRITE_TRUNCATE" else "WRITE_EMPTY"
bq_table_upload(tb, values, fields = types, write_disposition = write)
Expand Down

0 comments on commit 38f3196

Please sign in to comment.