Source code for xoa.coords

# -*- coding: utf-8 -*-
"""
Coordinates and dimensions utilities
"""
# Copyright 2020-2021 Shom
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Mapping

import xarray as xr

from .__init__ import XoaError, xoa_warn
from . import misc
from . import cf


[docs]@misc.ERRORS.format_function_docstring def get_lon(da, errors="raise"): """Get the longitude coordinate Parameters ---------- {errors} Return ------ xarray.DataArray or None """ return cf.get_cf_specs(da).search(da, 'lon', errors=errors)
[docs]@misc.ERRORS.format_function_docstring def get_lat(da, errors="raise"): """Get the latitude coordinate Parameters ---------- {errors} Return ------ xarray.DataArray or None """ return cf.get_cf_specs(da).search(da, 'lat', errors=errors)
[docs]@misc.ERRORS.format_function_docstring def get_depth(da, errors="raise"): """Get the depth coordinate Parameters ---------- {errors} Return ------ xarray.DataArray or None """ return cf.get_cf_specs(da).search(da, 'depth', errors=errors)
[docs]@misc.ERRORS.format_function_docstring def get_altitude(da, errors="raise"): """Get the altitude coordinate Parameters ---------- {errors} Return ------ xarray.DataArray or None """ return cf.get_cf_specs(da).search(da, 'altitude', errors=errors)
[docs]@misc.ERRORS.format_function_docstring def get_level(da, errors="raise"): """Get the level coordinate Parameters ---------- {errors} Return ------ xarray.DataArray or None """ return cf.get_cf_specs(da).coords.search(da, 'level', errors=errors)
[docs]@misc.ERRORS.format_function_docstring def get_vertical(da, errors="raise"): """Get either depth or altitude Parameters ---------- {errors} Return ------ xarray.DataArray or None """ cfspecs = cf.get_cf_specs() height = cfspecs.search(da, 'depth', errors="ignore") if height is None: height = cfspecs.search(da, 'altitude', errors="ignore") if height is None: errors = misc.ERRORS[errors] msg = "No vertical coordinate found" if errors == "raise": raise cf.XoaCFError(msg) elif errors == "warn": xoa_warn(msg) else: return height
[docs]@misc.ERRORS.format_function_docstring def get_time(da, errors="raise"): """Get the time coordinate Parameters ---------- {errors} Return ------ xarray.DataArray or None """ return cf.get_cf_specs(da).coords.search(da, 'time', errors=errors)
[docs]@misc.ERRORS.format_function_docstring def get_cf_coords(da, coord_names, errors="raise"): """Get several coordinates Parameters ---------- {errors} Return ------ list(xarray.DataArray) """ cfspecs = cf.get_cf_specs(da) return [cfspecs.search_coord(da, coord_name, errors=errors) for coord_name in coord_names]
[docs]@misc.ERRORS.format_function_docstring def get_dims(da, dim_types, allow_positional=False, positions='tzyx', errors="warn"): """Get the data array dimensions names from their type Parameters ---------- da: xarray.DataArray Array to scan dim_types: str, list Letters among "x", "y", "z", "t" and "f". allow_positional: bool Fall back to positional dimension of types if unkown. positions: str Default position per type starting from the end. {errors} Return ------ tuple Tuple of dimension name or None when the dimension if not found See also -------- xoa.cf.CFSpecs.get_dims """ return cf.get_cf_specs(da).get_dims( da, dim_types, allow_positional=allow_positional, positions=positions, errors=errors)
[docs]class transpose_modes(misc.IntEnumChoices, metaclass=misc.DefaultEnumMeta): """Supported :func:`transpose` modes""" #: Basic xarray transpose with :meth:`xarray.DataArray.transpose` classic = 0 basic = 0 xarray = 0 #: Transpose skipping incompatible dimensions compat = -1 #: Transpose adding missing dimensions with a size of 1 insert = 1 #: Transpose resizing to missing dimensions. #: Note that dims must be an array or a dict of sizes #: otherwise new dimensions will have a size of 1. resize = 2
[docs]def transpose(da, dims, mode='compat'): """Transpose an array Parameters ---------- da: xarray.DataArray Array to tranpose dims: tuple(str), xarray.DataArray, dict Target dimensions or array with dimensions mode: str, int Transpose mode as one of the following: {transpose_modes.rst_with_links} Return ------ xarray.DataArray Transposed array Example ------- .. ipython:: python @suppress import xarray as xr, numpy as np @suppress from xoa.coords import transpose a = xr.DataArray(np.ones((2, 3, 4)), dims=('y', 'x', 't')) b = xr.DataArray(np.ones((10, 3, 2)), dims=('m', 'y', 'x')) # classic transpose(a, (Ellipsis, 'y', 'x'), mode='classic') # insert transpose(a, ('m', 'y', 'x', 'z'), mode='insert') transpose(a, b, mode='insert') # resize transpose(a, b, mode='resize') transpose(a, b.sizes, mode='resize') # with dict # compat mode transpose(a, ('y', 'x'), mode='compat').dims transpose(a, b.dims, mode='compat').dims transpose(a, b, mode='compat').dims # same as with b.dims See also -------- xarray.DataArray.transpose """ # Inits if hasattr(dims, 'dims'): sizes = dims.sizes dims = dims.dims elif isinstance(dims, Mapping): sizes = dims dims = list(dims.keys()) else: sizes = None mode = str(transpose_modes[mode]) kw = dict(transpose_coords=True) # Classic if mode == "classic": return da.transpose(*dims, **kw) # Get specs odims = () expand_dims = {} with_ell = False for dim in dims: if dim is Ellipsis: with_ell = True odims += dim, elif dim in da.dims: odims += dim, elif mode == "insert": expand_dims[dim] = 1 odims += dim, elif mode == "resize": if sizes is None or dim not in sizes: xoa_warn(f"new dim '{dim}' in transposition is set to one" " since no size is provided to it") size = 1 else: size = sizes[dim] expand_dims[dim] = size odims += dim, # Expand if expand_dims: da = da.expand_dims(expand_dims) # Input dimensions that were not specified in transposition # are flushed to the left if not with_ell and set(odims) < set(da.dims): odims = (...,) + odims # Transpose return da.transpose(*odims, **kw)
transpose.__doc__ = transpose.__doc__.format(**locals())
[docs]class DimFlusher1D(object):
[docs] def __init__(self, da_in, coord_out, dim=None, coord_in_name=None): """Right-flush the working dimension Parameters ---------- da_in: xarray.DataArray Input data array coord_out: xarray.DataArray Output coordinate array dim: str, tuple, None Working dimension coord_in_name: str, None Input coordinate name. If not provided, it is infered. """ # Get the working dimensions if not isinstance(dim, (tuple, list)): dim = (dim, dim) dim0, dim1 = dim if None in dim or coord_in_name is None: cfspecs = cf.get_cf_specs(da_in) # - dim1 (out) if dim1 is None: # get dim1 from coord_out dim1 = cfspecs.search_dim(coord_out) if dim1 is None: raise cf.XoaCFError("No CF dimension found for output coord. " "Please specifiy the working dimension.") dim1, dim_type = dim1 else: # dim1 is provided dim_type = cfspecs.coords.get_dim_type(dim1, coord_out) # - dim0 (in) if dim0 is None: if dim_type: for c0 in da_in.coords.values(): dim0 = cfspecs.coords.search_dim(c0, dim_type) if dim0: if dim_type is None: dim0 = dim0[0] break else: raise cf.XoaCFError( "No CF {dim_type }dimension found for datarray. " "Please specifiy the working dimension.") else: dim0 = dim1 # be cafeful, dim1 must be in input! assert dim0 in da_in.dims assert dim1 in coord_out.dims # Input coordinate if coord_in_name: assert coord_in_name in da_in.coords, 'Invalid coordinate' else: coord_in = cfspecs.search_coord_from_dim(da_in, dim0) if coord_in is None: raise cf.XoaCFError( f"No coordinate found matching dimension '{dim0}'") coord_in_name = coord_in.name # Check dims # - non-common other dimensions odims0 = set(da_in.dims).difference({dim0}) odims1 = set(coord_out.dims).difference({dim1}) if odims0.difference(odims1) and odims1.difference(odims0): raise XoaError("Conflicting non working dimensions") # - common dims, with size checking cdims = odims0.intersection(odims1).difference({dim0}) for cdim in cdims: assert da_in.sizes[cdim] == coord_out.sizes[cdim] # - common dims in the order of da_in cdims0 = [] for cdim0 in da_in.dims: if cdim0 in cdims: cdims0.append(cdim0) # - input dims with output dim dims01 = list(da_in.dims) if dim0 != dim1: dims01[dims01.index(dim0)] = dim1 dims01 = tuple(dims01) # Store self._dim0, self._dim1 = dim0, dim1 self._da_in = da_in self.coord_out = transpose(coord_out, (Ellipsis,) + dims01, "compat") self.coord_out_name = self.coord_out.name or coord_in.name # self._odims0 = odims0 # self._odims1 = odims1 # self._cdims0 = cdims0 self.da_in = da_in # Transpose to push work dim right da_in = da_in.transpose( Ellipsis, *(cdims0 + [self._dim0]), transpose_coords=True) coord_out = coord_out.transpose( Ellipsis, *(cdims0 + [self._dim1])) # Broadcast data array # - data var if set(da_in.dims[:-1]) < set(coord_out.dims[:-1]): da_in = da_in.broadcast_like(coord_out, exclude=(self._dim0, self._dim1)) # - input coordinate # if (set(coord_out.dims[:-1])set(da_in.coords[coord_in_name].dims[:-1]) # < set(coord_out.dims[:-1])): if (coord_out.ndim > 1 and set(coord_out.dims[:-1]) not in set(da_in.coords[coord_in_name].dims[:-1])): #set(coord_out.dims[:-1]) > set(da_in.coords[coord_in_name].dims[:-1]): if da_in.coords[coord_in_name].ndim == 1: coord_in_name, old_coord_in_name = ( coord_in_name + '_dimflush1d', coord_in_name) else: old_coord_in_name = coord_in_name da_in.coords[coord_in_name] = ( da_in.coords[old_coord_in_name].broadcast_like( coord_out, exclude=(self._dim0, self._dim1))) coord_in = da_in.coords[coord_in_name] # - output coordinate if (coord_out.ndim > 1 and set(coord_in.dims[:-1]) > set(coord_out.dims[:-1])): coord_out = coord_out.broadcast_like( coord_in, exclude=(self._dim0, self._dim1)) # Info reverse transfoms # - input coords that doesn't have dim0 inside and must copied self.extra_coords = dict([(name, coord) for name, coord in da_in.coords.items() if dim0 not in coord.dims]) # - da shape after reshape + broadcast self.work_shape = da_in.shape[:-1] + (coord_out.sizes[dim1], ) self.work_dims = da_in.dims[:-1] + (dim1, ) self.final_dims = list(self._da_in.dims) idim0 = self.final_dims.index(dim0) self.final_dims[idim0] = dim1 self.final_dims = tuple(self.final_dims) # self.final_shape = list(self._da_in.shape) # self.final_shape[idim0] = coord_out.sizes[dim1] # Convert to numpy 2D self.da_in_data = da_in.data.reshape(-1, da_in.shape[-1]) self.coord_in_data = coord_in.data.reshape(-1, coord_in.shape[-1]) self.coord_out_data = coord_out.data.reshape(-1, coord_out.shape[-1])
[docs] def get_back(self, data_out): data_out = data_out.reshape(self.work_shape) da_out = xr.DataArray(data_out, dims=self.work_dims) da_out = da_out.transpose(Ellipsis, *self.final_dims) da_out[self.coord_out_name] = self.coord_out da_out.attrs.update(self.da_in.attrs) da_out.coords.update(self.extra_coords) da_out.name = self.da_in.name return da_out
[docs]def get_dim_types(da, unknown=None, asdict=False): """Get dimension types Parameters ---------- da: xarray.DataArray or tuple(str) Data array or tuple of dimensions unknown: Value to assign to unknown types asdict: bool Get the result as dictionary Return ------ tuple """ return cf.get_cf_specs(da).coords.get_dim_types( da, unknown=unknown, asdict=asdict)
[docs]def get_order(da): """Like :func:`get_dim_types` but returning a string""" return "".join(get_dim_types(da, unknown="-", asdict=False))
[docs]def reorder(da, order): """Transpose an array to match a given order Parameters ---------- da: xarray.DataArray Data array to transpose order: str A combination of x, y, z, t, f and - symbols and their upper case value. Letters refer to the dimension type. When the value is -, it may match any dimension type. Return ------ xarray.DataArray """ # Convert from dim_types if isinstance(order, dict): order = tuple(order.values()) if isinstance(order, tuple): order = ''.join([ ('-' if o not in "ftzyx" else o) for o in order]) # From order to dims to_dims = () dim_types = get_dim_types(da, asdict=True) ndim = len(dim_types) for i, o in enumerate(order[::-1]): if i+1 == ndim: break for dim in da.dims: if o == dim_types[dim]: to_dims = (dim, ) + to_dims break else: raise XoaError( f"Coordinate type not found: {o}. Dims are: {da.dims}") # Final transpose return transpose(da, to_dims)
[docs]def get_coords_compat_with_dims(da, include_dims=None, exclude_dims=None): """Return the coordinates that are compatible with dims Parameters ---------- da: xarray.DataArray Data array include_dims: set(str) If provided, the coordinates must have at least one of these dimensions exclude_dims: set(str) If provided, the coordinates must not have one of these dimnesions Return ------ list(str) List of coordinates """ if isinstance(include_dims, str): include_dims = {include_dims} if isinstance(exclude_dims, str): exclude_dims = {exclude_dims} coords = [] for coord in da.coords.values(): dims = set(coord.dims) if include_dims and not include_dims.intersection(dims): continue if exclude_dims and exclude_dims.intersection(dims): continue coords.append(coord) return coords
[docs]def change_index(da, dim, values): """Change the values of a dataset or data array index Parameters ---------- da: xarray.Dataset, xarray.DataArray dim: str values: array_like Return ------ xarray.Dataset, xarray.DataArray See also -------- xarray.DataArray.reset_index xarray.DataArray.assign_coords """ attrs = da.coords[dim].attrs if hasattr(values, "attrs"): attrs.update(attrs) if dim in da.indexes: da = da.reset_index([dim], drop=True) coord = xr.DataArray(values, dims=dim, attrs=attrs) return da.assign_coords({dim: coord})
[docs]def drop_dim_coords(da, dim): """Drop coords that have a particular dim""" return da.drop([c.name for c in da.coords.values() if dim in c.dims])
[docs]class positive_attr(misc.IntEnumChoices, metaclass=misc.DefaultEnumMeta): """Allowed value for the positive attribute argument""" #: Infer it from the axis coordinate infer = 0 guess = 0 #: Coordinates are increasing up up = 1 #: Coordinates are increasing down down = -1
[docs]def get_positive_attr(da, zdim=None): """Get the positive attribute of a dataset Parameters ---------- da: xarray.Dataset, xarray.DataArray zdim: None, str The index coordinate name that is supposed to have this attribute, which is usually the vertical dimension Return ------ None, "up" or "down" """ # Targets if zdim is None: zdim = get_dims(da, "z", errors="ignore") if zdim: zdim = zdim[0] if zdim and zdim in da.coords: targets = [da.coords[zdim]] else: targets = list(da.coords.values()) if isinstance(da, xr.Dataset): targets.extend(da.data_vars.values()) # Loop on targets for target in targets: if "positive" in target.attrs: positive = da.coords[zdim].attrs["positive"] return positive_attr[positive].name # Fall back to current CFSpecs cfspecs = cf.get_cf_specs(da) return cfspecs["vertical"]["positive"]