Skip to content

Commit

Permalink
adding even more debug messages
Browse files Browse the repository at this point in the history
  • Loading branch information
mmaelicke committed Apr 23, 2024
1 parent e1c113b commit f75be43
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 14 deletions.
24 changes: 19 additions & 5 deletions src/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ def _create_datasource_table(entry: Entry, table_name: str, use_spatial: bool =
def _create_insert_sql(entry: Entry, table_name: str, source_name: str = 'df', use_spatial: bool = False) -> str:
if use_spatial:
raise NotImplementedError('There is still an error with the spatial type.')

# get the parameters
params = load_params()

# get the dimension names
spatial_dims = entry.datasource.spatial_scale.dimension_names
Expand Down Expand Up @@ -186,8 +183,9 @@ def load_files(file_mapping: List[FileMapping]) -> str:
try:
table_name = _switch_source_loader(entry, fname)
table_names.append(table_name)
except:
except Exception as e:
logger.exception(f"ERRORED on loading file <{fname}>")
logger.error(str(e))
continue

# get a set of all involved table names
Expand All @@ -200,20 +198,23 @@ def load_files(file_mapping: List[FileMapping]) -> str:
add_temporal_integration(entry=entry, table_name=table_name, funcs=None)
except Exception as e:
logger.exception(f"ERRORED on adding temporal integration for table <{table_name}>")
logger.error(str(e))

# spatial aggregation
try:
# TODO: the hard coded params should be changeable
add_spatial_integration(entry=entry, table_name=table_name, funcs=None, target_epsg=3857, algin_cell='center')
except Exception as e:
logger.exception(f"ERRORED on adding spatial integration for table <{table_name}>")
logger.error(str(e))

# spatio-temporal aggregation
try:
# TODO: the hard coded params should be changeable
add_spatial_integration(entry=entry, table_name=table_name, spatio_temporal=True, funcs=None, target_epsg=3857, algin_cell='center')
except Exception as e:
logger.exception(f"ERRORED on adding spatio-temporal integration for table <{table_name}>")
logger.error(str(e))

# now load all metadata that we can find on the dataset folder level
load_metadata_to_duckdb()
Expand Down Expand Up @@ -247,12 +248,25 @@ def load_xarray_to_duckdb(entry: Entry, data: xr.Dataset) -> str:
# get the dimension names and spatial dimensions
dimension_names = entry.datasource.dimension_names

# log out the dimension names
logger.debug(f"Dimension names for <ID={entry.id}>: {dimension_names}")

# we assume that the source uses chunking, hence convert to dask dataframe
logger.info(f"Loading preprocessed source <ID={entry.id}> to duckdb database <{params.database_path}> for data integration...")
t1 = time.time()

# get a delayed dask dataframe
ddf = data.to_dask_dataframe()[dimension_names]
try:
ddf = data.to_dask_dataframe()[dimension_names]
except ValueError as e:
# check this is the chunking error
if 'Object has inconsistent chunks' in str(e):
logger.warning(f"Xarray had problems reading chunks from the clip of <ID={entry.id}>. Trying to rechunk the data...")
unified = data.unify_chunks()
ddf = unified.to_dask_dataframe()[dimension_names]
else:
# in any other case re-raise the error
raise e

# create the table name
table_name = f"{entry.variable.name.replace(' ', '_')}_{entry.id}"
Expand Down
14 changes: 7 additions & 7 deletions src/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,20 @@ def load_netcdf_file(entry: Entry, executor: Executor) -> str:
name = entry.datasource.path

# check if there is a wildcard in the name
msg = f"Entry <ID={entry.id}> supplied the raw entry.datasource.path={name}."
if '*' in name:
fnames = glob.glob(name)
msg += f" Has a wildcard, resolved path to {len(fnames)} files: [{fnames}]."
else:
fnames = [name]
msg += " Resource is a single file."

# check the amount of files to be processed
if len(fnames) > 1:
logger.debug(f"For {name} found {len(fnames)} files.")
elif len(fnames) == 0:
logger.warning(f"Could not find any files for {name}.")
if len(fnames) == 0:
logger.warning(msg + f" Could not find any files for {name}.")
return None
else:
logger.debug(f"Resource {name} is single file.")
logger.debug(msg)

# get the time axis
temporal_dims = entry.datasource.temporal_scale.dimension_names if entry.datasource.temporal_scale is not None else []
Expand Down Expand Up @@ -163,7 +164,6 @@ def load_netcdf_file(entry: Entry, executor: Executor) -> str:
entry_metadata_saver(entry, metafile_name)
logger.info(f"Saved metadata for dataset <ID={entry.id}> to {metafile_name}.")


# return the out_path
return str(dataset_base_path)

Expand Down Expand Up @@ -195,6 +195,7 @@ def _clip_netcdf_cdo(path: Path, params: Params):

return str(out_name)


def _clip_netcdf_xarray(entry: Entry, file_name: str, data: xr.Dataset, params: Params):
if data.rio.crs is None:
logger.error(f"Could not clip {data} as it has no CRS.")
Expand Down Expand Up @@ -257,7 +258,6 @@ def _clip_netcdf_xarray(entry: Entry, file_name: str, data: xr.Dataset, params:
return region



def load_raster_file(entry: Entry, name: str, reference_area: dict, base_path: str = '/out') -> rio.DatasetReader:
#DAS hier passt noch nicht zum workflow
#Eher alle load Funktionen dispatchen? not sure
Expand Down
4 changes: 2 additions & 2 deletions src/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@

# wait until all results are finished
executor.shutdown(wait=True)
logger.debug(f"STOP {type(executor).__name__} - Pool finished all tasks and shutdown.")
logger.info(f"STOP {type(executor).__name__} - Pool finished all tasks and shutdown.")

# here to the stuff for creating a consistent dataset
# check if the user disabled integration
if params.integration == Integrations.NONE:
logger.debug("Integration is disabled. No further processing will be done.")
logger.info("Integration is disabled. No further processing will be done.")

# check if we have any files to process
elif len(file_mapping) > 0:
Expand Down
4 changes: 4 additions & 0 deletions src/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def dispatch_save_file(entry: Entry, data: DataFrame | xr.Dataset, executor: Exe
else:
target_path = Path(base_path) / target_name

# log out the target path
logger.debug(f"target_path derived for <ID={entry.id}> is: {target_path}")

# define the exception handler
# TODO exception handler is not the right name anymore
def exception_handler(future: Future):
Expand Down Expand Up @@ -110,6 +113,7 @@ def xarray_to_netcdf_saver(data: xr.Dataset, target_name: str) -> str:
if Path(target_name).exists():
logger.debug(f"writer.xarray_to_netcdf_saver: {target_name} already exists. Skipping.")
return target_name

t1 = time.time()
data.to_netcdf(target_name)
t2 = time.time()
Expand Down

0 comments on commit f75be43

Please sign in to comment.