Skip to content

Commit

Permalink
Move over to asyncCommunicator mixin and execute partialUpdate of row…
Browse files Browse the repository at this point in the history
…s in same transaction as row inserts
  • Loading branch information
BryanFauble committed Feb 13, 2025
1 parent ab8c194 commit 997b036
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 70 deletions.
24 changes: 17 additions & 7 deletions synapseclient/models/mixins/asynchronous_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Self

from synapseclient import Synapse
from synapseclient.core.constants.concrete_types import AGENT_CHAT_REQUEST
from synapseclient.core.constants.concrete_types import (
AGENT_CHAT_REQUEST,
TABLE_UPDATE_TRANSACTION_REQUEST,
)
from synapseclient.core.exceptions import SynapseError, SynapseTimeoutError

ASYNC_JOB_URIS = {
AGENT_CHAT_REQUEST: "/agent/chat/async",
TABLE_UPDATE_TRANSACTION_REQUEST: "/entity/{entityId}/table/transaction/async",
}


Expand All @@ -21,9 +25,7 @@ def to_synapse_request(self) -> None:
"""Converts the request to a request expected of the Synapse REST API."""
raise NotImplementedError("to_synapse_request must be implemented.")

def fill_from_dict(
self, synapse_response: Dict[str, str]
) -> "AsynchronousCommunicator":
def fill_from_dict(self, synapse_response: Dict[str, str]) -> "Self":
"""
Converts a response from the REST API into this dataclass.
Expand Down Expand Up @@ -51,7 +53,7 @@ async def send_job_and_wait_async(
post_exchange_args: Optional[Dict[str, Any]] = None,
*,
synapse_client: Optional[Synapse] = None,
) -> "AsynchronousCommunicator":
) -> "Self":
"""Send the job to the Asynchronous Job service and wait for it to complete.
Intended to be called by a class inheriting from this mixin to start a job
in the Synapse API and wait for it to complete. The inheriting class needs to
Expand Down Expand Up @@ -101,6 +103,8 @@ async def send_job_and_wait_async(
synapse_client=synapse_client,
)
self.fill_from_dict(synapse_response=result)
if not post_exchange_args:
post_exchange_args = {}
await self._post_exchange_async(
**post_exchange_args, synapse_client=synapse_client
)
Expand Down Expand Up @@ -320,8 +324,14 @@ async def send_job_async(
raise ValueError(f"Unsupported request type: {request_type}")

client = Synapse.get_client(synapse_client=synapse_client)
uri = ASYNC_JOB_URIS[request_type]
if "{entityId}" in uri:
if "entityId" not in request:
raise ValueError(f"Attempting to send job with missing id in uri: {uri}")
uri = uri.format(entityId=request["entityId"])

response = await client.rest_post_async(
uri=f"{ASYNC_JOB_URIS[request_type]}/start", body=json.dumps(request)
uri=f"{uri}/start", body=json.dumps(request)
)
return response["token"]

Expand Down
129 changes: 79 additions & 50 deletions synapseclient/models/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datetime import date, datetime
from enum import Enum
from io import BytesIO
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Self, Union

import pandas as pd
from pandas.api.types import infer_dtype
Expand All @@ -35,6 +35,7 @@
merge_dataclass_entities,
)
from synapseclient.models import Activity, Annotations
from synapseclient.models.mixins import AsynchronousCommunicator
from synapseclient.models.mixins.access_control import AccessControllable
from synapseclient.models.protocols.table_protocol import (
ColumnSynchronousProtocol,
Expand Down Expand Up @@ -1159,7 +1160,7 @@ async def main():
results = await Table.query_async(
query=select_statement, synapse_client=synapse_client
)
partial_changes_objects_to_rename: List[PartialRow] = []
rows_to_update_df: List[PartialRow] = []
indexs_of_original_df_with_changes = []
for _, row in results.iterrows():
row_id = row["ROW_ID"]
Expand Down Expand Up @@ -1207,7 +1208,7 @@ async def main():
for partial_change_key, partial_change_value in partial_change_values.items()
],
)
partial_changes_objects_to_rename.append(partial_chage)
rows_to_update_df.append(partial_chage)
indexs_of_original_df_with_changes.append(matching_row.index[0])

