Source code for xoa.sigma

# -*- coding: utf-8 -*-
"""
Terrain following parametric vertical coordinates

This follows the CF conventions for
`Parametric Vertical Coordinates <http://cfconventions.org/Data/cf-conventions/cf-conventions-1.8/cf-conventions.html#parametric-v-coord>`_.

"""
# Copyright 2020-2026 Shom
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re

import numpy as np
import xarray as xr

from . import exceptions
from . import misc
from . import meta
from . import coords as xcoords
from .core import sigma as core_sigma

# %% Constants

#: To convert from formula terms to CF names
FORMULA_TERMS_TO_CF_NAMES = {
    'c': 'cs',  # C
    's': 'sig',
    'sigma': 'sig',
    'eta': 'ssh',
    'depth': 'bathy',
    'depth_c': 'hc',
    'a': 'thetas',
    'b': 'thetab',
}

#: CF names that are known to have horizontal dimensions
HORIZONTAL_TERMS = ["ssh", "bathy", "hc"]


# %% Low level routines
#
# Core computation functions are now guvectorized functions imported from xoa.core.sigma
# These are universal functions that work efficiently with xarray.apply_ufunc

_atmosphere_sigma_ = core_sigma.atmosphere_sigma_coordinate
_ocean_sigma_ = core_sigma.ocean_sigma_coordinate
_ocean_s_ = core_sigma.ocean_s_coordinate
_ocean_s_g1_ = core_sigma.ocean_s_coordinate_g1
_ocean_s_g2_ = core_sigma.ocean_s_coordinate_g2


# %% Wrapper


def _apply_ocean_s_(func, sig, ssh, bathy, hc, thetas, thetab, cs, cs_type, dask):
    # Stretching curve
    if cs is None:
        if None in (thetas, thetab):
            raise exceptions.XoaSigmaError("thetas and thetab must be provided when cs is not")
        cs = get_cs(sig, thetas, thetab, cs_type)

    # Checks dims
    if np.ndim(hc) and set(hc.dims) != set(bathy.dims):
        raise exceptions.XoaSigmaError("Incompatible dimensions between bathy and hc")

    # Call core routine
    zdim = sig.dims[0]
    depth = xr.apply_ufunc(
        func,
        sig,
        ssh,
        bathy,
        hc,
        cs,
        input_core_dims=[[zdim], [], [], [], [zdim]],
        output_core_dims=[[zdim]],
        exclude_dims={zdim},
        dask=dask,
        dask_gufunc_kwargs={"output_sizes": {zdim: sig.shape[0]}},
    )

    # Format
    return meta.get_meta_specs(sig).format_data_var(
        depth, "depth", format_coords=False, rename_dims=False
    )


# %% High level routines


