Skip to content

Commit

Permalink
change upsert to build in code and enhance update command
Browse files Browse the repository at this point in the history
  • Loading branch information
gosow9 committed Aug 9, 2024
1 parent 798c33d commit 75b2496
Show file tree
Hide file tree
Showing 7 changed files with 11,534 additions and 2 deletions.
17 changes: 17 additions & 0 deletions fits2db/adapters/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from typing import Optional
from pandas import DataFrame
from ..config.config_model import ConfigType
from ..fits import FitsFile
from .mysql import MySQL
Expand Down Expand Up @@ -78,6 +79,22 @@ def clean_db(self) -> None:
except Exception as e:
log.error(f"Error during upsert operation: {e}")

def get_db_file_infos(self)-> DataFrame:
"""
Gets all file infos from FITS2DB_META Table
"""
log.debug("Starting db cleaning operation.")
try:
if self.loader:
df = self.loader.get_fits2db_meta()
log.info("FITS2DB_META loaded")
return df
else:
log.error("Loader is not initialized.")
except Exception as e:
log.error(f"Error during upsert operation: {e}")



def upsert(self) -> None:
"""
Expand Down
9 changes: 9 additions & 0 deletions fits2db/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ def clean_db(self):
log.info(f"Tables in DB:{self.db_table_names}")
self.drop_tables()
self.drop_meta_tables()

def get_fits2db_meta(self)->pd.DataFrame:
try:
df = pd.read_sql_table("FITS2DB_META", con=self.engine)
return df
except Exception as err:
log.error(err)
raise


def upsert_file(self):
self.session = self.db_session()
Expand Down
5 changes: 3 additions & 2 deletions fits2db/cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import click
from .helper_func import tables, files, upsert, init
from .helper_func import tables, files, build, init, update
from .utils import set_verbosity

@click.version_option("0.0.1b", "--version")
Expand Down Expand Up @@ -28,8 +28,9 @@ def cli(ctx):

cli.add_command(files)
cli.add_command(tables)
cli.add_command(upsert)
cli.add_command(build)
cli.add_command(init)
cli.add_command(update)

if __name__ == "__main__":
cli()
7 changes: 7 additions & 0 deletions fits2db/cli/helper_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ def build(config_path, reset):
fits = Fits2db(config_path)
fits.build(reset)

@click.command()
@click.argument("config_path", default=".", type=click.Path(exists=True))
def update(config_path):
"""Upsert all tables defnied in config.yml to databse"""
fits = Fits2db(config_path)
fits.update_db()


@click.command()
@click.argument("config_path", default=".", type=click.Path(exists=True))
Expand Down
46 changes: 46 additions & 0 deletions fits2db/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pandas as pd
from ..adapters import DBWriter
import logging
from datetime import datetime


# Use the configured logger
Expand Down Expand Up @@ -69,6 +70,22 @@ def get_file_names(self) -> list:
log.debug(f"paths {paths}")
log.info("run function")
return list(dict.fromkeys(get_all_fits(paths)))

def get_file_infos(self) -> pd.DataFrame:

meta = []
for path in self.fits_file_paths:
path = Path(path)
absolute_path = path.resolve()
file_meta = {
"filename": path.name,
"filepath":absolute_path.as_posix(),
"last_file_mutation":datetime.fromtimestamp(os.path.getmtime(absolute_path))}
meta.append(file_meta)
df = pd.DataFrame(meta)
log.debug(df)
return df


def get_table_names(self):
self.all_table_names = []
Expand Down Expand Up @@ -123,6 +140,35 @@ def build(self, reset:bool=True):
except ValueError as err:
log.error(f"\n {err}")

def update_db(self):
file_infos = self.get_file_infos()
log.info(file_infos)
writer = DBWriter(self.configs)
db_file_infos = writer.get_db_file_infos()
log.info(db_file_infos)
merged_df = pd.merge(file_infos, db_file_infos, on=['filename', 'filepath'], how='left', suffixes=('_file', '_db'))


filtered_df = merged_df[
(merged_df['last_file_mutation_file'] > merged_df['last_file_mutation_db']) |
merged_df['last_file_mutation_db'].isna()
]

result_df = filtered_df[['filename', 'filepath', 'last_file_mutation_file']].rename(
columns={'last_file_mutation_file': 'last_file_mutation'}
)
log.info(result_df)
fits_file_paths = result_df["filepath"].to_list()
for path in tqdm(fits_file_paths):
path = Path(path)
try:
file = FitsFile(path)
writer = DBWriter(self.configs, file)
writer.upsert()

except ValueError as err:
log.error(f"\n {err}")


def upsert_to_db(self):
log.debug("Start upsert to db")
Expand Down
Loading

0 comments on commit 75b2496

Please sign in to comment.