-
Notifications
You must be signed in to change notification settings - Fork 125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: avoid 403 from to_gbq when table has policyTags #356
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
"""Module for checking dependency versions and supported features.""" | ||
|
||
# https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md | ||
BIGQUERY_MINIMUM_VERSION = "1.11.1" | ||
BIGQUERY_CLIENT_INFO_VERSION = "1.12.0" | ||
BIGQUERY_BQSTORAGE_VERSION = "1.24.0" | ||
BIGQUERY_FROM_DATAFRAME_CSV_VERSION = "2.6.0" | ||
PANDAS_VERBOSITY_DEPRECATION_VERSION = "0.23.0" | ||
|
||
|
||
class Features: | ||
def __init__(self): | ||
self._bigquery_installed_version = None | ||
self._pandas_installed_version = None | ||
|
||
@property | ||
def bigquery_installed_version(self): | ||
import google.cloud.bigquery | ||
import pkg_resources | ||
|
||
if self._bigquery_installed_version is not None: | ||
return self._bigquery_installed_version | ||
|
||
self._bigquery_installed_version = pkg_resources.parse_version( | ||
google.cloud.bigquery.__version__ | ||
) | ||
bigquery_minimum_version = pkg_resources.parse_version( | ||
BIGQUERY_MINIMUM_VERSION | ||
) | ||
|
||
if self._bigquery_installed_version < bigquery_minimum_version: | ||
raise ImportError( | ||
"pandas-gbq requires google-cloud-bigquery >= {0}, " | ||
"current version {1}".format( | ||
bigquery_minimum_version, self._bigquery_installed_version | ||
) | ||
) | ||
|
||
return self._bigquery_installed_version | ||
|
||
@property | ||
def bigquery_has_client_info(self): | ||
import pkg_resources | ||
|
||
bigquery_client_info_version = pkg_resources.parse_version( | ||
BIGQUERY_CLIENT_INFO_VERSION | ||
) | ||
return self.bigquery_installed_version >= bigquery_client_info_version | ||
|
||
@property | ||
def bigquery_has_bqstorage(self): | ||
import pkg_resources | ||
|
||
bigquery_bqstorage_version = pkg_resources.parse_version( | ||
BIGQUERY_BQSTORAGE_VERSION | ||
) | ||
return self.bigquery_installed_version >= bigquery_bqstorage_version | ||
|
||
@property | ||
def bigquery_has_from_dataframe_with_csv(self): | ||
import pkg_resources | ||
|
||
bigquery_from_dataframe_version = pkg_resources.parse_version( | ||
BIGQUERY_FROM_DATAFRAME_CSV_VERSION | ||
) | ||
return ( | ||
self.bigquery_installed_version >= bigquery_from_dataframe_version | ||
) | ||
|
||
@property | ||
def pandas_installed_version(self): | ||
import pandas | ||
import pkg_resources | ||
|
||
if self._pandas_installed_version is not None: | ||
return self._pandas_installed_version | ||
|
||
self._pandas_installed_version = pkg_resources.parse_version( | ||
pandas.__version__ | ||
) | ||
return self._pandas_installed_version | ||
|
||
@property | ||
def pandas_has_deprecated_verbose(self): | ||
import pkg_resources | ||
|
||
# Add check for Pandas version before showing deprecation warning. | ||
# https://github.com/pydata/pandas-gbq/issues/157 | ||
pandas_verbosity_deprecation = pkg_resources.parse_version( | ||
PANDAS_VERBOSITY_DEPRECATION_VERSION | ||
) | ||
return self.pandas_installed_version >= pandas_verbosity_deprecation | ||
|
||
|
||
FEATURES = Features() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
|
||
from google.cloud import bigquery | ||
|
||
from pandas_gbq.features import FEATURES | ||
import pandas_gbq.schema | ||
|
||
|
||
|
@@ -30,21 +31,21 @@ def encode_chunk(dataframe): | |
return io.BytesIO(body) | ||
|
||
|
||
def encode_chunks(dataframe, chunksize=None): | ||
def split_dataframe(dataframe, chunksize=None): | ||
dataframe = dataframe.reset_index(drop=True) | ||
if chunksize is None: | ||
yield 0, encode_chunk(dataframe) | ||
yield 0, dataframe | ||
return | ||
|
||
remaining_rows = len(dataframe) | ||
total_rows = remaining_rows | ||
start_index = 0 | ||
while start_index < total_rows: | ||
end_index = start_index + chunksize | ||
chunk_buffer = encode_chunk(dataframe[start_index:end_index]) | ||
chunk = dataframe[start_index:end_index] | ||
start_index += chunksize | ||
remaining_rows = max(0, remaining_rows - chunksize) | ||
yield remaining_rows, chunk_buffer | ||
yield remaining_rows, chunk | ||
|
||
|
||
def load_chunks( | ||
|
@@ -60,24 +61,35 @@ def load_chunks( | |
job_config.source_format = "CSV" | ||
job_config.allow_quoted_newlines = True | ||
|
||
if schema is None: | ||
# Explicit schema? Use that! | ||
if schema is not None: | ||
schema = pandas_gbq.schema.remove_policy_tags(schema) | ||
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) | ||
# If not, let BigQuery determine schema unless we are encoding the CSV files ourselves. | ||
elif not FEATURES.bigquery_has_from_dataframe_with_csv: | ||
schema = pandas_gbq.schema.generate_bq_schema(dataframe) | ||
schema = pandas_gbq.schema.remove_policy_tags(schema) | ||
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) | ||
|
||
schema = pandas_gbq.schema.add_default_nullable_mode(schema) | ||
chunks = split_dataframe(dataframe, chunksize=chunksize) | ||
for remaining_rows, chunk in chunks: | ||
yield remaining_rows | ||
|
||
job_config.schema = [ | ||
bigquery.SchemaField.from_api_repr(field) for field in schema["fields"] | ||
] | ||
|
||
chunks = encode_chunks(dataframe, chunksize=chunksize) | ||
for remaining_rows, chunk_buffer in chunks: | ||
try: | ||
yield remaining_rows | ||
client.load_table_from_file( | ||
chunk_buffer, | ||
if FEATURES.bigquery_has_from_dataframe_with_csv: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not strictly necessary (just the "omit policyTags" logic was), but I thought this might be a good opportunity to use more logic from The CSV encoding in |
||
client.load_table_from_dataframe( | ||
chunk, | ||
destination_table_ref, | ||
job_config=job_config, | ||
location=location, | ||
).result() | ||
finally: | ||
chunk_buffer.close() | ||
else: | ||
try: | ||
chunk_buffer = encode_chunk(chunk) | ||
client.load_table_from_file( | ||
chunk_buffer, | ||
destination_table_ref, | ||
job_config=job_config, | ||
location=location, | ||
).result() | ||
finally: | ||
chunk_buffer.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pulled this out of
gbq.py
, becauseload.py
also needs some of this logic now.