diff --git a/src/ingestor.py b/src/ingestor.py index 1fb57bc..f8c856a 100644 --- a/src/ingestor.py +++ b/src/ingestor.py @@ -129,9 +129,6 @@ def _create_datasource_table(entry: Entry, table_name: str, use_spatial: bool = def _create_insert_sql(entry: Entry, table_name: str, source_name: str = 'df', use_spatial: bool = False) -> str: if use_spatial: raise NotImplementedError('There is still an error with the spatial type.') - - # get the parameters - params = load_params() # get the dimension names spatial_dims = entry.datasource.spatial_scale.dimension_names @@ -186,8 +183,9 @@ def load_files(file_mapping: List[FileMapping]) -> str: try: table_name = _switch_source_loader(entry, fname) table_names.append(table_name) - except: + except Exception as e: logger.exception(f"ERRORED on loading file <{fname}>") + logger.error(str(e)) continue # get a set of all involved table names @@ -200,6 +198,7 @@ def load_files(file_mapping: List[FileMapping]) -> str: add_temporal_integration(entry=entry, table_name=table_name, funcs=None) except Exception as e: logger.exception(f"ERRORED on adding temporal integration for table <{table_name}>") + logger.error(str(e)) # spatial aggregation try: @@ -207,6 +206,7 @@ def load_files(file_mapping: List[FileMapping]) -> str: add_spatial_integration(entry=entry, table_name=table_name, funcs=None, target_epsg=3857, algin_cell='center') except Exception as e: logger.exception(f"ERRORED on adding spatial integration for table <{table_name}>") + logger.error(str(e)) # spatio-temporal aggregation try: @@ -214,6 +214,7 @@ def load_files(file_mapping: List[FileMapping]) -> str: add_spatial_integration(entry=entry, table_name=table_name, spatio_temporal=True, funcs=None, target_epsg=3857, algin_cell='center') except Exception as e: logger.exception(f"ERRORED on adding spatio-temporal integration for table <{table_name}>") + logger.error(str(e)) # now load all metadata that we can find on the dataset folder level load_metadata_to_duckdb() @@ -247,12 +248,25 @@ def load_xarray_to_duckdb(entry: Entry, data: xr.Dataset) -> str: # get the dimension names and spatial dimensions dimension_names = entry.datasource.dimension_names + # log out the dimension names + logger.debug(f"Dimension names for : {dimension_names}") + # we assume that the source uses chunking, hence convert to dask dataframe logger.info(f"Loading preprocessed source to duckdb database <{params.database_path}> for data integration...") t1 = time.time() # get a delayed dask dataframe - ddf = data.to_dask_dataframe()[dimension_names] + try: + ddf = data.to_dask_dataframe()[dimension_names] + except ValueError as e: + # check this is the chunking error + if 'Object has inconsistent chunks' in str(e): + logger.warning(f"Xarray had problems reading chunks from the clip of . Trying to rechunk the data...") + unified = data.unify_chunks() + ddf = unified.to_dask_dataframe()[dimension_names] + else: + # in any other case re-raise the error + raise e # create the table name table_name = f"{entry.variable.name.replace(' ', '_')}_{entry.id}" diff --git a/src/loader.py b/src/loader.py index e3c3e86..65392aa 100644 --- a/src/loader.py +++ b/src/loader.py @@ -87,19 +87,20 @@ def load_netcdf_file(entry: Entry, executor: Executor) -> str: name = entry.datasource.path # check if there is a wildcard in the name + msg = f"Entry supplied the raw entry.datasource.path={name}." if '*' in name: fnames = glob.glob(name) + msg += f" Has a wildcard, resolved path to {len(fnames)} files: [{fnames}]." else: fnames = [name] + msg += " Resource is a single file." # check the amount of files to be processed - if len(fnames) > 1: - logger.debug(f"For {name} found {len(fnames)} files.") - elif len(fnames) == 0: - logger.warning(f"Could not find any files for {name}.") + if len(fnames) == 0: + logger.warning(msg + f" Could not find any files for {name}.") return None else: - logger.debug(f"Resource {name} is single file.") + logger.debug(msg) # get the time axis temporal_dims = entry.datasource.temporal_scale.dimension_names if entry.datasource.temporal_scale is not None else [] @@ -163,7 +164,6 @@ def load_netcdf_file(entry: Entry, executor: Executor) -> str: entry_metadata_saver(entry, metafile_name) logger.info(f"Saved metadata for dataset to {metafile_name}.") - # return the out_path return str(dataset_base_path) @@ -195,6 +195,7 @@ def _clip_netcdf_cdo(path: Path, params: Params): return str(out_name) + def _clip_netcdf_xarray(entry: Entry, file_name: str, data: xr.Dataset, params: Params): if data.rio.crs is None: logger.error(f"Could not clip {data} as it has no CRS.") @@ -257,7 +258,6 @@ def _clip_netcdf_xarray(entry: Entry, file_name: str, data: xr.Dataset, params: return region - def load_raster_file(entry: Entry, name: str, reference_area: dict, base_path: str = '/out') -> rio.DatasetReader: #DAS hier passt noch nicht zum workflow #Eher alle load Funktionen dispatchen? not sure diff --git a/src/run.py b/src/run.py index 1cafcb0..78b9b0b 100644 --- a/src/run.py +++ b/src/run.py @@ -122,12 +122,12 @@ # wait until all results are finished executor.shutdown(wait=True) - logger.debug(f"STOP {type(executor).__name__} - Pool finished all tasks and shutdown.") + logger.info(f"STOP {type(executor).__name__} - Pool finished all tasks and shutdown.") # here to the stuff for creating a consistent dataset # check if the user disabled integration if params.integration == Integrations.NONE: - logger.debug("Integration is disabled. No further processing will be done.") + logger.info("Integration is disabled. No further processing will be done.") # check if we have any files to process elif len(file_mapping) > 0: diff --git a/src/writer.py b/src/writer.py index e6e4470..08a0da8 100644 --- a/src/writer.py +++ b/src/writer.py @@ -39,6 +39,9 @@ def dispatch_save_file(entry: Entry, data: DataFrame | xr.Dataset, executor: Exe else: target_path = Path(base_path) / target_name + # log out the target path + logger.debug(f"target_path derived for is: {target_path}") + # define the exception handler # TODO exception handler is not the right name anymore def exception_handler(future: Future): @@ -110,6 +113,7 @@ def xarray_to_netcdf_saver(data: xr.Dataset, target_name: str) -> str: if Path(target_name).exists(): logger.debug(f"writer.xarray_to_netcdf_saver: {target_name} already exists. Skipping.") return target_name + t1 = time.time() data.to_netcdf(target_name) t2 = time.time()