diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ade6b4e..9f71ab51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,13 @@ All calls to species_info.json and site_info.json are now made through the openg ### Removed +## 2023-03-02 +### Added +- hbmcmc code now has options to use HiTRes processes for the set up + +### Changed +- removed option for fp_directory dictionary in HiTRes processes as HiTRes and integrated fps are no longer needed in conjunction + ## [0.3.1] - 2022-08-25 ### Added diff --git a/acrg/BC/timevarying_BC.py b/acrg/BC/timevarying_BC.py index 2f4d2435..388c856a 100644 --- a/acrg/BC/timevarying_BC.py +++ b/acrg/BC/timevarying_BC.py @@ -12,7 +12,6 @@ vmr_var = 'vmr', gph_height_var = 'height', time_coord = 'time', - height_coord = 'height', species = 'ch4', domain = 'EUROPE', start_date = None) @@ -32,15 +31,23 @@ ------- ''' -if 'os' not in dir(): import os -if 'np' not in dir(): import numpy as np -if 'xr' not in dir(): import xarray as xr -if 'paths' not in dir(): from acrg_config.paths import paths +import os +import json +import numpy as np +import xarray as xr +from scipy import interpolate +from collections import OrderedDict + +from acrg.name.name import open_ds +from acrg.config.paths import Paths +from acrg.convert import concentration +from acrg.countrymask import domain_volume + class BoundaryConditions: def __init__(self, vmr_var, gph_height_var, dataset=None, filename=None, - file_dir=None, time_coord='time', height_coord='height', species=None, - domain='EUROPE', start_date=None): + file_dir=None, time_coord='time', species=None, + domain='EUROPE', start_date=None, adjust=None): ''' vmr_var : str The VMR variable name in the nesw dataset @@ -73,13 +80,12 @@ def __init__(self, vmr_var, gph_height_var, dataset=None, filename=None, if filename is None: print('Please provide either a dataset or filename containing vmr') elif os.path.isfile(filename): - from acrg_name.name import open_ds dataset = open_ds(filename) else: print(f'Cannot find file: {filename}') - self.data_path = paths.data - self.acrg_path = paths.acrg + self.data_path = Paths.data + self.acrg_path = Paths.acrg self.vmr_var = vmr_var self.height_var = gph_height_var @@ -91,7 +97,12 @@ def __init__(self, vmr_var, gph_height_var, dataset=None, filename=None, # rename latitude and longitude to short version if 'latitude' in dataset: dataset.rename({'latitude' : 'lat'}) if 'longitude' in dataset: dataset.rename({'longitude' : 'lon'}) - self.dataset = dataset + + if adjust is not None: + print(f'Adjusting vmr by {adjust}') + dataset[vmr_var] = dataset[vmr_var] + adjust + + self.dataset = dataset def make_bc_file(self, fp_directory=None, fp_height_coord='height', reverse=None, convert_units=True, datasource=None, out_path=None, glob_attrs={}, @@ -145,7 +156,7 @@ def make_bc_file(self, fp_directory=None, fp_height_coord='height', reverse=None if verbose: print('\nSaving to netcdf\n----------------') # save to a netcdf file with a standardised filename self.to_netcdf(datasource=datasource, out_path=out_path, glob_attrs=glob_attrs, - copy_glob_attrs=copy_glob_attrs, verbose=verbose) + copy_glob_attrs=copy_glob_attrs, verbose=verbose, overwrite=False) def get_unit_conversion(self, verbose=True): @@ -190,8 +201,6 @@ def cut_to_edge(self, fp_directory=None, fp_height_coord='height', convert_units for north, south, east, and west Can be accessed using BoundaryConditions_obj.edges ''' - from acrg_countrymask import domain_volume - fp_directory = os.path.join(self.data_path, 'LPDM', 'fp_NAME') if fp_directory is None else fp_directory self.fp_lat, self.fp_lon, self.fp_height = domain_volume(self.domain, fp_directory=fp_directory) @@ -205,16 +214,16 @@ def cut_to_edge(self, fp_directory=None, fp_height_coord='height', convert_units lat_n -= 1 lat_s = (np.abs(self.dataset.coords['lat'].values - min(self.fp_lat))).argmin() - if self.dataset.coords['lat'].values[lat_s] > min(self.fp_lat) and lat_s != (len(dataset.coords['lat'].values)-1): + if self.dataset.coords['lat'].values[lat_s] > min(self.fp_lat) and lat_s != (len(self.dataset.coords['lat'].values)-1): lat_s += 1 lon_e = (np.abs(self.dataset.coords['lon'].values - max(self.fp_lon))).argmin() - if self.dataset.coords['lon'].values[lon_e] < max(self.fp_lon) and lon_e != (len(dataset.coords['lon'].values)-1): - lat_s += 1 + if self.dataset.coords['lon'].values[lon_e] < max(self.fp_lon) and lon_e != (len(self.dataset.coords['lon'].values)-1): + lon_e += 1 lon_w = (np.abs(self.dataset.coords['lon'].values - min(self.fp_lon))).argmin() if self.dataset.coords['lon'].values[lon_w] > min(self.fp_lon) and lon_w != 0: - lat_w -= 1 + lon_w -= 1 # Cut to these north = self.dataset.sel(lat = self.dataset.coords['lat'][lat_n], @@ -241,11 +250,7 @@ def interpolate(self, reverse=None, verbose=True): Whether height values within th dataset input are in reverse order (i.e. nesw["gph"] level 1 values > nesw["gph"] level 2 values). Default = None. If this is set to None this will be automatically determined. - height_coord (str, optional) - Used if reverse is not defined to extract appropriate - height values to compare. - nesw[height_coord] should be a 1D array of datetimes - verbose (bool, optional) + verbose (bool, optional) Whether to print any updates Output @@ -259,7 +264,7 @@ def interpolate(self, reverse=None, verbose=True): self.interp_height_single(direction=dd, reverse=reverse, verbose=verbose) self.interp_latlon_single(direction=dd, verbose=verbose) - def interp_height_single(self, direction, height_coord='height', reverse=None, new_vmr_var=None, verbose=True): + def interp_height_single(self, direction, reverse=None, new_vmr_var=None, verbose=True): ''' Interpolates the data to the NAME heights @@ -267,11 +272,7 @@ def interp_height_single(self, direction, height_coord='height', reverse=None, n direction (str, optional) The compass direction of nesw Used for naming the output array : {self.species}_{direction} - height_coord (str, optional) - Used if reverse is not defined to extract appropriate - height values to compare. - nesw[height_coord] should be a 1D array of datetimes - verbose (bool, optional) + verbose (bool, optional) reverse (bool/None, optional) Whether height values within is nesw input are in reverse order (i.e. nesw["gph"] level 1 values > nesw["gph"] level 2 values). @@ -288,8 +289,6 @@ def interp_height_single(self, direction, height_coord='height', reverse=None, n with a 3D VMR array interpolated to the NAME heights Can be accessed using BoundaryConditions_obj.edges ''' - if 'interpolate' not in dir(): from scipy import interpolate - # get the axis along which to interpolate # either 'longitude' (or 'lon', N or S) or 'latitude' (or 'lat', E or W) latorlon = 'lon' if 'lon' in self.edges[direction] else 'lat' @@ -326,7 +325,7 @@ def interp_height_single(self, direction, height_coord='height', reverse=None, n 'height' : self.fp_height, latorlon : self.edges[direction][latorlon].values}) - def interp_latlon_single(self, direction=None, height_coord='height', reverse=None, new_vmr_var=None, verbose=True): + def interp_latlon_single(self, direction=None, reverse=None, new_vmr_var=None, verbose=True): ''' Interpolates the data to the NAME latitudes and longitudes @@ -334,11 +333,7 @@ def interp_latlon_single(self, direction=None, height_coord='height', reverse=No direction (str, optional) The compass direction of nesw Used for naming the output array : {self.species}_{direction} - height_coord (str, optional) - Used if reverse is not defined to extract appropriate - height values to compare. - nesw[height_coord] should be a 1D array of datetimes - verbose (bool, optional) + verbose (bool, optional) Whether to print any updates reverse : bool/None Whether height values within is nesw input are in reverse order @@ -358,8 +353,6 @@ def interp_latlon_single(self, direction=None, height_coord='height', reverse=No with a 3D VMR array interpolated to the NAME latitudes and longitudes Can be accessed using BoundaryConditions_obj.edges ''' - if 'interpolate' not in dir(): from scipy import interpolate - # get the axis along which to interpolate # either 'longitude' (or 'lon', N or S) or 'latitude' (or 'lat', E or W) latorlon = 'lon' if 'lon' in self.edges[direction] else \ @@ -376,11 +369,11 @@ def interp_latlon_single(self, direction=None, height_coord='height', reverse=No # 3D array to fill with interpolated heights fp_latlon = self.fp_lat if latorlon=='lat' else self.fp_lon interp = np.zeros((len(self.edges[direction][self.time_coord]), - len(self.edges[direction][height_coord]), + len(self.edges[direction][self.height_var]), len(fp_latlon))) # loop through height and time - # self.edges[direction][height_var] : time, height, latlon + # self.edges[direction][self.height_var] : time, height, latlon for j in range(len(self.edges[direction][self.vmr_var][0,:,0])): for i in range(len(self.edges[direction][self.vmr_var][:,0,0])): # get the vmr for all latorlon @@ -397,7 +390,7 @@ def interp_latlon_single(self, direction=None, height_coord='height', reverse=No new_vmr_var = new_vmr_var if new_vmr_var is not None else self.vmr_var self.edges[direction] = xr.Dataset({new_vmr_var : (['time', 'height', latorlon], interp)}, coords = {'time' : self.edges[direction][self.time_coord].values, - 'height' : self.edges[direction][height_coord].values, + 'height' : self.edges[direction][self.height_var].values, latorlon : fp_latlon}) def bc_filename(self, from_climatology=False, verbose=True): @@ -418,7 +411,7 @@ def bc_filename(self, from_climatology=False, verbose=True): if verbose: print(f'Output filename : {self.out_filename}') def to_netcdf(self, datasource=None, out_path=None, glob_attrs={}, from_climatology=False, - copy_glob_attrs=False, verbose=True): + copy_glob_attrs=False, verbose=True, overwrite=False): ''' Args datasource (str, optional) @@ -449,7 +442,7 @@ def to_netcdf(self, datasource=None, out_path=None, glob_attrs={}, from_climatol Includes DataArrays for the n, s, e, and w boundaries ''' - edges = {dd : self.edges[dd].rename({self.vmr_var: f'vmr_{dd}'}) + edges = {dd : self.edges[dd].rename({self.vmr_var: f'vmr_{dd[0]}'}) for dd in ['north', 'south', 'east', 'west']} # merge the interpolated n, s, e, & w Datasets @@ -479,8 +472,22 @@ def to_netcdf(self, datasource=None, out_path=None, glob_attrs={}, from_climatol os.path.join(self.data_path, 'LPDM', 'bc', self.domain) self.bc_filename(from_climatology=from_climatology, verbose=verbose) - if verbose: print(f'Saving boundary conditions to : {os.path.join(out_path, self.out_filename)}') - BC_edges.to_netcdf(path = os.path.join(out_path, self.out_filename), mode = 'w') + if overwrite and verbose: + save = True + if os.path.isfile(os.path.join(out_path, self.out_filename)): + print(f'Boundary condition file {os.path.join(out_path, self.out_filename)} already exists and is being overwritten.') + elif os.path.isfile(os.path.join(out_path, self.out_filename)) and not overwrite: + print(f'Boundary condition file {os.path.join(out_path, self.out_filename)} already exists.') + answer = input("You are about to overwrite an existing file, do you want to continue? Y/N ") + if answer.upper() == 'N': + save = False + elif answer.upper() == 'Y': + save = True + else: + save = True + if save == True: + if verbose: print(f'Saving boundary conditions to : {os.path.join(out_path, self.out_filename)}') + BC_edges.to_netcdf(path = os.path.join(out_path, self.out_filename), mode = 'w') \ No newline at end of file diff --git a/acrg/convert.py b/acrg/convert.py index 37f2bdee..57527d61 100644 --- a/acrg/convert.py +++ b/acrg/convert.py @@ -8,6 +8,8 @@ acrg_path = Paths.acrg +import numpy as np + def molar_mass(species): ''' This function extracts the molar mass of a species from the acrg_species_info.json file. @@ -69,5 +71,4 @@ def convert_lons_0360(lons): div = lons // 360 - return lons - div*360 - + return lons - div*360 \ No newline at end of file diff --git a/acrg/hbmcmc/hbmcmc.py b/acrg/hbmcmc/hbmcmc.py index f257757b..358788c3 100644 --- a/acrg/hbmcmc/hbmcmc.py +++ b/acrg/hbmcmc/hbmcmc.py @@ -36,6 +36,8 @@ def fixedbasisMCMC(species, sites, domain, meas_period, start_date, end_date, outputpath, outputname, met_model = None, + species_footprint = None, + HiTRes = False, xprior={"pdf":"lognormal", "mu":1, "sd":1}, bcprior={"pdf":"lognormal", "mu":0.004, "sd":0.02}, sigprior={"pdf":"uniform", "lower":0.5, "upper":3}, @@ -71,6 +73,11 @@ def fixedbasisMCMC(species, sites, domain, meas_period, start_date, End time of inversion "YYYY-mm-dd" outputname (str): Unique identifier for output/run name. + species_footprint (str, optional): + species of the footprint to be imported, if different to the species + of interest (e.g. import co2 footprints for HiTRes studies) + HiTRes (bool) + True if using HiTRes footprints outputpath (str): Path to where output should be saved. xprior (dict): @@ -174,15 +181,23 @@ def fixedbasisMCMC(species, sites, domain, meas_period, start_date, TO DO: Add a wishlist... """ + keep_missing = True if HiTRes else False + if verbose and species_footprint is not None: + print(f'species_footprint: {species_footprint}') + data = getobs.get_obs(sites, species, start_date = start_date, end_date = end_date, average = meas_period, data_directory=obs_directory, - keep_missing=False,inlet=inlet, instrument=instrument, max_level=max_level) - fp_all = name.footprints_data_merge(data, domain=domain, met_model = met_model, calc_bc=True, - height=fpheight, + keep_missing=keep_missing,inlet=inlet, instrument=instrument, + max_level=max_level) + fp_all = name.footprints_data_merge(data, domain=domain, met_model = met_model, calc_bc=True, + HiTRes = HiTRes, + height = fpheight, + calc_timeseries = False, fp_directory = fp_directory, bc_directory = bc_directory, flux_directory = flux_directory, - emissions_name=emissions_name) + emissions_name = emissions_name, + species_footprint = species_footprint) for site in sites: for j in range(len(data[site])): @@ -221,10 +236,15 @@ def fixedbasisMCMC(species, sites, domain, meas_period, start_date, basis_directory = tempdir else: basis_directory = basis_directory - - fp_data = name.fp_sensitivity(fp_all, domain=domain, basis_case=fp_basis_case,basis_directory=basis_directory) - fp_data = name.bc_sensitivity(fp_data, domain=domain,basis_case=bc_basis_case) + fp_data = name.fp_sensitivity(fp_all, domain=domain, basis_case=fp_basis_case, basis_directory=basis_directory, + calc_timeseries = True) + fp_data = name.bc_sensitivity(fp_data, domain=domain, basis_case=bc_basis_case) + + if HiTRes: + for site in sites: + fp_data[site] = fp_data[site].dropna(dim='time') + #apply named filters to the data fp_data = name.filtering(fp_data, filters) diff --git a/acrg/hbmcmc/inversion_pymc3.py b/acrg/hbmcmc/inversion_pymc3.py index 14680ead..9130bd6c 100644 --- a/acrg/hbmcmc/inversion_pymc3.py +++ b/acrg/hbmcmc/inversion_pymc3.py @@ -331,10 +331,6 @@ def inferpymc3_postprocessouts(xouts,bcouts, sigouts, convergence, with {source_name: emissions_file_identifier} (e.g. {'anth':'co2-ff-mth'}). This way multiple sources can be read in simultaneously if they are added as separate entries to the emissions_name dictionary. - If using HiTRes footprints, both the high and low frequency emissions files must be specified - in a second dictionary like so: {'anth': {'high_freq':'co2-ff-2hr', 'low_freq':'co2-ff-mth'}}. - It is not a problem to have a mixture of sources, with some that use HiTRes footprints and some - that don't. basis_directory (str, optional): Directory containing basis function file country_file (str, optional): @@ -407,10 +403,8 @@ def inferpymc3_postprocessouts(xouts,bcouts, sigouts, convergence, # it varies over the inversion period emissions_flux = np.expand_dims(rerun_file.fluxapriori.values,2) else: - if emissions_name == None: - emds = name.flux(domain, species, start = start_date, end = end_date, flux_directory=flux_directory) - else: - emds = name.flux(domain, list(emissions_name.values())[0], start = start_date, end = end_date, flux_directory=flux_directory) + spec = species if emissions_name == None else list(emissions_name.values())[0] + emds = name.flux(domain=domain, species=spec, start=start_date, end=end_date, flux_directory=flux_directory) emissions_flux = emds.flux.values flux = scalemap*emissions_flux[:,:,0] diff --git a/acrg/name/flux.py b/acrg/name/flux.py index 056ea7e0..7478a031 100644 --- a/acrg/name/flux.py +++ b/acrg/name/flux.py @@ -26,7 +26,8 @@ def write(lat, lon, time, flux, species, domain, source, title, prior_info_dict, regridder_used = 'acrg_grid.regrid.regrid_3D', copy_from_year = None, climatology = False, flux_comments = None, - output_directory = output_directory): + uncertainty = None, uncertainty_desc=None, units=None, + output_directory = output_directory, overwrite=False): '''Write a flux file for emissions Args: @@ -63,8 +64,18 @@ def write(lat, lon, time, flux, species, domain, Default is False flux_comments (str, optional): Extra comments. Default is None. + uncertainty (array, optional): + Uncertainty in the flux + uncertainty_desc (str, optional): + Description to add to uncertainty attributes + e.g. describing method of calculation output_directory (str, optional): Output directory. Default is 'data_path + LPDM/emissions/'. + units (str, optional) + Units of data, defaults to flux units of mol/m2/s + overwrite (bool, optional) + If True, automatically overwrite files with the same species, source, and year + defaults to False - will ask for user input to overwrite file Returns: None @@ -88,25 +99,19 @@ def write(lat, lon, time, flux, species, domain, Todo: Add some error checking (e.g. check that domain is correct) ''' - - print("WARNING: Make sure time stamp is start of time period (i.e. 1st of month\ - for monthly data or 1st January for yearly data).") - print("WARNING: Make sure coordinates are centre of the gridbox.") - print("WARNING: Make sure fluxes are in mol/m2/s.") + units = 'mol/m2/s' if units is None else units - if climatology == True: - name_climatology = "-climatology" + print("WARNING: Make sure time stamp is start of time period (i.e. 1st of month "+\ + "for monthly data or 1st January for yearly data).") + print("WARNING: Make sure coordinates are centre of the gridbox.") + if units =='mol/m2/s': + print("WARNING: Make sure fluxes are in mol/m2/s.") else: - name_climatology = "" + print(f"Warning: Saving data in units of {units}") - if source == None: - file_source = species + name_climatology+ '-total' - source_name = file_source - else: - file_source = species + name_climatology + '-' + source - source_name = file_source - - file_source = file_source.lower() + name_climatology = "-climatology" if climatology == True else "" + file_source = f'{species}{name_climatology}-total' if source==None else \ + f'{species}{name_climatology}-{source}' species = species.lower() # Check that the flux is in the correct shape @@ -117,47 +122,46 @@ def write(lat, lon, time, flux, species, domain, #Set climatology to year 1900 if climatology == True: - if len(time) == 1: - time = [np.datetime64('1900-01-01')] - elif len(time) == 12: - time = np.arange('1900-01', '1901-01', dtype='datetime64[M]') - else: + if len(time) not in [1, 12]: sys.exit('Expecting either yearly or monthly climatology. Make sure time dimension is of size 1 or 12.') + time = [np.datetime64('1900-01-01')] if len(time) == 1 else \ + np.arange('1900-01', '1901-01', dtype='datetime64[M]') if type(time[0]) == np.datetime64: - #time=time - if isinstance(time,np.ndarray): - time = time.astype(dtype="datetime64[ns]") - elif isinstance(time,list): - time = [t.astype("datetime64[ns]") for t in time] - else: - time = time + time = time.astype(dtype="datetime64[ns]") if isinstance(time, np.ndarray) else \ + [t.astype("datetime64[ns]") for t in time] if isinstance(time, list) else \ + time else: - sys.exit('Time format not correct, needs to be a list of type numpy.datetime64. A DatetimeIndex will not work.\ - To convert a DatetimeIndex to correct format: time = [np.datetime64(i) for i in DatetimeIndex]') + sys.exit('Time format not correct, needs to be a list of type numpy.datetime64. A DatetimeIndex will not work. ' + \ + 'To convert a DatetimeIndex to correct format: time = [np.datetime64(i) for i in DatetimeIndex]') if not os.path.exists(output_directory): raise IOError("Unable to write file to specified output directory. Does not exist: {}.".format(output_directory)) - #Open netCDF file + # Open netCDF file year = pd.DatetimeIndex([time[0]]).year[0] - if copy_from_year != None: - ncname = os.path.join(output_directory,domain,f"{file_source}_{domain}_{year}_copy-from-{copy_from_year}.nc") - elif climatology == True: - ncname = os.path.join(output_directory,domain,f"{file_source}_{domain}.nc") - else: - ncname = os.path.join(output_directory,domain,f"{file_source}_{domain}_{year}.nc") + ncdesc = f'_{year}_copy-from-{copy_from_year}' if copy_from_year is not None else '' if climatology else f'_{year}' + ncname = os.path.join(output_directory,domain,f"{file_source.lower()}_{domain}{ncdesc}.nc") - if os.path.isfile(ncname) == True: + if os.path.isfile(ncname) == True and not overwrite: answer = input("You are about to overwrite an existing file, do you want to continue? Y/N ") - if answer == 'N': + if answer.lower() == 'n': sys.exit() - elif answer == 'Y': + elif answer.lower() == 'y': pass + elif os.path.isfile(ncname) == True and overwrite: + print(f'Overwriting existing file: {ncname}') - flux_attrs = {"source" : source_name, - "units" : 'mol/m2/s', - "species" : species} + flux_attrs = {"source" : file_source, + "units" : units, + "species" : species} + + if uncertainty is not None: + uncertainty_desc = "" if uncertainty_desc is None else uncertainty_desc + unc_attrs = {"source" : file_source, + "units" : units, + "species" : species, + "description": uncertainty_desc} lat_attrs = {"long_name" : "latitude", "units" : "degrees_north", @@ -167,7 +171,6 @@ def write(lat, lon, time, flux, species, domain, "units" : "degrees_east", "notes" : "centre of cell"} - glob_attrs = c.OrderedDict([("title",title), ("author" , getpass.getuser()), ("date_created" , np.str(dt.datetime.today())), @@ -175,22 +178,22 @@ def write(lat, lon, time, flux, species, domain, for i, key in enumerate(prior_info_dict.keys()): prior_number = i+1 - glob_attrs['prior_file_' + str(prior_number)] = key - glob_attrs['prior_file_' + str(prior_number)+'_version'] = prior_info_dict[key][0] - glob_attrs['prior_file_' + str(prior_number)+'_raw_resolution']=prior_info_dict[key][1] - glob_attrs['prior_file_' + str(prior_number)+'_reference']=prior_info_dict[key][2] - - glob_attrs["regridder_used"]= regridder_used + glob_attrs[f'prior_file_{str(prior_number)}'] = key + glob_attrs[f'prior_file_{str(prior_number)}_version'] = prior_info_dict[key][0] + glob_attrs[f'prior_file_{str(prior_number)}_raw_resolution'] = prior_info_dict[key][1] + glob_attrs[f'prior_file_{str(prior_number)}_reference'] = prior_info_dict[key][2] - if flux_comments != None: - glob_attrs['comments'] = flux_comments - if copy_from_year != None: - glob_attrs['comments'] = f"Fluxes copied from year {copy_from_year}. {flux_comments}" + glob_attrs["regridder_used"] = regridder_used - elif copy_from_year != None: - glob_attrs['comments'] = f"Fluxes copied from year {copy_from_year}." + if flux_comments is not None or copy_from_year is not None: + flux_comments = '' if flux_comments is None else flux_comments + glob_attrs['comments'] = flux_comments if copy_from_year is None else \ + f"Fluxes copied from year {copy_from_year}. {flux_comments}" - flux_ds = xray.Dataset({'flux':(['lat','lon','time'], flux, flux_attrs)}, + flux_dict = {'flux':(['lat','lon','time'], flux, flux_attrs)} + if uncertainty is not None: + flux_dict['uncertainty'] = (['lat','lon','time'], uncertainty, unc_attrs) + flux_ds = xray.Dataset(flux_dict, coords = {'lat' : lat, 'lon' : lon, 'time' : time}, @@ -204,6 +207,7 @@ def write(lat, lon, time, flux, species, domain, print(f"Creating {domain} subdirectory in output directory: {output_directory}") os.makedirs(os.path.join(output_directory,domain)) + print(f'Saving to {ncname}') flux_ds.flux.encoding = {'zlib':True} flux_ds.to_netcdf(ncname, mode='w') diff --git a/acrg/name/name.py b/acrg/name/name.py index efc31409..69c7cbce 100644 --- a/acrg/name/name.py +++ b/acrg/name/name.py @@ -3,25 +3,25 @@ Created on Mon Nov 10 10:45:51 2014 """ -import numpy as np -import matplotlib as mpl -import matplotlib.pyplot as plt -from matplotlib.ticker import MaxNLocator -from matplotlib import ticker -import datetime as dt import os import sys import glob -import pandas as pd import bisect +import cartopy +import calendar import subprocess -from os.path import join +import numpy as np +import pandas as pd import xarray as xr -import calendar -import dateutil.relativedelta +import datetime as dt +from tqdm import tqdm +import dask.array as da +from os.path import join import cartopy.crs as ccrs -import cartopy -from collections import OrderedDict +from matplotlib import ticker +import matplotlib.pyplot as plt +from matplotlib.ticker import MaxNLocator +from dateutil.relativedelta import relativedelta from acrg.time import convert import acrg.obs as obs @@ -32,9 +32,6 @@ from openghg_defs import site_info_file from openghg_defs import species_info_file -from tqdm import tqdm -import dask.array as da - acrg_path = Paths.acrg data_path = Paths.data @@ -68,6 +65,37 @@ def open_ds(path, chunks=None, combine=None): ds.load() return ds +def filter_files_by_date(files, start, end): + ''' + Filter files for those in the start-end date range + Avoids loading in data that is not needed + + Args: + files (list) : + files which are to be filtered + start, end (str) : + start and end of the date range required + + Returns: + files (list) : + list of files which are within the start-end date range + if no files are found within this date range then the original file list is returned + ''' + # extract the year and date from the filename and convert to a pandas datetime, if it is a climatology set date string to 1900 + f_date_str = {ff: ff.split('_')[-1].split('.')[0] for ff in files} + f_date = {ff: pd.to_datetime('-'.join([f_d[:4], f_d[4:]])) if is_number(f_d) and len(f_d)==6 else + pd.to_datetime(f_d) if is_number(f_d) and len(f_d)==4 else 1900 + for ff, f_d in f_date_str.items()} + + # check for files for which the filename dates are within the start-end time period + files_lim = [ff for ff, f_d in f_date.items() if f_d in np.arange(start, end, dtype='datetime64[M]') and len(f_date_str[ff])==6 or + pd.to_datetime(str(f_d)) in np.arange(f'{str(pd.to_datetime(start).year)}-01-01', + f'{str(pd.to_datetime(end).year+1)}-01-01', dtype='datetime64[Y]') or f_d==1900] + + # if no files are found for the required time period then data will be sliced below + files = files_lim if len(files_lim)>0 else files + + return files def filenames(site, domain, start, end, height, fp_directory, met_model = None, network=None, species=None): """ @@ -90,8 +118,7 @@ def filenames(site, domain, start, end, height, fp_directory, met_model = None, height (str) : Height related to input data. fp_directory (str) : - fp_directory can be specified if files are not in the default directory must point to a directory - which contains subfolders organized by domain. + a directory containing subfolders organized by domain, if files are not in the default directory met_model (str): Met model used to run NAME Default is None and implies the standard global met model @@ -141,35 +168,21 @@ def filenames(site, domain, start, end, height, fp_directory, met_model = None, files = [] for ym in yearmonth: - - if species: - - if met_model: - f=glob.glob(os.path.join(fp_directory,domain,f"{site}-{height}_{met_model}_{species}_{domain}_{ym}*.nc")) - else: - f=glob.glob(os.path.join(fp_directory,domain,f"{site}-{height}_{species}_{domain}_{ym}*.nc")) - - - else: - #manually create empty list if no species specified - f = [] - - if len(f) == 0: - - if met_model: - glob_path = os.path.join(fp_directory,domain,f"{site}-{height}_{met_model}_{domain}_{ym}*.nc") - else: - glob_path = os.path.join(fp_directory,domain,f"{site}-{height}_{domain}_{ym}*.nc") - - if lifetime_hrs is None: - print("No lifetime defined in species_info.json or species not defined. WARNING: 30-day integrated footprint used without chemical loss.") - f=glob.glob(glob_path) - elif lifetime_hrs <= 1440: + met_str = f'_{met_model}' if met_model else '' + spec_str = f'_{species}' if species else '' + + glob_path = os.path.join(fp_directory,domain,f"{site}-{height}{met_str}{spec_str}_{domain}_{ym}*.nc") + + if species is None and lifetime_hrs is not None: + if lifetime_hrs <= 1440: print("This is a short-lived species. Footprints must be species specific. Re-process in process.py with lifetime") return [] else: print("Treating species as long-lived.") - f=glob.glob(glob_path) + elif species is None and lifetime_hrs is None: + print("No lifetime defined in species_info.json or species not defined. WARNING: 30-day integrated footprint used without chemical loss.") + + f = glob.glob(glob_path) if len(f) > 0: files += f @@ -250,11 +263,8 @@ def footprints(sitecode_or_filename, met_model = None, fp_directory = None, {"MHD":'UKV', "JFJ":None} or {"MHD":'UKV', "TAC":"UKV"} {"MHD":None, "TAC":None} is equivalent to met_model = None {"MHD":'UKV', "TAC":'UKV'} is equivalent to met_model = 'UKV' - fp_directory (str/dict) : - If the high time resolution footprints are used (HiTRes = True) fp_directory must be a dictionary - of the form : - fp_directory = {"integrated":PATH_TO_INTEGRATED_FP,"HiTRes":PATH_TO_HIGHTRES_FP} - otherwise can be a single string if only integrated FPs are used and non-default. + fp_directory (str) : + a directory containing subfolders organized by domain, if files are not in the default directory start (str) : Start date in format "YYYY-MM-DD" for range of files to find. end (str) : @@ -288,38 +298,31 @@ def footprints(sitecode_or_filename, met_model = None, fp_directory = None, # into an annual footprint file in (mol/mol) / (mol/m2/s) # using acrg_name_process - if fp_directory is None: - fp_integrated_directory = join(data_path, 'LPDM/fp_NAME/') - fp_HiTRes_directory = join(data_path,'LPDM/fp_NAME_high_time_res/') - fp_directory = {'integrated': fp_integrated_directory, - 'HiTRes': fp_HiTRes_directory} - - # Error message if looking for HiTRes files and fp_directory is not a dictionary - if HiTRes is True: - if type(fp_directory) is not dict: - print("fp_directory needs to be a dictionary containing paths \ - to integrated and HiTRes footprints \ - {integrated:path1, HiTRes:path2}") - return None + fp_directory = join(data_path, 'LPDM/fp_NAME/') if fp_directory is None else fp_directory + + if HiTRes and met_model=='UKV': + print('Finding high time resolution footprints, setting species = co2') + species = 'co2' if '.nc' in sitecode_or_filename: if not '/' in sitecode_or_filename: files = [os.path.join(fp_directory, sitecode_or_filename)] else: - files=[sitecode_or_filename] + files = [sitecode_or_filename] else: site=sitecode_or_filename[:] - # Finds integrated footprints if specified as a dictionary with multiple entries (HiTRes = True) - # or a string with one entry - if type(fp_directory) is dict: - files = filenames(site, domain, start, end, height, fp_directory["integrated"], met_model = met_model, network=network, species=species) - else: - files = filenames(site, domain, start, end, height, fp_directory, met_model = met_model, network=network, species=species) + # Finds integrated footprints if specified as a dictionary with multiple entries (HiTRes = True) + files = filenames(site, domain, start, end, height, fp_directory, met_model = met_model, network=network, species=species) + + if len(files)==0 and species is not None: + print(f"\nWarning: Can't find {species} footprint files for {sitecode_or_filename}. " \ + + "Looking for footprints without a defined species.\n") + files = filenames(site, domain, start, end, height, fp_directory, met_model = met_model, network=network, species=None) - if len(files) == 0: - print(f"\nWarning: Can't find footprint files for {sitecode_or_filename}. \ - This site will not be included in the output dictionary.\n") + if len(files)==0: + print(f"\nWarning: Can't find footprint files for {sitecode_or_filename}. " \ + + "This site will not be included in the output dictionary.\n") return None else: @@ -328,7 +331,8 @@ def footprints(sitecode_or_filename, met_model = None, fp_directory = None, return fp -def flux(domain, species, start = None, end = None, flux_directory=None, chunk=True, verbose=True): +def flux(domain, species, start = None, end = None, flux_directory=None, + HiTRes=False, H_back=None, chunks=None, verbose=True, test=False): """ The flux function reads in all flux files for the domain and species as an xarray Dataset. Note that at present ALL flux data is read in per species per domain or by emissions name. @@ -360,10 +364,14 @@ def flux(domain, species, start = None, end = None, flux_directory=None, chunk=T flux_directory (str, optional) : flux_directory can be specified if files are not in the default directory. Must point to a directory which contains subfolders organized by domain. + HiTRes (bool, optional) : + True if importing HiTRes fluxes + H_back (int, optional) : + hours back of data to collect if using the HiTRes processes + if HiTRes=True, by default H_back=24 Returns: xarray.Dataset : combined dataset of all matching flux files """ - if flux_directory is None: flux_directory = join(data_path, 'LPDM/emissions/') @@ -382,9 +390,14 @@ def flux(domain, species, start = None, end = None, flux_directory=None, chunk=T if len(files) > 0: break else: - raise IOError("\nError: Can't find flux files for domain '{0}' and species '{1}' (using search: {2})".format(domain,species,species_search_list)) + raise IOError(f"\nError: Can't find flux files for domain '{domain}' and species '{species}' (using search: {species_search_list})") + + if start and end and 'climatology' not in species: + # if getting fluxes for HiTRes data import data for the month prior to start + start_flux = start if not HiTRes else \ + (pd.to_datetime(start)-relativedelta(months=1)).strftime('%Y-%m-%d') + files = filter_files_by_date(files, start_flux, end) - chunks = None if start == None or end == None or not chunk else {'lat': 50, 'lon': 50} flux_ds = read_netcdfs(files, chunks=chunks) # Check that time coordinate is present if not "time" in list(flux_ds.coords.keys()): @@ -392,8 +405,8 @@ def flux(domain, species, start = None, end = None, flux_directory=None, chunk=T # Check for level coordinate. If one level, assume surface and drop if "lev" in list(flux_ds.coords.keys()): - print(f"WARNING: Can't support multi-level fluxes. Trying to remove 'lev' coordinate \ - from flux dataset for {domain}, {species}") + print(f"WARNING: Can't support multi-level fluxes. Trying to remove 'lev' coordinate" \ + + f" from flux dataset for {domain}, {species}") if len(flux_ds.lev) > 1: print("ERROR: More than one flux level") else: @@ -403,46 +416,50 @@ def flux(domain, species, start = None, end = None, flux_directory=None, chunk=T print("To get fluxes for a certain time period you must specify a start or end date.") return flux_ds else: - - #Change timeslice to be the beginning and end of months in the dates specified. + # Change timeslice to be the beginning and end of months in the dates specified. + # Or H_back hours before that for HiTRes + H_back = 24 if H_back is None and HiTRes else H_back start = pd.to_datetime(start) - month_start = dt.datetime(start.year, start.month, 1, 0, 0) + month_start = dt.datetime(start.year, start.month, 1, 0, 0) if not test else \ + dt.datetime(start.year, start.month, start.day, 0, 0) + month_start = month_start if not HiTRes else month_start - dt.timedelta(hours = H_back) end = pd.to_datetime(end) - month_end = dt.datetime(end.year, end.month, 1, 0, 0) - \ - dt.timedelta(seconds = 1) + month_end = dt.datetime(end.year, end.month, 1, 0, 0) if not test else \ + dt.datetime(end.year, end.month, end.day, 0, 0) + month_end = month_end - dt.timedelta(seconds = 1) if 'climatology' in species: ndate = pd.to_datetime(flux_ds.time.values) - if len(ndate) == 1: #If it's a single climatology value - dateadj = ndate - month_start #Adjust climatology to start in same year as obs - else: #Else if a monthly climatology - dateadj = ndate[month_start.month-1] - month_start #Adjust climatology to start in same year as obs + # Adjust climatology to start in same year as obs + dateadj = ndate - month_start if len(ndate) == 1 else \ + ndate[ndate.month==month_start.month][0] - month_start ndate = ndate - dateadj flux_ds = flux_ds.update({'time' : ndate}) flux_tmp = flux_ds.copy() - while month_end > ndate[-1]: - ndate = ndate + pd.DateOffset(years=1) + while pd.to_datetime(month_end.strftime('%Y-%m')) > ndate[-1]: + ndate = ndate + pd.DateOffset(years=1) flux_ds = xr.merge([flux_ds, flux_tmp.update({'time' : ndate})]) flux_timeslice = flux_ds.sel(time=slice(month_start, month_end)) - if np.logical_and(month_start.year != month_end.year, len(flux_timeslice.time) != dateutil.relativedelta.relativedelta(end, start).months): + if np.logical_and(month_start.year != month_end.year, len(flux_timeslice.time) != relativedelta(end, start).months) and not HiTRes: month_start = dt.datetime(start.year, 1, 1, 0, 0) flux_timeslice = flux_ds.sel(time=slice(month_start, month_end)) if len(flux_timeslice.time)==0: + # if there is no data between start and end, ffill data from the last available date flux_timeslice = flux_ds.sel(time=start, method = 'ffill') flux_timeslice = flux_timeslice.expand_dims('time',axis=-1) - print("Warning: No fluxes available during the time period specified so outputting\ - flux from %s" %flux_timeslice.time.values[0]) + print("Warning: No fluxes available during the time period specified so outputting"\ + + f"flux from {flux_timeslice.time.values[0]}") else: if verbose: - print("Slicing time to range {} - {}".format(month_start,month_end)) + print(f"Slicing time to range {month_start} - {month_end}") - flux_timeslice = flux_timeslice.compute() return flux_timeslice -def flux_for_HiTRes(domain, emissions_dict, start=None, end=None, flux_directory=None, verbose=True): +def flux_for_HiTRes(domain, emissions_dict, start=None, end=None, H_back=None, flux_directory=None, + chunks=None, verbose=True, test=False): """ Creates a dictionary of high and low frequency fluxes for use with HiTRes footprints. @@ -451,8 +468,10 @@ def flux_for_HiTRes(domain, emissions_dict, start=None, end=None, flux_directory Domain name. The flux files should be sub-categorised by the domain. emissions_dict (dict) : This should be a dictionary of the form: - {'high_freq':high_freq_emissions_name, 'low_freq':low_freq_emissions_name} + {'high_freq': high_freq_emissions_name, 'low_freq': low_freq_emissions_name} e.g. {'high_freq':'co2-ff-2hr', 'low_freq':'co2-ff-mth'}. + Or it can be the emissions names for different sources + e.g. {'anth': {'high_freq':'co2-ff-2hr', 'low_freq':'co2-ff-mth'}} start (str, optional) : Start date in format "YYYY-MM-DD" to output only a time slice of all the flux files. The start date used will be the first of the input month. I.e. if "2014-01-06" is input, @@ -466,35 +485,46 @@ def flux_for_HiTRes(domain, emissions_dict, start=None, end=None, flux_directory flux_directory (str, optional) : flux_directory can be specified if files are not in the default directory. Must point to a directory which contains subfolders organized by domain. + H_back (int, optional) : + hours back of data to collect, by default H_back=24 Returns: - dictionary {'high_freq': flux_xray_dataset, 'low_freq': flux_xray_dataset} : - Dictionary containing xray datasets for both high and low frequency fluxes. + dictionary: + Dictionary containing xarray Datasets for both high and low frequency fluxes for the + dates required and 1 month prior to start + {'high_freq': xarray.Dataset, 'low_freq': xarray.Dataset} """ - if 'low_freq' not in list(emissions_dict.keys()): - print("Warning: low_freq must be a key in the emissions_dict in order to combine with HiTRes footprints.") - if 'high_freq' not in list(emissions_dict.keys()): - print("Warning: high_freq must be a key in the emissions_dict in order to use HiTRes footprints.") - flux_dict = {} - if start: - #Get the month before the one one requested because this will be needed for the first few - #days in timeseries_HiTRes to calculate the modleed molefractions for times when the footprints - #are in the previous month. - start = str(pd.to_datetime(start) - dateutil.relativedelta.relativedelta(months=1)) - - if 'high_freq' in list(emissions_dict.keys()): - flux_dict['high_freq'] = flux(domain, emissions_dict['high_freq'], start = start, end = end, - flux_directory=flux_directory, verbose=verbose) - if 'low_freq' in list(emissions_dict.keys()): - flux_dict['low_freq'] = flux(domain, emissions_dict['low_freq'], start = start, end = end, - flux_directory=flux_directory, verbose=verbose) + # if start: + # # Get the month before the one one requested because this will be needed for the first few + # # days in timeseries_HiTRes to calculate the modleed molefractions for times when the footprints + # # are in the previous month. + # start = str(pd.to_datetime(start) - relativedelta(months=1)) + # print(f'start: {start}') + H_back = 24 if H_back is None else H_back + if verbose and all([start, end]): + print(f'Finding fluxes between {H_back} hours back from {start} and {end}') + + emissions_dict = {'no_source': emissions_dict} if any([freq in emissions_dict.keys() for freq in ['high_freq', 'low_freq']]) else \ + emissions_dict + + for source, em_source in emissions_dict.items(): + flux_dict[source] = {} + for freq in ['low_freq', 'high_freq']: + if freq not in list(em_source.keys()): + print(f"Warning: {freq} key not found in emissions_dict.") + else: + flux_dict[source][freq] = flux(domain, em_source[freq], start = start, end = end, + flux_directory = flux_directory, HiTRes=True, H_back=H_back, + chunks = chunks, verbose = verbose, test=test) + + flux_dict = flux_dict['no_source'] if list(flux_dict.keys())==['no_source'] else flux_dict return flux_dict -def boundary_conditions(domain, species, start = None, end = None, bc_directory=None): +def boundary_conditions(domain, species, start = None, end = None, bc_directory=None, chunks=None): """ The boundary_conditions function reads in the files with the global model vmrs at the domain edges to give the boundary conditions as an xarray Dataset. @@ -521,35 +551,41 @@ def boundary_conditions(domain, species, start = None, end = None, bc_directory= bc_directory (str, optional) : bc_directory can be specified if files are not in the default directory. Must point to a directory which contains subfolders organized by domain. + chunks (dict, optional): + size of chunks to load data using dask e.g. {'time': 50} Returns: xarray.Dataset : Combined dataset of matching boundary conditions files """ - if bc_directory is None: - bc_directory = join(data_path, 'LPDM/bc/') + bc_directory = join(data_path, 'LPDM/bc/') if bc_directory is None else bc_directory filenames = os.path.join(bc_directory,domain,f"{species.lower()}_*.nc") files = sorted(glob.glob(filenames)) file_no_acc = [ff for ff in files if not os.access(ff, os.R_OK)] files = [ff for ff in files if os.access(ff, os.R_OK)] + if len(file_no_acc)>0: print('Warning: unable to read all boundary conditions files which match this criteria:') [print(ff) for ff in file_no_acc] if len(files) == 0: - print("Cannot find boundary condition files in {}".format(filenames)) + print(f"Cannot find boundary condition files in {filenames}") raise IOError(f"\nError: Cannot find boundary condition files for domain '{domain}' and species '{species}' ") - bc_ds = read_netcdfs(files) - if start == None or end == None: print("To get boundary conditions for a certain time period you must specify an end date.") + bc_ds = read_netcdfs(files, chunks=chunks) return bc_ds else: - #Change timeslice to be the beginning and end of months in the dates specified. + if 'climatology' not in species: + files = filter_files_by_date(files, start, end) + + bc_ds = read_netcdfs(files, chunks=chunks) + + # Change timeslice to be the beginning and end of months in the dates specified. start = pd.to_datetime(start) month_start = dt.datetime(start.year, start.month, 1, 0, 0) @@ -561,8 +597,8 @@ def boundary_conditions(domain, species, start = None, end = None, bc_directory= if len(bc_timeslice.time)==0: bc_timeslice = bc_ds.sel(time=start, method = 'ffill') bc_timeslice = bc_timeslice.expand_dims('time',axis=-1) - print(f"No boundary conditions available during the time period specified so outputting\ - boundary conditions from {bc_timeslice.time.values[0]}") + print(f"No boundary conditions available during the time period specified so outputting " + \ + f"boundary conditions from {bc_timeslice.time.values[0]}") return bc_timeslice @@ -597,14 +633,13 @@ def basis(domain, basis_case, basis_directory = None): files = sorted(glob.glob(file_path)) if len(files) == 0: - raise IOError(f"\nError: Can't find basis function files for domain '{domain}' \ - and basis_case '{basis_case}' ") + raise IOError(f"\nError: Can't find basis function files for domain '{domain}' and basis_case '{basis_case}' ") basis_ds = read_netcdfs(files) return basis_ds -def basis_boundary_conditions(domain, basis_case, bc_basis_directory = None): +def basis_boundary_conditions(domain, basis_case, bc_basis_directory=None, start=None, end=None): """ The basis_boundary_conditions function reads in all matching files for the boundary conditions basis case and domain as an xarray Dataset. @@ -633,7 +668,10 @@ def basis_boundary_conditions(domain, basis_case, bc_basis_directory = None): file_path = os.path.join(bc_basis_directory,domain,f"{basis_case}_{domain}*.nc") - files = sorted(glob.glob(file_path)) + files = sorted(glob.glob(file_path)) + if start is not None and end is not None: + files = filter_files_by_date(files, start, end) + file_no_acc = [ff for ff in files if not os.access(ff, os.R_OK)] files = [ff for ff in files if os.access(ff, os.R_OK)] if len(file_no_acc)>0: @@ -641,8 +679,7 @@ def basis_boundary_conditions(domain, basis_case, bc_basis_directory = None): [print(ff) for ff in file_no_acc] if len(files) == 0: - raise IOError("\nError: Can't find boundary condition basis function files for domain '{0}' " - "and basis_case '{1}' ".format(domain,basis_case)) + raise IOError(f"\nError: Can't find boundary condition basis function files for domain '{domain}' and basis_case '{basis_case}' ") basis_ds = read_netcdfs(files) @@ -791,7 +828,8 @@ def footprints_data_merge(data, domain, met_model = None, load_flux = True, load bc_directory = None, resample_to_data = False, species_footprint = None, - chunks = False, + H_back = None, + chunks = None, verbose = True): """ @@ -835,12 +873,7 @@ def footprints_data_merge(data, domain, met_model = None, load_flux = True, load in a second dictionary like so: {'anth': {'high_freq':'co2-ff-2hr', 'low_freq':'co2-ff-mth'}}. It is not a problem to have a mixture of sources, with some that use HiTRes footprints and some that don't. - fp_directory : fp_directory must be a dictionary of the form - fp_directory = {"integrated":PATH_TO_INTEGRATED_FP, - "HiTRes":PATH_TO_HIGHTRES_FP} - if the high time resolution footprints are used (HiTRes = True) - otherwise can be a single string if only integrated FPs are used and - non-default. + fp_directory : a directory containing subfolders organized by domain, if files are not in the default directory flux_directory (str) : flux_directory can be specified if files are not in the default directory. Must point to a directory which contains subfolders organized by domain. (optional) @@ -850,6 +883,10 @@ def footprints_data_merge(data, domain, met_model = None, load_flux = True, load If set to None (default), then the footprints and data are sampled to the coarset resolution of the two. (optional) + species_footprint (str) : species associated with the required footprint, if different to that given in data + (optional) + H_back (int) : hours of data required prior to start if using HiTRes processes + optional, if HiTRes=True by default H_back=24 chunks (dict) size of chunks for each dimension of fp e.g. {'lat': 50, 'lon': 50} @@ -862,7 +899,6 @@ def footprints_data_merge(data, domain, met_model = None, load_flux = True, load Dictionary of the form {"MHD": MHD_xarray_dataset, "TAC": TAC_xarray_dataset, ".flux": dictionary_of_flux_datasets, ".bc": boundary_conditions_dataset}: combined dataset for each site """ - sites = [key for key in data] species_list = [] @@ -873,8 +909,8 @@ def footprints_data_merge(data, domain, met_model = None, load_flux = True, load else: species = species_list[0] - species_footprint = species if species_footprint is None else species_footprint - if species_footprint!=species: + species_footprint = species_footprint.lower() if species_footprint is not None else species_footprint + if species_footprint not in [species, None]: print(f'Finding footprints files for {species_footprint}') if load_flux: @@ -884,24 +920,19 @@ def footprints_data_merge(data, domain, met_model = None, load_flux = True, load Setting load_flux to False.") load_flux=False else: - emissions_name = {'all':species} - + emissions_name = {'all': species} # Output array fp_and_data = {} #empty variables to fill with earliest start and latest end dates flux_bc_start = None - flux_bc_end = None - - + flux_bc_end = None for i, site in enumerate(sites): - site_ds_list = [] for site_ds in data[site]: - if network is None: network_site = list(site_info[site].keys())[0] elif not isinstance(network,str): @@ -925,7 +956,6 @@ def footprints_data_merge(data, domain, met_model = None, load_flux = True, load if flux_bc_end == None or flux_bc_end < end: flux_bc_end = end - ## Convert to dataset #site_ds = xr.Dataset.from_dataframe(site_df) if site in list(site_modifier.keys()): @@ -961,7 +991,7 @@ def footprints_data_merge(data, domain, met_model = None, load_flux = True, load site_fp = footprints(site_modifier_fp, met_model = met_model_site, fp_directory = fp_directory, start = start, end = end, domain = domain, - species = species_footprint.lower(), + species = species_footprint, height = height_site, network = network_site, HiTRes = HiTRes, @@ -1016,20 +1046,22 @@ def footprints_data_merge(data, domain, met_model = None, load_flux = True, load # If units are specified, multiply by scaling factor if units: - site_ds.update({'fp' : (site_ds.fp.dims, site_ds.fp.data/units)}) + if 'fp' in site_ds.data_vars: + site_ds.update({'fp' : (site_ds.fp.dims, site_ds.fp.data/units)}) if HiTRes: site_ds.update({'fp_HiTRes' : (site_ds.fp_HiTRes.dims, - site_ds.fp_HiTRes/units)}) + site_ds.fp_HiTRes.data/units)}) site_ds_list += [site_ds] fp_and_data[site] = xr.merge(site_ds_list) if load_flux: - flux_dict = {} basestring = (str, bytes) for source, emiss_source in emissions_name.items(): + if isinstance(emissions_name[source], basestring) and HiTRes: + emissions_name[source] = {'high_freq': emissions_name[source]} if isinstance(emissions_name[source], basestring): flux_dict[source] = flux(domain, emiss_source, start=flux_bc_start, end=flux_bc_end, @@ -1037,24 +1069,25 @@ def footprints_data_merge(data, domain, met_model = None, load_flux = True, load elif isinstance(emissions_name[source], dict): if HiTRes == False: - print("HiTRes is set to False and a dictionary has been found as the emissions_name dictionary value\ - for source %s. Either enter your emissions names as separate entries in the emissions_name\ - dictionary or turn HiTRes to True to use the two emissions files together with HiTRes footprints." %source) - #return None + print("HiTRes is set to False and a dictionary has been found as the emissions_name dictionary value " + + f"for source {source}. Either enter your emissions names as separate entries in the emissions_name " + + "dictionary or turn HiTRes to True to use the two emissions files together with HiTRes footprints.\n" + + "Not calculating timeseries") else: - flux_dict[source] = flux_for_HiTRes(domain, emiss_source, start=flux_bc_start, - end=flux_bc_end, flux_directory=flux_directory, verbose=verbose) + H_back = 24 if H_back is None else H_back + flux_dict[source] = flux_for_HiTRes(domain, emiss_source, start=flux_bc_start, end=flux_bc_end, + H_back=H_back, flux_directory=flux_directory, verbose=verbose, + chunks=chunks) fp_and_data['.flux'] = flux_dict - if load_bc: - bc = boundary_conditions(domain, species, start=flux_bc_start, end=flux_bc_end, bc_directory=bc_directory) + bc = boundary_conditions(domain, species, start=flux_bc_start, end=flux_bc_end, bc_directory=bc_directory, chunks=chunks) if units: fp_and_data['.bc'] = bc/units else: fp_and_data['.bc'] = bc - + # Calculate model time series, if required if calc_timeseries: fp_and_data = add_timeseries(fp_and_data, load_flux, verbose=verbose) @@ -1104,28 +1137,26 @@ def add_timeseries(fp_and_data, load_flux, verbose=True): sources = list(fp_and_data['.flux'].keys()) for site in sites: for source in sources: + mf_name = 'mf_mod' if source=='all' else f'mf_mod_{source}' if type(fp_and_data['.flux'][source]) == dict: # work out the resolution of the fp, to use to estimate the timeseries - fp_time = (fp_and_data[site].time[1] - fp_and_data[site].time[0]).values.astype('timedelta64[h]').astype(int) + fp_time = fp_and_data[site]["time"].diff(dim="time").values.mean().astype('timedelta64[h]').astype('int') # estimate the timeseries - fp_and_data[site]['mf_mod_'+source] = timeseries_HiTRes(fp_HiTRes_ds = fp_and_data[site], - flux_dict = fp_and_data['.flux'][source], - output_fpXflux = False, - verbose = verbose, - time_resolution = f'{fp_time}H', - output_type = 'DataArray') + fp_and_data[site][mf_name] = timeseries_HiTRes(fp_HiTRes_ds = fp_and_data[site], + flux_dict = fp_and_data['.flux'][source], + output_fpXflux = False, + verbose = verbose, + time_resolution = f'{fp_time}H', + output_type = 'DataArray') # use forward fill to replace nans - fp_and_data[site]['mf_mod_'+source] = fp_and_data[site]['mf_mod_'+source].ffill(dim='time') + fp_and_data[site][mf_name] = fp_and_data[site][mf_name].ffill(dim='time') else: flux_reindex = fp_and_data['.flux'][source].reindex_like(fp_and_data[site], 'ffill') - if source == 'all': - fp_and_data[site]['mf_mod'] = xr.DataArray((fp_and_data[site].fp*flux_reindex.flux).sum(["lat", "lon"]), coords = {'time':fp_and_data[site].time}) - else: - fp_and_data[site]['mf_mod_'+source] = xr.DataArray((fp_and_data[site].fp*flux_reindex.flux).sum(["lat", "lon"]), coords = {'time':fp_and_data[site].time}) + fp_and_data[site][mf_name] = xr.DataArray((fp_and_data[site].fp*flux_reindex.flux).sum(["lat", "lon"]), coords = {'time':fp_and_data[site].time}) return fp_and_data -def add_bc(fp_and_data, load_bc, species): +def add_bc(fp_and_data, load_bc, species, bc=None): """ Add boundary condition mole fraction values in footprint_data_merge Boundary conditions are multipled by any loss (exp(-t/lifetime)) for the species @@ -1148,7 +1179,8 @@ def add_bc(fp_and_data, load_bc, species): sites = [key for key in list(fp_and_data.keys()) if key[0] != '.'] for site in sites: - bc_reindex = fp_and_data['.bc'].reindex_like(fp_and_data[site], 'ffill') + bc_reindex = bc.reindex_like(fp_and_data[site], 'ffill') if bc is not None else \ + fp_and_data['.bc'].reindex_like(fp_and_data[site], 'ffill') if 'lifetime' in species_info[species_obs].keys(): lifetime = species_info[species_obs]["lifetime"] @@ -1181,7 +1213,7 @@ def add_bc(fp_and_data, load_bc, species): def fp_sensitivity(fp_and_data, domain, basis_case, - basis_directory = None, verbose=True): + basis_directory = None, calc_timeseries=False, verbose=True): """ The fp_sensitivity function adds a sensitivity matrix, H, to each site xarray dataframe in fp_and_data. @@ -1191,71 +1223,88 @@ def fp_sensitivity(fp_and_data, domain, basis_case, Region numbering must start from 1 Args: - fp_and_data (dict) : Output from footprints_data_merge() function. Dictionary of datasets. - domain (str) : Domain name. The footprint files should be sub-categorised by the domain. - basis_case : Basis case to read in. Examples of basis cases are "NESW","stratgrad". - String if only one basis case is required. Dict if there are multiple - sources that require separate basis cases. In which case, keys in dict should - reflect keys in emissions_name dict used in fp_data_merge. - basis_directory (str) : basis_directory can be specified if files are not in the default - directory. Must point to a directory which contains subfolders organized - by domain. (optional) + fp_and_data (dict): + Output from footprints_data_merge() function. Dictionary of datasets. + domain (str): + Domain name. The footprint files should be sub-categorised by the domain. + basis_case: + Basis case to read in. Examples of basis cases are "NESW","stratgrad". + String if only one basis case is required. Dict if there are multiple + sources that require separate basis cases. In which case, keys in dict should + reflect keys in emissions_name dict used in fp_data_merge. + basis_directory (str): + basis_directory can be specified if files are not in the default + directory. Must point to a directory which contains subfolders organized + by domain. (optional) + calc_timeseries (bool): + if True, calculate the mf timeseries and add to the fpdm object, + default is False Returns: - dict (xarray.Dataset) : Same format as fp_and_data with sensitivity matrix and basis function grid added. + dict (xarray.Dataset): + Same format as fp_and_data with sensitivity matrix and basis function grid added. """ sites = [key for key in list(fp_and_data.keys()) if key[0] != '.'] - flux_sources = list(fp_and_data['.flux'].keys()) if type(basis_case) is not dict: - if len(flux_sources) == 1: - basis_case = {flux_sources[0]:basis_case} - else: - basis_case = {'all':basis_case} + basis_case = {flux_sources[0]:basis_case} if len(flux_sources) == 1 else \ + {'all': basis_case} if len(list(basis_case.keys())) != len(flux_sources): if len(list(basis_case.keys())) == 1: - print("Using %s as the basis case for all sources" %basis_case[list(basis_case.keys())[0]]) + print(f"Using {basis_case[list(basis_case.keys())[0]]} as the basis case for all sources") else: - print("There should either only be one basis_case, or it should be a dictionary the same length\ - as the number of sources.") + print("There should either only be one basis_case, or it should be a dictionary the same length " +\ + "as the number of sources.") return None - for site in sites: - for si, source in enumerate(flux_sources): - - if source in list(basis_case.keys()): - basis_func = basis(domain = domain, basis_case = basis_case[source], basis_directory = basis_directory) - else: - basis_func = basis(domain = domain, basis_case = basis_case['all'], basis_directory = basis_directory) - + bc_source = source if source in list(basis_case.keys()) else 'all' + basis_func = basis(domain = domain, basis_case = basis_case[bc_source], basis_directory = basis_directory) + + mf_name = 'mf_mod' if source=='all' else f'mf_mod_{source}' if type(fp_and_data['.flux'][source]) == dict: if 'fp_HiTRes' in list(fp_and_data[site].keys()): - site_bf = xr.Dataset({"fp_HiTRes":fp_and_data[site]["fp_HiTRes"], - "fp":fp_and_data[site]["fp"]}) + site_bf = xr.Dataset({"fp_HiTRes": fp_and_data[site]["fp_HiTRes"], + "fp": fp_and_data[site]["fp"]}) - fp_time = (fp_and_data[site].time[1] - fp_and_data[site].time[0]).values.astype('timedelta64[h]').astype(int) + # work out the resolution of the fp, to use to estimate the timeseries + fp_time = fp_and_data[site]["time"].diff(dim="time").values.mean().astype('timedelta64[h]').astype('int') # calculate the H matrix - H_all = timeseries_HiTRes(fp_HiTRes_ds = site_bf, flux_dict = fp_and_data['.flux'][source], output_TS = False, - output_fpXflux = True, output_type = 'DataArray', - time_resolution = f'{fp_time}H', verbose = verbose) + output_TS = True if calc_timeseries else False + ts_HiTRes = timeseries_HiTRes(fp_HiTRes_ds = site_bf, + flux_dict = fp_and_data['.flux'][source], + output_TS = output_TS, + output_fpXflux = True, + output_type = 'DataArray', + time_resolution = f'{fp_time}H', + verbose = False) #verbose) + if calc_timeseries: + # estimate the timeseries + fp_and_data[site][mf_name], H_all = ts_HiTRes + # use forward fill to replace nans + fp_and_data[site][mf_name] = fp_and_data[site][mf_name].ffill(dim='time') + else: + H_all = ts_HiTRes + else: print("fp_and_data needs the variable fp_HiTRes to use the emissions dictionary with high_freq and low_freq emissions.") else: site_bf = combine_datasets(fp_and_data[site]["fp"].to_dataset(), fp_and_data['.flux'][source]) - H_all=site_bf.fp*site_bf.flux + H_all = site_bf.fp*site_bf.flux + + if calc_timeseries: + flux_reindex = fp_and_data['.flux'][source].reindex_like(fp_and_data[site], 'ffill') + fp_and_data[site][mf_name] = xr.DataArray((fp_and_data[site].fp*flux_reindex.flux).sum(["lat", "lon"]), coords = {'time':fp_and_data[site].time}) - H_all_v=H_all.values.reshape((len(site_bf.lat)*len(site_bf.lon),len(site_bf.time))) - + H_all_v = H_all.values.reshape((len(site_bf.lat)*len(site_bf.lon),len(site_bf.time))) if 'region' in list(basis_func.dims.keys()): - if 'time' in basis_func.basis.dims: basis_func = basis_func.isel(time=0) @@ -1269,19 +1318,15 @@ def fp_sensitivity(fp_and_data, domain, basis_case, H[i,:] = np.sum(H_all_v*base_v[:,i,np.newaxis], axis = 0) if source == all: - if (sys.version_info < (3,0)): - region_name = site_bf.region - else: - region_name = site_bf.region.decode('ascii') + region_name = site_bf.region if (sys.version_info < (3,0)) else \ + site_bf.region.decode('ascii') else: - if (sys.version_info < (3,0)): - region_name = [source+'-'+reg for reg in site_bf.region.values] - else: - region_name = [source+'-'+reg.decode('ascii') for reg in site_bf.region.values] + region_name = [source+'-'+reg for reg in site_bf.region.values] \ + if (sys.version_info < (3,0)) else \ + [source+'-'+reg.decode('ascii') for reg in site_bf.region.values] - sensitivity = xr.DataArray(H, - coords=[('region', region_name), - ('time', fp_and_data[site].coords['time'])]) + sensitivity = xr.DataArray(H, coords=[('region', region_name), + ('time', fp_and_data[site].coords['time'])]) else: print("Warning: Using basis functions without a region dimension may be deprecated shortly.") @@ -1352,7 +1397,7 @@ def fp_sensitivity(fp_and_data, domain, basis_case, return fp_and_data -def bc_sensitivity(fp_and_data, domain, basis_case, bc_basis_directory = None): +def bc_sensitivity(fp_and_data, domain, basis_case, bc_basis_directory=None, start=None, end=None): """ The bc_sensitivity adds H_bc to the sensitivity matrix, to each site xarray dataframe in fp_and_data. @@ -1372,8 +1417,9 @@ def bc_sensitivity(fp_and_data, domain, basis_case, bc_basis_directory = None): sites = [key for key in list(fp_and_data.keys()) if key[0] != '.'] basis_func = basis_boundary_conditions(domain = domain, - basis_case = basis_case, bc_basis_directory=bc_basis_directory) -# sort basis_func into time order + basis_case = basis_case, + bc_basis_directory = bc_basis_directory) + # sort basis_func into time order ind = basis_func.time.argsort() timenew = basis_func.time[ind] basis_func = basis_func.reindex({"time":timenew}) @@ -2244,58 +2290,47 @@ def __init__(self, domain, country_file=None): self.country = np.asarray(country) self.name = name -def timeseries_HiTRes(flux_dict, fp_HiTRes_ds=None, fp_file=None, output_TS = True, output_fpXflux = True, - output_type='Dataset', output_file=None, verbose=False, chunks=None, - time_resolution='1H'): - """ - The timeseries_HiTRes function computes flux * HiTRes footprints. - - HiTRes footprints record the footprint at each 2 hour period back in time for the first 24 hours. - Need a high time resolution flux to multiply the first 24 hours back of footprints. - Need a residual flux to multiply the residual integrated footprint for the remainder of the 30 - day period. - + +def timeseries_HiTRes_setup(flux_dict, fp_HiTRes_ds=None, fp_file=None, + chunks=None, time_resolution='1H', verbose=False): + ''' + Setup for timeseries_HiTRes + - Makes sure that the footprint and fluxes have the correct time resolution + - Selects the required data from the fluxes to match the fp datetimes and those needed for H_back + - Sets up the low frequency fluxes + - Reformats the flux data as a numpy or dask array - using these is faster than xarray + Args: - fp_HiTRes_ds (xarray.Dataset) - Dataset of high time resolution footprints. HiTRes footprints record the footprint at - each timestep back in time for a given amount of time - (e.g. hourly time steps back in time for the first 24 hours). - domain (str) - Domain name. The footprint files should be sub-categorised by the domain. flux_dict (dict) - This should be a dictionary of the form output in the format - flux_dict: {'high_freq': flux_dataset, 'low_freq': flux_dataset}. - This is because this function needs two time resolutions of fluxes as - explained in the header. - + A dictionary of the form output in the format: + flux_dict: {'high_freq': xarray.Dataset, 'low_freq': xarray.Dataset}. If there are multiple sectors, the format should be: flux_dict: {sector1 : {'high_freq' : flux_dataset, 'low_freq' : flux_dataset}, sector2 : {'high_freq' : flux_dataset, 'low_freq' : flux_dataset}} - output_TS (bool) - Output the timeseries. Default is True. - output_fpXflux (bool) - Output the sensitivity map. Default is True. - verbose (bool) - show progress bar throughout loop - chunks (dict) + fp_HiTRes_ds (xarray.Dataset) + Dataset of high time resolution footprints. HiTRes footprints record the footprint at + each timestep back in time for a given amount of time + (e.g. hourly time steps back in time for the first 24 hours). + fp_file (str, optional) + footprint filename + not needed if fp_HiTRes_ds is given + chunks (dict, optional) size of chunks for each dimension e.g. {'lat': 50, 'lon': 50} opens dataset with dask, such that it is opened 'lazily' and all of the data is not loaded into memory defaults to None - dataset is opened with out dask + time_resolution (str, optional) + required time resolution of timeseries + defaults to '1H' + verbose (bool, optional) + show progress bar throughout loop Returns: - xarray.Dataset or dict - Same format as flux_dict['high_freq']: - If flux_dict['high_freq'] is an xarray.Dataset then an xarray.Dataset is returned - If flux_dict['high_freq'] is a dict of xarray.Datasets then a dict of xarray.Datasets - is returned (an xarray.Dataset for each sector) - - If output_TS is True: - Outputs the timeseries - If output_fpXflux is True: - Outputs the sensitivity map - """ + flux, fp_HiTRes, time_array + the reformatted fluxes, footprints, and time array as either + numpy or dask arrays + ''' if verbose: print(f'\nCalculating timeseries with {time_resolution} resolution, this might take a few minutes') ### get the high time res footprint @@ -2347,11 +2382,14 @@ def timeseries_HiTRes(flux_dict, fp_HiTRes_ds=None, fp_file=None, output_TS = Tr for sector, flux_sector in flux.items(): if 'high_freq' in flux_sector.keys() and flux_sector['high_freq'] is not None: # reindex the high frequency data to match the fp - time_flux = np.arange(fp_HiTRes_ds.time[0].values - np.timedelta64(max_H_back, 'h'), - fp_HiTRes_ds.time[-1].values + np.timedelta64(time_resolution[0], - time_resolution[1].lower()), - time_resolution[0], dtype=f'datetime64[{time_resolution[1].lower()}]') - flux_sector['high_freq'] = flux_sector['high_freq'].reindex(time=time_flux, method='ffill') + try: + time_flux = np.arange(fp_HiTRes_ds.time[0].values - np.timedelta64(max_H_back, 'h'), + fp_HiTRes_ds.time[-1].values + np.timedelta64(time_resolution[0], + time_resolution[1].lower()), + time_resolution[0], dtype=f'datetime64[{time_resolution[1].lower()}]') + flux_sector['high_freq'] = flux_sector['high_freq'].reindex(time=time_flux, method='ffill') + except: + print(f'could not reindex {sector} time') else: print(f'\nWarning: no high frequency flux data for {sector}, estimating a timeseries using the low frequency data') flux_sector['high_freq'] = None @@ -2366,16 +2404,86 @@ def timeseries_HiTRes(flux_dict, fp_HiTRes_ds=None, fp_file=None, output_TS = Tr da.array(flux_freq) for freq, flux_freq in flux_sector.items()} for sector, flux_sector in flux.items()} + + return flux, fp_HiTRes, time_array + + +def timeseries_HiTRes(flux_dict, fp_HiTRes_ds=None, fp_file=None, output_TS=True, output_fpXflux=True, + output_type='Dataset', output_file=None, verbose=False, chunks=None, + time_resolution='1H'): + """ + The timeseries_HiTRes function computes flux * HiTRes footprints. + + HiTRes footprints record the footprint at each 2 hour period back in time for the first 24 hours. + A high time resolution flux is used to multiply the first 24 hours back of footprints, high + frequency flux can be estimated by resampling low frequency data if required + + A residual flux is used to multiply the residual integrated footprint for the remainder of the 30 + day period, the residual can be calculated from the high frequency flux. + + Args: + flux_dict (dict) + A dictionary of the form output in the format: + flux_dict: {'high_freq': xarray.Dataset, 'low_freq': xarray.Dataset}. + If there are multiple sectors, the format should be: + flux_dict: {sector1 : {'high_freq' : flux_dataset, 'low_freq' : flux_dataset}, + sector2 : {'high_freq' : flux_dataset, 'low_freq' : flux_dataset}} + fp_HiTRes_ds (xarray.Dataset) + Dataset of high time resolution footprints. HiTRes footprints record the footprint at + each timestep back in time for a given amount of time + (e.g. hourly time steps back in time for the first 24 hours). + fp_file (str, optional) + footprint filename + not needed if fp_HiTRes_ds is given + output_TS (bool, optional) + Output the timeseries. Default is True. + output_fpXflux (bool, optional) + Output the sensitivity map. Default is True. + output_type (str, optional) + object type to be returned + default is Dataset which returns an xarray.Dataset + output_file (str, optional) + filename to save data to + data is not saved by default + verbose (bool, optional) + show progress bar throughout loop + chunks (dict, optional) + size of chunks for each dimension + e.g. {'lat': 50, 'lon': 50} + opens dataset with dask, such that it is opened 'lazily' + and all of the data is not loaded into memory + defaults to None - dataset is opened with out dask + time_resolution (str, optional) + required time resolution of timeseries + defaults to '1H' + Returns: + xarray.Dataset, xarray.DataArray or dict + Same format as flux_dict['high_freq']: + If flux_dict['high_freq'] is an xarrayobject then an xarray object is returned + (either DataArray or Dataset, as specified by output_type) + If flux_dict['high_freq'] is a dict of xarray objects then a dict of xarray objects + is returned (an xarray.Dataset or xarray.DataArray for each sector) + + If output_TS is True: + Outputs a mol fraction timeseries, i.e. latlon sum of footprint * flux + If output_fpXflux is True: + Outputs a sensitivity map, i.e. footprint * flux + """ + # run the setup to make sure that the footprints and fluxes have the correct format + flux, fp_HiTRes, time_array = timeseries_HiTRes_setup(flux_dict=flux_dict, + fp_HiTRes_ds=fp_HiTRes_ds, + fp_file=fp_file, + verbose=verbose, + chunks=chunks, + time_resolution=time_resolution) + # Set up a numpy array to calculate the product of the footprint (H matrix) with the fluxes if output_fpXflux: - fpXflux = {sector: da.zeros((len(fp_HiTRes_ds.lat), - len(fp_HiTRes_ds.lon), - len(time_array))) - for sector in flux.keys()} + fpXflux = {sector: None for sector in flux.keys()} elif output_TS: - timeseries = {sector: da.zeros(len(time_array)) for sector in flux.keys()} + timeseries = {sector: None for sector in flux.keys()} # month and year of the start of the data - used to index the low res data start = {dd: getattr(np.datetime64(time_array[0], 'h').astype(object), dd) @@ -2424,12 +2532,16 @@ def timeseries_HiTRes(flux_dict, fp_HiTRes_ds=None, fp_file=None, output_TS = Tr for sector, fp_fl in fpXflux_time.items(): if output_fpXflux: # Sum over time (H back) to give the total mf at this timestep - fpXflux[sector][:,:,tt] = np.nansum(fp_fl, axis=2) + fpXflux[sector] = da.asarray(np.nansum(fp_fl, axis=2)) if fpXflux[sector] is None else \ + da.dstack([fpXflux[sector], np.nansum(fp_fl, axis=2)]) + timeseries = None elif output_TS: # work out timeseries by summing over lat, lon, & time (24 hrs) - timeseries[sector][tt] = np.nansum(fp_fl) - + timeseries[sector] = da.from_array([np.nansum(fp_fl)]) if timeseries[sector] is None else \ + da.concatenate([timeseries[sector], [np.nansum(fp_fl)]], axis=0) + fpXflux = None + if output_fpXflux and output_TS: # if not already done then calculate the timeseries timeseries = {sector: fp_fl.sum(axis=(0,1)) for sector, fp_fl in fpXflux.items()} diff --git a/acrg/obs/utils.py b/acrg/obs/utils.py index d9cf4f20..f14d21b2 100644 --- a/acrg/obs/utils.py +++ b/acrg/obs/utils.py @@ -49,6 +49,7 @@ "ppb": "1e-9", "ppt": "1e-12", "ppq": "1e-15", + "per meg": "4.77e-6", "else": "unknown"} # For species which need more than just a hyphen removing and/or changing to lower case diff --git a/acrg/utils.py b/acrg/utils.py index f537ee35..314d759c 100644 --- a/acrg/utils.py +++ b/acrg/utils.py @@ -49,7 +49,7 @@ def is_number(s): except ValueError: return False -def combine_diff_resolution(data_1, data_2, method='add', verbose=True): +def combine_diff_resolution(data_1, data_2, data_1_res=None, data_2_res=None, method='add', verbose=True): ''' Combine datasets which have different time resolutions Avoids having to resample data to the same resolution in order to add, @@ -73,8 +73,10 @@ def combine_diff_resolution(data_1, data_2, method='add', verbose=True): ''' if 'np' not in dir(): import numpy as np data = {dd: dat for dd, dat in enumerate([data_1, data_2])} + data_res = {dd: dat for dd, dat in enumerate([data_1_res, data_2_res])} # calculate the time step for each dataset - time_step = {dd: (dat.time[1] - dat.time[0]).values for dd, dat in data.items()} + time_step = {dd: np.timedelta64(int(data_res[dd][0]), data_res[dd][1]).astype('timedelta64[ns]') if data_res[dd] else + (dat.time[1] - dat.time[0]).values for dd, dat in data.items()} if time_step[0]==time_step[1]: # if the time steps are equal then apply the method as normal @@ -123,7 +125,7 @@ def combine_method(resolved, method): # divide by the low res data return resolved / integrated elif method.lower()=='divide_high': - return interated / resolved + return integrated / resolved else: print(f'Method not recognised: {method}') return(None) diff --git a/tests/files/LPDM/fp_NAME_minimal/EUROPE/WAO-20magl_UKV_co2_EUROPE_201801.nc b/tests/files/LPDM/fp_NAME_minimal/EUROPE/WAO-20magl_UKV_co2_EUROPE_201801.nc index c2d4d3ec..ecebebe0 100644 Binary files a/tests/files/LPDM/fp_NAME_minimal/EUROPE/WAO-20magl_UKV_co2_EUROPE_201801.nc and b/tests/files/LPDM/fp_NAME_minimal/EUROPE/WAO-20magl_UKV_co2_EUROPE_201801.nc differ diff --git a/tests/files/benchmark/HiTRes_sens_201801.nc b/tests/files/benchmark/HiTRes_sens_201801.nc index 75c65429..611a6ffa 100644 Binary files a/tests/files/benchmark/HiTRes_sens_201801.nc and b/tests/files/benchmark/HiTRes_sens_201801.nc differ diff --git a/tests/files/benchmark/HiTRes_timeseries_201801.nc b/tests/files/benchmark/HiTRes_timeseries_201801.nc index 1234084f..d945e4d0 100644 Binary files a/tests/files/benchmark/HiTRes_timeseries_201801.nc and b/tests/files/benchmark/HiTRes_timeseries_201801.nc differ diff --git a/tests/test_name.py b/tests/test_name.py index b28a41bc..1bf4567d 100644 --- a/tests/test_name.py +++ b/tests/test_name.py @@ -26,12 +26,12 @@ import pytest import os -import sys import glob import numpy as np import xarray as xray import pandas as pd import pickle +from datetime import datetime, timedelta import acrg.name.name as name import acrg.obs.read as read @@ -224,6 +224,22 @@ def footprint_param(fp_directory,measurement_param): return input_param +@pytest.fixture() +def footprint_param_HiTRes(fp_directory,measurement_param): + ''' Define set of input parameters for footprints() function. Based on measurement_param ''' + + input_param = {} + input_param["sitecode_or_filename"] = 'WAO' + input_param["domain"] = measurement_param["domain"] + input_param["fp_directory"] = fp_directory + input_param['HiTRes'] = True + input_param['met_model'] = 'UKV' + input_param['start'] = '2018-01-01' + input_param['end'] = '2018-02-01' + input_param['height'] = '20magl' + + return input_param + def test_footprints_from_file(fp_directory,measurement_param): ''' Test dataset can be created from footprint filename with footprints() function @@ -238,7 +254,7 @@ def test_footprints_from_file(fp_directory,measurement_param): assert out @pytest.mark.long -def test_footprints_from_site(footprint_param,flux_directory,bc_directory): +def test_footprints_from_site(footprint_param): ''' Test dataset can be created from set of parameters with footprints() function. ''' @@ -247,6 +263,16 @@ def test_footprints_from_site(footprint_param,flux_directory,bc_directory): assert out +@pytest.mark.long +def test_footprints_from_site(footprint_param_HiTRes): + ''' + Test dataset can be created from set of parameters with footprints() function. + ''' + # test importing the HiTRes footprint + out_HiTRes = name.footprints(**footprint_param_HiTRes) + + assert out_HiTRes + @pytest.fixture() def flux_param(flux_directory,measurement_param): @@ -266,6 +292,45 @@ def test_flux(flux_param): out = name.flux(**flux_param) assert out +@pytest.fixture() +def flux_param_HiTRes(flux_directory): + ''' Define set of input parameters for flux_HiTRes() function.''' + + input_param = {} + input_param["domain"] = 'SMALL-DOMAIN' + input_param["emissions_dict"] = {'high_freq': 'co2-ukghg-total-1hr'} + input_param["flux_directory"] = flux_directory + input_param["start"] = '2018-01-02' + input_param["end"] = '2018-01-10' + input_param["test"] = True + + return input_param + +@pytest.fixture() +def flux_HiTRes(flux_param_HiTRes): + ''' + Create flux dataset using flux_for_HiTRes for associated tests + ''' + start = datetime.strptime(flux_param_HiTRes['start'], '%Y-%m-%d') + start = datetime(start.year, start.month, start.day) + timedelta(hours=-24) + + out = name.flux_for_HiTRes(**flux_param_HiTRes)['high_freq'] + + return out + +def test_flux_HiTRes(flux_HiTRes): + ''' + Test dataset can be created by flux_for_HiTRes() function + ''' + assert flux_HiTRes + +def test_flux_HiTRes_date(flux_param_HiTRes, flux_HiTRes): + start = datetime.strptime(flux_param_HiTRes['start'], '%Y-%m-%d') + start = datetime(start.year, start.month, start.day) + timedelta(hours=-24) + + out_start = datetime.strptime(flux_HiTRes.time.values[0].astype(str).split('T')[0], '%Y-%m-%d') + + assert start==out_start @pytest.fixture() def bc_param(bc_directory,measurement_param): @@ -828,18 +893,9 @@ def test_filtering_local(dummy_timeseries_dict_gen): out = name.filtering(dummy_timeseries_dict_gen,filters) assert np.isclose(out["TEST"].mf.values, np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])).all() -@pytest.mark.long -def test_hitres_timeseries(hitres_timeseries_benchmark_file, hitres_sens_benchmark_file): - ''' - Checks the output of name.timeseries_HiTRes matches a benchmarked version saved to - HiTRes_timeseries_201801.nc - ''' - - with xray.open_dataset(hitres_timeseries_benchmark_file) as benchmark: - benchmark.load() - with xray.open_dataset(hitres_sens_benchmark_file) as benchmark_sens: - benchmark_sens.load() - + +@pytest.fixture(scope='module') +def hitres_files(): fp_file_name = os.path.join(acrg_path, 'tests', 'files', 'LPDM', 'fp_NAME_minimal', 'EUROPE', 'WAO-20magl_UKV_co2_EUROPE_201801.nc') with xray.open_dataset(fp_file_name) as footprint: footprint.load() @@ -848,7 +904,13 @@ def test_hitres_timeseries(hitres_timeseries_benchmark_file, hitres_sens_benchma with xray.open_dataset(emiss_file_name) as emiss: emiss.load() flux_dict = {'high_freq': emiss} - + + return (footprint, flux_dict) + +@pytest.fixture(scope="module") +def setup_hitres_timeseries(hitres_files): + footprint, flux_dict = hitres_files + timeseries, sens = name.timeseries_HiTRes(flux_dict = flux_dict, fp_HiTRes_ds = footprint, output_TS = True, @@ -858,9 +920,59 @@ def test_hitres_timeseries(hitres_timeseries_benchmark_file, hitres_sens_benchma verbose = False, time_resolution = '1H') - assert np.allclose(timeseries['total'], benchmark['total']) - assert np.allclose(sens['total'], benchmark_sens['total']) + out = (timeseries, sens) + return out + +@pytest.mark.long +def test_hitres_timeseries(hitres_timeseries_benchmark_file, setup_hitres_timeseries): + ''' + Checks the timeseries output of name.timeseries_HiTRes matches a benchmarked version saved to + HiTRes_timeseries_201801.nc + ''' + with xray.open_dataset(hitres_timeseries_benchmark_file) as benchmark: + benchmark.load() + + assert np.allclose(setup_hitres_timeseries[0]['total'], benchmark['total']) + +@pytest.mark.long +def test_hitres_timeseries_sens(hitres_sens_benchmark_file, setup_hitres_timeseries): + ''' + Checks the sensitivity output of name.timeseries_HiTRes matches a benchmarked version saved to + HiTRes_timeseries_201801.nc + ''' + with xray.open_dataset(hitres_sens_benchmark_file) as benchmark: + benchmark.load() + assert np.allclose(setup_hitres_timeseries[1]['total'], benchmark['total']) + +@pytest.mark.long +def test_hitres_timeseries_int(hitres_files): + ''' + Test that using name.timeseries_HiTRes with constant fluxes and HiTRes footprint + gives the same result as calculating a timeseries with an integrated footprint + ''' + footprint, flux_dict = hitres_files + + # create a constant flux dataset using the mean from the high frequency one + flux_mean = flux_dict['high_freq'].flux.mean(dim='time') + flux_low_freq = flux_mean.values.tolist() + flux_low_freq = np.array([flux_low_freq] * len(flux_dict['high_freq'].time.values)).transpose(1, 2, 0) + flux_low_freq = xray.Dataset(data_vars = {'flux': (['lat', 'lon', 'time'], flux_low_freq)}, + coords = {'lat': flux_dict['high_freq'].lat.values, + 'lon': flux_dict['high_freq'].lon.values, + 'time': flux_dict['high_freq'].time.values}) + + # calculate the timeseries using the HiTRes footprint + ts_HiTRes = name.timeseries_HiTRes(flux_dict = {'high_freq': flux_low_freq}, + fp_HiTRes_ds = footprint, + output_fpXflux = False) * 1e6 + # calculate the timeseries using the integrated footprint + for ll in ['lat', 'lon']: + footprint[ll] = flux_low_freq[ll] + ts_integrated = (flux_low_freq.flux * footprint.fp).sum(['lat', 'lon']) * 1e6 + + assert np.allclose(ts_HiTRes.total.round(1), ts_integrated.round(1)) + # TODO: # Not working yet #def test_filtering_pblh(fp_data_H_pblh_merge):