Skip to content

Commit

Permalink
Include progress bar during upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanFauble committed Feb 18, 2025
1 parent 3994f36 commit 240f711
Showing 1 changed file with 171 additions and 141 deletions.
312 changes: 171 additions & 141 deletions synapseclient/models/mixins/table_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import asyncio
import json
import logging
import os
import sys
import tempfile
Expand All @@ -14,6 +15,8 @@
from io import BytesIO
from typing import Any, Dict, List, Optional, TypeVar, Union

from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from typing_extensions import Self

from synapseclient import Column as Synapse_Column
Expand Down Expand Up @@ -1031,7 +1034,11 @@ async def main():
loop = asyncio.get_event_loop()

client = Synapse.get_client(synapse_client=synapse_client)
client.logger.info(f"Running query: {query}")

if client.logger.isEnabledFor(logging.DEBUG):
client.logger.debug(f"Running query: {query}")
else:
client.logger.info("Running query")

# TODO: Implementation should not download CSV to disk, instead the ideal
# solution will load the result into BytesIO and then pass that to
Expand Down Expand Up @@ -1929,137 +1936,159 @@ async def main():
contains_etag = self.__class__.__name__ in CLASSES_THAT_CONTAIN_ROW_ETAG
indexs_of_original_df_with_changes = []
total_row_count_updated = 0
for individual_chunk in chunk_list:
select_statement = "SELECT ROW_ID, "

if self.__class__.__name__ in CLASSES_THAT_CONTAIN_ROW_ETAG:
select_statement += "ROW_ETAG, "

