From 44ade49c83134a63549d6010b2fb70e03600f8c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Fri, 17 Jan 2025 09:53:19 +0000 Subject: [PATCH 1/2] [DOP-23318] Avoid suppressing Hive Metastore errors --- docs/changelog/next_release/329.bugfix.rst | 1 + .../db_connection/hive/connection.py | 33 +++++++++++++------ 2 files changed, 24 insertions(+), 10 deletions(-) create mode 100644 docs/changelog/next_release/329.bugfix.rst diff --git a/docs/changelog/next_release/329.bugfix.rst b/docs/changelog/next_release/329.bugfix.rst new file mode 100644 index 00000000..9666bc8a --- /dev/null +++ b/docs/changelog/next_release/329.bugfix.rst @@ -0,0 +1 @@ +Avoid suppressing Hive Metastore errors while using ``DBWriter``. Previously this could lead to overriding already existing table if Hive Metastore was overloaded and responded with an exception. diff --git a/onetl/connection/db_connection/hive/connection.py b/onetl/connection/db_connection/hive/connection.py index 21fd5f69..f8f63d1b 100644 --- a/onetl/connection/db_connection/hive/connection.py +++ b/onetl/connection/db_connection/hive/connection.py @@ -296,17 +296,8 @@ def write_df_to_target( ) -> None: write_options = self.WriteOptions.parse(options) - try: - self.get_df_schema(target) - table_exists = True - - log.info("|%s| Table %r already exists", self.__class__.__name__, target) - except Exception as e: - log.debug("|%s| Cannot get schema of table %r: %s", self.__class__.__name__, target, e) - table_exists = False - # https://stackoverflow.com/a/72747050 - if table_exists and write_options.if_exists != HiveTableExistBehavior.REPLACE_ENTIRE_TABLE: + if self._target_exist(target) and write_options.if_exists != HiveTableExistBehavior.REPLACE_ENTIRE_TABLE: if write_options.if_exists == HiveTableExistBehavior.ERROR: raise ValueError("Operation stopped due to Hive.WriteOptions(if_exists='error')") elif write_options.if_exists == HiveTableExistBehavior.IGNORE: @@ -465,6 +456,28 @@ def _sort_df_columns_like_table(self, table: str, df_columns: list[str]) -> list return sorted(df_columns, key=lambda column: table_columns_normalized.index(column.casefold())) + def _target_exist(self, name: str) -> bool: + from pyspark.sql.functions import col + + log.info("|%s| Checking if table %r exists ...", self.__class__.__name__, name) + + # Do not use SELECT * FROM table, because it may fail if users have no parmissions, + # or Hive Metastore is overloaded. + # Also we ignore VIEW's as they are not insertable. + schema, table = name.split(".", maxsplit=1) + query = f"SHOW TABLES IN {schema} LIKE '{table}'" + + log.debug("|%s| Executing SQL query:", self.__class__.__name__) + log_lines(log, query, level=logging.DEBUG) + + df = self._execute_sql(query).where(col("tableName") == table) + result = df.count() > 0 + if result: + log.info("|%s| Table %r exists.", self.__class__.__name__, name) + else: + log.info("|%s| Table %r does not exist.", self.__class__.__name__, name) + return result + def _insert_into( self, df: DataFrame, From a087a21cd3082e50c0c2a95e3d2c54dad760711c Mon Sep 17 00:00:00 2001 From: Maxim Martynov Date: Tue, 21 Jan 2025 10:59:11 +0300 Subject: [PATCH 2/2] Update onetl/connection/db_connection/hive/connection.py Co-authored-by: Aleksey Nikoalev <98986158+piece-of-iron@users.noreply.github.com> --- onetl/connection/db_connection/hive/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onetl/connection/db_connection/hive/connection.py b/onetl/connection/db_connection/hive/connection.py index f8f63d1b..bd02f60b 100644 --- a/onetl/connection/db_connection/hive/connection.py +++ b/onetl/connection/db_connection/hive/connection.py @@ -461,7 +461,7 @@ def _target_exist(self, name: str) -> bool: log.info("|%s| Checking if table %r exists ...", self.__class__.__name__, name) - # Do not use SELECT * FROM table, because it may fail if users have no parmissions, + # Do not use SELECT * FROM table, because it may fail if users have no permissions, # or Hive Metastore is overloaded. # Also we ignore VIEW's as they are not insertable. schema, table = name.split(".", maxsplit=1)