[docs] def atmosphere_sigma_coordinate(sig, ps, ptop, dask="parallelized", cache=None): """Convert from sigma [0, 1] to pressure in an atmospheric model .. note:: This function is dask-aware since it delegates the core computation to :func:`xarray.apply_ufunc`. Source: `Atmosphere sigma coordinate <http://cfconventions.org/Data/cf-conventions/cf-conventions-1.8/cf-conventions.html#_atmosphere_sigma_coordinate>`_ Formula: .. math:: p = p_{top} + \\sigma * (p_{surf}-p_{top}) Sigma standard name: ``atmosphere_sigma_coordinate`` Formula terms: ``sigma: var1 ps: var2 ptop: var3`` Parameters ---------- sig: xarray.DataArray Sigma coordinates range from 0 to 1 (:math:`\\sigma` | ``sigma``) ps: xarray.DataArray Surface air pressure (:math:`p_{surf}` | ``ps``) ptop: xarray.DataArray Air pressure at top of model (:math:`p_{top}` | ``ptop``) Returns ------- xarray.DataArray Air pressure in Pa (:math:`p`) """ if cache is not None: exceptions.xoa_warn("The `cache` parameter is currently not used.") # Call core routine zdim = sig.dims[0] p = xr.apply_ufunc( _atmosphere_sigma_, sig, ps, ptop, input_core_dims=[[zdim], [], []], output_core_dims=[[zdim]], exclude_dims={zdim}, dask=dask, dask_gufunc_kwargs={"output_sizes": {zdim: sig.shape[0]}}, ) # Format return meta.get_meta_specs(sig).format_data_var( p, "plev", format_coords=False, rename_dims=False )
[docs] def ocean_sigma_coordinate(sig, ssh, bathy, dask="parallelized", cache=None): """Convert from sigma [-1, 0] to negative depths in an ocean model .. note:: This function is dask-aware since it delegates the core computation to :func:`xarray.apply_ufunc`. Source: `Ocean sigma coordinate <http://cfconventions.org/Data/cf-conventions/cf-conventions-1.8/cf-conventions.html#_ocean_sigma_coordinate>`_ Formula: .. math:: z = \\eta + \\sigma * (\\eta+h) Sigma standard name: ``ocean_sigma_coordinate`` Formula terms: ``sigma: var1 eta: var2 depth: var3`` Parameters ---------- sig: xarray.DataArray Sigma coordinates ranging from -1 to 0 (:math:`\\sigma` | ``sigma``) ssh: xarray.DataArray Sea surface height (:math:`\\eta` | ``eta``) bathy: xarray.DataArray Positive sea floor depth (:math:`h` | ``depth``) Returns ------- xarray.DataArray Negative depth below surface in m (:math:`z`) """ if cache is not None: exceptions.xoa_warn("The `cache` parameter is currently not used.") # Compute if not set(bathy.dims) <= set(ssh.dims): raise exceptions.XoaSigmaError("Incompatible dimensions between bathy and ssh") # Call core routine zdim = sig.dims[0] depth = xr.apply_ufunc( _ocean_sigma_, sig, ssh, bathy, input_core_dims=[[zdim], [], []], output_core_dims=[[zdim]], exclude_dims={zdim}, dask=dask, dask_gufunc_kwargs={"output_sizes": {zdim: sig.shape[0]}}, ) # Format return meta.get_meta_specs(sig).format_data_var( depth, "depth", format_coords=False, rename_dims=False )
[docs] def get_cs(sig, thetas, thetab, cs_type=None): """Get a s-coordinate stretching curve Parameters ---------- sig: xarray.DataArray Sigma coordinates range from 0 to 1 (:math:`s` | ``s``) thetas: xarray.DataArray Surface control parameter (:math:`a` | ``a``) thetab: xarray.DataArray Bottom control parameter (:math:`b` | ``b``) cs_type: str, None Stretching type: ``None`` (default): .. math:: C & = (1-b)*\\frac{\\sinh(a*s)}{\\sinh(a)} + b*\\left[\\frac{\\tanh(a*(s+0.5))}{2*\\tanh(0.5*a)} - 0.5\\right] Returns ------- xarray.DataArray Stretching curve (:math:`C` | ``C``) Example ------- .. code-block:: python >>> sig = xr.DataArray(np.linspace(-1, 0, 10), dims="sig") >>> cs = get_cs(sig, thetas=7.0, thetab=2.0) See also -------- ocean_s_coordinate ocean_s_coordinate_g1 ocean_s_coordinate_g2 """ s, a, b = sig, thetas, thetab cs = np.sinh(s * a) * (1 - b) / np.sinh(a) cs = cs + b * (np.tanh(a * (s + 0.5)) / (2 * np.tanh(0.5 * a)) - 0.5) if hasattr(cs, "coords"): cs.name = None cs = meta.get_meta_specs(sig).format_data_var( cs, "cs", format_coords=False, rename_dims=False ) return cs
[docs] def ocean_s_coordinate( sig, ssh, bathy, hc, thetas=None, thetab=None, cs=None, cs_type=None, cache=None, dask="parallelized", ): """Convert from s [-1, 0] to depths in an ocean model .. note:: This function is dask-aware since it delegates the core computation to :func:`xarray.apply_ufunc`. Source: `Ocean s-coordinate <http://cfconventions.org/Data/cf-conventions/cf-conventions-1.8/cf-conventions.html#_ocean_s_coordinate>`_ Formula: .. math:: z & = \\eta*(1+s) + h_c*s + (h-h_c)*C C & = (1-b)*\\frac{\\sinh(a*s)}{\\sinh(a)} + b*\\left[\\frac{\\tanh(a*(s+0.5))}{2*\\tanh(0.5*a)} - 0.5\\right] Sigma standard name: ``ocean_s_coordinate`` Formula terms: ``s: var1 eta: var2 depth: var3 a: var4 b: var5 depth_c: var6`` Parameters ---------- sig: xarray.DataArray Sigma coordinates ranging from -1 to 0 (:math:`s` | ``s``) ssh: xarray.DataArray Sea surface height (:math:`\\eta` | ``eta``) bathy: xarray.DataArray Positive sea floor depth (:math:`h` | ``depth``) hc: xarray.DataArray, float Positive critical depth (:math:`h_c` | ``depth_c``) thetas: xarray.DataArray, None Surface control parameter (:math:`a` | ``a``). Required if ``cs`` is not provided. thetab: xarray.DataArray, None Bottom control parameter (:math:`b` | ``b``). Required if ``cs`` is not provided. cs: xarray.DataArray, None Stretching curve, which defaults to the formula above computed by :func:`get_cs` (:math:`C` | ``C``) cs_type: str, None Stretching type (see :func:`get_cs`) cache: dict Deprecated, currently unused. Returns ------- xarray.DataArray Negative depth below surface in m (:math:`z`) """ if cache is not None: exceptions.xoa_warn("The `cache` parameter is currently not used.") return _apply_ocean_s_(_ocean_s_, sig, ssh, bathy, hc, thetas, thetab, cs, cs_type, dask)
[docs] def ocean_s_coordinate_g1( sig, ssh, bathy, hc, thetas=None, thetab=None, cs=None, cs_type=None, cache=None, dask="parallelized", ): """Convert from s [-1, 0] generic form 1 to depths in an ocean model .. note:: This function is dask-aware since it delegates the core computation to :func:`xarray.apply_ufunc`. Source: `Ocean s-coordinate, generic form 1 <http://cfconventions.org/cf-conventions/cf-conventions.html#_ocean_s_coordinate_generic_form_1>`_ Formula: .. math:: z & = S + \\eta*(1+s) + (1 + S / h) S & = h_c s + (h - h_c) C C & = (1-b)*\\frac{\\sinh(a*s)}{\\sinh(a)} + b*\\left[\\frac{\\tanh(a*(s+0.5))}{2*\\tanh(0.5*a)} - 0.5\\right] Sigma standard name: ``ocean_s_coordinate_g1`` Formula terms: ``s: var1 C: var2 eta: var3 depth: var4 depth_c: var5`` Parameters ---------- sig: xarray.DataArray Sigma coordinates ranging from -1 to 0 (:math:`s` | ``s``) ssh: xarray.DataArray Sea surface height (:math:`\\eta` | ``eta``) bathy: xarray.DataArray Positive sea floor depth (:math:`h` | ``depth``) hc: xarray.DataArray, float Positive critical depth (:math:`h_c` | ``depth_c``) thetas: xarray.DataArray, None Surface control parameter (:math:`a` | ``a``). Optional if ``cs`` is provided. thetab: xarray.DataArray, None Bottom control parameter (:math:`b` | ``b``). Optional if ``cs`` is provided. cs: xarray.DataArray, None Stretching curve, which defaults to the formula above computed by :func:`get_cs` (:math:`C` | ``C``) cs_type: str, None Stretching type (see :func:`get_cs`) cache: dict Deprecated, currently unused. Returns ------- xarray.DataArray Negative depth below surface in m (:math:`z`) """ if cache is not None: exceptions.xoa_warn("The `cache` parameter is currently not used.") return _apply_ocean_s_(_ocean_s_g1_, sig, ssh, bathy, hc, thetas, thetab, cs, cs_type, dask)
[docs] def ocean_s_coordinate_g2( sig, ssh, bathy, hc, thetas=None, thetab=None, cs=None, cs_type=None, cache=None, dask="parallelized", ): """Convert from s [-1, 0] generic form 2 to depths in an ocean model .. note:: This function is dask-aware since it delegates the core computation to :func:`xarray.apply_ufunc`. Source: `Ocean s-coordinate, generic form 2 <http://cfconventions.org/Data/cf-conventions/cf-conventions-1.8/cf-conventions.html#_ocean_s_coordinate_generic_form_2>`_ Formula: .. math:: z & = \\eta + (\\eta + h) * S S & = \\frac{h_c s + h C}{h_c + h} C & = (1-b)*\\frac{\\sinh(a*s)}{\\sinh(a)} + b*\\left[\\frac{\\tanh(a*(s+0.5))}{2*\\tanh(0.5*a)} - 0.5\\right] Sigma standard name: ``ocean_s_coordinate_g2`` Formula terms: ``s: var1 C: var2 eta: var3 depth: var4 depth_c: var5`` Parameters ---------- sig: xarray.DataArray Sigma coordinates ranging from -1 to 0 (:math:`s` | ``s``) ssh: xarray.DataArray Sea surface height (:math:`\\eta` | ``eta``) bathy: xarray.DataArray Positive sea floor depth (:math:`h` | ``depth``) hc: xarray.DataArray, float Positive critical depth (:math:`h_c` | ``depth_c``) thetas: xarray.DataArray, None Surface control parameter (:math:`a` | ``a``). Optional if ``cs`` is provided. thetab: xarray.DataArray, None Bottom control parameter (:math:`b` | ``b``). Optional if ``cs`` is provided. cs: xarray.DataArray, None Stretching curve, which defaults to the formula above computed by :func:`get_cs` (:math:`C` | ``C``) cs_type: str, None Stretching type (see :func:`get_cs`) cache: dict Deprecated, currently unused. Returns ------- xarray.DataArray Negative depth below surface in m (:math:`z`) """ if cache is not None: exceptions.xoa_warn("The `cache` parameter is currently not used.") return _apply_ocean_s_(_ocean_s_g2_, sig, ssh, bathy, hc, thetas, thetab, cs, cs_type, dask)
#: Supported sigma coordinates SIGMA_COORDINATE_FUNCTIONS = { "atmosphere_sigma_coordinate": atmosphere_sigma_coordinate, "ocean_sigma_coordinate": ocean_sigma_coordinate, "ocean_s_coordinate": ocean_s_coordinate, "ocean_s_coordinate_g1": ocean_s_coordinate_g1, "ocean_s_coordinate_g2": ocean_s_coordinate_g2, } # %% File decoding def _ds_search_ci_(ds, name): """Case insensitive search in data_vars, coords and attrs of a dataset Parameters ---------- ds: xarray.Dataset name: str Requested name Returns ------- str Real name """ lname = name.lower() for cat in "data_vars", "coords", "attrs": pool = getattr(ds, cat) names = [nm for nm in pool.keys()] lnames = [nm.lower() for nm in names] if lname in lnames: return names[lnames.index(lname)] _re_ft_split_terms = re.compile(r'\b\s+\b').split _re_ft_split_item = re.compile(r'\s*:\s*').split
[docs] def decode_formula_terms(attr): """Parse the formula_terms attribute Parameters ---------- attr: str Attribute value Returns ------- dict Example ------- .. ipython:: python @suppress from xoa.sigma import decode_formula_terms decode_formula_terms( 's: sc_r C: Cs_r eta: zeta depth: h depth_c: hc') """ terms = {} for item in _re_ft_split_terms(attr): item = _re_ft_split_item(item) if len(item) != 2: raise exceptions.XoaSigmaError("Malformed formula_terms attribute: " + attr) terms[item[0]] = item[1] return terms
[docs] def get_sigma_terms(ds, vloc=None, hlocs=None, rename=False): """Get sigma terms from a dataset as another dataset It operates like this: 1. Search for the sigma variables. 2. Parse their ``formula_terms`` attribute. 3. Create a dict for each locations from names in datasets to :mod:`xoa.meta` compliant names that are also used in conversion functions. Parameters ---------- ds: xarray.Dataset vloc: str, {"any", None}, False Staggered grid vertical location. If any or None, results for all locations are returned. hlocs: None, list of str A list of horizontal grid locations Returns ------- dict, dict of dict, dict of dict of dict A dict is generated for a given sigma variable, whose keys are array names, like ``"sc_r"``, and values are :mod:`~xoa.meta` names, like ``"sig"``. A special key is the ``"type"`` whose corresponding value is the ``standard_name``, stripped from its potential staggered grid location indicator. If ``loc`` is ``"any"`` or ``None``, each dict is embedded in a master dict whose keys are staggered grid location. If no location is found, the key is set ``None``. Raises ------ xoa.sigma.exceptions.XoaSigmaError In case of: - inconsistent staggered grid location in dataarrays as checked by :meth:`xoa.meta.SGLocator.get_location` - no standard_name in sigma/s variable - a malformed formula - a formula term variable that is not found in the dataset - an unknown formula term name """ # Get sigma arrays meta_specs = meta.get_meta_specs(ds) vsingle = vloc not in ("any", None) hsingle = not isinstance(hlocs, list) if hsingle: hlocs = [hlocs] sigs = meta_specs.search(ds, 'sig', loc=vloc, single=False) terms = {} for sig in sigs: # Check standard_name and get loc if "standard_name" not in sig.attrs: raise exceptions.XoaSigmaError( "No standard_name attribute found in sigma/s " "variable name: " + sig.name ) standard_name, loc = meta_specs.sglocator.parse_attr("standard_name", sig.standard_name) # skip this one if standard_name not in SIGMA_COORDINATE_FUNCTIONS: continue # Get formula terms if "formula_terms" not in sig.attrs: raise exceptions.XoaSigmaError( f"Sigma/s type variable {sig.name} " "has no formula_term attribute" ) formula_terms = decode_formula_terms(sig.formula_terms) # Loop in horizontal locations subterms = terms[loc] = {} for hloc in hlocs: # Check terms subsubterms = subterms[hloc] = {sig.name: "sig", "type": standard_name} for fname, fvname in formula_terms.items(): # xoa.meta name # TODO: handle mising cs and fallback with thetas and thetab if fname.lower() not in FORMULA_TERMS_TO_CF_NAMES: raise exceptions.XoaSigmaError("Unknown formula term name: " + fname) meta_name = FORMULA_TERMS_TO_CF_NAMES[fname.lower()] # Real name if meta_name in HORIZONTAL_TERMS: fvname = meta_specs.sglocator.format_attr("name", fvname, hloc) vname = _ds_search_ci_(ds, fvname) if vname is None: raise exceptions.XoaSigmaError("Formula array not found: " + fvname) subsubterms[vname] = meta_name if hsingle: for subterms in terms.values(): subsubterms = subterms[hloc] del subterms[hloc] subterms.update(subsubterms) if vsingle: return subterms if sigs else None return terms
# # Rename terms in dict or ds # ds = ds.rename({sigma.name: "sig"}) # for term_name, da_name_ in terms.items(): # da_name = _ds_search_ci_(ds, da_name_) # if da_name is None: # exceptions.xoa_warn("Formula term dataarray not found: " + da_name_) # ds = ds.rename({da_name: term_name}) # return ds
[docs] @misc.ERRORS.format_function_docstring def decode_sigma(ds, rename=False, hlocs=None, errors="raise"): """Compute heights from sigma-like variable in a dataset using CF conventions This makes use of the :class:`~xoa.meta.MetaSpecs` instance that is retrieved with :func:`xoa.meta.get_meta_specs` with ``ds`` as an argument in order to find needed variables. If the dataset is not found to have sigma-like coordinates, a simple copy is returned. When a data variable that has the same dimensions is found, the computed coordinate is transposed properly and assigned to the variable as a coordinate array. Parameters ---------- ds: xarray.Dataset Dataset that contains everything needed to compute heights rename: bool Rename and format arrays to make them compliant with :mod:`xoa.meta` hlocs: None, list of str Horizontal staggered grid locations to search for "bathy" and "ssh". By default, no location is assumed. {errors} Return ------ xarray.Dataset """ # Init ds = ds.copy() meta_specs = meta.get_meta_specs(ds) errors = misc.ERRORS[errors] # Decode formula terms try: all_terms = get_sigma_terms(ds, vloc=None, hlocs=hlocs) if all_terms is None: return ds except exceptions.XoaSigmaError as e: if errors == "raise": raise e if errors == "warn": exceptions.xoa_warn("Error while decoding sigma coords: {}. Skipping...".format(e)) return ds # Loop on locations sigma_type = None hsingle = not isinstance(hlocs, (list, tuple)) if hsingle: hlocs = [hlocs] for vloc, vterms in all_terms.items(): for hloc in hlocs: terms = vterms if hsingle else vterms[hloc] hloc = meta_specs.sglocator.parse_loc_arg(hloc) sigma_type = terms.pop("type") # Args: keys are meta names, values are dataarrays kwargs = {} for vname, meta_name in terms.items(): if meta_name in HORIZONTAL_TERMS: vname = meta_specs.sglocator.format_attr("name", vname, hloc) kwargs[meta_name] = ds[vname] # Compute depth/altitude/pressure func = SIGMA_COORDINATE_FUNCTIONS[sigma_type] # cache = {} # kwargs['cache'] = cache height = func(**kwargs) # Format loc = (hloc or "") + (vloc or "") height = meta_specs.sglocator.format_dataarray(height, loc) # Transpose if appropriate and set as coordinate transposed = False as_coord = False for da in ds.data_vars.values(): if set(height.dims) <= set(da.dims): if not transposed: height = xcoords.transpose(height, da, "compat") transposed = True ds[da.name] = da.assign_coords({height.name: height}) as_coord = True # Make sure it is in the dataset if not as_coord: ds.coords[height.name] = height if sigma_type: ds.encoding["xoa_sigma_type"] = sigma_type return ds
[docs] def decode_cf_sigma(*args, **kwargs): exceptions.xoa_warn( "decode_cf_sigma is deprecated. Please use decode_sigma instead.", "deprecation" ) return decode_sigma(*args, **kwargs)