Skip to content

Commit

Permalink
Move to csv reader/writer, expose timeout for async job, fix message …
Browse files Browse the repository at this point in the history
…for timeout
  • Loading branch information
BryanFauble committed Feb 20, 2025
1 parent 554680c commit d6421c2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 17 deletions.
16 changes: 12 additions & 4 deletions synapseclient/models/mixins/asynchronous_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async def _post_exchange_async(
async def send_job_and_wait_async(
self,
post_exchange_args: Optional[Dict[str, Any]] = None,
timeout: int = 60,
*,
synapse_client: Optional[Synapse] = None,
) -> "Self":
Expand All @@ -65,6 +66,8 @@ async def send_job_and_wait_async(
Arguments:
post_exchange_args: Additional arguments to pass to the request.
timeout: The number of seconds to wait for the job to complete or progress
before raising a SynapseTimeoutError. Defaults to 60.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.
Expand Down Expand Up @@ -102,6 +105,7 @@ async def send_job_and_wait_async(
result = await send_job_and_wait_async(
request=self.to_synapse_request(),
request_type=self.concrete_type,
timeout=timeout,
synapse_client=synapse_client,
)
self.fill_from_dict(synapse_response=result)
Expand Down Expand Up @@ -263,6 +267,7 @@ async def send_job_and_wait_async(
request: Dict[str, Any],
request_type: str,
endpoint: str = None,
timeout: int = 60,
*,
synapse_client: Optional["Synapse"] = None,
) -> Dict[str, Any]:
Expand All @@ -273,6 +278,8 @@ async def send_job_and_wait_async(
Arguments:
request: A request matching <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/asynch/AsynchronousRequestBody.html>.
endpoint: The endpoint to use for the request. Defaults to None.
timeout: The number of seconds to wait for the job to complete or progress
before raising a SynapseTimeoutError. Defaults to 60.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.
Expand All @@ -293,6 +300,7 @@ async def send_job_and_wait_async(
request_type=request_type,
synapse_client=synapse_client,
endpoint=endpoint,
timeout=timeout,
),
}

Expand Down Expand Up @@ -370,14 +378,14 @@ async def get_job_async(
SynapseTimeoutError: If the job does not complete or progress within the timeout interval.
"""
client = Synapse.get_client(synapse_client=synapse_client)
start_time = asyncio.get_event_loop().time()
start_time = time.time()

last_message = ""
last_progress = 0
last_total = 1
progressed = False

while asyncio.get_event_loop().time() - start_time < timeout:
while time.time() - start_time < timeout:
result = await client.rest_get_async(
uri=f"{ASYNC_JOB_URIS[request_type]}/get/{job_id}",
endpoint=endpoint,
Expand Down Expand Up @@ -406,7 +414,7 @@ async def get_job_async(
prefix=last_message,
isBytes=False,
)
start_time = asyncio.get_event_loop().time()
start_time = time.time()
await asyncio.sleep(sleep)
elif job_status.state == AsynchronousJobState.FAILED:
raise SynapseError(
Expand All @@ -416,7 +424,7 @@ async def get_job_async(
break
else:
raise SynapseTimeoutError(
f"Timeout waiting for query results: {time.time() - start_time} seconds"
f"Timeout waiting for results: {time.time() - start_time} seconds"
)

return result
46 changes: 33 additions & 13 deletions synapseclient/models/mixins/table_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
or querying for data."""

import asyncio
import csv
import json
import logging
import math
import os
import tempfile
import uuid
Expand Down Expand Up @@ -2644,23 +2644,39 @@ async def _chunk_and_upload_csv(
if file_size > insert_size_byte:
applied_additional_changes = False
with open(file=path_to_csv, mode="r", encoding="utf-8") as f:
header = f.readline().encode()
loop_iteration = 0
csv_table_descriptor = csv_table_descriptor or CsvTableDescriptor()
reader = csv.reader(
f,
delimiter=csv_table_descriptor.separator,
escapechar=csv_table_descriptor.escape_character,
lineterminator=csv_table_descriptor.line_end,
quotechar=csv_table_descriptor.quote_character,
)
header = next(reader)
temp_dir = client.cache.get_cache_dir(file_handle_id=111111111)
with tempfile.NamedTemporaryFile(
prefix="chunked_csv_for_synapse_store_rows",
suffix=".csv",
dir=temp_dir,
mode="w+",
encoding="utf-8",
) as temp_file:
try:
temp_file.write(header)
while chunk := f.readlines(math.ceil(insert_size_byte / 10)):
if not chunk:
break
csv_writer = csv.writer(
temp_file,
delimiter=csv_table_descriptor.separator,
escapechar=csv_table_descriptor.escape_character,
lineterminator=csv_table_descriptor.line_end,
quotechar=csv_table_descriptor.quote_character,
)

loop_iteration += 1
temp_file.writelines([line.encode() for line in chunk])
if loop_iteration % 10 == 0:
csv_writer.writerow(header)
current_bytes_written = 0
for row in reader:
current_bytes_written += csv_writer.writerow(row)

if current_bytes_written >= insert_size_byte:
current_bytes_written = 0
temp_file.flush()
# TODO: This portion of the code should be updated to support uploading a file from memory using BytesIO (Ticket to be created)
file_handle_id = await multipart_upload_file_async(
Expand All @@ -2670,7 +2686,7 @@ async def _chunk_and_upload_csv(
)
temp_file.truncate(0)
temp_file.seek(0)
temp_file.write(header)
csv_writer.writerow(header)

upload_request = UploadToTableRequest(
table_id=self.id,
Expand All @@ -2692,7 +2708,9 @@ async def _chunk_and_upload_csv(

await TableUpdateTransaction(
entity_id=self.id, changes=changes
).send_job_and_wait_async(synapse_client=client)
).send_job_and_wait_async(
synapse_client=client, timeout=600
)

# If there is any data except the header, upload it
if temp_file.tell() > 1:
Expand Down Expand Up @@ -2724,7 +2742,9 @@ async def _chunk_and_upload_csv(

await TableUpdateTransaction(
entity_id=self.id, changes=changes
).send_job_and_wait_async(synapse_client=client)
).send_job_and_wait_async(
synapse_client=client, timeout=600
)
finally:
temp_file.close()
else:
Expand Down

0 comments on commit d6421c2

Please sign in to comment.