Source code for xoa.meta
"""Meta-data and conventions management"""
import os
import fnmatch
from platformdirs import user_config_dir
from .. import exceptions
from .. import misc
from . import configs
from . import general
_THISDIR = os.path.dirname(__file__)
# Joint variables and coords config specification file
INI_FILE = os.path.join(_THISDIR, "meta.ini")
#: User Meta config file
USER_META_FILE = os.path.join(user_config_dir("xoa"), "meta.cfg")
# Cache dict
_META_CACHE = {}
[docs]
def get_cache():
"""Get the meta cache dict"""
if not len(_META_CACHE):
_META_CACHE.update(
current=None, # current active specs
default=None, # default xoa specs
loaded_dicts={}, # for pure caching of dicts by key
registered=[], # for registration and matching purpose
)
return _META_CACHE
[docs]
def get_matching_item_specs(da, loc="any"):
"""Get the item Meta specs that match this data array
Parameters
----------
da: xarray.DataArray
Return
------
dict or None
See also
--------
MetaSpecs.match
"""
meta_specs = get_meta_specs(da)
cat, name = meta_specs.match(da, loc=loc)
if cat:
return meta_specs[cat][name]
def _same_attr_(da0, da1, attr):
return (
attr in da0.attrs
and attr in da1.attrs
and da0.attrs[attr].lower() == da1.attrs[attr].lower()
)
[docs]
def are_similar(da0, da1):
"""Check if two DataArrays are similar
Verifications are performed in the following order:
- ``standard_name`` attribute,
- Matching MetaSpecs item name.
- ``name`` attribute.
- ``long_name`` attribute.
Parameters
----------
da0: xarray.DataArray
da1: xarray.DataArray
Return
------
bool
"""
# Standard name
if _same_attr_(da0, da1, "standard_name"):
return True
# Cf name
meta0 = get_matching_item_specs(da0)
meta1 = get_matching_item_specs(da1)
if meta0 and meta1 and meta0.name == meta1.name:
return True
# Name
if da0.name and da0.name and da0.name == da1.name:
return True
# Long name
return _same_attr_(da0, da1, "long_name")
[docs]
def search_similar(obj, da):
"""Search in ds for a similar DataArray
See :func:`is_similar` for what means "similar".
Parameters
----------
obj: xarray.Dataset, xarray.DataArray
Dataset that must be scanned.
da: xarray.DataArray
Array that must be compared to the content of ``ds``
Return
------
xarray.DataArray or None
See also
--------
is_similar
get_matching_item_specs
"""
targets = misc.list_xr_names(obj, dims=False)
for name in targets:
if are_similar(obj[name], da):
return obj[name]
[docs]
class set_meta_specs(object):
"""Set the current Meta specs
Parameters
----------
meta_source: MetaSpecs, str, list, dict
Either a :class:`MetaSpecs` instance or the name of a registered one,
or an argument to instantiante one.
See also
--------
get_meta_specs
register_meta_specs
get_registered_meta_specs
"""
[docs]
def __init__(self, meta_source):
if isinstance(meta_source, str):
meta_specs = get_meta_specs_from_name(meta_source, errors="ignore")
if meta_specs:
meta_source = meta_specs
if not isinstance(meta_source, general.MetaSpecs):
meta_source = general.MetaSpecs(meta_source)
self.meta_cache = get_cache()
self.old_specs = self.meta_cache["current"]
self.meta_cache["current"] = self.specs = meta_source
def __enter__(self):
return self.specs
def __exit__(self, exc_type, exc_value, traceback):
if self.old_specs is None:
self.meta_cache["current"] = None
else:
self.meta_cache["current"] = self.old_specs
[docs]
def reset_cache(memory=False, **kwargs):
"""Reset the in memory meta specs cache
Parameters
----------
memory: bool
Remove the in-memory cache.
.. warning:: This may lead to unpredicted behaviors.
"""
if "disk" in kwargs:
exceptions.xoa_warn("Disk cachng is no longer supported", category="deprecation")
if memory:
meta_cache = get_cache()
meta_cache["loaded_dicts"].clear()
meta_cache["current"] = None
meta_cache["default"] = None
meta_cache["registered"].clear()
[docs]
def show_cache():
"""Show the meta specs cache file"""
exceptions.xoa_warn("Disk cachng is no longer supported", category="deprecation")
[docs]
def get_meta_config_file(name):
"""Get the path of a meta config file given its short name"""
if name.endswith(".cfg"):
name = name[:-4]
if name not in configs.META_CONFIGS:
raise exceptions.XoaMetaError(
"fInvalid meta config name '{name}'.\n"
+ "Please use on of: "
+ ", ".join(configs.META_CONFIGS)
)
return configs.META_CONFIGS[name]
[docs]
@misc.ERRORS.format_function_docstring
def get_meta_specs_from_name(name, errors="warn"):
"""Get a registered Meta specs instance from its name
Parameters
----------
name: str
{errors}
Return
------
MetaSpecs or None
Issue a warning if not found
"""
# Registered specs
meta_cache = get_cache()
for meta_specs in meta_cache["registered"][::-1]:
if meta_specs["register"]["name"] and meta_specs["register"]["name"] == name.lower():
return meta_specs
# Internal specs
if name in configs.META_CONFIGS:
meta_specs = general.MetaSpecs(configs.META_CONFIGS[name])
register_meta_specs(meta_specs)
return meta_specs
# Not found
errors = misc.ERRORS[errors]
msg = f"Unknown registration name for Meta specs: {name}"
if errors == "raise":
raise exceptions.XoaMetaError(msg)
elif errors == "warn":
exceptions.xoa_warn(msg)
[docs]
def get_meta_specs_encoding(ds):
"""Get the ``meta_specs`` encoding value
Parameters
----------
ds: xarray.DataArray, xarray.Dataset
Return
------
str or None
See also
--------
get_meta_specs_from_encoding
"""
if ds is not None and not isinstance(ds, str):
for source in ds.encoding, ds.attrs:
for attr, value in source.items():
if attr.lower() == "meta_specs":
return value
[docs]
def get_meta_specs_from_encoding(ds):
"""Get a registered Meta specs instance from the ``meta_specs`` encoding value
Parameters
----------
ds: xarray.DataArray, xarray.Dataset
Return
------
MetaSpecs or None
See also
--------
get_meta_specs_encoding
"""
if ds is not None and not isinstance(ds, str):
name = get_meta_specs_encoding(ds)
if name is not None:
return get_meta_specs_from_name(name, errors="warn")
[docs]
def get_default_meta_specs(**kwargs):
"""Get the default Meta specifications"""
if "cache " in kwargs:
exceptions.xoa_warn("Disk cachng is no longer supported", category="deprecation")
meta_cache = get_cache()
if meta_cache["default"] is not None:
return meta_cache["default"]
# Setup
meta_specs = general.MetaSpecs()
meta_cache["default"] = meta_specs
if not is_registered_meta_specs(meta_specs):
register_meta_specs(meta_specs)
return meta_specs
[docs]
def get_meta_specs(name=None, cache="rw"):
"""Get the current or a registered Meta specifications instance
Parameters
----------
name: str, "current", "default", None, xarray.Dataset, xarray.DataArray
"default" means the default xoa specs.
"current" is equivalent to None and means the currents specs,
which defaults to the xoa defaults!
Else registration name for these specs or a data array or dataset
that can be used to get the registration name if it set in the
:attr:`meta_specs` attribute or encoding.
When set, ``cache`` is ignored.
Raises a :class:`XoaError` is case of invalid name.
cache: str, bool, None
Cache default specs on disk with pickling for fast loading.
If ``None``, it defaults to boolean option :xoaoption:`meta.cache`.
Possible string values: ``"ignore"``, ``"rw"``, ``"read"``, ``"write"``.
If ``True``, it is set to ``"rw"``.
If ``False``, it is set to ``"ignore"``.
Return
------
MetaSpecs
None is return if no specs are found
Raise
-----
XoaError
When ``name`` is provided as a string and is invalid.
"""
# Explicit request
if name is None:
name = "current"
if not isinstance(name, str) or name not in ("current", "default"):
# Registered name
if isinstance(name, str):
return get_meta_specs_from_name(name, errors="raise")
# Name as dataset or data array so we infer the specs
return infer_meta_specs(name)
# Not named => current or default specs
if name == "current":
meta_cache = get_cache()
if meta_cache.get("current") is None:
meta_cache["current"] = get_default_meta_specs()
meta_specs = meta_cache["current"]
else:
meta_specs = get_default_meta_specs()
return meta_specs
[docs]
def register_meta_specs(*args, **kwargs):
"""Register :class:`MetaSpecs` in a bank optionally with a name"""
# Named arguments
args = list(args)
for name, meta_specs in kwargs.items():
if not isinstance(meta_specs, general.MetaSpecs):
meta_specs = general.MetaSpecs(meta_specs)
meta_specs.name = name
args.append(meta_specs)
# Update the cache
for meta_specs in args:
meta_cache = get_cache()
if not isinstance(meta_specs, general.MetaSpecs):
meta_specs = general.MetaSpecs(meta_specs)
if meta_specs not in meta_cache["registered"]:
if meta_specs.name: # replace if same name. warn?
for rmetas in meta_cache["registered"]:
if rmetas.name and rmetas.name == meta_specs.name:
meta_cache["registered"].remove(rmetas)
meta_cache["registered"].append(meta_specs)
[docs]
def get_registered_meta_specs(current=True, reverse=True, named=False):
"""Get the list of registered MetaSpecs
Parameters
----------
current: bool
Also include the current specs if any, always at the last position
reverse: bool
Reverse the list
named: bool
Make sure the returned MetaSpecs have a valid registration name
Return
------
list
See also
--------
register_meta_specs
"""
meta_cache = get_cache()
metal = meta_cache["registered"]
if reverse:
metal = metal[::-1]
if current and meta_cache["current"] is not None:
metal.append(meta_cache["current"])
if named is False:
metadef = get_default_meta_specs()
if metadef not in metal:
metal.append(metadef)
else:
metal = [c for c in metal if c.name]
return metal
[docs]
def is_registered_meta_specs(name):
"""Check if given meta specs set is registered
Parameters
----------
name: str, MetaSpecs
Return
------
bool
"""
for meta_specs in get_registered_meta_specs():
if (
isinstance(name, str)
and meta_specs["register"]["name"]
and meta_specs["register"]["name"] == name
):
return True
if isinstance(name, general.MetaSpecs) and name is meta_specs:
return True
return False
[docs]
def get_meta_specs_matching_score(ds, meta_specs):
"""Get the matching score between ds data_vars and coord names and a MetaSpecs instance names
Parameters
----------
ds: xarray.Dataset, xarray.DataArray
meta_specs: MetaSpecs
Return
------
float
A percentage of the number of identified data arrays vs
the total number of data arrays
"""
hit = 0
total = 0
for cat in "data_vars", "coords":
metanames = [
meta_specs[cat].get_name(name, specialize=True) for name in meta_specs[cat].names
]
if not hasattr(ds, "data_vars"): # DataArray
dsnames = [ds.name] if ds.name else []
else:
dsnames = list(getattr(ds, cat).keys())
dsnames = [meta_specs.sglocator.parse_attr("name", dsname)[0] for dsname in dsnames]
total += len(dsnames)
hit += len(set(dsnames).intersection(metanames))
if total == 0:
return 0
return 100 * hit / total
[docs]
def infer_meta_specs(ds, named=False, from_attrs=True, from_score=True):
"""Get the registered MetaSpecs that are best matching this dataset
This accomplished with some heurestics.
First, the :attr:`meta_specs` global attribute or encoding of the dataset is compared
with the name of all registered datasets.
Second, a score based on the number of data_vars and coord names
that are both in the meta_specs and the dataset is computed by :func:`get_meta_specs_matching_score`
for the registered instances.
Finally, if no matching dataset is found, the current one is returned.
Parameters
----------
ds: xarray.Dataset, xarray.DataArray
named: bool
Make sure the candidate MetaSpecs have a name
from_attrs: bool
Scan attributes to infer specs
from_score: bool
Compute the matching score to infer specs
Return
------
MetaSpecs
The matching meta specs or the current ones
See also
--------
register_meta_specs
get_registered_meta_specs
get_meta_specs_matching_score
get_meta_specs
get_meta_specs
get_meta_specs_from_name
get_meta_specs_from_encoding
"""
# By registration name first
meta_specs = get_meta_specs_from_encoding(ds)
if meta_specs:
return meta_specs
# Candidates
candidates = get_registered_meta_specs(named=named)
# By attributes
if from_attrs:
for attrs in (ds.attrs, ds.encoding):
if attrs:
for meta_specs in candidates:
for attr, pattern in meta_specs["register"]["attrs"].items():
if attr in attrs:
if isinstance(pattern, str):
pattern = [pattern]
for pat in pattern:
if fnmatch.fnmatch(str(attrs[attr]).lower(), pat.lower()):
return meta_specs
# By matching score
if from_score:
best_score = -1
for meta_specs in candidates:
score = get_meta_specs_matching_score(ds, meta_specs)
if score != 0 and score > best_score:
best_meta_specs = meta_specs
best_score = score
if best_score != -1:
return best_meta_specs
# Fallback to current specs
meta_specs = get_meta_specs("current")
if named and not meta_specs.name:
return
return meta_specs
[docs]
def assign_meta_specs(ds, name=None, register=False, set_encoding=True):
"""Set the ``meta_specs`` encoding to ``name`` in all data vars and coords
Parameters
----------
ds: xarray.DataArray, xarray.Dataset
name: None, str, MetaSpecs, xarray.DataArray, xarray.Dataset
If a :class:`MetaSpecs`, it must have a registration name :
.. code-block:: ini
[register]
name=registration_name
If not provided, :func:`infer_meta_specs` is called to infer
the best named registered specs.
register: bool
Register the specs if name is a named, unregistered :class:`MetaSpecs` instance.
set_encoding: bool
Set the "meta_specs" encoding to name.
Return
------
xarray.Dataset, xarray.DataArray
Example
-------
.. ipython:: python
@suppress
from xoa.meta import assign_meta_specs
@suppress
import xarray as xr
ds = xr.Dataset({'temp': ('lon', [5])}, coords={'lon': [6]})
assign_meta_specs(ds, "mycroco");
ds.encoding
ds.temp.encoding
ds.lon.encoding
"""
# Name as a MetaSpecs instance
if name is None:
meta_specs = infer_meta_specs(ds, named=True)
if meta_specs.name:
name = meta_specs.name
else:
return ds
elif hasattr(name, "coords"): # from a dataset/dataarray
name = get_meta_specs_encoding(ds)
if name is None:
return ds
if not isinstance(name, str):
if not name.name:
exceptions.xoa_warn("MetaSpecs instance has no registration name")
return ds
if register and not is_registered_meta_specs(name):
register_meta_specs(name)
name = name.name
# Set as encoding
if set_encoding:
targets = [ds] + [ds[name] for name in misc.list_xr_names(ds, dims=False)]
for target in targets:
target.encoding.update(meta_specs=name)
return ds
[docs]
def infer_coords(ds):
"""Infer which of the data arrays of a dataset are coordinates
When coordinates are found, it makes sure they are registered in the dataset
as coordindates.
Parameters
----------
ds: xarray.Dataset
See also
--------
MetaSpecs.infer_coords
"""
return get_meta_specs(ds).infer_coords(ds)
# infer_coords.__doc__ = MetaSpecs.infer_coords.__doc__
[docs]
@misc.ERRORS.format_function_docstring
def get_variant(ds, variants, errors="ignore"):
"""Try to find a unique generic data array in a dataset
Parameters
----------
ds: xarraya.Dataset
variants: str, list(str)
A single or a list of meta names
{errors}
Returns
-------
xarray.DataArray, None
"""
meta_specs = get_meta_specs(ds)
return meta_specs.get(ds, variants, errors=errors)