rows_to_insert_df = values.loc[
Expand All @@ -1216,13 +1217,13 @@ async def main():

client = Synapse.get_client(synapse_client=synapse_client)
client.logger.info(
f"[{self.id}:{self.name}]: Found {len(partial_changes_objects_to_rename)}"
f"[{self.id}:{self.name}]: Found {len(rows_to_update_df)}"
f" rows to update and {len(rows_to_insert_df)} rows to insert"
)

if client.logger.isEnabledFor(logging.DEBUG):
client.logger.debug(
f"[{self.id}:{self.name}]: Rows to update: {partial_changes_objects_to_rename}"
f"[{self.id}:{self.name}]: Rows to update: {rows_to_update_df}"
)
client.logger.debug(
f"[{self.id}:{self.name}]: Rows to insert: {rows_to_insert_df}"
Expand All @@ -1231,34 +1232,42 @@ async def main():
if dry_run:
return

if partial_changes_objects_to_rename:
partial_row_set = PartialRowSet(
table_id=self.id, rows=partial_changes_objects_to_rename
)
appendable_rowset_request = None
if rows_to_update_df:
partial_row_set = PartialRowSet(table_id=self.id, rows=rows_to_update_df)
appendable_rowset_request = AppendableRowSetRequest(
entity_id=self.id, to_append=partial_row_set
)
# TODO: Convert this over to use the `AsynchronousCommunicator` mixin when available (https://github.com/Sage-Bionetworks/synapsePythonClient/pull/1152)
# TODO: Look into making a change here that allows the `TableUpdateTransactionRequest` to also execute any inserts to the table within the same transaction. Currently data will be upserted first, and then inserted.
uri = f"/entity/{self.id}/table/transaction/async"
transaction_request = TableUpdateTransactionRequest(
entity_id=self.id, changes=[appendable_rowset_request]
)
client._waitForAsync(
uri=uri, request=transaction_request.to_synapse_request()
)

if not rows_to_insert_df.empty:
await self.store_rows_async(
values=rows_to_insert_df, synapse_client=synapse_client
values=rows_to_insert_df,
additional_changes=[appendable_rowset_request]
if appendable_rowset_request
else None,
synapse_client=synapse_client,
)
else:
await TableUpdateTransaction(
entity_id=self.id,
changes=[appendable_rowset_request]
if appendable_rowset_request
else None,
).send_job_and_wait_async(synapse_client=client)

async def store_rows_async(
self,
values: Union[str, Dict[str, Any], pd.DataFrame],
schema_storage_strategy: SchemaStorageStrategy = None,
column_expansion_strategy: ColumnExpansionStrategy = None,
dry_run: bool = False,
additional_changes: List[
Union[
"TableSchemaChangeRequest",
"UploadToTableRequest",
"AppendableRowSetRequest",
]
] = None,
*,
synapse_client: Optional[Synapse] = None,
**kwargs,
Expand Down Expand Up @@ -1343,6 +1352,12 @@ async def store_rows_async(
updating table metadata. This is useful for debugging and understanding
what actions would be taken without actually performing them.
additional_changes: Additional changes to the table that should execute
within the same transaction as appending or updating rows. This is used
as a part of the `upsert_rows` method call to allow for the updating of
rows and the updating of the table schema in the same transaction. In
most cases you will not need to use this argument.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.
Expand Down Expand Up @@ -1538,6 +1553,8 @@ async def main():
f"[{self.id}:{self.name}]: Dry run enabled. No changes will be made."
)

schema_change_request = None

if schema_storage_strategy == SchemaStorageStrategy.INFER_FROM_DATA:
infered_columns = infer_column_type_from_data(values=values)

Expand All @@ -1562,9 +1579,9 @@ async def main():
):
column_instance.maximum_size = infered_column.maximum_size

schema_change_request = await self._generate_schema_change_request(
dry_run=dry_run, synapse_client=synapse_client
)
schema_change_request = await self._generate_schema_change_request(
dry_run=dry_run, synapse_client=synapse_client
)

if dry_run:
return
Expand All @@ -1582,41 +1599,39 @@ async def main():
upload_request = UploadToTableRequest(
table_id=self.id, upload_file_handle_id=file_handle_id, update_etag=None
)
uri = f"/entity/{self.id}/table/transaction/async"
changes = []
if schema_change_request:
changes.append(schema_change_request)
if additional_changes:
changes.extend(additional_changes)
changes.append(upload_request)
# TODO: Convert this over to use the `AsynchronousCommunicator` mixin when available (https://github.com/Sage-Bionetworks/synapsePythonClient/pull/1152)
transaction_request = TableUpdateTransactionRequest(

await TableUpdateTransaction(
entity_id=self.id, changes=changes
)
client._waitForAsync(
uri=uri, request=transaction_request.to_synapse_request()
)
).send_job_and_wait_async(synapse_client=client)
elif isinstance(values, pd.DataFrame):
# TODO: Remove file after upload
filepath = f"{tempfile.mkdtemp()}/{self.id}_upload_{uuid.uuid4()}.csv"
# TODO: Support everything from `from_data_frame` to_csv call
values.to_csv(filepath, index=False)
file_handle_id = await multipart_upload_file_async(
syn=client, file_path=filepath, content_type="text/csv"
)
try:
# TODO: Support everything from `from_data_frame` to_csv call
values.to_csv(filepath, index=False)
file_handle_id = await multipart_upload_file_async(
syn=client, file_path=filepath, content_type="text/csv"
)
finally:
os.remove(filepath)
upload_request = UploadToTableRequest(
table_id=self.id, upload_file_handle_id=file_handle_id, update_etag=None
)
uri = f"/entity/{self.id}/table/transaction/async"
# TODO: Convert this over to use the `AsynchronousCommunicator` mixin when available (https://github.com/Sage-Bionetworks/synapsePythonClient/pull/1152)
changes = []
if schema_change_request:
changes.append(schema_change_request)
if additional_changes:
changes.extend(additional_changes)
changes.append(upload_request)
transaction_request = TableUpdateTransactionRequest(

await TableUpdateTransaction(
entity_id=self.id, changes=changes
)
client._waitForAsync(
uri=uri, request=transaction_request.to_synapse_request()
)
).send_job_and_wait_async(synapse_client=client)
else:
raise ValueError(
"Don't know how to make tables from values of type %s." % type(values)
Expand Down Expand Up @@ -1914,14 +1929,9 @@ async def store_async(
return self

if schema_change_request:
uri = f"/entity/{self.id}/table/transaction/async"
transaction_request = TableUpdateTransactionRequest(
await TableUpdateTransaction(
entity_id=self.id, changes=[schema_change_request]
)
# TODO: Convert this over to use the `AsynchronousCommunicator` mixin when available (https://github.com/Sage-Bionetworks/synapsePythonClient/pull/1152)
client._waitForAsync(
uri=uri, request=transaction_request.to_synapse_request()
)
).send_job_and_wait_async(synapse_client=client)

# Replace the columns after a schema change in case any column names were updated
updated_columns = OrderedDict()
Expand Down Expand Up @@ -2862,14 +2872,20 @@ def to_synapse_request(self):


@dataclass
class TableUpdateTransactionRequest:
class TableUpdateTransaction(AsynchronousCommunicator):
"""
A request to update a table. This is used to update a table with a set of changes.
After calling the `send_job_and_wait_async` method the `results` attribute will be
filled in based off <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/TableUpdateTransactionResponse.html>.
"""

entity_id: str
changes: List[Union[TableSchemaChangeRequest, UploadToTableRequest]]
concrete_type: str = concrete_types.TABLE_UPDATE_TRANSACTION_REQUEST
results: Optional[List[Dict[str, Any]]] = None
"""This will be an array of
<https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/TableUpdateResponse.html>."""

def to_synapse_request(self):
"""Converts the request to a request expected of the Synapse REST API."""
Expand All @@ -2879,6 +2895,19 @@ def to_synapse_request(self):
"changes": [change.to_synapse_request() for change in self.changes],
}

def fill_from_dict(self, synapse_response: Dict[str, str]) -> "Self":
"""
Converts a response from the REST API into this dataclass.
Arguments:
synapse_response: The response from the REST API that matches <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/TableUpdateTransactionResponse.html>
Returns:
An instance of this class.
"""
self.results = synapse_response.get("results", None)
return self


def infer_column_type_from_data(values: pd.DataFrame) -> List[Column]:
"""
Expand Down
Loading

0 comments on commit 997b036

Please sign in to comment.