"""
Filtering utilities
"""
# Copyright 2020-2021 Shom
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import xarray as xr
from .__init__ import XoaError, xoa_warn
from .coords import transpose, get_dims
[docs]def get_window_func(window, *args, **kwargs):
"""Get a window function from its name
Parameters
----------
window: str, callable, list, array_like
Specification to get the window function:
- `callable`: used as is.
- `str`: supposed to be function name of the :mod:`numpy` or
:mod:`scipy.signal.windows` modules.
- `list`, `array_like`: transformed to an array and interpolated
onto ``size`` points.
args, kwargs:
Argument passed to the low level window function at calling time.
Return
------
callable
A function that only a ``size`` argument
Example
-------
.. ipython:: python
@suppress
import matplotlib.pyplot as plt
@suppress
from xoa.filter import get_window_func
func0 = get_window_func("gaussian", 22, sym=True)
func1 = get_window_func([1, 2, 5, 2, 1])
plt.plot(func0(100), label='Gaussian');
plt.plot(func1(100), label='List/array');
@savefig api.filter.get_window_func.png
plt.legend();
"""
# Explicit values so create a wrapper that interpolate them
if isinstance(window, (list, np.ndarray)):
def window_func(size):
x = np.linspace(0, 1, size)
xp = np.linspace(0, 1, len(window))
return np.interp(x, xp, window)
return window_func
# Base function
if isinstance(window, str):
if hasattr(np, window):
func = getattr(np, window)
else:
import scipy.signal.windows as sw
func = getattr(sw, window, None)
if func is None:
raise XoaError(f'Invalid window name: {window}')
else:
func = window
# Wrapper with args
if args or kwargs:
def window_func(size):
return func(size, *args, **kwargs)
return window_func
return func
# def get_window_kernel(size, window, *args, **kwargs):
# """Generate an 1d kernel from a window name or function
# It first get the window function, then returns::
# window_func(size)
# Parameters
# ----------
# size: int
# Size of the output kernel
# window: str, callable, list, array_like
# Specification to get the window function:
# - `callable`: used as is.
# - `str`: supposed to be function name of the :mod:`numpy` or
# :mod:`scipy.signal.windows` modules.
# - `list`, `array_like`: transformed to an array and interpolated
# onto ``size`` points.
# Return
# ------
# np.ndarray
# 1D kernel
# See also
# --------
# get_window_func
# """
# if isinstance(window, str):
# window = get_window_func(window, *args, **kwargs)
# elif isinstance(window, (list, np.ndarray)):
# x = np.linspace(0, 1, size)
# xp = np.linspace(0, 1, len(window))
# return np.interp(x, xp, window)
# return window(size)
[docs]def generate_isotropic_kernel(shape, window_func, fill_value=0, npt=None):
"""Generate an nD istropic kernel given a shape and a window function
Parameters
----------
shape: int, tuple
Shape of the desired kernel
window_func: str, callable
Function that take a size parameter as unique argument.
If a string, it is expected to be a numpy window function.
fill_value: float
Value to set when outside window bounds, near the corners
npt: int, None
Number of interpolation point to get the window value
at all positions. It is infered from shape if not given.
Return
------
np.ndarray
Isotropic kernel array
Example
-------
.. ipython:: python
@suppress
import numpy as np, matplotlib.pyplot as plt
@suppress
from xoa.filter import generate_isotropic_kernel
kernel = generate_isotropic_kernel((20, 30), "bartlett", np.nan)
plt.matshow(kernel, cmap="cmo.solar");
@savefig api.filter.generate_isotropic_kernel.png
plt.colorbar();
See also
--------
generate_orthogonal_kernel
generate_kernel
numpy.bartlett
numpy.blackman
numpy.hamming
numpy.hanning
numpy.kaiser
numpy.interp
"""
# Window function
window_func = get_window_func(window_func)
# Normalised indices
indices = np.indices(shape).astype('d')
for i, width in enumerate(shape):
indices[i] /= (width-1)
indices[i] -= 0.5
# Distance from bounds with 0.5 at center and < 0 outside bounds
x = 0.5 - np.sqrt((indices**2).sum(axis=0))
# Window values
if npt is None:
npt = 2*max(shape)
fp = window_func(npt)
xp = np.linspace(0, 1, npt)
# Interpolation
kernel = np.interp(x.ravel(), xp, fp).reshape(x.shape)
kernel[x < 0] = fill_value
return kernel
[docs]def generate_orthogonal_kernel(kernels, window_func="ones", fill_value=0.):
"""Generate an nD kernel from orthogonal 1d kernels
Parameters
----------
kernels: list
List of scalars and/or 1d kernels.
In case of a scalar, it is converted to an 1d kernel with
the ``window_func`` parameter.
window_func: str, callable
Function that take a size parameter as unique argument.
If a string, it is expected to be a numpy window function.
fill_value: float
Value to set when outside window bounds, when creating an 1d kernel
from a floating size
Return
------
np.ndarray
Orthogonal kernel array
Example
-------
.. ipython:: python
@suppress
import numpy as np, matplotlib.pyplot as plt
@suppress
from matplotlib.gridspec import GridSpec
@suppress
from xoa.filter import generate_orthogonal_kernel
# From sizes and functions
ny, nx = (21, 31)
kernel = generate_orthogonal_kernel((ny, nx), "bartlett")
j, i = 5, 20
plt.plot([3, 4])
fig = plt.figure(constrained_layout=True)
gs = GridSpec(3, 3, figure=fig)
ax0 = fig.add_subplot(gs[1:, :2])
ax0.matshow(kernel)
ax0.axhline(j, color='tab:red')
ax1 = fig.add_subplot(gs[0, :2], sharex=ax0)
ax1.plot(np.arange(nx), kernel[j], color='tab:red')
ax2 = fig.add_subplot(gs[1:, -1], sharey=ax0)
ax2.plot(kernel[:, i], np.arange(ny), color='tab:orange');
@savefig api.filter.generate_orthogonal_kernel_0.png
ax0.axvline(i, color='tab:orange')
# From 1d kernels
kernel = generate_orthogonal_kernel(([1, 1, 1], [1, 2, 3, 2, 1]))
plt.figure()
@savefig api.filter.generate_orthogonal_kernel_1.png
plt.matshow(kernel);
See also
--------
generate_isotropic_kernel
generate_kernel
numpy.bartlett
numpy.blackman
numpy.hamming
numpy.hanning
numpy.kaiser
numpy.interp
"""
kernel = None
for k1d in kernels:
# From scalar to 1d
if np.isscalar(k1d):
window_func = get_window_func(window_func)
if isinstance(k1d, int):
k1d = window_func(k1d)
else: # Float case
size = int(k1d)
if size != k1d:
size += 2
k1d = np.full(size, fill_value)
k1d[1:-1] = window_func(size-2)
else:
k1d = np.asarray(k1d)
# Orthogonal merge
if kernel is None:
kernel = k1d
else:
kernel = np.tensordot(kernel[:, None], k1d[None], axes=1)
return kernel
[docs]def generate_kernel(
kernel, data, isotropic=False, window_func="ones", fill_value=0.,
window_args=None, window_kwargs=None):
"""Generate a kernel that is compatible with a given data array
Parameters
----------
kernel: xarray.DataArray, np.ndarray, int, list, dictorthokernels
Ready to use kernel or specs to generate it.
- If an int, the kernel built with ones with a size
of `kernel` along all dimensions.
- If a tuple, the kernel is built with ones and a shape
equal to `kernel`.
- If a numpy array, it is used as is.
The final data array is transposed and/or expanded with
:func:`xoa.coords.transpose` to fit into the input data array.
data: xarray.DataArray
Data array that the kernel must be compatible with
isotropic: bool, tuple
Tuple of the dimensions over which must be computed isotropically.
Return
------
xarray.DataArray
Kernel array with suitable dimensions and shape
See also
--------
generate_isotropic_kernel
generate_orthogonal_kernel
xoa.coords.transpose
"""
# Isotropic
if isotropic is True:
isotropic = data.dims
# Convert to tuple
if isinstance(kernel, int):
kernel = (kernel,)*data.ndim
# Convert tuple to dict with dims
if isinstance(kernel, tuple):
if len(kernel) > data.ndim:
raise XoaError("Too many dimensions for your kernel: {} > {}"
.format(len(kernel), data.ndim))
kernel = dict(item for item in
zip(data.dims[-len(kernel):], kernel))
# Convert list to dict with dims
elif isinstance(kernel, list):
kernel = dict((dim, kernel) for dim in data.dims)
# From an size or 1d kernel for given dimensions
if isinstance(kernel, dict):
# Isotropic parameter
if isotropic:
if isotropic is True:
isotropic = data.dims
elif not set(isotropic).issubset(data.dims):
raise XoaError("invalid dimensions for isotropic keyword")
# Split into consistant isotropic and orthogonal kernels
isokernels_sizes = {}
isokernel = None
orthokernels = {}
for dim, kn in kernel.items():
if not isotropic or dim not in isotropic:
orthokernels[dim] = kn
else:
if isokernel:
if ((np.isscalar(isokernel) and not np.isscalar(kn)) or
(not np.isscalar(isokernel) and np.isscalar(kn))):
raise XoaError("Inconsistant mix of 1d and scalar "
"kernel specs for building isotropic "
"kernel")
if (not np.isscalar(kn) and not np.isscalar(isokernel)
and not np.allclose(kn, isokernel)):
raise XoaError("Inconsistant 1d kernels for building "
"isotropic kernel")
else:
isokernel = kn
size = kn if np.isscalar(kn) else len(kn)
isokernels_sizes[dim] = size
# Merge orthogonal kernels
dims = ()
kernel = None
if orthokernels:
dims += tuple(orthokernels.keys())
window_args = [] if window_args is None else window_args
window_kwargs = {} if window_kwargs is None else window_kwargs
window_func = get_window_func(
window_func, *window_args, **window_kwargs)
sizes = tuple(orthokernels.values())
kernel = generate_orthogonal_kernel(
sizes, window_func=window_func, fill_value=fill_value)
# Build isotropic kernel
if isokernel:
# List/array
if not np.isscalar(isokernel):
window_func = get_window_func(isokernel)
# nD isotropic kernel
isokernels = generate_isotropic_kernel(
tuple(isokernels_sizes.values()), window_func,
fill_value=fill_value)
# Update final kernel
dims += tuple(isokernels_sizes.keys())
if kernel is None:
kernel = isokernels
else:
kernel = np.tensordot(kernel[..., None], isokernels[None, ...])
# Numpy
elif isinstance(kernel, np.ndarray):
if kernel.ndim > data.ndim:
raise XoaError("too many dimensions for your numpy kernel: {} > {}"
.format(kernel.dim, data.ndim))
dims = data.dims[-kernel.ndim:]
# Data array
if not isinstance(kernel, xr.DataArray):
kernel = xr.DataArray(kernel, dims=dims)
elif not set(kernel.dims).issubset(set(data.dims)):
raise XoaError(f"kernel dimensions {kernel.dims} "
f"are not a subset of {data.dims}")
# Finalize
return transpose(kernel, data, mode="insert").astype(data.dtype)
[docs]def shapiro_kernel(dims):
"""Generate a shapiro kernel
Parameters
----------
dims: str, tuple
Dimension names
Return
------
xarray.DataArray
The kernel as a data array with provided dims and a shape of
``(3,)*len(dims)``
Example
-------
.. ipython:: python
@suppress
from xoa.filter import shapiro_kernel
shapiro_kernel('nx')
shapiro_kernel(('ny', 'nx'))
shapiro_kernel(('nt', 'ny', 'nx'))
"""
if isinstance(dims, str):
dims = (dims, )
ndim = len(dims)
kernel = np.zeros((3,)*ndim, dtype='d')
indices = np.indices(kernel.shape)
for idx in indices:
idx[idx == 2] = 0
kernel += idx
return xr.DataArray(kernel, dims=dims)
def _convolve_(data, kernel, normalize):
"""Pure numpy convolution that takes care of nans"""
import scipy.signal as ss
# Kernel
assert data.ndim == kernel.ndim
if kernel.dtype is not data.dtype:
xoa_warn(
"The dtype of your kernel is not the same as that of your data. "
"Converting it...")
kernel = kernel.astype(data.dtype)
# Guess mask
bad = np.isnan(data)
with_mask = bad.any()
if with_mask:
data = np.where(bad, 0, data)
# Convolutions
cdata = ss.convolve(data, kernel, mode='same')
if normalize or with_mask:
weights = ss.convolve((~bad).astype('i'), kernel, mode='same')
weights = np.clip(weights, 0, kernel.sum())
# Weigthing and masking
if with_mask:
bad = np.isclose(weights, 0, atol=float(1e-6*kernel.sum()))
if normalize:
if with_mask:
weights[bad] = 1
cdata /= weights
if with_mask:
cdata[bad.data] = np.nan
return cdata
[docs]def convolve(data, kernel, normalize=False):
"""N-dimensional convolution that takes care of nans
Parameters
----------
data: xarray.DataArray
Array to filter
kernel: int, tuple, numpy.ndarray, xarray.DataArray
Convolution kernel. See :func:`generate_kernel`.
normalize: bool
Divide the convolution product by the local sum weights.
The result is then a weighted average.
Return
------
xarray.DataArray
The filtered array with the same shape, attributes and coordinates
as the input array.
See also
--------
scipy.signal.convolve
generate_kernel
Example
-------
.. ipython:: python
@savefig api.filter.convolve.png
@suppress
import xarray as xr, numpy as np, matplotlib.pyplot as plt
@suppress
from xoa.filter import convolve
data = xr.DataArray(np.random.normal(size=(50, 70)), dims=('y', 'x'))
data[10:20, 10:20] = np.nan # introduce missing data
kernel = dict(x=[1, 2, 5, 2, 1], y=[1, 2, 1])
datac = convolve(data, kernel, normalize=True)
fig, (ax0, ax1) = plt.subplots(ncols=2, figsize=(7, 3))
kw = dict(vmin=data.min(), vmax=data.max())
data.plot.pcolormesh(ax=ax0, **kw)
datac.plot.pcolormesh(ax=ax1, **kw)
"""
# Adapt the kernel to the data
kernel = generate_kernel(kernel, data)
# Numpy convolution
datac = _convolve_(data.data, kernel.data, normalize)
# Format
return xr.DataArray(datac, coords=data.coords, attrs=data.attrs)
def _get_xydims_(data, xdim, ydim):
if xdim is None:
xdim = get_dims(
data, 'x', allow_positional=True, errors="raise")[0]
if ydim is None:
ydim = get_dims(
data, 'y', allow_positional=True, errors="raise")[0]
return xdim, ydim
def _reduce_mask_(data, excluded_dims):
rmask = data.isnull()
for dim in set(data.dims) - set(excluded_dims):
rmask = rmask.any(dim=dim)
return rmask
def _convolve_and_fill_(data, kernel):
return data.fillna(convolve(data, kernel, normalize=True))
[docs]def erode_mask(data, until=1, kernel=None, xdim=None, ydim=None):
"""Erode the horizontal mask using smoothing
Missing values are filled with the smoothed field in a iterative way.
Two cases:
- Erode a fixed number of times.
- Erode the data mask until there is no missing value where
a given horirizontal mask is False.
Parameters
----------
data: xarray.DataArray
Array of at least 2 dimensions, that are supposed to be horizontal.
until: array_like, int
Either a minimal mask, or a max number of iteration.
kernel:
Defaults to a :func:`shapiro <shapiro_kernel>` kernel.
In this case, ``xdim`` and ``ydim`` can be set to the
horizontal dimensions, otherwise they are inferred.
If ``kernel`` is provided, it must a compatible with
:func:`generate_kernel`.
Return
xarray.DataArray
Data array similar to input array, with its eroded
along x and y dimensions.
"""
if data.ndim < 2:
raise XoaError("input array must have at least 2 dimensions")
# Iteration or mask
if isinstance(until, int):
niter = until
mask = None
else:
mask = until
if isinstance(mask, np.ndarray):
xdim, ydim = _get_xydims_(data, xdim, ydim)
if mask.ndim > 2:
raise XoaError("mask must be two-dimensional")
mask = xr.DataArray(mask, dims=(ydim, xdim))
if not set(mask.dims).issubset(data.dims):
raise XoaError('mask dims must be a subset of data dims')
mask = transpose(mask, data, mode="compat")
# Kernel
if kernel is None:
xdim, ydim = _get_xydims_(data, xdim, ydim)
kernel = shapiro_kernel((ydim, xdim))
kernel = generate_kernel(kernel, data)
kdims = kernel.squeeze().dims
if len(kdims) > 2:
raise XoaError("Kernel must be two-dimensional")
# Filter
if mask is not None:
nmask_min = int(mask.sum())
nmask = np.inf
while nmask > nmask_min:
data = _convolve_and_fill_(data, kernel)
rmask = _reduce_mask_(data, kdims)
nmask = (mask | rmask).sum()
else:
for i in range(niter):
data = _convolve_and_fill_(data, kernel)
return data