Skip to content

Commit

Permalink
feat: implement option 'delete_rows' of argument 'if_exists' in 'Data…
Browse files Browse the repository at this point in the history
…Frame.to_sql' API.
  • Loading branch information
gmcrocetti committed Feb 17, 2025
1 parent ee06e71 commit e67c0a2
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 17 deletions.
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Other enhancements
- :meth:`Series.str.get_dummies` now accepts a ``dtype`` parameter to specify the dtype of the resulting DataFrame (:issue:`47872`)
- :meth:`pandas.concat` will raise a ``ValueError`` when ``ignore_index=True`` and ``keys`` is not ``None`` (:issue:`59274`)
- :py:class:`frozenset` elements in pandas objects are now natively printed (:issue:`60690`)
- Add ``"delete_rows"`` option to ``if_exists`` argument in :meth:`DataFrame.to_sql` deleting all records of the table before inserting data (:issue:`37210`).
- Errors occurring during SQL I/O will now throw a generic :class:`.DatabaseError` instead of the raw Exception type from the underlying driver manager library (:issue:`60748`)
- Implemented :meth:`Series.str.isascii` and :meth:`Series.str.isascii` (:issue:`59091`)
- Multiplying two :class:`DateOffset` objects will now raise a ``TypeError`` instead of a ``RecursionError`` (:issue:`59442`)
Expand Down
6 changes: 6 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2801,6 +2801,12 @@ def to_sql(
Databases supported by SQLAlchemy [1]_ are supported. Tables can be
newly created, appended to, or overwritten.
.. warning::
The pandas library does not attempt to sanitize inputs provided via a to_sql call.
Please refer to the documentation for the underlying database driver to see if it
will properly prevent injection, or alternatively be advised of a security risk when
executing arbitrary commands in a to_sql call.
Parameters
----------
name : str
Expand Down
61 changes: 47 additions & 14 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@

from sqlalchemy import Table
from sqlalchemy.sql.expression import (
Delete,
Select,
TextClause,
)
Expand Down Expand Up @@ -738,7 +739,7 @@ def to_sql(
name: str,
con,
schema: str | None = None,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label: IndexLabel | None = None,
chunksize: int | None = None,
Expand All @@ -750,6 +751,12 @@ def to_sql(
"""
Write records stored in a DataFrame to a SQL database.
.. warning::
The pandas library does not attempt to sanitize inputs provided via a to_sql call.
Please refer to the documentation for the underlying database driver to see if it
will properly prevent injection, or alternatively be advised of a security risk when
executing arbitrary commands in a to_sql call.
Parameters
----------
frame : DataFrame, Series
Expand All @@ -764,10 +771,11 @@ def to_sql(
schema : str, optional
Name of SQL schema in database to write to (if database flavor
supports this). If None, use default schema (default).
if_exists : {'fail', 'replace', 'append'}, default 'fail'
if_exists : {'fail', 'replace', 'append', 'delete_rows'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- delete_rows: If a table exists, delete all records and insert data.
index : bool, default True
Write DataFrame index as a column.
index_label : str or sequence, optional
Expand Down Expand Up @@ -818,7 +826,7 @@ def to_sql(
`sqlite3 <https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.rowcount>`__ or
`SQLAlchemy <https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount>`__
""" # noqa: E501
if if_exists not in ("fail", "replace", "append"):
if if_exists not in ("fail", "replace", "append", "delete_rows"):
raise ValueError(f"'{if_exists}' is not valid for if_exists")

if isinstance(frame, Series):
Expand Down Expand Up @@ -926,7 +934,7 @@ def __init__(
pandas_sql_engine,
frame=None,
index: bool | str | list[str] | None = True,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
prefix: str = "pandas",
index_label=None,
schema=None,
Expand Down Expand Up @@ -974,11 +982,13 @@ def create(self) -> None:
if self.exists():
if self.if_exists == "fail":
raise ValueError(f"Table '{self.name}' already exists.")
if self.if_exists == "replace":
elif self.if_exists == "replace":
self.pd_sql.drop_table(self.name, self.schema)
self._execute_create()
elif self.if_exists == "append":
pass
elif self.if_exists == "delete_rows":
self.pd_sql.delete_rows(self.name, self.schema)
else:
raise ValueError(f"'{self.if_exists}' is not valid for if_exists")
else:
Expand All @@ -997,7 +1007,7 @@ def _execute_insert(self, conn, keys: list[str], data_iter) -> int:
Each item contains a list of values to be inserted
"""
data = [dict(zip(keys, row)) for row in data_iter]
result = conn.execute(self.table.insert(), data)
result = self.pd_sql.execute(self.table.insert(), data)
return result.rowcount

def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:
Expand All @@ -1014,7 +1024,7 @@ def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:

data = [dict(zip(keys, row)) for row in data_iter]
stmt = insert(self.table).values(data)
result = conn.execute(stmt)
result = self.pd_sql.execute(stmt)
return result.rowcount

def insert_data(self) -> tuple[list[str], list[np.ndarray]]:
Expand Down Expand Up @@ -1480,7 +1490,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label=None,
schema=None,
Expand Down Expand Up @@ -1649,7 +1659,7 @@ def run_transaction(self):
else:
yield self.con

def execute(self, sql: str | Select | TextClause, params=None):
def execute(self, sql: str | Select | TextClause | Delete, params=None):
"""Simple passthrough to SQLAlchemy connectable"""
from sqlalchemy.exc import SQLAlchemyError

Expand Down Expand Up @@ -1874,7 +1884,7 @@ def prep_table(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool | str | list[str] | None = True,
index_label=None,
schema=None,
Expand Down Expand Up @@ -1951,7 +1961,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label=None,
schema: str | None = None,
Expand All @@ -1969,10 +1979,11 @@ def to_sql(
frame : DataFrame
name : string
Name of SQL table.
if_exists : {'fail', 'replace', 'append'}, default 'fail'
if_exists : {'fail', 'replace', 'append', 'delete_rows'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- delete_rows: If a table exists, delete all records and insert data.
index : boolean, default True
Write DataFrame index as a column.
index_label : string or sequence, default None
Expand Down Expand Up @@ -2069,6 +2080,16 @@ def drop_table(self, table_name: str, schema: str | None = None) -> None:
self.get_table(table_name, schema).drop(bind=self.con)
self.meta.clear()

def delete_rows(self, table_name: str, schema: str | None = None) -> None:
schema = schema or self.meta.schema
if self.has_table(table_name, schema):
self.meta.reflect(
bind=self.con, only=[table_name], schema=schema, views=True
)
table = self.get_table(table_name, schema)
self.execute(table.delete())
self.meta.clear()

def _create_sql_schema(
self,
frame: DataFrame,
Expand Down Expand Up @@ -2304,7 +2325,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label=None,
schema: str | None = None,
Expand All @@ -2326,6 +2347,7 @@ def to_sql(
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- delete_rows: If a table exists, delete all records and insert data.
index : boolean, default True
Write DataFrame index as a column.
index_label : string or sequence, default None
Expand Down Expand Up @@ -2416,6 +2438,11 @@ def has_table(self, name: str, schema: str | None = None) -> bool:

return False

def delete_rows(self, name: str, schema: str | None = None) -> None:
table_name = f"{schema}.{name}" if schema else name
if self.has_table(name, schema):
self.execute(f"DELETE FROM {table_name}").close()

def _create_sql_schema(
self,
frame: DataFrame,
Expand Down Expand Up @@ -2790,10 +2817,11 @@ def to_sql(
frame: DataFrame
name: string
Name of SQL table.
if_exists: {'fail', 'replace', 'append'}, default 'fail'
if_exists: {'fail', 'replace', 'append', 'delete_rows'}, default 'fail'
fail: If table exists, do nothing.
replace: If table exists, drop it, recreate it, and insert data.
append: If table exists, insert data. Create if it does not exist.
delete_rows: If a table exists, delete all records and insert data.
index : bool, default True
Write DataFrame index as a column
index_label : string or sequence, default None
Expand Down Expand Up @@ -2869,6 +2897,11 @@ def drop_table(self, name: str, schema: str | None = None) -> None:
drop_sql = f"DROP TABLE {_get_valid_sqlite_name(name)}"
self.execute(drop_sql)

def delete_rows(self, name: str, schema: str | None = None) -> None:
delete_sql = f"DELETE FROM {_get_valid_sqlite_name(name)}"
if self.has_table(name, schema):
self.execute(delete_sql)

def _create_sql_schema(
self,
frame,
Expand Down
60 changes: 57 additions & 3 deletions pandas/tests/io/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,9 @@ def test_to_sql(conn, method, test_frame1, request):


@pytest.mark.parametrize("conn", all_connectable)
@pytest.mark.parametrize("mode, num_row_coef", [("replace", 1), ("append", 2)])
@pytest.mark.parametrize(
"mode, num_row_coef", [("replace", 1), ("append", 2), ("delete_rows", 1)]
)
def test_to_sql_exist(conn, mode, num_row_coef, test_frame1, request):
conn = request.getfixturevalue(conn)
with pandasSQL_builder(conn, need_transaction=True) as pandasSQL:
Expand Down Expand Up @@ -2698,6 +2700,58 @@ def test_drop_table(conn, request):
assert not insp.has_table("temp_frame")


@pytest.mark.parametrize("conn_name", all_connectable)
def test_delete_rows_success(conn_name, test_frame1, request):
table_name = "temp_frame"
conn = request.getfixturevalue(conn_name)

with pandasSQL_builder(conn) as pandasSQL:
with pandasSQL.run_transaction():
assert pandasSQL.to_sql(test_frame1, table_name) == test_frame1.shape[0]

with pandasSQL.run_transaction():
assert pandasSQL.delete_rows(table_name) is None

assert count_rows(conn, table_name) == 0
assert pandasSQL.has_table("temp_frame")


@pytest.mark.parametrize("conn_name", all_connectable)
def test_delete_rows_is_atomic(conn_name, request):
sqlalchemy = pytest.importorskip("sqlalchemy")

table_name = "temp_frame"
table_stmt = f"CREATE TABLE {table_name} (a INTEGER, b INTEGER UNIQUE NOT NULL)"

if conn_name != "sqlite_buildin" and "adbc" not in conn_name:
table_stmt = sqlalchemy.text(table_stmt)

# setting dtype is mandatory for adbc related tests
original_df = DataFrame({"a": [1, 2], "b": [3, 4]}, dtype="int32")
replacing_df = DataFrame({"a": [5, 6, 7], "b": [8, 8, 8]}, dtype="int32")

conn = request.getfixturevalue(conn_name)
pandasSQL = pandasSQL_builder(conn)

with pandasSQL.run_transaction() as cur:
cur.execute(table_stmt)

with pandasSQL.run_transaction():
pandasSQL.to_sql(original_df, table_name, if_exists="append", index=False)

# inserting duplicated values in a UNIQUE constraint column
with pytest.raises(pd.errors.DatabaseError):
with pandasSQL.run_transaction():
pandasSQL.to_sql(
replacing_df, table_name, if_exists="delete_rows", index=False
)

# failed "delete_rows" is rolled back preserving original data
with pandasSQL.run_transaction():
result_df = pandasSQL.read_query(f"SELECT * FROM {table_name}", dtype="int32")
tm.assert_frame_equal(result_df, original_df)


@pytest.mark.parametrize("conn", all_connectable)
def test_roundtrip(conn, request, test_frame1):
if conn == "sqlite_str":
Expand Down Expand Up @@ -3409,8 +3463,8 @@ def test_to_sql_with_negative_npinf(conn, request, input):
mark = pytest.mark.xfail(reason="GH 36465")
request.applymarker(mark)

msg = "inf cannot be used with MySQL"
with pytest.raises(ValueError, match=msg):
msg = "Execution failed on sql"
with pytest.raises(pd.errors.DatabaseError, match=msg):
df.to_sql(name="foobar", con=conn, index=False)
else:
assert df.to_sql(name="foobar", con=conn, index=False) == 1
Expand Down

0 comments on commit e67c0a2

Please sign in to comment.