diff --git a/tethys/datareader/gridded.py b/tethys/datareader/gridded.py index 23d4059..2bf0012 100644 --- a/tethys/datareader/gridded.py +++ b/tethys/datareader/gridded.py @@ -2,7 +2,7 @@ import xarray as xr -def load_file(filename, target_resolution, years, variables=None, flags=None, regrid_method='extensive'): +def load_file(filename, target_resolution, years, variables=None, flags=(), bounds=None, regrid_method='extensive'): """Prepare a dataset from single file to be merged into a dataset of all proxies handles many oddities found in proxies @@ -12,13 +12,12 @@ def load_file(filename, target_resolution, years, variables=None, flags=None, re :param years: years to extract from the file :param variables: variables to extract from the file :param flags: list potentially containing 'cell_area_share' or 'short_name_as_name' + :param bounds: list [lat_min, lat_max, lon_min, lon_max] to crop to :param regrid_method: passed along to regrid :return: preprocessed data set """ - flags = [] if flags is None else flags - - ds = xr.open_dataset(filename, chunks='auto') + ds = xr.open_mfdataset(filename, chunks='auto') # handle tif (can only handle single band single variable single year currently) if 'band' in ds.coords: @@ -33,7 +32,10 @@ def load_file(filename, target_resolution, years, variables=None, flags=None, re # filter to desired variables if variables is not None: - ds = ds[variables] + if len(ds) == 1 and len(variables) > 1: # use single layer for all variables + ds[variables] = [ds[variables[0]] for i in variables] + else: + ds = ds[variables] # create a year dimension if missing, with the years reported for this file in the catalog if 'year' not in ds.coords: @@ -41,11 +43,7 @@ def load_file(filename, target_resolution, years, variables=None, flags=None, re else: ds = ds.sel(year=years, method='nearest') # nearest used for temporal files ds['year'] = years - - # do the year filtering - if years is not None and 'year' in ds.coords: - ds = ds.sel(year=years, method='nearest').chunk(chunks=dict(year=1)) - ds['year'] = years + ds = ds.chunk(year=1) # numeric stuff ds = ds.fillna(0).astype(np.float32) @@ -59,10 +57,12 @@ def load_file(filename, target_resolution, years, variables=None, flags=None, re # spatial aligning ds = pad_global(ds) ds = regrid(ds, target_resolution, method=regrid_method) + if bounds is not None: + ds = crop(ds, bounds) ds = ds.chunk(chunks=dict(lat=-1, lon=-1)) if 'month' in ds.coords: - ds = ds.chunk(chunks=dict(month=12)) + ds = ds.chunk(month=12) return ds @@ -133,6 +133,11 @@ def pad_global(ds): return ds +def crop(ds, bounds): + lat_min, lat_max, lon_min, lon_max = bounds + return ds.sel(lon=slice(lon_min, lon_max), lat=slice(lat_max, lat_min)) + + def regrid(ds, target_resolution, method='extensive'): """Simple regridding algorithm diff --git a/tethys/datareader/maps.py b/tethys/datareader/maps.py index 04ad026..9b24cc7 100644 --- a/tethys/datareader/maps.py +++ b/tethys/datareader/maps.py @@ -2,16 +2,17 @@ import pandas as pd import xarray as xr -from tethys.datareader.gridded import regrid +from tethys.datareader.gridded import regrid, pad_global, crop -def load_region_map(mapfile, masks=False, namefile=None, target_resolution=None, nodata=None, flip_lat=False): +def load_region_map(mapfile, masks=False, namefile=None, target_resolution=None, bounds=None, nodata=None, flip_lat=False): """ Load region map. :param mapfile: path to map file :param masks: bool whether to convert categorical map to layer of region masks :param namefile: optional path to csv with region names :param target_resolution: resolution to coerce map to. If None (default), use base resolution + :param bounds: list [lat_min, lat_max, lon_min, lon_max] to crop to :param nodata: nodata value (like 9999), will be replaced with 0 :param flip_lat: bool, whether the map is "upside down" """ @@ -41,9 +42,13 @@ def load_region_map(mapfile, masks=False, namefile=None, target_resolution=None, da = da.squeeze('band').drop_vars('band') if target_resolution is not None: + da = pad_global(da) da = regrid(da, target_resolution, method='label') da = da.chunk(chunks=dict(lat=-1, lon=-1)) + if bounds is not None: + da = crop(da, bounds) + # use region names provided in CSV namefile, otherwise check metadata for a names dict if namefile is not None: df = pd.read_csv(namefile) diff --git a/tethys/datareader/regional.py b/tethys/datareader/regional.py index 083eab7..ea6b9a6 100644 --- a/tethys/datareader/regional.py +++ b/tethys/datareader/regional.py @@ -18,7 +18,7 @@ def load_region_data(gcam_db, sectors, demand_type='withdrawals'): # remove subsector and technology if present, convert friendly water demand sector names to internal search_sectors = list(set(unfriendly_sector_name(i.split('/')[0]) for i in sectors)) - inputs = {'withdrawals': ['water_td_*_W', '*_water withdrawals', '*_desalinated water'], + inputs = {'withdrawals': ['water_td_*_W', '*_water withdrawals'], 'consumption': ['water_td_*_C', '*_water consumption']}.get(demand_type) df = conn.runQuery(easy_query('demand-physical', sector=search_sectors, technology='!water_td_*', input=inputs)) diff --git a/tethys/model.py b/tethys/model.py index fd7db58..54f6365 100644 --- a/tethys/model.py +++ b/tethys/model.py @@ -24,44 +24,44 @@ class Tethys: """Model wrapper for Tethys""" def __init__( - self, - config_file=None, - years=None, - resolution=0.125, + self, + config_file=None, + years=None, + resolution=0.125, + bounds=None, demand_type='withdrawals', - perform_temporal=False, - gcam_db=None, - csv=None, - output_file=None, - downscaling_rules=None, - proxy_files=None, - map_files=None, - temporal_files=None, - temporal_methods=None + gcam_db=None, + csv=None, + output_dir=None, + supersector_iterations=0, + downscaling_rules=None, + proxy_files=None, + map_files=None, + temporal_config=None ): """Parameters can be specified in a YAML file or passed directly, with the config file taking precedence :param config_file: path to YAML configuration file containing these parameters :param years: list of years to be included spatial downscaling :param resolution: resolution in degrees for spatial downscaling + :param bounds: list [lat_min, lat_max, lon_min, lon_max] to crop to :param demand_type: choice between “withdrawals” (default) or “consumption” - :param perform_temporal: choice between False (default) or True :param gcam_db: relative path to a GCAM database :param csv: relative path to csv file containing inputs - :param output_file: name of file to write outputs to + :param output_dir: directory to write outputs to + :param supersector_iterations: number of times to repeat applying individual and total sector constraints, default 0 :param downscaling_rules: mapping from water demand sectors to proxy variables :param proxy_files: mapping of spatial proxy files to their years/variables :param map_files: list of files containing region maps - :param temporal_files: mapping of sector to temporal downscaling method - :param temporal_methods: files that will be accessible during temporal downscaling + :param temporal_config: mapping of sector to temporal downscaling method and arguments """ self.root = None # project level settings self.years = years self.resolution = resolution + self.bounds = bounds self.demand_type = demand_type - self.perform_temporal = perform_temporal # GCAM database info self.gcam_db = gcam_db @@ -70,15 +70,15 @@ def __init__( self.csv = csv # outputs - self.output_file = output_file + self.output_dir = output_dir self.downscaling_rules = downscaling_rules + self.supersector_iterations = supersector_iterations self.proxy_files = proxy_files self.map_files = map_files - self.temporal_files = temporal_files - self.temporal_methods = temporal_methods + self.temporal_config = temporal_config # data we'll load or generate later self.region_masks = None @@ -99,21 +99,8 @@ def __init__( else: self.root = os.getcwd() - if self.temporal_methods is None: - self.temporal_methods = { - 'domestic': 'domestic', - 'municipal': 'domestic', - 'electricity': 'electricity', - 'irrigation': 'irrigation' - } - else: - self.temporal_methods = {k.lower(): v for k, v in self.temporal_methods.items()} - - if self.temporal_files is not None: - self.temporal_files = {k: os.path.join(self.root, v) for k, v in self.temporal_files.items()} - - if self.output_file is not None: - self.output_file = os.path.join(self.root, self.output_file) + if self.output_dir is not None: + self.output_dir = os.path.join(self.root, self.output_dir) self._parse_proxy_files() @@ -121,7 +108,7 @@ def _parse_proxy_files(self): """Handle several shorthand expressions in the proxy catalog""" out = dict() - # name may be something like "ssp1_[YEAR].tif", which actually refers to multiple files + # name may be something like "ssp1_{YEAR}.tif", which actually refers to multiple files # such as "ssp1_2010.tif" and "ssp1_2020.tif" when info['years'] == [2010, 2020] for name, info in self.proxy_files.items(): # promote strs to list @@ -162,17 +149,17 @@ def _load_proxies(self): """Load all proxies from the catalog, regrid to target spatial resolution, and interpolate to target years""" print('Loading Proxy Data') # align each variable spatially - dataarrays = [da for filename, info in self.proxy_files.items() for da in - load_file(os.path.join(self.root, filename), self.resolution, **info).values()] + dataarrays = [da for filename, info in self.proxy_files.items() for da in load_file( + os.path.join(self.root, filename), self.resolution, bounds=self.bounds, **info).values()] print('Interpolating Proxies') # interpolate each variable, then merge to one array - self.proxies = xr.merge(interp_helper(xr.concat([da for da in dataarrays if da.name == variable], 'year'), + self.proxies = xr.merge(interp_helper(xr.concat([da for da in dataarrays if da.name == variable], 'year', coords='minimal'), self.years) for variable in set(da.name for da in dataarrays)).to_array() def _load_region_masks(self): self.region_masks = xr.concat([load_region_map(os.path.join(self.root, filename), masks=True, - target_resolution=self.resolution) + target_resolution=self.resolution, bounds=self.bounds) for filename in self.map_files], dim='region') def _load_inputs(self): @@ -186,6 +173,8 @@ def _load_inputs(self): # filter inputs to valid regions and years self.inputs = self.inputs[(self.inputs.region.isin(self.region_masks.region.data)) & (self.inputs.year.isin(self.years))] + # replace "/" with "_" because it causes problems with netcdf variable names + self.inputs['sector'] = self.inputs.sector.str.replace('/', '_') def downscale(self, distribution, inputs, region_masks): """Actual spatial downscaling happens here @@ -224,7 +213,8 @@ def run_model(self): rules = {supersector: rules} proxies = xr.Dataset( - {sector: self.proxies.sel(variable=proxy if isinstance(proxy, list) else [proxy]).sum('variable') + {sector.replace('/', '_'): + self.proxies.sel(variable=proxy if isinstance(proxy, list) else [proxy]).sum('variable') for sector, proxy in rules.items()} ).to_array(dim='sector') @@ -246,29 +236,46 @@ def run_model(self): region_masks_total = self.region_masks.sel(region=inputs_total.region) - downscaled = self.downscale(downscaled, inputs_total, region_masks_total) + # alternate between applying total and individual sector constraints so that both are met + for i in range(self.supersector_iterations): + downscaled = self.downscale(downscaled, inputs_total, region_masks_total) + downscaled = self.downscale(downscaled, inputs, region_masks) + + # in a lot of cases this could be optimized by solving the intersections at region scale first, + # then downscaling once, but harder to implement, especially if differing regions are not subsets - if self.perform_temporal: + # write spatial downscaling outputs + if self.output_dir is not None: + filename = os.path.join(self.output_dir, f'{supersector}_{self.demand_type}.nc') + encoding = {sector: {'zlib': True, 'complevel': 5} for sector in downscaled.sector.data} + downscaled.to_dataset(dim='sector').to_netcdf(filename, encoding=encoding) + downscaled = xr.open_dataset(filename).to_array(dim='sector') # hopefully this keeps dask happy + + if self.temporal_config is not None: # calculate the monthly distributions (share of annual) for each year # this is how we'll do this for now - if supersector.lower() in self.temporal_methods: - module = f'tethys.tdmethods.{self.temporal_methods[supersector.lower()]}' - distribution = getattr(importlib.import_module(module), 'temporal_distribution')(self) + if supersector in self.temporal_config: + module = f'tethys.tdmethods.' + self.temporal_config[supersector]['method'] + temporal_distribution = getattr(importlib.import_module(module), 'temporal_distribution') + years = range(self.years[0], self.years[-1] + 1) + kwargs = self.temporal_config[supersector]['kwargs'] + distribution = temporal_distribution(years=years, resolution=self.resolution, + bounds=self.bounds, **kwargs) else: + # fall back to uniform distribution distribution = xr.DataArray(np.full(12, 1/12, np.float32), coords=dict(month=range(1, 13))) downscaled = interp_helper(downscaled) * distribution - self.outputs.update(downscaled.to_dataset(dim='sector')) + # write temporal downscaling outputs + if self.output_dir is not None: + filename = os.path.join(self.output_dir, f'{supersector}_{self.demand_type}_monthly.nc') + encoding = {sector: {'zlib': True, 'complevel': 5} for sector in downscaled.sector.data} + downscaled.to_dataset(dim='sector').to_netcdf(filename, encoding=encoding) + downscaled = xr.open_dataset(filename).to_array(dim='sector') # hopefully this keeps dask happy - if self.output_file is not None: - print('Writing Outputs') - # cannot have '/' in netcdf variable name - self.outputs = self.outputs.rename({name: name.replace('/', '_') for name in list(self.outputs)}) - # compression - encoding = {variable: {'zlib': True, 'complevel': 5} for variable in self.outputs} - self.outputs.to_netcdf(self.output_file, encoding=encoding) + self.outputs.update(downscaled.to_dataset(dim='sector')) def reaggregate(self, region_masks=None): """Reaggregate from grid cells to regions diff --git a/tethys/tdmethods/domestic.py b/tethys/tdmethods/domestic.py index ac1afbf..443683b 100644 --- a/tethys/tdmethods/domestic.py +++ b/tethys/tdmethods/domestic.py @@ -2,12 +2,11 @@ from tethys.datareader.gridded import load_file -def temporal_distribution(model): +def temporal_distribution(years, resolution, tasfile, rfile, tasvar='tas', rvar='amplitude', bounds=None): """Temporal downscaling for domestic water demand using algorithm from Wada et al. (2011)""" - years = range(model.years[0], model.years[-1] + 1) - tas = load_file(model.temporal_files['tas'], model.resolution, years, regrid_method='intensive')['tas'] - amplitude = load_file(model.temporal_files['domr'], model.resolution, years, regrid_method='label')['amplitude'] + tas = load_file(tasfile, resolution, years, bounds=bounds, regrid_method='intensive', variables=[tasvar])[tasvar] + amplitude = load_file(rfile, resolution, years, bounds=bounds, regrid_method='label', variables=[rvar])[rvar] ranges = tas.max(dim='month') - tas.min(dim='month') ranges = xr.where(ranges != 0, ranges, 1) # avoid 0/0 diff --git a/tethys/tdmethods/electricity.py b/tethys/tdmethods/electricity.py index 67830b6..e8aedcb 100644 --- a/tethys/tdmethods/electricity.py +++ b/tethys/tdmethods/electricity.py @@ -1,25 +1,25 @@ -import os import numpy as np import pandas as pd import xarray as xr from tethys.datareader.regional import elec_sector_weights from tethys.datareader.gridded import load_file, interp_helper +from tethys.datareader.maps import load_region_map -def temporal_distribution(model): +def temporal_distribution(years, resolution, hddfile, cddfile, regionfile, gcam_db, hddvar='hdd', cddvar='cdd', + bounds=None): """Temporal downscaling of water demand for electricity generation using algorithm from Voisin et al. (2013)""" - years = range(model.years[0], model.years[-1] + 1) - hdd = load_file(model.temporal_files['hdd'], model.resolution, years, regrid_method='intensive')['hdd'] - cdd = load_file(model.temporal_files['cdd'], model.resolution, years, regrid_method='intensive')['cdd'] - - weights = elec_sector_weights(os.path.join(model.root, model.gcam_db)) - weights = weights[(weights.region.isin(model.inputs.region[model.inputs.sector == 'Electricity'])) & - (weights.region.isin(model.region_masks.region.data)) & - (weights.year.isin(model.years))].set_index(['region', 'sector', 'year'] - )['value'].to_xarray().fillna(0).astype(np.float32) + + # get weights of heating/cooling/other by location and time + regions = load_region_map(regionfile, masks=True, target_resolution=resolution, bounds=bounds) + weights = elec_sector_weights(gcam_db) + weights = weights[(weights.region.isin(regions.region.data)) & (weights.year.isin(years))] + weights = weights.set_index(['region', 'sector', 'year'])['value'].to_xarray().fillna(0).astype(np.float32) + weights = weights.dot(regions.sel(region=weights.region), dims='region') weights = interp_helper(weights) - region_masks = model.region_masks.sel(region=weights.region) + hdd = load_file(hddfile, resolution, years, bounds=bounds, regrid_method='intensive', variables=[hddvar])[hddvar] + cdd = load_file(cddfile, resolution, years, bounds=bounds, regrid_method='intensive', variables=[cddvar])[cddvar] # this formula is annoying to implement because of the hdd/cdd thresholds and reallocation of weights hdd_sums = hdd.sum(dim='month') @@ -33,19 +33,13 @@ def temporal_distribution(model): hdd = xr.where((hdd_sums < 650) & (cdd_sums < 450), 1 / 12, hdd) cdd = xr.where((hdd_sums < 650) & (cdd_sums < 450), 1 / 12, cdd) - # redo sums based on reallocation - hdd_sums = hdd.sum(dim='month') - cdd_sums = cdd.sum(dim='month') # prevent 0/0 - hdd_sums = xr.where(hdd_sums != 0, hdd_sums, 1) - cdd_sums = xr.where(cdd_sums != 0, cdd_sums, 1) - hdd /= hdd_sums - cdd /= cdd_sums + hdd /= hdd.sum(dim='month').where(lambda x: x != 0, 1) + cdd /= cdd.sum(dim='month').where(lambda x: x != 0, 1) distribution = xr.concat([hdd, cdd, xr.full_like(hdd, 1/12)], dim=pd.Series(['Heating', 'Cooling', 'Other'], name='sector')) - distribution = distribution.where(region_masks, 0) - distribution = distribution.dot(weights, dims=('sector', 'region')) + distribution = distribution.dot(weights, dims='sector') return distribution diff --git a/tethys/tdmethods/irrigation.py b/tethys/tdmethods/irrigation.py index e9275b4..4915bad 100644 --- a/tethys/tdmethods/irrigation.py +++ b/tethys/tdmethods/irrigation.py @@ -1,16 +1,13 @@ from tethys.datareader.gridded import load_file +from tethys.datareader.maps import load_region_map -def temporal_distribution(model): +def temporal_distribution(years, resolution, regionfile, irrfile, irrvar='pirrww', bounds=None): """Temporal downscaling of irrigation water demand""" - years = range(model.years[0], model.years[-1] + 1) - irr = load_file(model.temporal_files['irr'], model.resolution, years, regrid_method='label')['pirrww'] + irr = load_file(irrfile, resolution, years, bounds=bounds, regrid_method='label', variables=[irrvar])[irrvar] - irr_regions = model.inputs.region[(model.inputs.sector == 'Irrigation') & - (model.inputs.region.isin(model.region_masks.region.data))].unique() - - region_masks = model.region_masks.sel(region=irr_regions) + region_masks = load_region_map(regionfile, masks=True, target_resolution=resolution, bounds=bounds) irr_grouped = irr.where(region_masks, 0) month_sums = irr_grouped.sum(dim=('lat', 'lon')) diff --git a/tethys/tdmethods/weights.py b/tethys/tdmethods/weights.py new file mode 100644 index 0000000..bc65799 --- /dev/null +++ b/tethys/tdmethods/weights.py @@ -0,0 +1,13 @@ +from tethys.datareader.gridded import load_file + + +def temporal_distribution(years, resolution, weightfile, weightvar='weight', regrid_method='intensive', + prenormalized=False, bounds=None): + """Load a monthly distribution from filename""" + + distribution = load_file(weightfile, resolution, years, bounds=bounds, regrid_method=regrid_method, + variables=[weightvar])[weightvar] + if prenormalized is False: + distribution /= distribution.sum(dim='month').where(lambda x: x != 0, 1) + + return distribution