|
6 | 6 | that will be directly exposed to its users.
|
7 | 7 | """
|
8 | 8 | import pandas as pd
|
9 |
| -from sqlalchemy.engine.base import Connectable |
10 |
| -from typing import Optional, Union |
| 9 | +from sqlalchemy.engine import Connectable |
| 10 | +from typing import Union |
11 | 11 |
|
12 | 12 | # local imports
|
13 | 13 | from pangres.executor import Executor
|
14 | 14 | from pangres.helpers import validate_chunksize_param
|
| 15 | +from pangres.pangres_types import AsyncConnectable, AUpsertResult, UpsertResult |
15 | 16 |
|
16 | 17 |
|
17 | 18 | # -
|
18 | 19 |
|
19 | 20 | # # upsert
|
20 | 21 |
|
21 |
| -def upsert(con:Connectable, |
22 |
| - df:pd.DataFrame, |
23 |
| - table_name:str, |
24 |
| - if_row_exists:str, |
25 |
| - schema:Optional[str]=None, |
26 |
| - create_schema:bool=False, |
27 |
| - create_table:bool=True, |
28 |
| - add_new_columns:bool=False, |
29 |
| - adapt_dtype_of_empty_db_columns:bool=False, |
30 |
| - chunksize:Optional[int]=None, |
31 |
| - dtype:Union[dict,None]=None, |
32 |
| - yield_chunks:bool=False): |
| 22 | +def upsert(con: Connectable, |
| 23 | + df: pd.DataFrame, |
| 24 | + table_name: str, |
| 25 | + if_row_exists: str, |
| 26 | + schema: Union[str, None] = None, |
| 27 | + create_schema: bool = False, |
| 28 | + create_table: bool = True, |
| 29 | + add_new_columns: bool = False, |
| 30 | + adapt_dtype_of_empty_db_columns: bool = False, |
| 31 | + chunksize: Union[int, None] = None, |
| 32 | + dtype: Union[dict, None] = None, |
| 33 | + yield_chunks: bool = False) -> UpsertResult: |
33 | 34 | """
|
34 | 35 | Insert updates/ignores a pandas DataFrame into a SQL table (or
|
35 | 36 | creates a SQL table from the DataFrame if it does not exist).
|
@@ -210,7 +211,7 @@ def upsert(con:Connectable,
|
210 | 211 | ... if_row_exists='update',
|
211 | 212 | ... dtype=dtype,
|
212 | 213 | ... create_table=False)
|
213 |
| - >>> |
| 214 | + >>> |
214 | 215 | >>> # Now we read from the database to check what we got and as you can see
|
215 | 216 | >>> # John Travolta was updated and Arnold Schwarzenegger was added!
|
216 | 217 | >>> with engine.connect() as connection:
|
@@ -299,32 +300,33 @@ def upsert(con:Connectable,
|
299 | 300 | # execute SQL operations
|
300 | 301 | if not yield_chunks:
|
301 | 302 | executor.execute(connectable=con, if_row_exists=if_row_exists, chunksize=chunksize)
|
| 303 | + return None |
302 | 304 | else:
|
303 | 305 | return executor.execute_yield(connectable=con, if_row_exists=if_row_exists, chunksize=chunksize)
|
304 | 306 |
|
305 | 307 |
|
306 | 308 | # # Async upsert
|
307 | 309 |
|
308 |
| -async def aupsert(con, |
309 |
| - df:pd.DataFrame, |
310 |
| - table_name:str, |
311 |
| - if_row_exists:str, |
312 |
| - schema:Optional[str]=None, |
313 |
| - create_schema:bool=False, |
314 |
| - create_table:bool=True, |
315 |
| - add_new_columns:bool=False, |
316 |
| - adapt_dtype_of_empty_db_columns:bool=False, |
317 |
| - chunksize:Optional[int]=None, |
318 |
| - dtype:Union[dict,None]=None, |
319 |
| - yield_chunks:bool=False): |
| 310 | +async def aupsert(con: AsyncConnectable, |
| 311 | + df: pd.DataFrame, |
| 312 | + table_name: str, |
| 313 | + if_row_exists: str, |
| 314 | + schema: Union[str, None] = None, |
| 315 | + create_schema: bool = False, |
| 316 | + create_table: bool = True, |
| 317 | + add_new_columns: bool = False, |
| 318 | + adapt_dtype_of_empty_db_columns: bool = False, |
| 319 | + chunksize: Union[int, None] = None, |
| 320 | + dtype: Union[dict, None] = None, |
| 321 | + yield_chunks: bool = False) -> AUpsertResult: |
320 | 322 | """
|
321 | 323 | Asynchronous variant of `pangres.upsert`. Make sure to read its docstring
|
322 | 324 | before using this function!
|
323 | 325 |
|
324 | 326 | The parameters of `pangres.aupsert` are the same but parameter `con`
|
325 | 327 | will require an asynchronous connectable (asynchronous engine or asynchronous connection).
|
326 | 328 |
|
327 |
| - For example you can use PostgreSQL asynchronously with `sqlalchemy` thanks to |
| 329 | + For example, you can use PostgreSQL asynchronously with `sqlalchemy` thanks to |
328 | 330 | the library/driver `asyncpg`, or SQLite with `aiosqlite` or Mysql with `aiomysql`.
|
329 | 331 |
|
330 | 332 | **WARNING**
|
@@ -389,12 +391,12 @@ async def aupsert(con,
|
389 | 391 | >>> df = DocsExampleTable.df
|
390 | 392 | >>>
|
391 | 393 | >>> # Create table before inserting! This will avoid race conditions mentionned above
|
392 |
| - >>> # (here we are lazy so we'll use pangres to do that but we could also use a sqlalchemy ORM model) |
393 |
| - >>> # By using `df.head(0)` we get 0 rows but we have all the information about columns, index levels |
| 394 | + >>> # (here we are lazy, so we'll use pangres to do that, but we could also use a sqlalchemy ORM model) |
| 395 | + >>> # By using `df.head(0)` we get 0 rows, but we have all the information about columns, index levels |
394 | 396 | >>> # and data types that we need for creating the table.
|
395 | 397 | >>> # And in a second step (see coroutine `execute_upsert` that we define after)
|
396 | 398 | >>> # we will set all parameters that could cause structure changes
|
397 |
| - >>> # to False so we can run queries in parallel without worries! |
| 399 | + >>> # to False, so we can run queries in parallel without worries! |
398 | 400 | >>> async def setup():
|
399 | 401 | ... await aupsert(con=engine, df=df.head(0),
|
400 | 402 | ... table_name='example',
|
@@ -455,6 +457,7 @@ async def aupsert(con,
|
455 | 457 | # execute SQL operations
|
456 | 458 | if not yield_chunks:
|
457 | 459 | await executor.aexecute(async_connectable=con, if_row_exists=if_row_exists, chunksize=chunksize)
|
| 460 | + return None |
458 | 461 | else:
|
459 | 462 | # IMPORTANT! NO `await` because this returns an asynchronous generator
|
460 | 463 | return executor.aexecute_yield(async_connectable=con, if_row_exists=if_row_exists, chunksize=chunksize)
|
0 commit comments