Skip to content

Commit

Permalink
Add more testing, docstrings, and patching messages for the async job
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanFauble committed Feb 21, 2025
1 parent 2fcd3bd commit 305f8dc
Show file tree
Hide file tree
Showing 3 changed files with 485 additions and 7 deletions.
13 changes: 12 additions & 1 deletion synapseclient/models/mixins/asynchronous_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ async def send_job_and_wait_async(
synapse_client=synapse_client,
endpoint=endpoint,
timeout=timeout,
request=request,
),
}

Expand Down Expand Up @@ -352,6 +353,7 @@ async def get_job_async(
endpoint: str = None,
sleep: int = 1,
timeout: int = 60,
request: Dict[str, Any] = None,
*,
synapse_client: Optional["Synapse"] = None,
) -> Dict[str, Any]:
Expand All @@ -365,6 +367,8 @@ async def get_job_async(
sleep: The number of seconds to wait between requests. Defaults to 1.
timeout: The number of seconds to wait for the job to complete or progress
before raising a SynapseTimeoutError. Defaults to 60.
request: The original request that was sent to the server that created the job.
Required if the request type is one that requires additional information.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.
Expand All @@ -386,8 +390,15 @@ async def get_job_async(
progressed = False

while time.time() - start_time < timeout:
uri = ASYNC_JOB_URIS[request_type]
if "{entityId}" in uri:
if not request:
raise ValueError("Attempting to get job with missing request.")
if "entityId" not in request:
raise ValueError(f"Attempting to get job with missing id in uri: {uri}")
uri = uri.format(entityId=request["entityId"])
result = await client.rest_get_async(
uri=f"{ASYNC_JOB_URIS[request_type]}/get/{job_id}",
uri=f"{uri}/get/{job_id}",
endpoint=endpoint,
)
job_status = AsynchronousJobStatus().fill_from_dict(async_job_status=result)
Expand Down
50 changes: 47 additions & 3 deletions synapseclient/models/mixins/table_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ class QueryResultBundle:
last_updated_on: Optional[str] = None
"""The date-time when this table/view was last updated. Note: Since views are
eventually consistent a view might still be out-of-date even if it was recently
updated. Use mask = 0x80 to include in the bundle."""
updated. Use mask = 0x80 to include in the bundle. This is returned in the
ISO8601 format like `2000-01-01T00:00:00.000Z`."""


@async_to_sync
Expand Down Expand Up @@ -1155,7 +1156,12 @@ async def main():
return QueryResultBundle(
result=as_df,
count=results.count,
sum_file_sizes=results.sumFileSizes,
sum_file_sizes=SumFileSizes(
sum_file_size_bytes=results.sumFileSizes.get("sumFileSizesBytes", None),
greater_than=results.sumFileSizes.get("greaterThan", None),
)
if results.sumFileSizes
else None,
last_updated_on=results.lastUpdatedOn,
)

Expand Down Expand Up @@ -2375,6 +2381,45 @@ async def store_rows_async(
data may be chunked into smaller requests.
The following is a Sequence Daigram that describes the process noted in the
limitation above. It shows how the data is chunked into smaller requests when
the data exceeds the limit of 1GB, and how the data is written to a temporary
file that is cleaned up after upload.
```mermaid
sequenceDiagram
participant User
participant Table
participant CSVHandler
participant FileSystem
participant Synapse
User->>Table: store_rows(values)
Table->>CSVHandler: Convert values to CSV (if Dict or DataFrame)
CSVHandler->>Table: Return CSV path
alt CSV size > 1GB
loop Split and Upload CSV
Table->>CSVHandler: Split CSV into smaller chunks
CSVHandler->>FileSystem: Write chunk to local file
FileSystem->>Table: Return file path
Table->>Synapse: Upload CSV chunk using file path
Synapse-->>Table: Return `file_handle_id`
Table->>FileSystem: Truncate file for next chunk
Table->>Synapse: Send 'TableUpdateTransaction' to append/update rows
Synapse-->>Table: Transaction result
end
Table->>FileSystem: Delete temporary file
else
Table->>Synapse: Upload CSV without splitting
Synapse-->>Table: Return `file_handle_id`
Table->>Synapse: Send `TableUpdateTransaction' to append/update rows
Synapse-->>Table: Transaction result
end
Table-->>User: Upload complete
```
Arguments:
values: Supports storing data from the following sources:
Expand Down Expand Up @@ -2822,7 +2867,6 @@ async def _send_update(
entity_id=self.id, changes=all_changes
).send_job_and_wait_async(synapse_client=client, timeout=job_timeout)

# TODO: Add integration test around this portion of the code
file_size = os.path.getsize(path_to_csv)
if file_size > insert_size_byte:
# Apply schema changes before breaking apart and uploading the file
Expand Down
Loading

0 comments on commit 305f8dc

Please sign in to comment.