Skip to content

Commit

Permalink
Modify storage of rows via df to go through csv process
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanFauble committed Feb 19, 2025
1 parent 895296a commit b7b14fd
Showing 1 changed file with 137 additions and 155 deletions.
292 changes: 137 additions & 155 deletions synapseclient/models/mixins/table_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2550,170 +2550,152 @@ async def main():
)

if isinstance(original_values, str):
file_size = os.path.getsize(original_values)
if file_size > insert_size_byte:
applied_additional_changes = False
with open(file=original_values, mode="r", encoding="utf-8") as f:
header = f.readline()
chunk = f.readlines(insert_size_byte)
while chunk:
try:
file_path = None
temp_dir = client.cache.get_cache_dir(
file_handle_id=111111111
)
os.makedirs(temp_dir, exist_ok=True)
with tempfile.NamedTemporaryFile(
delete=False,
prefix="chunked_csv_for_synapse_store_rows",
suffix=".csv",
dir=temp_dir,
) as temp_file:
file_path = temp_file.name
temp_file.write(header)
temp_file.writelines(chunk)
temp_file.close()
# TODO: This portion of the code should be updated to support uploading a file from memory using BytesIO (Ticket to be created)
file_handle_id = await multipart_upload_file_async(
syn=client,
file_path=file_path,
content_type="text/csv",
)
finally:
os.remove(file_path)

upload_request = UploadToTableRequest(
table_id=self.id,
upload_file_handle_id=file_handle_id,
update_etag=None,
)
if csv_table_descriptor:
upload_request.csv_table_descriptor = csv_table_descriptor
changes = []
if not applied_additional_changes:
applied_additional_changes = True
if schema_change_request:
changes.append(schema_change_request)
if additional_changes:
changes.extend(additional_changes)
changes.append(upload_request)

await TableUpdateTransaction(
entity_id=self.id, changes=changes
).send_job_and_wait_async(synapse_client=client)
await self._chunk_and_upload_csv(
path_to_csv=original_values,
insert_size_byte=insert_size_byte,
csv_table_descriptor=csv_table_descriptor,
schema_change_request=schema_change_request,
client=client,
additional_changes=additional_changes,
)
elif isinstance(values, DataFrame):
# When creating this temporary file we are using the cache directory
# as the staging location for the file upload. This is because it
# will allow for the purge cache function to clean up files that
# end up getting left here. It is also to account for the fact that
# the temp directory may not have enough disk space to hold a file
# we need to upload (As is the case on EC2 instances)
temp_dir = client.cache.get_cache_dir(file_handle_id=111111111)
os.makedirs(temp_dir, exist_ok=True)
with tempfile.NamedTemporaryFile(
delete=False,
prefix="chunked_csv_for_synapse_store_rows",
suffix=".csv",
dir=temp_dir,
) as temp_file:
try:
# TODO: This portion of the code should be updated to support uploading a file from memory using BytesIO (Ticket to be created)
# TODO: The way that the dataframe is split up is also needed. As of right now we are writing the CSV, and then letting the CSV
# TODO: upload process take over. However, if we are uploading from memory, then we don't have the ability to write to a file, and then
# TODO: upload that file. We need to be able to split the dataframe into chunks, and then upload those chunks to Synapse.
values.to_csv(
temp_file.name,
index=False,
float_format="%.12g",
**(to_csv_kwargs or {}),
)
# NOTE: reason for flat_format='%.12g':
# pandas automatically converts int columns into float64 columns when some cells in the column have no
# value. If we write the whole number back as a decimal (e.g. '3.0'), Synapse complains that we are writing
# a float into a INTEGER(synapse table type) column. Using the 'g' will strip off '.0' from whole number
# values. pandas by default (with no float_format parameter) seems to keep 12 values after decimal, so we
# use '%.12g'.c
# see SYNPY-267.

await self._chunk_and_upload_csv(
path_to_csv=temp_file.name,
insert_size_byte=insert_size_byte,
csv_table_descriptor=csv_table_descriptor,
schema_change_request=schema_change_request,
client=client,
additional_changes=additional_changes,
)
finally:
os.remove(temp_file.name)

chunk = f.readlines(insert_size_byte)
else:
raise ValueError(
"Don't know how to make tables from values of type %s." % type(values)
)

else:
file_handle_id = await multipart_upload_file_async(
syn=client, file_path=original_values, content_type="text/csv"
)
upload_request = UploadToTableRequest(
table_id=self.id,
upload_file_handle_id=file_handle_id,
update_etag=None,
)
if csv_table_descriptor:
upload_request.csv_table_descriptor = csv_table_descriptor
changes = []
if schema_change_request:
changes.append(schema_change_request)
if additional_changes:
changes.extend(additional_changes)
changes.append(upload_request)

await TableUpdateTransaction(
entity_id=self.id, changes=changes
).send_job_and_wait_async(synapse_client=client)
elif isinstance(values, DataFrame):
async def _chunk_and_upload_csv(
self,
path_to_csv: str,
insert_size_byte: int,
csv_table_descriptor: CsvTableDescriptor,
schema_change_request: TableSchemaChangeRequest,
client: Synapse,
additional_changes: List[
Union[
"TableSchemaChangeRequest",
"UploadToTableRequest",
"AppendableRowSetRequest",
]
] = None,
) -> None:
file_size = os.path.getsize(path_to_csv)
if file_size > insert_size_byte:
applied_additional_changes = False

async def _upload_df_chunk(
chunk: DataFrame, apply_additional_changes: bool
) -> None:
# When creating this temporary file we are using the cache directory
# as the staging location for the file upload. This is because it
# will allow for the purge cache function to clean up files that
# end up getting left here. It is also to account for the fact that
# the temp directory may not have enough disk space to hold a file
# we need to upload (As is the case on EC2 instances)
with open(file=path_to_csv, mode="r", encoding="utf-8") as f:
header = f.readline()
chunk = f.readlines(insert_size_byte)
file_path = None
temp_dir = client.cache.get_cache_dir(file_handle_id=111111111)
os.makedirs(temp_dir, exist_ok=True)
with tempfile.NamedTemporaryFile(
delete=False,
prefix="chunked_csv_for_synapse_store_rows",
suffix=".csv",
dir=temp_dir,
) as temp_file:
try:
chunk.to_csv(
temp_file.name,
index=False,
float_format="%.12g",
**(to_csv_kwargs or {}),
)
# NOTE: reason for flat_format='%.12g':
# pandas automatically converts int columns into float64 columns when some cells in the column have no
# value. If we write the whole number back as a decimal (e.g. '3.0'), Synapse complains that we are writing
# a float into a INTEGER(synapse table type) column. Using the 'g' will strip off '.0' from whole number
# values. pandas by default (with no float_format parameter) seems to keep 12 values after decimal, so we
# use '%.12g'.c
# see SYNPY-267.

# TODO: This portion of the code should be updated to support uploading a file from memory using BytesIO (Ticket to be created)
file_handle_id = await multipart_upload_file_async(
syn=client,
file_path=temp_file.name,
content_type="text/csv",
)
finally:
os.remove(temp_file.name)
upload_request = UploadToTableRequest(
table_id=self.id,
upload_file_handle_id=file_handle_id,
update_etag=None,
)
changes = []
if apply_additional_changes:
if schema_change_request:
changes.append(schema_change_request)
if additional_changes:
changes.extend(additional_changes)
changes.append(upload_request)

await TableUpdateTransaction(
entity_id=self.id, changes=changes
).send_job_and_wait_async(synapse_client=client)

current_chunk_size = 0
last_chunk_index = 0
has_additional_rows = False
for index, row in values.iterrows():
# Applying a 10% buffer to the row size to account for the overhead of the request
row_size = row.memory_usage(deep=False, index=False) * 1.1
if current_chunk_size + row_size >= insert_size_byte:
await _upload_df_chunk(
chunk=values[index : index + last_chunk_index],
apply_additional_changes=not applied_additional_changes,
while chunk:
with tempfile.NamedTemporaryFile(
delete=False,
prefix="chunked_csv_for_synapse_store_rows",
suffix=".csv",
dir=temp_dir,
) as temp_file:
try:
file_path = temp_file.name
temp_file.write(header)
temp_file.writelines(chunk)
temp_file.close()
# TODO: This portion of the code should be updated to support uploading a file from memory using BytesIO (Ticket to be created)
file_handle_id = await multipart_upload_file_async(
syn=client,
file_path=file_path,
content_type="text/csv",
)
finally:
os.remove(file_path)

upload_request = UploadToTableRequest(
table_id=self.id,
upload_file_handle_id=file_handle_id,
update_etag=None,
)
applied_additional_changes = True
current_chunk_size = 0
last_chunk_index = index
has_additional_rows = False
else:
has_additional_rows = True
current_chunk_size += row_size
if has_additional_rows:
await _upload_df_chunk(
chunk=values[last_chunk_index:],
apply_additional_changes=not applied_additional_changes,
)
applied_additional_changes = True
if csv_table_descriptor:
upload_request.csv_table_descriptor = csv_table_descriptor
changes = []
if not applied_additional_changes:
applied_additional_changes = True
if schema_change_request:
changes.append(schema_change_request)
if additional_changes:
changes.extend(additional_changes)
changes.append(upload_request)

await TableUpdateTransaction(
entity_id=self.id, changes=changes
).send_job_and_wait_async(synapse_client=client)

chunk = f.readlines(insert_size_byte)

else:
raise ValueError(
"Don't know how to make tables from values of type %s." % type(values)
file_handle_id = await multipart_upload_file_async(
syn=client, file_path=path_to_csv, content_type="text/csv"
)
upload_request = UploadToTableRequest(
table_id=self.id,
upload_file_handle_id=file_handle_id,
update_etag=None,
)
if csv_table_descriptor:
upload_request.csv_table_descriptor = csv_table_descriptor
changes = []
if schema_change_request:
changes.append(schema_change_request)
if additional_changes:
changes.extend(additional_changes)
changes.append(upload_request)

await TableUpdateTransaction(
entity_id=self.id, changes=changes
).send_job_and_wait_async(synapse_client=client)

# TODO: Determine if it is possible to delete rows from a `Dataset` entity, or if it's only possible to delete the rows by setting the `items` attribute and storing the entity
async def delete_rows_async(
Expand Down

0 comments on commit b7b14fd

Please sign in to comment.