select_statement += (
f"{', '.join(all_columns_from_df)} FROM {self.id} WHERE "
with logging_redirect_tqdm(loggers=[client.logger]):
progress_bar = tqdm(
total=len(values),
desc="Querying rows",
unit_scale=True,
smoothing=0,
postfix=f"Over {len(chunk_list)} requests",
)
where_statements = []
for upsert_column in primary_keys:
column_model = self.columns[upsert_column]
if (
column_model.column_type
in (
ColumnType.STRING_LIST,
ColumnType.INTEGER_LIST,
ColumnType.BOOLEAN_LIST,
ColumnType.ENTITYID_LIST,
ColumnType.USERID_LIST,
)
or column_model.column_type == ColumnType.JSON
):
raise ValueError(
f"Column type {column_model.column_type} is not supported for primary_keys"
)
elif column_model.column_type in (
ColumnType.STRING,
ColumnType.MEDIUMTEXT,
ColumnType.LARGETEXT,
ColumnType.LINK,
ColumnType.ENTITYID,
):
values_for_where_statement = [
f"'{value}'"
for value in individual_chunk[upsert_column]
if value is not None
]

elif column_model.column_type == ColumnType.BOOLEAN:
include_true = False
include_false = False
for value in individual_chunk[upsert_column]:
if value is None:
continue
if value:
include_true = True
else:
include_false = True
if include_true and include_false:
break
if include_true and include_false:
values_for_where_statement = ["'true'", "'false'"]
elif include_true:
values_for_where_statement = ["'true'"]
elif include_false:
values_for_where_statement = ["'false'"]
else:
values_for_where_statement = [
str(value)
for value in individual_chunk[upsert_column]
if value is not None
]
if not values_for_where_statement:
continue
where_statements.append(
f"\"{upsert_column}\" IN ({', '.join(values_for_where_statement)})"
)
for individual_chunk in chunk_list:
select_statement = "SELECT ROW_ID, "

where_statement = " AND ".join(where_statements)
select_statement += where_statement
if self.__class__.__name__ in CLASSES_THAT_CONTAIN_ROW_ETAG:
select_statement += "ROW_ETAG, "

results = await self.query_async(
query=select_statement, synapse_client=synapse_client
)
select_statement += (
f"{', '.join(all_columns_from_df)} FROM {self.id} WHERE "
)
where_statements = []
for upsert_column in primary_keys:
column_model = self.columns[upsert_column]
if (
column_model.column_type
in (
ColumnType.STRING_LIST,
ColumnType.INTEGER_LIST,
ColumnType.BOOLEAN_LIST,
ColumnType.ENTITYID_LIST,
ColumnType.USERID_LIST,
)
or column_model.column_type == ColumnType.JSON
):
raise ValueError(
f"Column type {column_model.column_type} is not supported for primary_keys"
)
elif column_model.column_type in (
ColumnType.STRING,
ColumnType.MEDIUMTEXT,
ColumnType.LARGETEXT,
ColumnType.LINK,
ColumnType.ENTITYID,
):
values_for_where_statement = [
f"'{value}'"
for value in individual_chunk[upsert_column]
if value is not None
]

elif column_model.column_type == ColumnType.BOOLEAN:
include_true = False
include_false = False
for value in individual_chunk[upsert_column]:
if value is None:
continue
if value:
include_true = True
else:
include_false = True
if include_true and include_false:
break
if include_true and include_false:
values_for_where_statement = ["'true'", "'false'"]
elif include_true:
values_for_where_statement = ["'true'"]
elif include_false:
values_for_where_statement = ["'false'"]
else:
values_for_where_statement = [
str(value)
for value in individual_chunk[upsert_column]
if value is not None
]
if not values_for_where_statement:
continue
where_statements.append(
f"\"{upsert_column}\" IN ({', '.join(values_for_where_statement)})"
)

for row in results.itertuples(index=False):
row_etag = None
where_statement = " AND ".join(where_statements)
select_statement += where_statement

if contains_etag:
row_etag = row.ROW_ETAG
results = await self.query_async(
query=select_statement, synapse_client=synapse_client
)

partial_change_values = {}
for row in results.itertuples(index=False):
row_etag = None

# Find the matching row in `values` that matches the row in `results` for the primary_keys
matching_conditions = individual_chunk[primary_keys[0]] == getattr(
row, primary_keys[0]
)
for col in primary_keys[1:]:
matching_conditions &= individual_chunk[col] == getattr(row, col)
matching_row = individual_chunk.loc[matching_conditions]
if contains_etag:
row_etag = row.ROW_ETAG

# Determines which cells need to be updated
for column in individual_chunk.columns:
if len(matching_row[column].values) > 1:
raise ValueError(
f"The values for the keys being upserted must be unique in the table: [{matching_row}]"
partial_change_values = {}

# Find the matching row in `values` that matches the row in `results` for the primary_keys
matching_conditions = individual_chunk[primary_keys[0]] == getattr(
row, primary_keys[0]
)
for col in primary_keys[1:]:
matching_conditions &= individual_chunk[col] == getattr(
row, col
)
matching_row = individual_chunk.loc[matching_conditions]

if len(matching_row[column].values) == 0:
continue
column_id = self.columns[column].id
column_type = self.columns[column].column_type
cell_value = matching_row[column].values[0]
if cell_value != getattr(row, column):
if (
isinstance(cell_value, list) and len(cell_value) > 0
) or not isna(cell_value):
partial_change_values[
column_id
] = _convert_pandas_row_to_python_types(
cell=cell_value, column_type=column_type
# Determines which cells need to be updated
for column in individual_chunk.columns:
if len(matching_row[column].values) > 1:
raise ValueError(
f"The values for the keys being upserted must be unique in the table: [{matching_row}]"
)
else:
partial_change_values[column_id] = None

if partial_change_values != {}:
total_row_count_updated += 1
partial_change = PartialRow(
row_id=row.ROW_ID,
etag=row_etag,
values=[
{
"key": partial_change_key,
"value": partial_change_value,
}
for partial_change_key, partial_change_value in partial_change_values.items()
],
)
rows_to_update.append(partial_change)
indexs_of_original_df_with_changes.append(matching_row.index[0])

if len(matching_row[column].values) == 0:
continue
column_id = self.columns[column].id
column_type = self.columns[column].column_type
cell_value = matching_row[column].values[0]
if cell_value != getattr(row, column):
if (
isinstance(cell_value, list) and len(cell_value) > 0
) or not isna(cell_value):
partial_change_values[
column_id
] = _convert_pandas_row_to_python_types(
cell=cell_value, column_type=column_type
)
else:
partial_change_values[column_id] = None

if partial_change_values != {}:
total_row_count_updated += 1
partial_change = PartialRow(
row_id=row.ROW_ID,
etag=row_etag,
values=[
{
"key": partial_change_key,
"value": partial_change_value,
}
for partial_change_key, partial_change_value in partial_change_values.items()
],
)
rows_to_update.append(partial_change)
indexs_of_original_df_with_changes.append(matching_row.index[0])
progress_bar.update(len(individual_chunk))
progress_bar.close()

rows_to_insert_df = values.loc[
~values.index.isin(indexs_of_original_df_with_changes)
]

client.logger.info(
f"[{self.id}:{self.name}]: Found {total_row_count_updated}"
f" rows to update and {len(rows_to_insert_df)} rows to insert"
)

if not dry_run and rows_to_update:
chunked_rows_to_update = []
Expand All @@ -2077,30 +2106,31 @@ async def main():
if chunk:
chunked_rows_to_update.append(chunk)

for chunked_row_to_update in chunked_rows_to_update:
change = AppendableRowSetRequest(
entity_id=self.id,
to_append=PartialRowSet(
table_id=self.id,
rows=chunked_row_to_update,
),
)
print(
f"The size of the change is: {sys.getsizeof(change.to_synapse_request())}"
with logging_redirect_tqdm(loggers=[client.logger]):
progress_bar = tqdm(
total=len(rows_to_update),
desc="Updating rows",
unit_scale=True,
smoothing=0,
postfix=f"Over {len(chunked_rows_to_update)} requests",
)
await TableUpdateTransaction(
entity_id=self.id,
changes=[change],
).send_job_and_wait_async(synapse_client=client)

rows_to_insert_df = values.loc[
~values.index.isin(indexs_of_original_df_with_changes)
]
for chunked_row_to_update in chunked_rows_to_update:
change = AppendableRowSetRequest(
entity_id=self.id,
to_append=PartialRowSet(
table_id=self.id,
rows=chunked_row_to_update,
),
)

client.logger.info(
f"[{self.id}:{self.name}]: Found {total_row_count_updated}"
f" rows to update and {len(rows_to_insert_df)} rows to insert"
)
await TableUpdateTransaction(
entity_id=self.id,
changes=[change],
).send_job_and_wait_async(synapse_client=client)
progress_bar.update(len(chunked_row_to_update))

progress_bar.close()

if not rows_to_insert_df.empty:
await self.store_rows_async(
Expand Down

0 comments on commit 240f711

Please sign in to comment.