"""
# NcML parser for xarray
The `open_ncml` function parse an XML document compliant with the NcML-2.2 schema and returns an xarray Dataset.
The XML is parsed into a Python objects using `xsdata`. The parser converts XML elements into classes instances
defined in an autogenerated data model (`generated.ncml_2_2`). This datamodel was created using:
```bash
xsdata generate -ds NumPy --compound-fields -mll=119 --postponed-annotations schemas/ncml-2.2.xsd
```
The code below converts the NcML instructions into xarray instructions. Not all NcML instructions are currently
supported.
## TODO
Support for these elements is missing:
- <cacheVariable>
- <logicalReduce>
- <logicalSection>
- <logicalSlice>
- <promoteGlobalAttribute>
- <scanFmrc>
Support for these attributes is missing:
- dateFormatMark
- olderThan
- tiled aggregations
"""
from __future__ import annotations
import datetime as dt
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING
from warnings import warn
import numpy as np
import xarray as xr
from xsdata.formats.dataclass.parsers import XmlParser
from .generated import (
Aggregation,
AggregationType,
Attribute,
DataType,
Dimension,
EnumTypedef,
Group,
Netcdf,
ObjectType,
Remove,
Values,
Variable,
)
if TYPE_CHECKING:
from collections.abc import Iterator
__author__ = "David Huard, Abel Aoun"
__date__ = "July 2022"
__contact__ = "huard.david@ouranos.ca"
FLATTEN_GROUPS = "*"
ROOT_GROUP = "/"
[docs]
def parse(path: Path) -> Netcdf:
"""
Parse NcML file using NetCDF datamodel based on NcML-2.2 Schema.
Parameters
----------
path : Path
Path to NcML file.
Returns
-------
Netcdf instance.
Object description of NcML content.
"""
parser = XmlParser()
return parser.from_path(path, Netcdf)
[docs]
def open_ncml(ncml: str | Path, group: str = ROOT_GROUP) -> xr.Dataset:
"""
Convert NcML document to a dataset.
Parameters
----------
ncml : str | Path
Path to NcML file.
group : str
Path of the group to parse within the ncml.
The special value ``*`` opens every group and flattens the variables into a single
dataset, renaming variables and dimensions if conflicting names are found.
Returns
-------
xr.Dataset
Dataset holding variables and attributes defined in NcML document.
"""
# Parse NcML document
ncml = Path(ncml)
obj = parse(ncml)
return read_netcdf(xr.Dataset(), xr.Dataset(), obj, ncml, group)
[docs]
def read_netcdf(target: xr.Dataset, ref: xr.Dataset, obj: Netcdf, ncml: Path, group: str) -> xr.Dataset:
"""
Return content of <netcdf> element.
Parameters
----------
target : xr.Dataset
Target dataset to be updated with <netcdf>'s content.
ref : xr.Dataset
Reference dataset used to copy content into `target`.
obj : Netcdf
<netcdf> object description.
ncml : Path
Path to NcML document, sometimes required to follow relative links.
group : str
Path of the group to parse within the ncml.
The special value ``*`` opens every group and flattens the variables into a single
dataset.
Returns
-------
xr.Dataset
Dataset holding variables and attributes defined in <netcdf> element.
"""
# Open location if any
ref = read_ds(obj, ncml) or ref
# <explicit/> element means that only content specifically mentioned in NcML document is included in dataset.
if obj.explicit is not None:
pass
else:
# By default, all metadata from the reference dataset is read: <readMetadata/>
target = ref
for item in filter_by_class(obj.choice, Aggregation):
target = read_aggregation(target, item, ncml)
if group == FLATTEN_GROUPS:
target = _flatten_groups(target, ref, obj)
else:
if not group.startswith("/"):
group = f"/{group}"
target = read_group(target, ref, obj, groups_to_read=[group])
return target
[docs]
def read_aggregation(target: xr.Dataset, obj: Aggregation, ncml: Path) -> xr.Dataset:
"""
Return merged or concatenated content of <aggregation> element.
Parameters
----------
target : xr.Dataset
Target dataset to be updated with <netcdf>'s content.
obj : Aggregation
<aggregation> object description.
ncml : Path
Path to NcML document, sometimes required to follow relative links.
Returns
-------
xr.Dataset
Dataset holding variables and attributes defined in <aggregation> element.
"""
from xarray.coding.times import CFDatetimeCoder
# Names of variables to be aggregated. All variables if undefined.
names = [v.name for v in filter_by_class(obj.choice, Aggregation.VariableAgg)]
for attr in obj.promote_global_attribute:
msg = f"{attr} in <promoteGlobalAttribute> not implemented yet."
raise NotImplementedError(msg)
# Create list of datasets to aggregate.
datasets = []
closers = []
for item in obj.netcdf:
# Open dataset defined in <netcdf>'s `location` attribute
tar = read_netcdf(xr.Dataset(), ref=xr.Dataset(), obj=item, ncml=ncml, group=ROOT_GROUP)
closers.append(tar._close)
# Select variables
if names:
tar = tar[names]
# Handle coordinate values
if item.coord_value is not None:
dtypes = [i[obj.dim_name].dtype.type for i in [tar, target] if obj.dim_name in i]
coords = read_coord_value(item, obj, dtypes=dtypes)
tar = tar.assign_coords({obj.dim_name: coords})
datasets.append(tar)
# Handle <scan> element
for item in obj.scan:
dss = read_scan(item, ncml)
datasets.extend([ds.chunk() for ds in dss])
closers.extend([ds._close for ds in dss])
# Need to decode time variable
if obj.time_units_change:
for i, ds in enumerate(datasets):
t = xr.as_variable(ds[obj.dim_name], obj.dim_name) # Maybe not the same name...
encoded = CFDatetimeCoder(use_cftime=True).decode(t, name=t.name)
datasets[i] = ds.assign_coords({obj.dim_name: encoded})
# Translate different types of aggregation into xarray instructions.
if obj.type == AggregationType.JOIN_EXISTING:
agg = xr.concat(datasets, obj.dim_name)
elif obj.type == AggregationType.JOIN_NEW:
agg = xr.concat(datasets, obj.dim_name)
elif obj.type == AggregationType.UNION:
agg = xr.merge(datasets)
else:
raise NotImplementedError
agg = read_group(agg, ref=None, obj=obj, groups_to_read=[ROOT_GROUP])
out = target.merge(agg, combine_attrs="no_conflicts")
out.set_close(partial(_multi_file_closer, closers))
return out
[docs]
def read_ds(obj: Netcdf, ncml: Path) -> xr.Dataset:
"""
Return dataset defined in <netcdf> element.
Parameters
----------
obj : Netcdf
<netcdf> object description.
ncml : Path
Path to NcML document, sometimes required to follow relative links.
Returns
-------
xr.Dataset
Dataset defined at <netcdf>' `location` attribute.
"""
if obj.location:
try:
# Python >= 3.9
location = obj.location.removeprefix("file:")
except AttributeError:
location = obj.location.strip("file:")
if not Path(location).is_absolute():
location = ncml.parent / location
return xr.open_dataset(location, decode_times=False)
def _get_leaves(group: Netcdf | Group, parent: str | None = None) -> Iterator[str]:
group_children = [child for child in group.choice if isinstance(child, Group)]
current_path = ROOT_GROUP if parent is None else f"{parent}{group.name}/"
if len(group_children) == 0:
yield current_path
for child in group_children:
yield from _get_leaves(child, parent=current_path)
def _flatten_groups(target: xr.Dataset, ref: xr.Dataset, root_group: Netcdf) -> xr.Dataset:
dims = {}
enums = {}
leaves_group = list(_get_leaves(root_group))
read_group(target, ref, root_group, groups_to_read=leaves_group, dims=dims, enums=enums)
return target
[docs]
def read_group(
target: xr.Dataset,
ref: xr.Dataset | None,
obj: Group | Netcdf,
groups_to_read: list[str],
parent_group_path: str = ROOT_GROUP,
dims: dict = None,
enums: dict = None,
) -> xr.Dataset:
"""
Parse <group> items, typically <dimension>, <variable>, <attribute> and <remove> elements.
Parameters
----------
target : xr.Dataset
Target dataset to be updated.
ref : xr.Dataset | None
Reference dataset used to copy content into `target`.
obj : Group | Netcdf
<netcdf> object description.
groups_to_read : list[str]
List of groups that must be read and included in `target`.
parent_group_path : str
Path of parent group, by default the root group '/'.
dims : dict[str, Dimension]
Dictionary of the dimensions of this dataset.
Returns
-------
xr.Dataset
Dataset holding variables and attributes defined in <netcdf> element.
"""
dims = {} if dims is None else dims
enums = {} if enums is None else enums
for item in obj.choice:
if isinstance(item, Dimension):
target = rename_dimension(target, ref, item)
dim_name = item.name
if dims.get(dim_name):
dims[dim_name].append(read_dimension(item))
else:
dims[dim_name] = [read_dimension(item)]
elif isinstance(item, Variable):
target = read_variable(target, ref, item, dims, enums, group_path=parent_group_path)
elif isinstance(item, Attribute):
read_attribute(target, item, ref)
elif isinstance(item, Remove):
target = read_remove(target, item)
elif isinstance(item, EnumTypedef):
enums[item.name] = read_enum(item)
elif isinstance(item, Group):
if any(item.name in group_name for group_name in groups_to_read):
target = read_group(
target,
ref,
item,
parent_group_path=f"{parent_group_path}{item.name}/",
dims=dims,
groups_to_read=groups_to_read,
)
else:
# ignore group
continue
elif isinstance(item, Aggregation):
pass # <aggregation> elements are parsed in `read_netcdf`
else:
raise AttributeError
return target
[docs]
def read_scan(obj: Aggregation.Scan, ncml: Path) -> list[xr.Dataset]:
"""
Return list of datasets defined in <scan> element.
Parameters
----------
obj : Aggregation.Scan instance
<scan> object description.
ncml : Path
Path to NcML document, sometimes required to follow relative links.
Returns
-------
list
List of datasets found by scan.
"""
import glob
import re
if obj.date_format_mark:
raise NotImplementedError
path = Path(obj.location)
if not path.is_absolute():
path = ncml.parent / path
files = list(path.rglob("*") if obj.subdirs else path.glob("*"))
if not files:
raise ValueError(f"No files found in {path}")
fns = map(str, files)
if obj.reg_exp:
pat = re.compile(obj.reg_exp)
files = list(filter(pat.match, fns))
elif obj.suffix:
pat = "*" + obj.suffix
files = glob.fnmatch.filter(fns, pat)
if not files:
raise ValueError("regular expression or suffix matches no file.")
files.sort()
return [xr.open_dataset(f, decode_times=False) for f in files]
[docs]
def read_coord_value(nc: Netcdf, agg: Aggregation, dtypes: list = ()):
"""
Read `coordValue` attribute of <netcdf> element.
Parameters
----------
nc : Netcdf instance
<netcdf> object description.
agg : Aggregation instance
<aggregation> object description
dtypes : tuple
List of preferred type for coordinate value.
Returns
-------
str, np.array, scalar
Coordinate value cast to proper type.
Notes
-----
The casting logic is most likely not up to spec.
"""
val = nc.coord_value
# A JOIN_NEW aggregation has exactly one coordinate value
if agg.type == AggregationType.JOIN_NEW:
coord = val
elif agg.type == AggregationType.JOIN_EXISTING:
coord = val.replace(",", " ").split()
else:
raise NotImplementedError
# Cast to dtype, not clear what the spec is exactly for this.
if dtypes:
typ = dtypes[0]
else:
try:
dt.datetime.strptime(coord, "%Y-%m-%d %H:%M:%SZ")
typ = str
except ValueError:
typ = np.float64
return typ(coord)
[docs]
def read_enum(obj: EnumTypedef) -> dict[str, list]:
"""
Parse <enumTypeDef> element.
Parameters
----------
obj : EnumTypeDef
<enumTypeDef> object.
Returns
-------
dict
A dictionary describing the Enum.
Examples
--------
.. code-block: xml
<enumTypedef name="trilean" type="enum1">
<enum key="0">false</enum>
<enum key="1">true</enum>
<enum key="2">undefined</enum>
</enumTypedef>
"""
return {e.content[0]: e.key for e in obj.content}
[docs]
def read_variable(
target: xr.Dataset,
ref: xr.Dataset,
obj: Variable,
dimensions: dict,
enums: dict[str, dict[str, int]],
group_path: str,
) -> xr.Dataset:
"""
Parse <variable> element.
Parameters
----------
target : xr.Dataset
Target dataset to be updated.
ref : xr.Dataset
Reference dataset used to copy content into `target`.
obj : Variable
<variable> object description.
dimensions : dict
Dimension attributes keyed by name.
enums : dict[str, dict]
The enums types that have been read in the parent groups.
group_path : str
Path to the parent group.
Returns
-------
xr.Dataset
Dataset holding variable defined in <variable> element.
"""
# Handle logic for variable name change
if obj.org_name:
if obj.org_name in target:
target = target.rename({obj.org_name: obj.name})
ref_var = None
elif obj.org_name in ref:
ref_var = xr.as_variable(ref[obj.org_name])
else:
raise ValueError
else:
ref_var = None
var_name = obj.name
# Read existing data or create empty DataArray
if (existing_var := target.get(var_name)) is not None and existing_var.attrs.get("group_path") in [None, group_path]:
out = xr.as_variable(target[var_name])
if obj.type:
out = out.astype(nctype(obj.type))
ref_var = None
elif (existing_var := ref.get(var_name)) is not None and existing_var.attrs.get("group_path") in [None, group_path]:
out = xr.as_variable(ref[var_name])
if obj.type:
out = out.astype(nctype(obj.type))
ref_var = ref[var_name]
elif obj.shape:
var_dims = []
shape = []
for dim in obj.shape.split(" "):
if dimensions.get(dim) is None:
err = f"Unknown dimension '{dim}'. Make sure it is declared before being used in the NCML."
raise ValueError(err)
shape.append(dimensions[dim][-1].length)
if (dim_count := len(dimensions[dim])) > 1:
dim = f"{dim}__{dim_count - 1}"
var_dims.append(dim)
out = xr.Variable(data=np.empty(shape, dtype=nctype(obj.type)), dims=var_dims)
elif obj.shape == "":
out = build_scalar_variable(var_name=var_name, values_tag=obj.values, var_type=obj.type)
else:
error_msg = f"Could not build variable `{var_name}`."
raise ValueError(error_msg)
# Set variable attributes
for item in obj.attribute:
read_attribute(out, item, ref=ref_var)
out.attrs["group_path"] = group_path
# Remove attributes or dimensions
for item in obj.remove:
read_remove(out, item)
# Read values for arrays (already done for a scalar)
if obj.values and obj.shape != "":
data = read_values(var_name, out.size, obj.values)
data = out.dtype.type(data)
out = xr.Variable(
out.dims,
data,
out.attrs,
)
if obj.logical_section:
raise NotImplementedError
if obj.logical_slice:
raise NotImplementedError
if obj.logical_reduce:
raise NotImplementedError
if obj.typedef in enums.keys():
dtype = out.dtype
new_dtype = np.dtype(dtype, metadata={"enum": enums[obj.typedef], "enum_name": obj.typedef})
out.encoding["dtype"] = new_dtype
out = out.astype(new_dtype)
elif obj.typedef is not None:
raise NotImplementedError
import re
reg = re.compile(f"^{var_name}__|{var_name}")
similar_vars_but_diff_path = [v for v in target.data_vars if reg.match(v) and target[v].attrs.get("group_path") not in [None, group_path]]
if len(similar_vars_but_diff_path) > 0:
var_name = f"{var_name}__{len(similar_vars_but_diff_path)}"
target[var_name] = out
return target
[docs]
def read_values(var_name: str, expected_size: int, values_tag: Values) -> list:
"""
Read values for <variable> element.
Parameters
----------
var_name : str
The variable name.
size : int
The variable expected size.
values_tag : Values instance
<values> object description
Returns
-------
list
A list filled with values from <values> element.
"""
if values_tag.from_attribute is not None:
error_msg = f"xncml cannot yet fetch values from a global or a variable attribute using <from_attribute>, here on variable {var_name}."
raise NotImplementedError(error_msg)
if values_tag.start is not None and values_tag.increment is not None:
number_of_values = int(values_tag.npts or expected_size)
return values_tag.start + np.arange(number_of_values) * values_tag.increment
if not isinstance(values_tag.content, list):
error_msg = f"Unsupported format of the <values> tag from variable {var_name}."
raise NotImplementedError(error_msg)
if len(values_tag.content) == 0:
error_msg = f"No values found for variable {var_name}, but a {expected_size} values were expected."
raise ValueError(error_msg)
if not isinstance(values_tag.content[0], str):
error_msg = f"Unsupported format of the <values> tag from variable {var_name}."
raise NotImplementedError(error_msg)
separator = values_tag.separator or " "
data = values_tag.content[0].split(separator)
if len(data) > expected_size:
error_msg = f"The expected size for variable {var_name} was {expected_size}, but {len(data)} values were found in its <values> tag."
raise ValueError(error_msg)
return data
[docs]
def build_scalar_variable(var_name: str, values_tag: Values, var_type: str) -> xr.Variable:
"""
Build an xr.Variable for scalar variables.
Parameters
----------
var_name : str
The variable name.
values_tag : Values instance
<values> object description
var_type : str
The variable expected type.
Returns
-------
xr.Variable
A xr.Variable filled with values from <values> element.
Raises
------
ValueError
If the <values> tag is not a valid scalar.
"""
if values_tag is None:
default_value = nctype(var_type)()
warn(
f"The scalar variable {var_name} has no values set within"
f" <values></values>. A default value of {default_value} is set"
" to preserve the type.",
stacklevel=2,
)
return xr.Variable(data=default_value, dims=())
values_content = read_values(var_name, expected_size=1, values_tag=values_tag)
if len(values_content) == 1:
return xr.Variable(data=np.array(values_content[0], dtype=nctype(var_type))[()], dims=())
if len(values_content) > 1:
error_msg = f'Multiple values found for variable {var_name} but its shape is "" thus a single scalar is expected within its <values> tag.'
raise ValueError(error_msg)
[docs]
def read_remove(target: xr.Dataset | xr.Variable, obj: Remove) -> xr.Dataset:
"""
Remove item from dataset.
Parameters
----------
target : xr.Dataset | xr.Variable
Target dataset or variable to be updated.
obj : Remove instance
<remove> object description.
Returns
-------
xr.Dataset or xr.Variable
Dataset with attribute, variable or dimension removed, or variable with attribute removed.
"""
if obj.type == ObjectType.ATTRIBUTE:
target.attrs.pop(obj.name)
elif obj.type == ObjectType.VARIABLE:
target = target.drop_vars(obj.name)
elif obj.type == ObjectType.DIMENSION:
target = target.drop_dims(obj.name)
return target
[docs]
def read_attribute(target: xr.Dataset | xr.Variable, obj: Attribute, ref: xr.Dataset = None):
"""
Update target dataset in place with new or modified attribute.
Parameters
----------
target : xr.Dataset | xr.Variable
Target dataset to be updated.
obj : Attribute instance
<attribute> object description.
ref : xr.Dataset
Reference dataset.
"""
if obj.value is not None:
target.attrs[obj.name] = cast(obj)
elif obj.org_name is not None:
try:
target.attrs[obj.name] = ref.attrs[obj.org_name]
except (AttributeError, KeyError):
target.attrs[obj.name] = target.attrs.get(obj.org_name)
target.attrs.pop(obj.org_name)
elif ref is not None:
target.attrs[obj.name] = ref.attrs.get(obj.name)
else:
raise NotImplementedError
[docs]
def read_dimension(obj: Dimension) -> Dimension:
"""Return dimension object with its length cast to an integer."""
if obj.length is not None:
obj.length = int(obj.length)
return obj
[docs]
def rename_dimension(target: xr.Dataset, ref: xr.Dataset, obj: Dimension) -> xr.Dataset:
"""Rename dimension in target dataset."""
if obj.org_name:
if obj.org_name in target.dims:
target = target.rename_dims({obj.org_name: obj.name})
elif obj.org_name in ref.dims:
target = target.expand_dims({obj.name: ref.dims[obj.org_name]})
else:
raise ValueError(f"Dimension '{obj.org_name}' not found in either the target or reference dataset.")
return target
[docs]
def nctype(typ: DataType) -> type:
"""Return Python type corresponding to the NcML DataType of object."""
if typ in [DataType.STRING, DataType.STRING_1]:
return str
elif typ in [DataType.BYTE, DataType.ENUM1]:
return np.int8
elif typ in [DataType.SHORT, DataType.ENUM2]:
return np.int16
elif typ in [DataType.INT, DataType.ENUM4]:
return np.int32
elif typ == DataType.LONG:
return int
elif typ == DataType.FLOAT:
return np.float32
elif typ == DataType.DOUBLE:
return np.float64
elif typ == DataType.UBYTE:
return np.ubyte
elif typ == DataType.USHORT:
return np.ushort
elif typ == DataType.UINT:
return np.uintc
elif typ == DataType.LONG:
return np.uint
raise NotImplementedError
[docs]
def cast(obj: Attribute) -> tuple | str:
"""Cast attribute value to the appropriate type."""
value = obj.value or obj.content
if value:
if obj.type in [DataType.STRING, DataType.STRING_1]:
return value
sep = obj.separator or " "
values = value.split(sep)
if len(values) == 1:
return nctype(obj.type)(values[0])
return tuple(map(nctype(obj.type), values))
return ""
[docs]
def filter_by_class(iterable, klass):
"""Return generator filtering on class."""
for item in iterable:
if isinstance(item, klass):
yield item
def _multi_file_closer(closers):
"""Close multiple files."""
# Note that if a closer is None, it probably means an alteration was made to the original dataset. Make sure
# that the `_close` attribute is obtained directly from the object returned by `open_dataset`.
for closer in closers:
if closer is not None:
closer()