Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid 403 from to_gbq when table has policyTags #356

Merged
merged 3 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ Features
client project. Specify the target table ID as ``project.dataset.table`` to
use this feature. (:issue:`321`, :issue:`347`)

Bug fixes
~~~~~~~~~

- Avoid 403 error from ``to_gbq`` when table has ``policyTags``. (:issue:`354`)

Dependencies
~~~~~~~~~~~~

Expand Down
95 changes: 95 additions & 0 deletions pandas_gbq/features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Module for checking dependency versions and supported features."""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled this out of gbq.py, because load.py also needs some of this logic now.


# https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md
BIGQUERY_MINIMUM_VERSION = "1.11.1"
BIGQUERY_CLIENT_INFO_VERSION = "1.12.0"
BIGQUERY_BQSTORAGE_VERSION = "1.24.0"
BIGQUERY_FROM_DATAFRAME_CSV_VERSION = "2.6.0"
PANDAS_VERBOSITY_DEPRECATION_VERSION = "0.23.0"


class Features:
def __init__(self):
self._bigquery_installed_version = None
self._pandas_installed_version = None

@property
def bigquery_installed_version(self):
import google.cloud.bigquery
import pkg_resources

if self._bigquery_installed_version is not None:
return self._bigquery_installed_version

self._bigquery_installed_version = pkg_resources.parse_version(
google.cloud.bigquery.__version__
)
bigquery_minimum_version = pkg_resources.parse_version(
BIGQUERY_MINIMUM_VERSION
)

if self._bigquery_installed_version < bigquery_minimum_version:
raise ImportError(
"pandas-gbq requires google-cloud-bigquery >= {0}, "
"current version {1}".format(
bigquery_minimum_version, self._bigquery_installed_version
)
)

return self._bigquery_installed_version

@property
def bigquery_has_client_info(self):
import pkg_resources

bigquery_client_info_version = pkg_resources.parse_version(
BIGQUERY_CLIENT_INFO_VERSION
)
return self.bigquery_installed_version >= bigquery_client_info_version

@property
def bigquery_has_bqstorage(self):
import pkg_resources

bigquery_bqstorage_version = pkg_resources.parse_version(
BIGQUERY_BQSTORAGE_VERSION
)
return self.bigquery_installed_version >= bigquery_bqstorage_version

@property
def bigquery_has_from_dataframe_with_csv(self):
import pkg_resources

bigquery_from_dataframe_version = pkg_resources.parse_version(
BIGQUERY_FROM_DATAFRAME_CSV_VERSION
)
return (
self.bigquery_installed_version >= bigquery_from_dataframe_version
)

@property
def pandas_installed_version(self):
import pandas
import pkg_resources

if self._pandas_installed_version is not None:
return self._pandas_installed_version

self._pandas_installed_version = pkg_resources.parse_version(
pandas.__version__
)
return self._pandas_installed_version

@property
def pandas_has_deprecated_verbose(self):
import pkg_resources

# Add check for Pandas version before showing deprecation warning.
# https://github.com/pydata/pandas-gbq/issues/157
pandas_verbosity_deprecation = pkg_resources.parse_version(
PANDAS_VERBOSITY_DEPRECATION_VERSION
)
return self.pandas_installed_version >= pandas_verbosity_deprecation


FEATURES = Features()
100 changes: 20 additions & 80 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,100 +16,45 @@

from pandas_gbq.exceptions import AccessDenied
from pandas_gbq.exceptions import PerformanceWarning
from pandas_gbq import features
from pandas_gbq.features import FEATURES
import pandas_gbq.schema
import pandas_gbq.timestamp


logger = logging.getLogger(__name__)

BIGQUERY_INSTALLED_VERSION = None
BIGQUERY_CLIENT_INFO_VERSION = "1.12.0"
BIGQUERY_BQSTORAGE_VERSION = "1.24.0"
HAS_CLIENT_INFO = False
HAS_BQSTORAGE_SUPPORT = False

try:
import tqdm # noqa
except ImportError:
tqdm = None


def _check_google_client_version():
global BIGQUERY_INSTALLED_VERSION, HAS_CLIENT_INFO, HAS_BQSTORAGE_SUPPORT, SHOW_VERBOSE_DEPRECATION

try:
import pkg_resources

except ImportError:
raise ImportError("Could not import pkg_resources (setuptools).")

# https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md
bigquery_minimum_version = pkg_resources.parse_version("1.11.0")
bigquery_client_info_version = pkg_resources.parse_version(
BIGQUERY_CLIENT_INFO_VERSION
)
bigquery_bqstorage_version = pkg_resources.parse_version(
BIGQUERY_BQSTORAGE_VERSION
)
BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution(
"google-cloud-bigquery"
).parsed_version

HAS_CLIENT_INFO = (
BIGQUERY_INSTALLED_VERSION >= bigquery_client_info_version
)
HAS_BQSTORAGE_SUPPORT = (
BIGQUERY_INSTALLED_VERSION >= bigquery_bqstorage_version
)

if BIGQUERY_INSTALLED_VERSION < bigquery_minimum_version:
raise ImportError(
"pandas-gbq requires google-cloud-bigquery >= {0}, "
"current version {1}".format(
bigquery_minimum_version, BIGQUERY_INSTALLED_VERSION
)
)

# Add check for Pandas version before showing deprecation warning.
# https://github.com/pydata/pandas-gbq/issues/157
pandas_installed_version = pkg_resources.get_distribution(
"pandas"
).parsed_version
pandas_version_wo_verbosity = pkg_resources.parse_version("0.23.0")
SHOW_VERBOSE_DEPRECATION = (
pandas_installed_version >= pandas_version_wo_verbosity
)


def _test_google_api_imports():
try:
import pkg_resources # noqa
except ImportError as ex:
raise ImportError("pandas-gbq requires setuptools") from ex

try:
import pydata_google_auth # noqa
except ImportError as ex:
raise ImportError(
"pandas-gbq requires pydata-google-auth: {0}".format(ex)
)
raise ImportError("pandas-gbq requires pydata-google-auth") from ex

try:
from google_auth_oauthlib.flow import InstalledAppFlow # noqa
except ImportError as ex:
raise ImportError(
"pandas-gbq requires google-auth-oauthlib: {0}".format(ex)
)
raise ImportError("pandas-gbq requires google-auth-oauthlib") from ex

try:
import google.auth # noqa
except ImportError as ex:
raise ImportError("pandas-gbq requires google-auth: {0}".format(ex))
raise ImportError("pandas-gbq requires google-auth") from ex

try:
from google.cloud import bigquery # noqa
except ImportError as ex:
raise ImportError(
"pandas-gbq requires google-cloud-bigquery: {0}".format(ex)
)

_check_google_client_version()
raise ImportError("pandas-gbq requires google-cloud-bigquery") from ex


class DatasetCreationError(ValueError):
Expand Down Expand Up @@ -416,7 +361,7 @@ def get_client(self):
# In addition to new enough version of google-api-core, a new enough
# version of google-cloud-bigquery is required to populate the
# client_info.
if HAS_CLIENT_INFO:
if FEATURES.bigquery_has_client_info:
return bigquery.Client(
project=self.project_id,
credentials=self.credentials,
Expand Down Expand Up @@ -550,14 +495,15 @@ def _download_results(
if user_dtypes is None:
user_dtypes = {}

if self.use_bqstorage_api and not HAS_BQSTORAGE_SUPPORT:
if self.use_bqstorage_api and not FEATURES.bigquery_has_bqstorage:
warnings.warn(
(
"use_bqstorage_api was set, but have google-cloud-bigquery "
"version {}. Requires google-cloud-bigquery version "
"{} or later."
).format(
BIGQUERY_INSTALLED_VERSION, BIGQUERY_BQSTORAGE_VERSION
FEATURES.bigquery_installed_version,
features.BIGQUERY_BQSTORAGE_VERSION,
),
PerformanceWarning,
stacklevel=4,
Expand All @@ -568,7 +514,7 @@ def _download_results(
create_bqstorage_client = False

to_dataframe_kwargs = {}
if HAS_BQSTORAGE_SUPPORT:
if FEATURES.bigquery_has_bqstorage:
to_dataframe_kwargs[
"create_bqstorage_client"
] = create_bqstorage_client
Expand Down Expand Up @@ -880,7 +826,7 @@ def read_gbq(

_test_google_api_imports()

if verbose is not None and SHOW_VERBOSE_DEPRECATION:
if verbose is not None and FEATURES.pandas_has_deprecated_verbose:
warnings.warn(
"verbose is deprecated and will be removed in "
"a future version. Set logging level in order to vary "
Expand Down Expand Up @@ -1054,7 +1000,7 @@ def to_gbq(

_test_google_api_imports()

if verbose is not None and SHOW_VERBOSE_DEPRECATION:
if verbose is not None and FEATURES.pandas_has_deprecated_verbose:
warnings.warn(
"verbose is deprecated and will be removed in "
"a future version. Set logging level in order to vary "
Expand Down Expand Up @@ -1133,8 +1079,8 @@ def to_gbq(
"schema of the destination table."
)

# Update the local `table_schema` so mode matches.
# See: https://github.com/pydata/pandas-gbq/issues/315
# Update the local `table_schema` so mode (NULLABLE/REQUIRED)
# matches. See: https://github.com/pydata/pandas-gbq/issues/315
table_schema = pandas_gbq.schema.update_schema(
table_schema, original_schema
)
Expand Down Expand Up @@ -1252,7 +1198,6 @@ def create(self, table_id, schema):
dataframe.
"""
from google.cloud.bigquery import DatasetReference
from google.cloud.bigquery import SchemaField
from google.cloud.bigquery import Table
from google.cloud.bigquery import TableReference

