-
Notifications
You must be signed in to change notification settings - Fork 69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SYNPY-1551] Batch upsert and inserts #1167
base: synpy-1551-tables-refactor
Are you sure you want to change the base?
Conversation
chunked_rows_to_update = [] | ||
chunk_size_limit = 1.5 * 1024 * 1024 # 1.5MB | ||
current_chunk_size = 0 | ||
chunk = [] | ||
for row in rows_to_update: | ||
row_size = sys.getsizeof(row.to_synapse_request()) | ||
if current_chunk_size + row_size > chunk_size_limit: | ||
chunked_rows_to_update.append(chunk) | ||
chunk = [] | ||
current_chunk_size = 0 | ||
chunk.append(row) | ||
current_chunk_size += row_size | ||
|
||
if chunk: | ||
chunked_rows_to_update.append(chunk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is some of the largest changes here. This interface that we use to submit a PartialRow change is what limits requests to 2MB or less:
https://rest-docs.synapse.org/rest/POST/entity/id/table/transaction/async/start.html
In this case I am using 1.5MB to allow for some overhead room.
# Used for backwards compatability if anyone happens to be using this function | ||
if logger: | ||
logger.info(text) | ||
else: | ||
sys.stdout.write(text) | ||
sys.stdout.flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…total memory usage
…n entire 1GB chunk into memory
start_time = asyncio.get_event_loop().time() | ||
start_time = time.time() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with logging_redirect_tqdm(loggers=[syn.logger]): | ||
return await _multipart_upload_async( | ||
syn, | ||
dest_file_name, | ||
upload_request, | ||
part_fn, | ||
md5_fn_util, | ||
force_restart=force_restart, | ||
storage_str=storage_str, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was noticing some weirdness when I had multiple progress bar running (One for the chunked upload of row data into a Table, and one for the actual upload of the file itself).
This doesn't totally fix it, but the behavior seems better with it like this.
async def _chunk_and_upload_csv( | ||
self, | ||
path_to_csv: str, | ||
insert_size_byte: int, | ||
csv_table_descriptor: CsvTableDescriptor, | ||
schema_change_request: TableSchemaChangeRequest, | ||
client: Synapse, | ||
job_timeout: int, | ||
additional_changes: List[ | ||
Union[ | ||
"TableSchemaChangeRequest", | ||
"UploadToTableRequest", | ||
"AppendableRowSetRequest", | ||
] | ||
] = None, | ||
) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a super big fan of the code I wrote for this. I had looked at some alternative libraries but was hesitant to add another dependency here, especially when thinking about performance concerns that are outlined in this comparison for splitting CSV files.
From their investigation, the Python filesystem API was much quicker (Likely since under the hood it uses C). I didn't do a new analysis based on the code I wrote here as I've chalked it up in a "Good enough" category.
upload_request = UploadToTableRequest( | ||
table_id=self.id, upload_file_handle_id=file_handle_id, update_etag=None | ||
header = next(reader) | ||
temp_dir = client.cache.get_cache_dir(file_handle_id=111111111) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dislike having to do this, but until we can slice the CSV in memory, and upload that - We need to write the CSV to disk since that is how the underlying multipart_upload_file_async
function works. I chose to write to the synapse cache directory for 2 reasons:
- I tried to use
/tmp
directory but ran into issues with EC2 instances/the service catalog. The/tmp
directory is only around 100MB, and breaks this script since we're chunking the file into 1GB chunks. - When the
.synapseCache
directory is purged it would clean up any files that happen to be leftover here.
Ideally, we will be able to create a bytesio object and pass that into multipart_upload_file_async
, which would be able to read small portions of the CSV file and upload them.
Any thoughts on other alternatives to consider?
…atching the POC script to work again
Problem:
The provided CSV file exceeds the maximum size of 1 GB.
when inserting a large number of rows. It forces the users to batch up the CSV themselves.Solution:
store_rows
functionTesting:
Showing what the console looks like during the upload of a large CSV that is split into multiple chunks:
