Source code for xoa.core.interp

"""
Low level interpolation routines accelerated with numba

The numerical inputs and outputs of all these routines are of scalar
or numpy.ndarray type.
"""

# Copyright 2020-2026 Shom
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import math

import numpy as np
import numba

from .geo import haversine

NOT_CI = os.environ.get("CI", "false") == "false"


# %% 2D routines


[docs] @numba.njit(parallel=True, fastmath=True) def closest2d(xxi, yyi, xo, yo): """Find indices of closest point on 2D lon/lat grid Parameters ---------- xxi: array_like(nyi, nxi) Grid longitudes in degrees yyi: array_like(nyi, nxi) Grid latitudes in degrees xo: Point longtitude yo: Point latitude Return ------ int: i index along second dim int: j Index along first dim """ nyi, nxi = xxi.shape mindist = np.pi i = 0 j = 0 for jt in numba.prange(0, nyi): for it in range(0, nxi): dist = haversine(xo, yo, xxi[jt, it], yyi[jt, it]) if dist <= mindist: i = it j = jt mindist = dist return i, j
[docs] @numba.njit(fastmath=True) def cell2relloc(x1, x2, x3, x4, y1, y2, y3, y4, x, y): """Compute coordinates of point relative to a curvilinear cell Cell shape:: 2 - 3 | | 1 - 4 Example ------- >>> cell2relloc(0., -2., 0., 2., 0., 1., 1., 0., 0., 0.5) (0.5, 0.5) See also -------- http://ntrs.nasa.gov/archive/nasa/casi.ntrs.nasa.gov/19890018062_1989018062.pdf """ small = np.finfo(np.float64).eps * 2 # Coefs a = x4 - x1 b = x2 - x1 c = x3 - x4 - x2 + x1 d = y4 - y1 e = y2 - y1 f = y3 - y4 - y2 + y1 # Solve A*p**2 + B*p + C = 0 yy = y - y1 xx = x - x1 AA = c * d - a * f BB = -c * yy + b * d + xx * f - a * e CC = -yy * b + e * xx if abs(AA) < small: p1 = -CC / BB p2 = p1 else: DD = BB**2 - 4 * AA * CC sDD = math.sqrt(DD) p1 = (-BB - sDD) / (2 * AA) p2 = (-BB + sDD) / (2 * AA) # Get q from p if abs(b + c * p1) > small: q1 = (xx - a * p1) / (b + c * p1) else: q1 = (yy - d * p1) / (e + f * p1) # Select point closest to center if p1 < 0.0 or p1 > 1.0 or q1 < 0.0 or q1 > 1.0: if abs(b + c * p2) > small: q2 = (xx - a * p2) / (b + c * p2) else: q2 = (yy - d * p2) / (e + f * p2) p = p2 q = q2 else: p = p1 q = q1 if p < -small or q < -small: p = -1.0 q = -1.0 return p, q
[docs] @numba.njit(fastmath=True, cache=NOT_CI) def grid2relloc(xxi, yyi, xo, yo): """Compute coordinates of point relative to a curvilinear grid Parameters ---------- xxi: array_like(nyi, nxi) Grid longitudes in degrees yyi: array_like(nyi, nxi) Grid latitudes in degrees xo: float Point longitude yo: float Point latitude Return ------ float: The integer part gives the grid cell index along the second dim, and the fractional part gives the coordinate relative this cell. A value of -1 means outside the grid. float: The integer part gives the grid cell index along the first dim, and the fractional part gives the coordinate relative this cell. A value of -1 means outside the grid. """ p = -1.0 q = -1.0 small = np.finfo(np.float64).eps nyi, nxi = xxi.shape # Find the closest corner ic, jc = closest2d(xxi, yyi, xo, yo) # Curvilinear to rectangular with a loop on four candidate cells for j in range(max(jc - 1, 0), min(jc + 1, nyi - 1)): for i in range(max(ic - 1, 0), min(ic + 1, nxi - 1)): # Get relative position a, b = cell2relloc( xxi[j, i], xxi[j + 1, i], xxi[j + 1, i + 1], xxi[j, i + 1], yyi[j, i], yyi[j + 1, i], yyi[j + 1, i + 1], yyi[j, i + 1], xo, yo, ) # Store absolute indices if ( (a >= 0.0 - small) and (a <= 1.0 + small) and (b >= 0.0 - small) and (b <= 1.0 + small) ): p = np.float64(i) + a q = np.float64(j) + b break else: continue break return p, q
[docs] @numba.njit(fastmath=True, cache=NOT_CI) def grid2rellocs(xxi, yyi, xo, yo): """Compute coordinates of points relative to a curvilinear grid Parameters ---------- xxi: array_like(nyi, nxi) Grid longitudes in degrees yyi: array_like(nyi, nxi) Grid latitudes in degrees xo: array_like(no) Point longitude yo: array_like(no) Point latitude Return ------ array_like(no): The integer part gives the grid cell index along the second dim, and the fractional part gives the coordinate relative this cell. A value of -1 means outside the grid. array_like(no): The integer part gives the grid cell index along the first dim, and the fractional part gives the coordinate relative this cell. A value of -1 means outside the grid. """ no = xo.size pp = np.zeros(no) qq = np.zeros(no) for i in range(xo.size): pp[i], qq[i] = grid2relloc(xxi, yyi, xo[i], yo[i]) return pp, qq
[docs] @numba.njit(parallel=True, cache=NOT_CI) def grid2locs(xxi, yyi, zzi, ti, vi, xo, yo, zo, to): """Linear interpolation of gridded data to random positions Parameters ---------- xxi: array_like(nyi, nxi) Input grid longitudes in degrees, with `nyi==1` for 1D coordinates. yyi: array_like(nyi, nxi) Input grid latitudes in degrees, with `nxi==1` for 1D coordinates. zzi: array_like(nexz, nti, nzi, nyi, nxi) Input grid depths, positive up. Non effective dimensions must be set to 1. `nexz` may be equal or a multiple of `nex`. ti: array_like(nti) Input times vi: array_like(nexz, ntiz, nzi, nyiz, nxiz) Input values. xo: array_like(no) Points longitude yo: array_like(no) Points latitude zo: array_like(no) Points depth, positive up. to: array_like(no) Points time. Return ------ array_like(nex, no) Points value. """ # Dimensions nyix, nxi = xxi.shape nyi, nxiy = yyi.shape nexz, ntiz, nzi, nyiz, nxiz = zzi.shape nexv, nti, nzi, nyi, nxi = vi.shape no = xo.shape[0] # Initalisations nex = max(nexv, nexz) vo = np.full((nex, no), np.nan, dtype=vi.dtype) bmask = np.isnan(vi) masko = np.isnan(xo) | np.isnan(yo) | np.isnan(zo) | np.isnan(to) ximin = xxi.min() ximax = xxi.max() yimin = yyi.min() yimax = yyi.max() zimin = zzi.min() zimax = zzi.max() timin = ti.min() timax = ti.max() curved = nyix != 1 # Verifications assert not curved or (nxi == nxiy and nyi == nyix), "linear4dto1: Invalid curved dimensions" assert nxiz == 1 or nxiz == nxi, "grid2locs: Invalid nxiz dimension" assert nyiz == 1 or nyiz == nyi, "grid2locs: Invalid nyiz dimension" assert ntiz == 1 or ntiz == nti, "grid2locs: Invalid ntiz dimension" # Loop on ouput points for io in numba.prange(no): # for io in range(no): if masko[io]: continue if ( (nxi != 1 and (xo[io] < ximin or xo[io] > ximax)) or (nyi != 1 and (yo[io] < yimin or yo[io] > yimax)) or (nzi != 1 and (zo[io] < zimin or zo[io] > zimax)) or (nti != 1 and (to[io] < timin or to[io] > timax)) ): continue # Weights if curved: p, q = grid2relloc(xxi, yyi, xo[io], yo[io]) if p < 0 or p > nxi - 1 or q < 0 or q > nyi - 1: continue # outside the grid i = int(p) j = int(q) a = p - i b = q - j npi = 2 npj = 2 else: # - X if nxi == 1: i = 0 a = 0.0 npi = 1 elif xxi[0, nxi - 1] == xo[io]: i = nxi - 1 a = 0.0 npi = 1 else: # i = minloc(xxi[1, :], dim=1, mask=xxi(1,:)>xo[io])-1 i = np.searchsorted(xxi[0, :], xo[io], "right") - 1 a = xo[io] - xxi[0, i] if abs(a) > 180.0: a -= 180.0 # FIXME: grid2locs: abs(a)>180. a = a / (xxi[0, i + 1] - xxi[0, i]) npi = 2 # - Y if nyi == 1: j = 0 b = 0.0 npj = 1 elif yyi[nyi - 1, 0] == yo[io]: j = nyi - 1 b = 0.0 npj = 1 else: # j = minloc(yyi[:,1], dim=1, mask=yyi[:,1]>yo[io]) - 1 j = np.searchsorted(yyi[:, 0], yo[io], "right") - 1 b = (yo[io] - yyi[j, 0]) / (yyi[j + 1, 0] - yyi[j, 0]) npj = 2 # - T if nti == 1: l = 0 d = 0.0 npl = 1 elif ti[nti - 1] == to[io]: l = nti - 1 d = 0.0 npl = 1 else: l = np.searchsorted(ti, to[io], "right") - 1 # l = minloc(ti, dim=1, mask=ti>to[io])-1 if ti[l + 1] == ti[l]: d = 0.0 npl = 1 else: d = (to[io] - ti[l]) / (ti[l + 1] - ti[l]) npl = 2 # - Z c = np.zeros(nexz) k = np.zeros(nexz, "l") npk = np.zeros(nexz, "l") if nzi == 1: k[:] = 0 c[:] = 0.0 npk[:] = 1 else: # Local zi if nxiz == 1: npiz = 1 az = 0.0 iz = 0 else: npiz = npi az = a iz = i if nyiz == 1: npjz = 1 bz = 0.0 jz = 0 else: npjz = npj bz = b jz = j if ntiz == 1: nplz = 1 dz = 0.0 lz = 0 else: nplz = npl dz = d lz = l zi = np.zeros((nexz, nzi)) for ie in range(nexz): for ll in range(nplz): for kk in range(nzi): for jj in range(0, npjz): for ii in range(0, npiz): zi[ie, kk] += ( zzi[ie, lz + ll, kk, jz + jj, iz + ii] * ((1 - az) * (1 - ii) + az * ii) * ((1 - bz) * (1 - jj) + bz * jj) * ((1 - dz) * (1 - ll) + dz * ll) ) # Normal stuff (c(nexz),zi[nexz,nzi),k(nexz) for ie in range(nexz): # extra dim if zi[ie, nzi - 1] == zo[io]: k[ie] = nzi - 1 c[ie] = 0.0 npk[ie] = 1 else: k[ie] = np.searchsorted(zi[ie], zo[io], "right") - 1 if zi[ie, k[ie] + 1] == zi[ie, k[ie]]: c[ie] = 0.0 npk[ie] = 1 else: c[ie] = (zo[io] - zi[ie, k[ie]]) / (zi[ie, k[ie] + 1] - zi[ie, k[ie]]) npk[ie] = 2 # Interpolate for ie in range(nex): if not bmask[ ie % nexv, l : l + npl, k[ie % nexz] : k[ie % nexz] + npk[ie % nexz], j : j + npj, i : i + npi, ].any(): vo[ie % nex, io] = 0.0 for ll in range(npl): for kk in range(npk[ie % nexz]): for jj in range(npj): for ii in range(npi): vo[ie % nex, io] = vo[ie % nex, io] + vi[ ie % nex, l + ll, k[ie % nexz] + kk, j + jj, i + ii, ] * ((1 - a) * (1 - ii) + a * ii) * ( (1 - b) * (1 - jj) + b * jj ) * ( (1 - c[ie % nexz]) * (1 - kk) + c[ie % nexz] * kk ) * ( (1 - d) * (1 - ll) + d * ll ) return vo
@numba.guvectorize( [ ( numba.float64[:], numba.float64[:], numba.float64[:], numba.boolean, numba.float64[:], ) ], "(nz),(nz),(),()->()", ) def isoslice(var, values, isoval, reverse, isovar): """Extract data from var where values==isoval Parameters ---------- var: array_like array from which the data are extracted values: array_like array on which a research of isoval is made isoval: float value of interest on which we perform research in values array reverse: bool search from the last index instead of the first one Return ------ isovar : array_like Sliced array based on var where values==isoval Example ------- Let's define depth and temperature variables both in 3 dimensions (i,j,k) where i and j are horizontal dimension and k the vertical one:: dep_at_t20 = isoslice(dep, temp, 20) # depth at temperature=20°C temp_at_z15 = isoslice(temp, dep, -15) # temperature at depth=-15m """ nz = var.shape[-1] isovar[0] = np.nan if reverse: istart, istop, istep = nz - 1, 1, -1 else: istart, istop, istep = 0, nz - 2, 1 # From the top for i in numba.prange(istart, istop + istep, istep): if (values[i] >= isoval[0] and values[i + istep] <= isoval[0]) or ( values[i] <= isoval[0] and values[i + istep] >= isoval[0] ): if values[i + istep] == values[i]: isovar[0] = values[i] else: isovar[0] = var[i + istep] + (isoval[0] - values[i + istep]) * ( var[i] - var[i + istep] ) / (values[i] - values[i + istep])