Skip to content

Commit

Permalink
Exposing the timeout parameter for the async jobs with synapse
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanFauble committed Feb 20, 2025
1 parent c79c7dd commit 2fcd3bd
Showing 1 changed file with 52 additions and 8 deletions.
60 changes: 52 additions & 8 deletions synapseclient/models/mixins/table_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,11 @@ async def _generate_schema_change_request(
method_to_trace_name=lambda self, **kwargs: f"{self.__class__}_Store: {self.name}"
)
async def store_async(
self, dry_run: bool = False, *, synapse_client: Optional[Synapse] = None
self,
dry_run: bool = False,
*,
job_timeout: int = 600,
synapse_client: Optional[Synapse] = None,
) -> "Self":
"""Store non-row information about a table including the columns and annotations.
Expand All @@ -321,6 +325,12 @@ async def store_async(
Arguments:
dry_run: If True, will not actually store the table but will log to
the console what would have been stored.
job_timeout: The maximum amount of time to wait for a job to complete.
This is used when updating the table schema. If the timeout
is reached a `SynapseTimeoutError` will be raised.
The default is 600 seconds
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.
Expand Down Expand Up @@ -401,7 +411,7 @@ async def store_async(
if schema_change_request:
await TableUpdateTransaction(
entity_id=self.id, changes=[schema_change_request]
).send_job_and_wait_async(synapse_client=client)
).send_job_and_wait_async(synapse_client=client, timeout=job_timeout)

# Replace the columns after a schema change in case any column names were updated
updated_columns = OrderedDict()
Expand Down Expand Up @@ -1933,6 +1943,7 @@ async def _push_row_updates_to_synapse(
rows_to_update: List[PartialRow],
update_size_byte: int,
progress_bar: tqdm,
job_timeout: int,
client: Synapse,
) -> None:
current_chunk_size = 0
Expand All @@ -1954,7 +1965,7 @@ async def _push_row_updates_to_synapse(
)

await request.send_job_and_wait_async(
synapse_client=client, timeout=600
synapse_client=client, timeout=job_timeout
)
progress_bar.update(len(chunk))
chunk = []
Expand All @@ -1974,7 +1985,7 @@ async def _push_row_updates_to_synapse(
await TableUpdateTransaction(
entity_id=self.id,
changes=[change],
).send_job_and_wait_async(synapse_client=client)
).send_job_and_wait_async(synapse_client=client, timeout=job_timeout)
progress_bar.update(len(chunk))

async def upsert_rows_async(
Expand All @@ -1986,6 +1997,7 @@ async def upsert_rows_async(
rows_per_query: int = 50000,
update_size_byte: int = 1.9 * MB,
insert_size_byte: int = 900 * MB,
job_timeout: int = 600,
synapse_client: Optional[Synapse] = None,
**kwargs,
) -> None:
Expand Down Expand Up @@ -2061,6 +2073,12 @@ async def upsert_rows_async(
insert_size_byte: The maximum size of the request that will be sent to Synapse
when inserting rows of data. The default is 900MB.
job_timeout: The maximum amount of time to wait for a job to complete.
This is used when inserting, and updating rows of data. Each individual
request to Synapse will be sent as an independent job. If the timeout
is reached a `SynapseTimeoutError` will be raised.
The default is 600 seconds
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor
Expand Down Expand Up @@ -2221,6 +2239,7 @@ async def main():
update_size_byte=update_size_byte,
progress_bar=progress_bar,
client=client,
job_timeout=job_timeout,
)
elif dry_run:
progress_bar.update(len(rows_to_update))
Expand Down Expand Up @@ -2312,6 +2331,7 @@ async def store_rows_async(
csv_table_descriptor: Optional[CsvTableDescriptor] = None,
read_csv_kwargs: Optional[Dict[str, Any]] = None,
to_csv_kwargs: Optional[Dict[str, Any]] = None,
job_timeout: int = 600,
synapse_client: Optional[Synapse] = None,
) -> None:
"""
Expand Down Expand Up @@ -2448,6 +2468,12 @@ async def store_rows_async(
<https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html>
for complete list of supported arguments.
job_wait_time: The maximum amount of time to wait for a job to complete.
This is used when inserting, and updating rows of data. Each individual
request to Synapse will be sent as an independent job. If the timeout
is reached a `SynapseTimeoutError` will be raised.
The default is 600 seconds
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.
Expand Down Expand Up @@ -2682,6 +2708,7 @@ async def main():
schema_change_request=schema_change_request,
client=client,
additional_changes=additional_changes,
job_timeout=job_timeout,
)
elif isinstance(values, DataFrame):
# When creating this temporary file we are using the cache directory
Expand Down Expand Up @@ -2724,6 +2751,7 @@ async def main():
schema_change_request=schema_change_request,
client=client,
additional_changes=additional_changes,
job_timeout=job_timeout,
)
finally:
temp_file.close()
Expand All @@ -2740,6 +2768,7 @@ async def _chunk_and_upload_csv(
csv_table_descriptor: CsvTableDescriptor,
schema_change_request: TableSchemaChangeRequest,
client: Synapse,
job_timeout: int,
additional_changes: List[
Union[
"TableSchemaChangeRequest",
Expand All @@ -2750,6 +2779,7 @@ async def _chunk_and_upload_csv(
) -> None:
async def _send_update(
table_descriptor: CsvTableDescriptor,
job_timeout: int,
file_handle_id: str = None,
changes: List[
Union[
Expand All @@ -2768,6 +2798,7 @@ async def _send_update(
Arguments:
table_descriptor: The descriptor for the CSV file that is being uploaded.
job_timeout: The maximum amount of time to wait for a job to complete.
file_handle_id: The file handle ID that is being uploaded to Synapse.
changes: Additional changes to the table that should
execute within the same transaction as appending or updating rows.
Expand All @@ -2789,7 +2820,7 @@ async def _send_update(
if all_changes:
await TableUpdateTransaction(
entity_id=self.id, changes=all_changes
).send_job_and_wait_async(synapse_client=client, timeout=600)
).send_job_and_wait_async(synapse_client=client, timeout=job_timeout)

# TODO: Add integration test around this portion of the code
file_size = os.path.getsize(path_to_csv)
Expand All @@ -2801,7 +2832,11 @@ async def _send_update(
if additional_changes:
changes.extend(additional_changes)

await _send_update(table_descriptor=csv_table_descriptor, changes=changes)
await _send_update(
table_descriptor=csv_table_descriptor,
changes=changes,
job_timeout=job_timeout,
)

progress_bar = tqdm(
total=file_size,
Expand Down Expand Up @@ -2857,6 +2892,7 @@ async def _send_update(
await _send_update(
table_descriptor=csv_table_descriptor,
file_handle_id=file_handle_id,
job_timeout=job_timeout,
)
progress_bar.update(current_bytes_written)
current_bytes_written = 0
Expand All @@ -2874,6 +2910,7 @@ async def _send_update(
await _send_update(
table_descriptor=csv_table_descriptor,
file_handle_id=file_handle_id,
job_timeout=job_timeout,
)
progress_bar.update(current_bytes_written)
finally:
Expand All @@ -2896,11 +2933,16 @@ async def _send_update(
table_descriptor=csv_table_descriptor,
file_handle_id=file_handle_id,
changes=changes,
job_timeout=job_timeout,
)

# TODO: Determine if it is possible to delete rows from a `Dataset` entity, or if it's only possible to delete the rows by setting the `items` attribute and storing the entity
async def delete_rows_async(
self, query: str, *, synapse_client: Optional[Synapse] = None
self,
query: str,
*,
job_timeout: int = 600,
synapse_client: Optional[Synapse] = None,
) -> DATA_FRAME_TYPE:
"""
Delete rows from a table given a query to select rows. The query at a
Expand All @@ -2914,6 +2956,8 @@ async def delete_rows_async(
must select the `ROW_ID` and `ROW_VERSION` columns. See this document
that describes the expected syntax of the query:
<https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/web/controller/TableExamples.html>
job_timeout: The amount of time to wait for table updates to complete
before a `SynapseTimeoutError` is thrown. The default is 600 seconds.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.
Expand Down Expand Up @@ -2984,7 +3028,7 @@ async def main():

await TableUpdateTransaction(
entity_id=self.id, changes=[upload_request]
).send_job_and_wait_async(synapse_client=client)
).send_job_and_wait_async(synapse_client=client, timeout=job_timeout)

return results_from_query

Expand Down

0 comments on commit 2fcd3bd

Please sign in to comment.