Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

temporal scaling methods #72

Merged
merged 8 commits into from
Aug 19, 2024
27 changes: 16 additions & 11 deletions tethys/datareader/gridded.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import xarray as xr


def load_file(filename, target_resolution, years, variables=None, flags=None, regrid_method='extensive'):
def load_file(filename, target_resolution, years, variables=None, flags=(), bounds=None, regrid_method='extensive'):
"""Prepare a dataset from single file to be merged into a dataset of all proxies

handles many oddities found in proxies
Expand All @@ -12,13 +12,12 @@ def load_file(filename, target_resolution, years, variables=None, flags=None, re
:param years: years to extract from the file
:param variables: variables to extract from the file
:param flags: list potentially containing 'cell_area_share' or 'short_name_as_name'
:param bounds: list [lat_min, lat_max, lon_min, lon_max] to crop to
:param regrid_method: passed along to regrid
:return: preprocessed data set
"""

flags = [] if flags is None else flags

ds = xr.open_dataset(filename, chunks='auto')
ds = xr.open_mfdataset(filename, chunks='auto')

# handle tif (can only handle single band single variable single year currently)
if 'band' in ds.coords:
Expand All @@ -33,19 +32,18 @@ def load_file(filename, target_resolution, years, variables=None, flags=None, re

# filter to desired variables
if variables is not None:
ds = ds[variables]
if len(ds) == 1 and len(variables) > 1: # use single layer for all variables
ds[variables] = [ds[variables[0]] for i in variables]
else:
ds = ds[variables]

# create a year dimension if missing, with the years reported for this file in the catalog
if 'year' not in ds.coords:
ds = ds.expand_dims(year=len(years))
else:
ds = ds.sel(year=years, method='nearest') # nearest used for temporal files
ds['year'] = years

# do the year filtering
if years is not None and 'year' in ds.coords:
ds = ds.sel(year=years, method='nearest').chunk(chunks=dict(year=1))
ds['year'] = years
ds = ds.chunk(year=1)

# numeric stuff
ds = ds.fillna(0).astype(np.float32)
Expand All @@ -59,10 +57,12 @@ def load_file(filename, target_resolution, years, variables=None, flags=None, re
# spatial aligning
ds = pad_global(ds)
ds = regrid(ds, target_resolution, method=regrid_method)
if bounds is not None:
ds = crop(ds, bounds)

ds = ds.chunk(chunks=dict(lat=-1, lon=-1))
if 'month' in ds.coords:
ds = ds.chunk(chunks=dict(month=12))
ds = ds.chunk(month=12)

return ds

Expand Down Expand Up @@ -133,6 +133,11 @@ def pad_global(ds):
return ds


def crop(ds, bounds):
lat_min, lat_max, lon_min, lon_max = bounds
return ds.sel(lon=slice(lon_min, lon_max), lat=slice(lat_max, lat_min))


def regrid(ds, target_resolution, method='extensive'):
"""Simple regridding algorithm

Expand Down
9 changes: 7 additions & 2 deletions tethys/datareader/maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
import pandas as pd
import xarray as xr

from tethys.datareader.gridded import regrid
from tethys.datareader.gridded import regrid, pad_global, crop


def load_region_map(mapfile, masks=False, namefile=None, target_resolution=None, nodata=None, flip_lat=False):
def load_region_map(mapfile, masks=False, namefile=None, target_resolution=None, bounds=None, nodata=None, flip_lat=False):
""" Load region map.

:param mapfile: path to map file
:param masks: bool whether to convert categorical map to layer of region masks
:param namefile: optional path to csv with region names
:param target_resolution: resolution to coerce map to. If None (default), use base resolution
:param bounds: list [lat_min, lat_max, lon_min, lon_max] to crop to
:param nodata: nodata value (like 9999), will be replaced with 0
:param flip_lat: bool, whether the map is "upside down"
"""
Expand Down Expand Up @@ -41,9 +42,13 @@ def load_region_map(mapfile, masks=False, namefile=None, target_resolution=None,
da = da.squeeze('band').drop_vars('band')

if target_resolution is not None:
da = pad_global(da)
da = regrid(da, target_resolution, method='label')
da = da.chunk(chunks=dict(lat=-1, lon=-1))

if bounds is not None:
da = crop(da, bounds)

# use region names provided in CSV namefile, otherwise check metadata for a names dict
if namefile is not None:
df = pd.read_csv(namefile)
Expand Down
2 changes: 1 addition & 1 deletion tethys/datareader/regional.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def load_region_data(gcam_db, sectors, demand_type='withdrawals'):
# remove subsector and technology if present, convert friendly water demand sector names to internal
search_sectors = list(set(unfriendly_sector_name(i.split('/')[0]) for i in sectors))

inputs = {'withdrawals': ['water_td_*_W', '*_water withdrawals', '*_desalinated water'],
inputs = {'withdrawals': ['water_td_*_W', '*_water withdrawals'],
'consumption': ['water_td_*_C', '*_water consumption']}.get(demand_type)

df = conn.runQuery(easy_query('demand-physical', sector=search_sectors, technology='!water_td_*', input=inputs))
Expand Down
117 changes: 62 additions & 55 deletions tethys/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,44 @@ class Tethys:
"""Model wrapper for Tethys"""

def __init__(
self,
config_file=None,
years=None,
resolution=0.125,
self,
config_file=None,
years=None,
resolution=0.125,
bounds=None,
demand_type='withdrawals',
perform_temporal=False,
gcam_db=None,
csv=None,
output_file=None,
downscaling_rules=None,
proxy_files=None,
map_files=None,
temporal_files=None,
temporal_methods=None
gcam_db=None,
csv=None,
output_dir=None,
supersector_iterations=0,
downscaling_rules=None,
proxy_files=None,
map_files=None,
temporal_config=None
):
"""Parameters can be specified in a YAML file or passed directly, with the config file taking precedence

:param config_file: path to YAML configuration file containing these parameters
:param years: list of years to be included spatial downscaling
:param resolution: resolution in degrees for spatial downscaling
:param bounds: list [lat_min, lat_max, lon_min, lon_max] to crop to
:param demand_type: choice between “withdrawals” (default) or “consumption”
:param perform_temporal: choice between False (default) or True
:param gcam_db: relative path to a GCAM database
:param csv: relative path to csv file containing inputs
:param output_file: name of file to write outputs to
:param output_dir: directory to write outputs to
:param supersector_iterations: number of times to repeat applying individual and total sector constraints, default 0
:param downscaling_rules: mapping from water demand sectors to proxy variables
:param proxy_files: mapping of spatial proxy files to their years/variables
:param map_files: list of files containing region maps
:param temporal_files: mapping of sector to temporal downscaling method
:param temporal_methods: files that will be accessible during temporal downscaling
:param temporal_config: mapping of sector to temporal downscaling method and arguments
"""
self.root = None

# project level settings
self.years = years
self.resolution = resolution
self.bounds = bounds
self.demand_type = demand_type
self.perform_temporal = perform_temporal

# GCAM database info
self.gcam_db = gcam_db
Expand All @@ -70,15 +70,15 @@ def __init__(
self.csv = csv

# outputs
self.output_file = output_file
self.output_dir = output_dir

self.downscaling_rules = downscaling_rules
self.supersector_iterations = supersector_iterations

self.proxy_files = proxy_files
self.map_files = map_files
self.temporal_files = temporal_files

self.temporal_methods = temporal_methods
self.temporal_config = temporal_config

# data we'll load or generate later
self.region_masks = None
Expand All @@ -99,29 +99,16 @@ def __init__(
else:
self.root = os.getcwd()

if self.temporal_methods is None:
self.temporal_methods = {
'domestic': 'domestic',
'municipal': 'domestic',
'electricity': 'electricity',
'irrigation': 'irrigation'
}
else:
self.temporal_methods = {k.lower(): v for k, v in self.temporal_methods.items()}

if self.temporal_files is not None:
self.temporal_files = {k: os.path.join(self.root, v) for k, v in self.temporal_files.items()}

if self.output_file is not None:
self.output_file = os.path.join(self.root, self.output_file)
if self.output_dir is not None:
self.output_dir = os.path.join(self.root, self.output_dir)

self._parse_proxy_files()

def _parse_proxy_files(self):
"""Handle several shorthand expressions in the proxy catalog"""
out = dict()

# name may be something like "ssp1_[YEAR].tif", which actually refers to multiple files
# name may be something like "ssp1_{YEAR}.tif", which actually refers to multiple files
# such as "ssp1_2010.tif" and "ssp1_2020.tif" when info['years'] == [2010, 2020]
for name, info in self.proxy_files.items():
# promote strs to list
Expand Down Expand Up @@ -162,17 +149,17 @@ def _load_proxies(self):
"""Load all proxies from the catalog, regrid to target spatial resolution, and interpolate to target years"""
print('Loading Proxy Data')
# align each variable spatially
dataarrays = [da for filename, info in self.proxy_files.items() for da in
load_file(os.path.join(self.root, filename), self.resolution, **info).values()]
dataarrays = [da for filename, info in self.proxy_files.items() for da in load_file(
os.path.join(self.root, filename), self.resolution, bounds=self.bounds, **info).values()]

print('Interpolating Proxies')
# interpolate each variable, then merge to one array
self.proxies = xr.merge(interp_helper(xr.concat([da for da in dataarrays if da.name == variable], 'year'),
self.proxies = xr.merge(interp_helper(xr.concat([da for da in dataarrays if da.name == variable], 'year', coords='minimal'),
self.years) for variable in set(da.name for da in dataarrays)).to_array()

def _load_region_masks(self):
self.region_masks = xr.concat([load_region_map(os.path.join(self.root, filename), masks=True,
target_resolution=self.resolution)
target_resolution=self.resolution, bounds=self.bounds)
for filename in self.map_files], dim='region')

def _load_inputs(self):
Expand All @@ -186,6 +173,8 @@ def _load_inputs(self):
# filter inputs to valid regions and years
self.inputs = self.inputs[(self.inputs.region.isin(self.region_masks.region.data)) &
(self.inputs.year.isin(self.years))]
# replace "/" with "_" because it causes problems with netcdf variable names
self.inputs['sector'] = self.inputs.sector.str.replace('/', '_')

def downscale(self, distribution, inputs, region_masks):
"""Actual spatial downscaling happens here
Expand Down Expand Up @@ -224,7 +213,8 @@ def run_model(self):
rules = {supersector: rules}

proxies = xr.Dataset(
{sector: self.proxies.sel(variable=proxy if isinstance(proxy, list) else [proxy]).sum('variable')
{sector.replace('/', '_'):
self.proxies.sel(variable=proxy if isinstance(proxy, list) else [proxy]).sum('variable')
for sector, proxy in rules.items()}
).to_array(dim='sector')

Expand All @@ -246,29 +236,46 @@ def run_model(self):

region_masks_total = self.region_masks.sel(region=inputs_total.region)

downscaled = self.downscale(downscaled, inputs_total, region_masks_total)
# alternate between applying total and individual sector constraints so that both are met
for i in range(self.supersector_iterations):
downscaled = self.downscale(downscaled, inputs_total, region_masks_total)
downscaled = self.downscale(downscaled, inputs, region_masks)

# in a lot of cases this could be optimized by solving the intersections at region scale first,
# then downscaling once, but harder to implement, especially if differing regions are not subsets

if self.perform_temporal:
# write spatial downscaling outputs
if self.output_dir is not None:
filename = os.path.join(self.output_dir, f'{supersector}_{self.demand_type}.nc')
encoding = {sector: {'zlib': True, 'complevel': 5} for sector in downscaled.sector.data}
downscaled.to_dataset(dim='sector').to_netcdf(filename, encoding=encoding)
downscaled = xr.open_dataset(filename).to_array(dim='sector') # hopefully this keeps dask happy

if self.temporal_config is not None:
# calculate the monthly distributions (share of annual) for each year

# this is how we'll do this for now
if supersector.lower() in self.temporal_methods:
module = f'tethys.tdmethods.{self.temporal_methods[supersector.lower()]}'
distribution = getattr(importlib.import_module(module), 'temporal_distribution')(self)
if supersector in self.temporal_config:
module = f'tethys.tdmethods.' + self.temporal_config[supersector]['method']
temporal_distribution = getattr(importlib.import_module(module), 'temporal_distribution')
years = range(self.years[0], self.years[-1] + 1)
kwargs = self.temporal_config[supersector]['kwargs']
distribution = temporal_distribution(years=years, resolution=self.resolution,
bounds=self.bounds, **kwargs)
else:
# fall back to uniform distribution
distribution = xr.DataArray(np.full(12, 1/12, np.float32), coords=dict(month=range(1, 13)))

downscaled = interp_helper(downscaled) * distribution

self.outputs.update(downscaled.to_dataset(dim='sector'))
# write temporal downscaling outputs
if self.output_dir is not None:
filename = os.path.join(self.output_dir, f'{supersector}_{self.demand_type}_monthly.nc')
encoding = {sector: {'zlib': True, 'complevel': 5} for sector in downscaled.sector.data}
downscaled.to_dataset(dim='sector').to_netcdf(filename, encoding=encoding)
downscaled = xr.open_dataset(filename).to_array(dim='sector') # hopefully this keeps dask happy

if self.output_file is not None:
print('Writing Outputs')
# cannot have '/' in netcdf variable name
self.outputs = self.outputs.rename({name: name.replace('/', '_') for name in list(self.outputs)})
# compression
encoding = {variable: {'zlib': True, 'complevel': 5} for variable in self.outputs}
self.outputs.to_netcdf(self.output_file, encoding=encoding)
self.outputs.update(downscaled.to_dataset(dim='sector'))

def reaggregate(self, region_masks=None):
"""Reaggregate from grid cells to regions
Expand Down
7 changes: 3 additions & 4 deletions tethys/tdmethods/domestic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
from tethys.datareader.gridded import load_file


def temporal_distribution(model):
def temporal_distribution(years, resolution, tasfile, rfile, tasvar='tas', rvar='amplitude', bounds=None):
"""Temporal downscaling for domestic water demand using algorithm from Wada et al. (2011)"""

years = range(model.years[0], model.years[-1] + 1)
tas = load_file(model.temporal_files['tas'], model.resolution, years, regrid_method='intensive')['tas']
amplitude = load_file(model.temporal_files['domr'], model.resolution, years, regrid_method='label')['amplitude']
tas = load_file(tasfile, resolution, years, bounds=bounds, regrid_method='intensive', variables=[tasvar])[tasvar]
amplitude = load_file(rfile, resolution, years, bounds=bounds, regrid_method='label', variables=[rvar])[rvar]

ranges = tas.max(dim='month') - tas.min(dim='month')
ranges = xr.where(ranges != 0, ranges, 1) # avoid 0/0
Expand Down
Loading
Loading