Skip to content

Commit

Permalink
Move the update of rows to the same loop as query, to allow for less …
Browse files Browse the repository at this point in the history
…total memory usage
  • Loading branch information
BryanFauble committed Feb 19, 2025
1 parent 63890ca commit a105e37
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 52 deletions.
90 changes: 39 additions & 51 deletions synapseclient/models/mixins/table_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
SERIES_TYPE = TypeVar("pd.Series")


def test_import_pandas():
def test_import_pandas() -> None:
try:
import pandas as pd # noqa F401
# used to catch when pandas isn't installed
Expand Down Expand Up @@ -1742,7 +1742,7 @@ async def upsert_rows_async(
primary_keys: List[str],
dry_run: bool = False,
*,
rows_per_query: int = 10000,
rows_per_query: int = 50000,
update_size_mb: int = 1.5 * MB,
insert_size_mb: int = 900 * MB,
synapse_client: Optional[Synapse] = None,
Expand All @@ -1769,7 +1769,7 @@ async def upsert_rows_async(
data is chunked up into multiple requests you may find that a portion of
your data is updated, but another portion is not.
- The number of rows that may be upserted in a single call should be
kept to a minimum (< 10,000). There is significant overhead in the request
kept to a minimum (< 50,000). There is significant overhead in the request
to Synapse for each row that is upserted. If you are upserting a large
number of rows a better approach may be to query for the data you want
to update, update the data, then use the [store_rows_async][synapseclient.models.mixins.table_operator.TableRowOperator.store_rows_async] method to
Expand Down Expand Up @@ -1812,7 +1812,7 @@ async def upsert_rows_async(
rows_per_query: The number of rows that will be queries from Synapse per
request. Since we need to query for the data that is being updated
this will determine the number of rows that are queried at a time.
The default is 10,000 rows.
The default is 50,000 rows.
update_size_mb: The maximum size of the request that will be sent to Synapse
when updating rows of data. The default is 1.5MB.
Expand Down Expand Up @@ -1925,8 +1925,6 @@ async def main():
values = DataFrame(values)
elif isinstance(values, str):
values = csv_to_pandas_df(filepath=values, **kwargs)
elif isinstance(values, DataFrame) or isinstance(values, str):
values = values.copy()
else:
raise ValueError(
"Don't know how to make tables from values of type %s." % type(values)
Expand All @@ -1947,10 +1945,10 @@ async def main():
with logging_redirect_tqdm(loggers=[client.logger]):
progress_bar = tqdm(
total=len(values),
desc="Querying rows",
desc="Querying & Updating rows",
unit_scale=True,
smoothing=0,
postfix=f"Over {len(chunk_list)} requests",
postfix=f"Over {len(chunk_list)} chunks",
)
for individual_chunk in chunk_list:
select_statement = "SELECT ROW_ID, "
Expand Down Expand Up @@ -2085,6 +2083,38 @@ async def main():
)
rows_to_update.append(partial_change)
indexs_of_original_df_with_changes.append(matching_row.index[0])

if not dry_run and rows_to_update:
chunked_rows_to_update = []
current_chunk_size = 0
chunk = []
for row in rows_to_update:
row_size = sys.getsizeof(row.to_synapse_request())
if current_chunk_size + row_size > update_size_mb:
chunked_rows_to_update.append(chunk)
chunk = []
current_chunk_size = 0
chunk.append(row)
current_chunk_size += row_size

if chunk:
chunked_rows_to_update.append(chunk)

for chunked_row_to_update in chunked_rows_to_update:
change = AppendableRowSetRequest(
entity_id=self.id,
to_append=PartialRowSet(
table_id=self.id,
rows=chunked_row_to_update,
),
)

await TableUpdateTransaction(
entity_id=self.id,
changes=[change],
).send_job_and_wait_async(synapse_client=client)

rows_to_update: List[PartialRow] = []
progress_bar.update(len(individual_chunk))
progress_bar.close()

Expand All @@ -2097,49 +2127,7 @@ async def main():
f" rows to update and {len(rows_to_insert_df)} rows to insert"
)

if not dry_run and rows_to_update:
chunked_rows_to_update = []
current_chunk_size = 0
chunk = []
for row in rows_to_update:
row_size = sys.getsizeof(row.to_synapse_request())
if current_chunk_size + row_size > update_size_mb:
chunked_rows_to_update.append(chunk)
chunk = []
current_chunk_size = 0
chunk.append(row)
current_chunk_size += row_size

if chunk:
chunked_rows_to_update.append(chunk)

with logging_redirect_tqdm(loggers=[client.logger]):
progress_bar = tqdm(
total=len(rows_to_update),
desc="Updating rows",
unit_scale=True,
smoothing=0,
postfix=f"Over {len(chunked_rows_to_update)} requests",
)

for chunked_row_to_update in chunked_rows_to_update:
change = AppendableRowSetRequest(
entity_id=self.id,
to_append=PartialRowSet(
table_id=self.id,
rows=chunked_row_to_update,
),
)

await TableUpdateTransaction(
entity_id=self.id,
changes=[change],
).send_job_and_wait_async(synapse_client=client)
progress_bar.update(len(chunked_row_to_update))

progress_bar.close()

if not rows_to_insert_df.empty:
if not dry_run and not rows_to_insert_df.empty:
await self.store_rows_async(
values=rows_to_insert_df,
dry_run=dry_run,
Expand Down
10 changes: 9 additions & 1 deletion synapseclient/models/protocols/table_operator_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ def upsert_rows(
primary_keys: List[str],
dry_run: bool = False,
*,
rows_per_query: int = 50000,
update_size_mb: int = 1.5 * MB,
insert_size_mb: int = 900 * MB,
synapse_client: Optional[Synapse] = None,
**kwargs,
) -> None:
Expand All @@ -300,7 +303,7 @@ def upsert_rows(
data is chunked up into multiple requests you may find that a portion of
your data is updated, but another portion is not.
- The number of rows that may be upserted in a single call should be
kept to a minimum (< 10,000). There is significant overhead in the request
kept to a minimum (< 50,000). There is significant overhead in the request
to Synapse for each row that is upserted. If you are upserting a large
number of rows a better approach may be to query for the data you want
to update, update the data, then use the [store_rows_async][synapseclient.models.mixins.table_operator.TableRowOperator.store_rows_async] method to
Expand Down Expand Up @@ -340,6 +343,11 @@ def upsert_rows(
set the log level to DEBUG by setting the debug flag when creating
your Synapse class instance like: `syn = Synapse(debug=True)`.
rows_per_query: The number of rows that will be queries from Synapse per
request. Since we need to query for the data that is being updated
this will determine the number of rows that are queried at a time.
The default is 50,000 rows.
update_size_mb: The maximum size of the request that will be sent to Synapse
when updating rows of data. The default is 1.5MB.
Expand Down

0 comments on commit a105e37

Please sign in to comment.