Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYNPY-1551] Batch upsert and inserts #1167

Open
wants to merge 38 commits into
base: synpy-1551-tables-refactor
Choose a base branch
from

Conversation

BryanFauble
Copy link
Contributor

@BryanFauble BryanFauble commented Feb 18, 2025

Problem:

  1. When inserting, or updating a large number of rows we are hitting the limits imposed by the Synapse Rest API as defined in this document.
  2. The error the current Table API runs into is The provided CSV file exceeds the maximum size of 1 GB. when inserting a large number of rows. It forces the users to batch up the CSV themselves.

Solution:

  1. Batch up the request when querying for data during the table upsert process
  2. Batch up the requests to modify rows or insert new rows via CSV to Synapse during the store_rows function

Testing:

  • Verifying that upserting a large number of rows works (Including a large number of updates, and inserts in the same transaction)
  • Verify that updating a large number of rows works after querying for a large number of rows works
  • Verify that inserting a large number of rows (> 1GB CSV file) works

Showing what the console looks like during the upload of a large CSV that is split into multiple chunks:
image

Comment on lines 2092 to 2106
chunked_rows_to_update = []
chunk_size_limit = 1.5 * 1024 * 1024 # 1.5MB
current_chunk_size = 0
chunk = []
for row in rows_to_update:
row_size = sys.getsizeof(row.to_synapse_request())
if current_chunk_size + row_size > chunk_size_limit:
chunked_rows_to_update.append(chunk)
chunk = []
current_chunk_size = 0
chunk.append(row)
current_chunk_size += row_size

if chunk:
chunked_rows_to_update.append(chunk)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is some of the largest changes here. This interface that we use to submit a PartialRow change is what limits requests to 2MB or less:

https://rest-docs.synapse.org/rest/POST/entity/id/table/transaction/async/start.html

In this case I am using 1.5MB to allow for some overhead room.

Comment on lines +1019 to +1024
# Used for backwards compatability if anyone happens to be using this function
if logger:
logger.info(text)
else:
sys.stdout.write(text)
sys.stdout.flush()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make sure that the tqdm bar is playing nicely with these messages that get printed out for table operations. Internal to the "new" interface, we are still using some of the query functionality from the "old" client. Example of the console:
image

(It was much worse before :D )

Comment on lines -373 to +381
start_time = asyncio.get_event_loop().time()
start_time = time.time()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of time was causing the message to appear oddly. Moving to time.time, seems to have resolved it, as it seems to wait the expected time.

image

Comment on lines +688 to +697
with logging_redirect_tqdm(loggers=[syn.logger]):
return await _multipart_upload_async(
syn,
dest_file_name,
upload_request,
part_fn,
md5_fn_util,
force_restart=force_restart,
storage_str=storage_str,
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was noticing some weirdness when I had multiple progress bar running (One for the chunked upload of row data into a Table, and one for the actual upload of the file itself).

This doesn't totally fix it, but the behavior seems better with it like this.

Comment on lines +2764 to +2779
async def _chunk_and_upload_csv(
self,
path_to_csv: str,
insert_size_byte: int,
csv_table_descriptor: CsvTableDescriptor,
schema_change_request: TableSchemaChangeRequest,
client: Synapse,
job_timeout: int,
additional_changes: List[
Union[
"TableSchemaChangeRequest",
"UploadToTableRequest",
"AppendableRowSetRequest",
]
] = None,
) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a super big fan of the code I wrote for this. I had looked at some alternative libraries but was hesitant to add another dependency here, especially when thinking about performance concerns that are outlined in this comparison for splitting CSV files.

From their investigation, the Python filesystem API was much quicker (Likely since under the hood it uses C). I didn't do a new analysis based on the code I wrote here as I've chalked it up in a "Good enough" category.

upload_request = UploadToTableRequest(
table_id=self.id, upload_file_handle_id=file_handle_id, update_etag=None
header = next(reader)
temp_dir = client.cache.get_cache_dir(file_handle_id=111111111)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dislike having to do this, but until we can slice the CSV in memory, and upload that - We need to write the CSV to disk since that is how the underlying multipart_upload_file_async function works. I chose to write to the synapse cache directory for 2 reasons:

  1. I tried to use /tmp directory but ran into issues with EC2 instances/the service catalog. The /tmp directory is only around 100MB, and breaks this script since we're chunking the file into 1GB chunks.
  2. When the .synapseCache directory is purged it would clean up any files that happen to be leftover here.

Ideally, we will be able to create a bytesio object and pass that into multipart_upload_file_async, which would be able to read small portions of the CSV file and upload them.

Any thoughts on other alternatives to consider?

@BryanFauble BryanFauble requested a review from BWMac February 21, 2025 22:29
@BryanFauble BryanFauble marked this pull request as ready for review February 21, 2025 22:29
@BryanFauble BryanFauble requested a review from a team as a code owner February 21, 2025 22:29
@BryanFauble BryanFauble mentioned this pull request Feb 21, 2025
12 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant