From 6c4b0a3ee8c22266b62a4872f3b9160a963d8bb4 Mon Sep 17 00:00:00 2001 From: BinamB Date: Mon, 14 Oct 2024 13:41:17 -0500 Subject: [PATCH] (Chore): Refine Single Table Migration --- bin/migrate_to_single_table.py | 26 ++++++++++++++++++-------- tests/test_migrate_to_single_table.py | 6 ++++-- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/bin/migrate_to_single_table.py b/bin/migrate_to_single_table.py index 7a1d30d0..86b2ced5 100644 --- a/bin/migrate_to_single_table.py +++ b/bin/migrate_to_single_table.py @@ -36,7 +36,9 @@ def load_json(file_name): def main(): args = parse_args() - migrator = IndexRecordMigrator(creds_file=args.creds_file) + migrator = IndexRecordMigrator( + creds_file=args.creds_file, batch_size=args.batch_size + ) migrator.index_record_to_new_table( offset=args.start_offset, last_seen_guid=args.start_did ) @@ -61,14 +63,22 @@ def parse_args(): parser.add_argument( "--start-offset", dest="start_offset", + type=int, help="offset to start at", default=None, ) + parser.add_argument( + "--batch-size", + dest="batch_size", + help="number of records to batch select from source table (default: 1000)", + type=int, + default=1000, + ) return parser.parse_args() class IndexRecordMigrator: - def __init__(self, creds_file=None): + def __init__(self, creds_file=None, batch_size=None): self.logger = get_logger("migrate_single_table", log_level="debug") conf_data = load_json(creds_file) if creds_file else load_json("creds.json") @@ -78,6 +88,7 @@ def __init__(self, creds_file=None): psw = conf_data.get("db_password", "{{db_password}}") pghost = conf_data.get("db_host", "{{db_host}}") pgport = 5432 + self.batch_size = batch_size try: engine = create_engine( @@ -91,9 +102,7 @@ def __init__(self, creds_file=None): self.session = Session() - def index_record_to_new_table( - self, batch_size=1000, offset=None, last_seen_guid=None - ): + def index_record_to_new_table(self, offset=None, last_seen_guid=None): """ Collect records from index_record table, collect additional info from multiple tables and bulk insert to new record table. """ @@ -106,7 +115,7 @@ def index_record_to_new_table( records = ( self.session.query(IndexRecord) .order_by(IndexRecord.did) - .limit(batch_size) + .limit(self.batch_size) .all() ) elif offset is not None: @@ -114,15 +123,16 @@ def index_record_to_new_table( self.session.query(IndexRecord) .order_by(IndexRecord.did) .offset(offset - 1) - .limit(batch_size) + .limit(self.batch_size) .all() ) else: + self.logger.info(f"Start guid set to: {last_seen_guid}") records = ( self.session.query(IndexRecord) .order_by(IndexRecord.did) .filter(IndexRecord.did > last_seen_guid) - .limit(batch_size) + .limit(self.batch_size) .all() ) diff --git a/tests/test_migrate_to_single_table.py b/tests/test_migrate_to_single_table.py index 37dbd1ff..6d35adf7 100644 --- a/tests/test_migrate_to_single_table.py +++ b/tests/test_migrate_to_single_table.py @@ -70,10 +70,12 @@ def test_index_record_to_new_table(): """ Test index_record_to_new_table copies records from old tables to new record table. """ - index_record_migrator = IndexRecordMigrator(creds_file="tests/test_creds.json") + index_record_migrator = IndexRecordMigrator( + creds_file="tests/test_creds.json", batch_size=10 + ) n_records = 100 create_record(n_records) - index_record_migrator.index_record_to_new_table(batch_size=10) + index_record_migrator.index_record_to_new_table() engine = create_engine(POSTGRES_CONNECTION) with engine.connect() as conn: