From 997b0368cd20e094c706763e1448be94755de6ff Mon Sep 17 00:00:00 2001 From: BryanFauble <17128019+BryanFauble@users.noreply.github.com> Date: Thu, 13 Feb 2025 14:54:44 -0700 Subject: [PATCH] Move over to asyncCommunicator mixin and execute partialUpdate of rows in same transaction as row inserts --- .../models/mixins/asynchronous_job.py | 24 +++- synapseclient/models/table.py | 129 +++++++++++------- .../models/async/test_table_async.py | 23 ++-- 3 files changed, 106 insertions(+), 70 deletions(-) diff --git a/synapseclient/models/mixins/asynchronous_job.py b/synapseclient/models/mixins/asynchronous_job.py index aac481663..03ce79666 100644 --- a/synapseclient/models/mixins/asynchronous_job.py +++ b/synapseclient/models/mixins/asynchronous_job.py @@ -3,14 +3,18 @@ import time from dataclasses import dataclass from enum import Enum -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Self from synapseclient import Synapse -from synapseclient.core.constants.concrete_types import AGENT_CHAT_REQUEST +from synapseclient.core.constants.concrete_types import ( + AGENT_CHAT_REQUEST, + TABLE_UPDATE_TRANSACTION_REQUEST, +) from synapseclient.core.exceptions import SynapseError, SynapseTimeoutError ASYNC_JOB_URIS = { AGENT_CHAT_REQUEST: "/agent/chat/async", + TABLE_UPDATE_TRANSACTION_REQUEST: "/entity/{entityId}/table/transaction/async", } @@ -21,9 +25,7 @@ def to_synapse_request(self) -> None: """Converts the request to a request expected of the Synapse REST API.""" raise NotImplementedError("to_synapse_request must be implemented.") - def fill_from_dict( - self, synapse_response: Dict[str, str] - ) -> "AsynchronousCommunicator": + def fill_from_dict(self, synapse_response: Dict[str, str]) -> "Self": """ Converts a response from the REST API into this dataclass. @@ -51,7 +53,7 @@ async def send_job_and_wait_async( post_exchange_args: Optional[Dict[str, Any]] = None, *, synapse_client: Optional[Synapse] = None, - ) -> "AsynchronousCommunicator": + ) -> "Self": """Send the job to the Asynchronous Job service and wait for it to complete. Intended to be called by a class inheriting from this mixin to start a job in the Synapse API and wait for it to complete. The inheriting class needs to @@ -101,6 +103,8 @@ async def send_job_and_wait_async( synapse_client=synapse_client, ) self.fill_from_dict(synapse_response=result) + if not post_exchange_args: + post_exchange_args = {} await self._post_exchange_async( **post_exchange_args, synapse_client=synapse_client ) @@ -320,8 +324,14 @@ async def send_job_async( raise ValueError(f"Unsupported request type: {request_type}") client = Synapse.get_client(synapse_client=synapse_client) + uri = ASYNC_JOB_URIS[request_type] + if "{entityId}" in uri: + if "entityId" not in request: + raise ValueError(f"Attempting to send job with missing id in uri: {uri}") + uri = uri.format(entityId=request["entityId"]) + response = await client.rest_post_async( - uri=f"{ASYNC_JOB_URIS[request_type]}/start", body=json.dumps(request) + uri=f"{uri}/start", body=json.dumps(request) ) return response["token"] diff --git a/synapseclient/models/table.py b/synapseclient/models/table.py index 4d30331d3..f333a1078 100644 --- a/synapseclient/models/table.py +++ b/synapseclient/models/table.py @@ -11,7 +11,7 @@ from datetime import date, datetime from enum import Enum from io import BytesIO -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Self, Union import pandas as pd from pandas.api.types import infer_dtype @@ -35,6 +35,7 @@ merge_dataclass_entities, ) from synapseclient.models import Activity, Annotations +from synapseclient.models.mixins import AsynchronousCommunicator from synapseclient.models.mixins.access_control import AccessControllable from synapseclient.models.protocols.table_protocol import ( ColumnSynchronousProtocol, @@ -1159,7 +1160,7 @@ async def main(): results = await Table.query_async( query=select_statement, synapse_client=synapse_client ) - partial_changes_objects_to_rename: List[PartialRow] = [] + rows_to_update_df: List[PartialRow] = [] indexs_of_original_df_with_changes = [] for _, row in results.iterrows(): row_id = row["ROW_ID"] @@ -1207,7 +1208,7 @@ async def main(): for partial_change_key, partial_change_value in partial_change_values.items() ], ) - partial_changes_objects_to_rename.append(partial_chage) + rows_to_update_df.append(partial_chage) indexs_of_original_df_with_changes.append(matching_row.index[0]) rows_to_insert_df = values.loc[ @@ -1216,13 +1217,13 @@ async def main(): client = Synapse.get_client(synapse_client=synapse_client) client.logger.info( - f"[{self.id}:{self.name}]: Found {len(partial_changes_objects_to_rename)}" + f"[{self.id}:{self.name}]: Found {len(rows_to_update_df)}" f" rows to update and {len(rows_to_insert_df)} rows to insert" ) if client.logger.isEnabledFor(logging.DEBUG): client.logger.debug( - f"[{self.id}:{self.name}]: Rows to update: {partial_changes_objects_to_rename}" + f"[{self.id}:{self.name}]: Rows to update: {rows_to_update_df}" ) client.logger.debug( f"[{self.id}:{self.name}]: Rows to insert: {rows_to_insert_df}" @@ -1231,27 +1232,28 @@ async def main(): if dry_run: return - if partial_changes_objects_to_rename: - partial_row_set = PartialRowSet( - table_id=self.id, rows=partial_changes_objects_to_rename - ) + appendable_rowset_request = None + if rows_to_update_df: + partial_row_set = PartialRowSet(table_id=self.id, rows=rows_to_update_df) appendable_rowset_request = AppendableRowSetRequest( entity_id=self.id, to_append=partial_row_set ) - # TODO: Convert this over to use the `AsynchronousCommunicator` mixin when available (https://github.com/Sage-Bionetworks/synapsePythonClient/pull/1152) - # TODO: Look into making a change here that allows the `TableUpdateTransactionRequest` to also execute any inserts to the table within the same transaction. Currently data will be upserted first, and then inserted. - uri = f"/entity/{self.id}/table/transaction/async" - transaction_request = TableUpdateTransactionRequest( - entity_id=self.id, changes=[appendable_rowset_request] - ) - client._waitForAsync( - uri=uri, request=transaction_request.to_synapse_request() - ) if not rows_to_insert_df.empty: await self.store_rows_async( - values=rows_to_insert_df, synapse_client=synapse_client + values=rows_to_insert_df, + additional_changes=[appendable_rowset_request] + if appendable_rowset_request + else None, + synapse_client=synapse_client, ) + else: + await TableUpdateTransaction( + entity_id=self.id, + changes=[appendable_rowset_request] + if appendable_rowset_request + else None, + ).send_job_and_wait_async(synapse_client=client) async def store_rows_async( self, @@ -1259,6 +1261,13 @@ async def store_rows_async( schema_storage_strategy: SchemaStorageStrategy = None, column_expansion_strategy: ColumnExpansionStrategy = None, dry_run: bool = False, + additional_changes: List[ + Union[ + "TableSchemaChangeRequest", + "UploadToTableRequest", + "AppendableRowSetRequest", + ] + ] = None, *, synapse_client: Optional[Synapse] = None, **kwargs, @@ -1343,6 +1352,12 @@ async def store_rows_async( updating table metadata. This is useful for debugging and understanding what actions would be taken without actually performing them. + additional_changes: Additional changes to the table that should execute + within the same transaction as appending or updating rows. This is used + as a part of the `upsert_rows` method call to allow for the updating of + rows and the updating of the table schema in the same transaction. In + most cases you will not need to use this argument. + synapse_client: If not passed in and caching was not disabled by `Synapse.allow_client_caching(False)` this will use the last created instance from the Synapse class constructor. @@ -1538,6 +1553,8 @@ async def main(): f"[{self.id}:{self.name}]: Dry run enabled. No changes will be made." ) + schema_change_request = None + if schema_storage_strategy == SchemaStorageStrategy.INFER_FROM_DATA: infered_columns = infer_column_type_from_data(values=values) @@ -1562,9 +1579,9 @@ async def main(): ): column_instance.maximum_size = infered_column.maximum_size - schema_change_request = await self._generate_schema_change_request( - dry_run=dry_run, synapse_client=synapse_client - ) + schema_change_request = await self._generate_schema_change_request( + dry_run=dry_run, synapse_client=synapse_client + ) if dry_run: return @@ -1582,41 +1599,39 @@ async def main(): upload_request = UploadToTableRequest( table_id=self.id, upload_file_handle_id=file_handle_id, update_etag=None ) - uri = f"/entity/{self.id}/table/transaction/async" changes = [] if schema_change_request: changes.append(schema_change_request) + if additional_changes: + changes.extend(additional_changes) changes.append(upload_request) - # TODO: Convert this over to use the `AsynchronousCommunicator` mixin when available (https://github.com/Sage-Bionetworks/synapsePythonClient/pull/1152) - transaction_request = TableUpdateTransactionRequest( + + await TableUpdateTransaction( entity_id=self.id, changes=changes - ) - client._waitForAsync( - uri=uri, request=transaction_request.to_synapse_request() - ) + ).send_job_and_wait_async(synapse_client=client) elif isinstance(values, pd.DataFrame): - # TODO: Remove file after upload filepath = f"{tempfile.mkdtemp()}/{self.id}_upload_{uuid.uuid4()}.csv" - # TODO: Support everything from `from_data_frame` to_csv call - values.to_csv(filepath, index=False) - file_handle_id = await multipart_upload_file_async( - syn=client, file_path=filepath, content_type="text/csv" - ) + try: + # TODO: Support everything from `from_data_frame` to_csv call + values.to_csv(filepath, index=False) + file_handle_id = await multipart_upload_file_async( + syn=client, file_path=filepath, content_type="text/csv" + ) + finally: + os.remove(filepath) upload_request = UploadToTableRequest( table_id=self.id, upload_file_handle_id=file_handle_id, update_etag=None ) - uri = f"/entity/{self.id}/table/transaction/async" - # TODO: Convert this over to use the `AsynchronousCommunicator` mixin when available (https://github.com/Sage-Bionetworks/synapsePythonClient/pull/1152) changes = [] if schema_change_request: changes.append(schema_change_request) + if additional_changes: + changes.extend(additional_changes) changes.append(upload_request) - transaction_request = TableUpdateTransactionRequest( + + await TableUpdateTransaction( entity_id=self.id, changes=changes - ) - client._waitForAsync( - uri=uri, request=transaction_request.to_synapse_request() - ) + ).send_job_and_wait_async(synapse_client=client) else: raise ValueError( "Don't know how to make tables from values of type %s." % type(values) @@ -1914,14 +1929,9 @@ async def store_async( return self if schema_change_request: - uri = f"/entity/{self.id}/table/transaction/async" - transaction_request = TableUpdateTransactionRequest( + await TableUpdateTransaction( entity_id=self.id, changes=[schema_change_request] - ) - # TODO: Convert this over to use the `AsynchronousCommunicator` mixin when available (https://github.com/Sage-Bionetworks/synapsePythonClient/pull/1152) - client._waitForAsync( - uri=uri, request=transaction_request.to_synapse_request() - ) + ).send_job_and_wait_async(synapse_client=client) # Replace the columns after a schema change in case any column names were updated updated_columns = OrderedDict() @@ -2862,14 +2872,20 @@ def to_synapse_request(self): @dataclass -class TableUpdateTransactionRequest: +class TableUpdateTransaction(AsynchronousCommunicator): """ A request to update a table. This is used to update a table with a set of changes. + + After calling the `send_job_and_wait_async` method the `results` attribute will be + filled in based off . """ entity_id: str changes: List[Union[TableSchemaChangeRequest, UploadToTableRequest]] concrete_type: str = concrete_types.TABLE_UPDATE_TRANSACTION_REQUEST + results: Optional[List[Dict[str, Any]]] = None + """This will be an array of + .""" def to_synapse_request(self): """Converts the request to a request expected of the Synapse REST API.""" @@ -2879,6 +2895,19 @@ def to_synapse_request(self): "changes": [change.to_synapse_request() for change in self.changes], } + def fill_from_dict(self, synapse_response: Dict[str, str]) -> "Self": + """ + Converts a response from the REST API into this dataclass. + + Arguments: + synapse_response: The response from the REST API that matches + + Returns: + An instance of this class. + """ + self.results = synapse_response.get("results", None) + return self + def infer_column_type_from_data(values: pd.DataFrame) -> List[Column]: """ diff --git a/tests/integration/synapseclient/models/async/test_table_async.py b/tests/integration/synapseclient/models/async/test_table_async.py index 036c3bdbc..83c8a4804 100644 --- a/tests/integration/synapseclient/models/async/test_table_async.py +++ b/tests/integration/synapseclient/models/async/test_table_async.py @@ -7,6 +7,7 @@ import pytest from pytest_mock import MockerFixture +import synapseclient.models.mixins.asynchronous_job as asynchronous_job_module import synapseclient.models.table as table_module from synapseclient import Evaluation, Synapse from synapseclient.core import utils @@ -431,7 +432,7 @@ async def test_store_rows_from_csv_no_columns(self, project_model: Project) -> N # THEN the table data should fail to be inserted assert ( - "400 Client Error: \nThe first line is expected to be a header but the values do not match the names of of the columns of the table (column_string is not a valid column name or id). Header row: column_string" + "400 Client Error: The first line is expected to be a header but the values do not match the names of of the columns of the table (column_string is not a valid column name or id). Header row: column_string" in str(e.value) ) @@ -500,7 +501,7 @@ async def test_store_rows_on_existing_table_with_schema_storage_strategy( # AND the table exists in Synapse table = await table.store_async(synapse_client=self.syn) self.schedule_for_cleanup(table.id) - spy_async_update = mocker.spy(self.syn, "_waitForAsync") + spy_send_job = mocker.spy(asynchronous_job_module, "send_job_async") # AND data for a column stored to CSV data_for_table = pd.DataFrame( @@ -522,9 +523,9 @@ async def test_store_rows_on_existing_table_with_schema_storage_strategy( spy_csv_file_conversion.assert_called_once() # AND the schema should not have been updated - assert len(spy_async_update.call_args.kwargs["request"]["changes"]) == 1 + assert len(spy_send_job.call_args.kwargs["request"]["changes"]) == 1 assert ( - spy_async_update.call_args.kwargs["request"]["changes"][0]["concreteType"] + spy_send_job.call_args.kwargs["request"]["changes"][0]["concreteType"] == concrete_types.UPLOAD_TO_TABLE_REQUEST ) @@ -555,7 +556,7 @@ async def test_store_rows_on_existing_table_adding_column( # AND the table exists in Synapse table = await table.store_async(synapse_client=self.syn) self.schedule_for_cleanup(table.id) - spy_async_update = mocker.spy(self.syn, "_waitForAsync") + spy_send_job = mocker.spy(asynchronous_job_module, "send_job_async") # AND data for a column stored to CSV data_for_table = pd.DataFrame( @@ -575,13 +576,13 @@ async def test_store_rows_on_existing_table_adding_column( spy_csv_file_conversion.assert_called_once() # AND the schema should not have been updated - assert len(spy_async_update.call_args.kwargs["request"]["changes"]) == 2 + assert len(spy_send_job.call_args.kwargs["request"]["changes"]) == 2 assert ( - spy_async_update.call_args.kwargs["request"]["changes"][0]["concreteType"] + spy_send_job.call_args.kwargs["request"]["changes"][0]["concreteType"] == concrete_types.TABLE_SCHEMA_CHANGE_REQUEST ) assert ( - spy_async_update.call_args.kwargs["request"]["changes"][1]["concreteType"] + spy_send_job.call_args.kwargs["request"]["changes"][1]["concreteType"] == concrete_types.UPLOAD_TO_TABLE_REQUEST ) @@ -628,7 +629,7 @@ async def test_store_rows_on_existing_table_no_schema_storage_strategy( # THEN the table data should fail to be inserted assert ( - "400 Client Error: \nThe first line is expected to be a header but the values do not match the names of of the columns of the table (column_key_2 is not a valid column name or id). Header row: column_string,column_key_2" + "400 Client Error: The first line is expected to be a header but the values do not match the names of of the columns of the table (column_key_2 is not a valid column name or id). Header row: column_string,column_key_2" in str(e.value) ) @@ -949,8 +950,6 @@ async def test_upsert_all_data_types_single_key( ) table = await table.store_async(synapse_client=self.syn) self.schedule_for_cleanup(table.id) - # TODO: Add a spy for the partial changeset to verify number of changes - spy_async_update = mocker.spy(self.syn, "_waitForAsync") # AND A bogus file path = utils.make_bogus_data_file() @@ -1260,8 +1259,6 @@ async def test_upsert_all_data_types_multi_key( ) table = await table.store_async(synapse_client=self.syn) self.schedule_for_cleanup(table.id) - # TODO: Add a spy for the partial changeset to verify number of changes - spy_async_update = mocker.spy(self.syn, "_waitForAsync") # AND A bogus file path = utils.make_bogus_data_file()