diff --git a/docs/changelog/next_release/329.bugfix.rst b/docs/changelog/next_release/329.bugfix.rst new file mode 100644 index 00000000..9666bc8a --- /dev/null +++ b/docs/changelog/next_release/329.bugfix.rst @@ -0,0 +1 @@ +Avoid suppressing Hive Metastore errors while using ``DBWriter``. Previously this could lead to overriding already existing table if Hive Metastore was overloaded and responded with an exception. diff --git a/onetl/connection/db_connection/hive/connection.py b/onetl/connection/db_connection/hive/connection.py index 21fd5f69..bd02f60b 100644 --- a/onetl/connection/db_connection/hive/connection.py +++ b/onetl/connection/db_connection/hive/connection.py @@ -296,17 +296,8 @@ def write_df_to_target( ) -> None: write_options = self.WriteOptions.parse(options) - try: - self.get_df_schema(target) - table_exists = True - - log.info("|%s| Table %r already exists", self.__class__.__name__, target) - except Exception as e: - log.debug("|%s| Cannot get schema of table %r: %s", self.__class__.__name__, target, e) - table_exists = False - # https://stackoverflow.com/a/72747050 - if table_exists and write_options.if_exists != HiveTableExistBehavior.REPLACE_ENTIRE_TABLE: + if self._target_exist(target) and write_options.if_exists != HiveTableExistBehavior.REPLACE_ENTIRE_TABLE: if write_options.if_exists == HiveTableExistBehavior.ERROR: raise ValueError("Operation stopped due to Hive.WriteOptions(if_exists='error')") elif write_options.if_exists == HiveTableExistBehavior.IGNORE: @@ -465,6 +456,28 @@ def _sort_df_columns_like_table(self, table: str, df_columns: list[str]) -> list return sorted(df_columns, key=lambda column: table_columns_normalized.index(column.casefold())) + def _target_exist(self, name: str) -> bool: + from pyspark.sql.functions import col + + log.info("|%s| Checking if table %r exists ...", self.__class__.__name__, name) + + # Do not use SELECT * FROM table, because it may fail if users have no permissions, + # or Hive Metastore is overloaded. + # Also we ignore VIEW's as they are not insertable. + schema, table = name.split(".", maxsplit=1) + query = f"SHOW TABLES IN {schema} LIKE '{table}'" + + log.debug("|%s| Executing SQL query:", self.__class__.__name__) + log_lines(log, query, level=logging.DEBUG) + + df = self._execute_sql(query).where(col("tableName") == table) + result = df.count() > 0 + if result: + log.info("|%s| Table %r exists.", self.__class__.__name__, name) + else: + log.info("|%s| Table %r does not exist.", self.__class__.__name__, name) + return result + def _insert_into( self, df: DataFrame,