Skip to content

Commit

Permalink
(Chore): Refine Single Table Migration
Browse files Browse the repository at this point in the history
  • Loading branch information
BinamB committed Oct 14, 2024
1 parent 91f8974 commit 6c4b0a3
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 10 deletions.
26 changes: 18 additions & 8 deletions bin/migrate_to_single_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def load_json(file_name):

def main():
args = parse_args()
migrator = IndexRecordMigrator(creds_file=args.creds_file)
migrator = IndexRecordMigrator(
creds_file=args.creds_file, batch_size=args.batch_size
)
migrator.index_record_to_new_table(
offset=args.start_offset, last_seen_guid=args.start_did
)
Expand All @@ -61,14 +63,22 @@ def parse_args():
parser.add_argument(
"--start-offset",
dest="start_offset",
type=int,
help="offset to start at",
default=None,
)
parser.add_argument(
"--batch-size",
dest="batch_size",
help="number of records to batch select from source table (default: 1000)",
type=int,
default=1000,
)
return parser.parse_args()


class IndexRecordMigrator:
def __init__(self, creds_file=None):
def __init__(self, creds_file=None, batch_size=None):
self.logger = get_logger("migrate_single_table", log_level="debug")

conf_data = load_json(creds_file) if creds_file else load_json("creds.json")
Expand All @@ -78,6 +88,7 @@ def __init__(self, creds_file=None):
psw = conf_data.get("db_password", "{{db_password}}")
pghost = conf_data.get("db_host", "{{db_host}}")
pgport = 5432
self.batch_size = batch_size

try:
engine = create_engine(
Expand All @@ -91,9 +102,7 @@ def __init__(self, creds_file=None):

self.session = Session()

def index_record_to_new_table(
self, batch_size=1000, offset=None, last_seen_guid=None
):
def index_record_to_new_table(self, offset=None, last_seen_guid=None):
"""
Collect records from index_record table, collect additional info from multiple tables and bulk insert to new record table.
"""
Expand All @@ -106,23 +115,24 @@ def index_record_to_new_table(
records = (
self.session.query(IndexRecord)
.order_by(IndexRecord.did)
.limit(batch_size)
.limit(self.batch_size)
.all()
)
elif offset is not None:
records = (
self.session.query(IndexRecord)
.order_by(IndexRecord.did)
.offset(offset - 1)
.limit(batch_size)
.limit(self.batch_size)
.all()
)
else:
self.logger.info(f"Start guid set to: {last_seen_guid}")
records = (
self.session.query(IndexRecord)
.order_by(IndexRecord.did)
.filter(IndexRecord.did > last_seen_guid)
.limit(batch_size)
.limit(self.batch_size)
.all()
)

Expand Down
6 changes: 4 additions & 2 deletions tests/test_migrate_to_single_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ def test_index_record_to_new_table():
"""
Test index_record_to_new_table copies records from old tables to new record table.
"""
index_record_migrator = IndexRecordMigrator(creds_file="tests/test_creds.json")
index_record_migrator = IndexRecordMigrator(
creds_file="tests/test_creds.json", batch_size=10
)
n_records = 100
create_record(n_records)
index_record_migrator.index_record_to_new_table(batch_size=10)
index_record_migrator.index_record_to_new_table()

engine = create_engine(POSTGRES_CONNECTION)
with engine.connect() as conn:
Expand Down

0 comments on commit 6c4b0a3

Please sign in to comment.