Expand All @@ -1274,12 +1219,7 @@ def create(self, table_id, schema):
DatasetReference(self.project_id, self.dataset_id), table_id
)
table = Table(table_ref)

schema = pandas_gbq.schema.add_default_nullable_mode(schema)

table.schema = [
SchemaField.from_api_repr(field) for field in schema["fields"]
]
table.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)

try:
self.client.create_table(table)
Expand Down
48 changes: 30 additions & 18 deletions pandas_gbq/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from google.cloud import bigquery

from pandas_gbq.features import FEATURES
import pandas_gbq.schema


Expand All @@ -30,21 +31,21 @@ def encode_chunk(dataframe):
return io.BytesIO(body)


def encode_chunks(dataframe, chunksize=None):
def split_dataframe(dataframe, chunksize=None):
dataframe = dataframe.reset_index(drop=True)
if chunksize is None:
yield 0, encode_chunk(dataframe)
yield 0, dataframe
return

remaining_rows = len(dataframe)
total_rows = remaining_rows
start_index = 0
while start_index < total_rows:
end_index = start_index + chunksize
chunk_buffer = encode_chunk(dataframe[start_index:end_index])
chunk = dataframe[start_index:end_index]
start_index += chunksize
remaining_rows = max(0, remaining_rows - chunksize)
yield remaining_rows, chunk_buffer
yield remaining_rows, chunk


def load_chunks(
Expand All @@ -60,24 +61,35 @@ def load_chunks(
job_config.source_format = "CSV"
job_config.allow_quoted_newlines = True

if schema is None:
# Explicit schema? Use that!
if schema is not None:
schema = pandas_gbq.schema.remove_policy_tags(schema)
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)
# If not, let BigQuery determine schema unless we are encoding the CSV files ourselves.
elif not FEATURES.bigquery_has_from_dataframe_with_csv:
schema = pandas_gbq.schema.generate_bq_schema(dataframe)
schema = pandas_gbq.schema.remove_policy_tags(schema)
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)

schema = pandas_gbq.schema.add_default_nullable_mode(schema)
chunks = split_dataframe(dataframe, chunksize=chunksize)
for remaining_rows, chunk in chunks:
yield remaining_rows

job_config.schema = [
bigquery.SchemaField.from_api_repr(field) for field in schema["fields"]
]

chunks = encode_chunks(dataframe, chunksize=chunksize)
for remaining_rows, chunk_buffer in chunks:
try:
yield remaining_rows
client.load_table_from_file(
chunk_buffer,
if FEATURES.bigquery_has_from_dataframe_with_csv:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly necessary (just the "omit policyTags" logic was), but I thought this might be a good opportunity to use more logic from google-cloud-bigquery, per #339

The CSV encoding in google-cloud-bigquery is still relatively new, so I didn't want to bump our minimum google-cloud-bigquery versions yet. Discussion: #357

client.load_table_from_dataframe(
chunk,
destination_table_ref,
job_config=job_config,
location=location,
).result()
finally:
chunk_buffer.close()
else:
try:
chunk_buffer = encode_chunk(chunk)
client.load_table_from_file(
chunk_buffer,
destination_table_ref,
job_config=job_config,
location=location,
).result()
finally:
chunk_buffer.close()
Loading