Source code for xncml.core

"""
Core features of xncml.

This module exposes the ``Dataset`` class which is used to manipulate ncml files.
"""

from collections import OrderedDict
from enum import Enum
from pathlib import Path
from typing import Any
from warnings import warn

import xmltodict


[docs] class Dataset: """ A class for reading and manipulating NcML file. Note that NcML documents are used for two distinct purposes: - an XML description of NetCDF structure and metadata; - create virtual NetCDF datasets, e.g. an aggregation of multiple files. This class supports both types of uses. """ def __init__(self, filepath: str = None, location: str = None): """ Create a Dataset. Parameters ---------- filepath : str File path to dataset NcML file. If it does not exist, an empty NcML document will be created and this will be the default filename when writing to disk with `to_ncml`. location : Str NetCDF file location. Set this to create a NcML file modifying an existing NetCDF document. """ self.filepath = Path(filepath) if filepath is not None else None if self.filepath and self.filepath.exists(): # Convert all dictionaries to lists of dicts to simplify the internal logic. self.ncroot = self._parse_xml(self.filepath.read_text()) else: self.ncroot = OrderedDict() self.ncroot["netcdf"] = OrderedDict({"@xmlns": "http://www.unidata.ucar.edu/namespaces/netcdf/ncml-2.2"}) if location is not None: self.ncroot["netcdf"]["@location"] = str(location)
[docs] @classmethod def from_text(cls, xml: str): """Create Dataset from xml string.""" self = cls() self.ncroot = self._parse_xml(xml) return self
@staticmethod def _parse_xml(xml: str) -> dict: """Return dictionary from xml.""" return xmltodict.parse( xml, force_list=["variable", "attribute", "group", "dimension"], process_namespaces=True, namespaces={ "http://www.unidata.ucar.edu/namespaces/netcdf/ncml-2.2": None, "https://www.unidata.ucar.edu/namespaces/netcdf/ncml-2.2": None, }, ) def __repr__(self) -> str: """Return a string representation of the parsed xml""" return xmltodict.unparse(self.ncroot, pretty=True) # Aggregations and scans
[docs] def add_aggregation(self, dim_name: str, type_: str, recheck_every: str = None, time_units_change: bool = None): """ Add aggregation. Parameters ---------- dim_name : str Dimension name. ``type_`` : str Aggregation type. recheck_every : str Time interval for rechecking the aggregation. Only used if ``type_`` is ``AggregationType.scan``. time_units_change : bool Whether the time units change. Only used if ``type_`` is ``AggregationType.scan``. """ at = AggregationType(type_) item = OrderedDict( { "@dimName": dim_name, "@type": at.value, "@recheckEvery": recheck_every, "@timeUnitsChange": time_units_change, } ) item = preparse(item) aggregations = self.ncroot["netcdf"].get("aggregation", []) for agg in aggregations: if agg["@dimName"] == dim_name: agg.update(item) break else: aggregations.append(item) self.ncroot["netcdf"]["aggregation"] = aggregations
[docs] def add_variable_agg(self, dim_name: str, name: str): """ Add variable aggregation. Parameters ---------- dim_name: str Dimension name for the aggregation. name : str Variable name. """ item = OrderedDict({"@name": name}) aggregations = self.ncroot["netcdf"].get("aggregation") for agg in aggregations: if agg["@dimName"] == dim_name: variables = agg.get("variableAgg", []) for var in variables: if var["@name"] == name: var.update(item) break else: variables.append(item) agg["variableAgg"] = variables
[docs] def add_scan( self, dim_name: str, location: str, reg_exp: str = None, suffix: str = None, subdirs: bool = True, older_than: str = None, date_format_mark: str = None, enhance: bool = None, ): """ Add scan element. Parameters ---------- dim_name : str Dimension name. location : str Location of the files to scan. reg_exp : str Regular expression to match the full pathname of files. suffix : str File suffix. subdirs : bool Whether to scan subdirectories. older_than : str Older than time interval. date_format_mark : str Date format mark. enhance : bool Whether to enhance the scan. """ item = OrderedDict( { "@location": location, "@regExp": reg_exp, "@suffix": suffix, "@subdirs": subdirs, "@olderThan": older_than, "@dateFormatMark": date_format_mark, "@enhance": enhance, } ) item = preparse(item) # An aggregation must exist for the scan to be added. for agg in self.ncroot["netcdf"].get("aggregation"): if agg["@dimName"] == dim_name: scan = agg.get("scan", []) scan.append(item) agg["scan"] = scan break else: raise ValueError(f"No aggregation found for dimension {dim_name}.")
# Variable
[docs] def add_variable_attribute(self, variable, key, value, type_="String"): """ Add variable attribute. Parameters ---------- variable : str Variable name key : str Attribute name value : object Attribute value. Must be a serializable Python Object ``type_`` : str, default: 'String' String describing attribute type. """ item = OrderedDict({"@name": key, "@type": type_, "@value": value}) variables = self.ncroot["netcdf"].get("variable", []) for var in variables: if var["@name"] == variable: var_attributes = var.get("attribute", []) for attr in var_attributes: if attr["@name"] == key: attr.update(item) break else: var_attributes.append(item) var["attribute"] = var_attributes break else: variables.append(OrderedDict({"@name": variable, "attribute": item})) self.ncroot["netcdf"]["variable"] = variables
[docs] def remove_variable_attribute(self, variable, key): """Remove variable attribute""" item = OrderedDict({"@name": key, "@type": "attribute"}) variables = self.ncroot["netcdf"].get("variable", []) for var in variables: if var["@name"] == variable: var["remove"] = item break else: new_var = OrderedDict({"@name": variable, "remove": item}) variables.append(new_var) self.ncroot["netcdf"]["variable"] = variables
[docs] def rename_variable(self, variable, new_name): """ Rename variable attribute Parameters ---------- variable : str Original variable name. new_name : str New variable name. """ item = OrderedDict({"@name": new_name, "@orgName": variable}) variables = self.ncroot["netcdf"].get("variable", []) for var in variables: if var["@name"] == variable: var["@name"] = new_name var["@orgName"] = variable break else: variables.append(item) self.ncroot["netcdf"]["variable"] = variables
[docs] def remove_variable(self, variable): """ Remove dataset variable. Parameters ---------- key : str Name of the variable to remove. """ item = OrderedDict({"@name": variable, "@type": "variable"}) removes = self.ncroot["netcdf"].get("remove", []) if item not in removes: removes.append(item) self.ncroot["netcdf"]["remove"] = removes
[docs] def rename_variable_attribute(self, variable, old_name, new_name): """ Rename variable attribute. Parameters ---------- variable : str Variable name. old_name : str Original attribute name. new_name : str New attribute name. """ item = OrderedDict({"@name": new_name, "@orgName": old_name}) variables = self.ncroot["netcdf"].get("variable", []) for var in variables: if var["@name"] == variable: attrs = var.get("attribute", []) for attr in attrs: if attr["@name"] == old_name: attr["@name"] = new_name attr["@orgName"] = old_name break else: attrs.append(item) break else: new_var = OrderedDict({"@name": "variable", "attribute": item}) variables.append(new_var) self.ncroot["netcdf"]["variable"] = variables
# Dimensions
[docs] def rename_dimension(self, dimension, new_name): """ Rename dimension. Parameters ---------- dimension: str Original dimension name. new_name: str New dimension name. """ item = OrderedDict({"@name": new_name, "@orgName": dimension}) dimensions = self.ncroot["netcdf"].get("dimension", []) for dim in dimensions: if dim["@name"] == dimension: dim["@name"] = new_name dim["@orgName"] = dimension break else: dimensions.append(item) self.ncroot["netcdf"]["dimensions"] = dimensions
# Dataset
[docs] def add_dataset_attribute(self, key, value, type_="String"): """ Add dataset attribute Parameters ---------- key : str Attribute name. value : object Attribute value. Must be a serializable Python Object. ``type_`` : str, default: 'String' String describing attribute type. """ item = OrderedDict({"@name": key, "@type": type_, "@value": value}) attributes = self.ncroot["netcdf"].get("attribute", []) for attr in attributes: if attr["@name"] == key: attr.update(item) break else: attributes.append(item) self.ncroot["netcdf"]["attribute"] = attributes
[docs] def remove_dataset_attribute(self, key): """ Remove dataset attribute. Parameters ---------- key : str Name of the attribute to remove. """ removals = self.ncroot["netcdf"].get("remove", []) item = OrderedDict({"@name": key, "@type": "attribute"}) if removals: removals_keys = [removal["@name"] for removal in removals if removal["@type"] == "attribute"] if key not in removals_keys: removals.append(item) else: self.ncroot["netcdf"]["remove"] = [item]
[docs] def rename_dataset_attribute(self, old_name, new_name): """ Rename dataset attribute. Parameters ---------- old_name: str Original attribute name. new_name: str New attribute name. """ attributes = self.ncroot["netcdf"].get("attribute", None) item = OrderedDict({"@name": new_name, "orgName": old_name}) if attributes: if isinstance(attributes, dict | OrderedDict): attributes = [attributes] for attr in attributes: if attr["@name"] == old_name: attr["@name"] = new_name attr["@orgName"] = old_name break else: self.ncroot["netcdf"]["attribute"] = [*attributes, item] else: self.ncroot["netcdf"]["attribute"] = item
[docs] def to_ncml(self, path=None): """ Write NcML file to disk. Parameters ---------- path: str Path to write NcML document. """ if not path: if self.filepath.exists(): path = f"{str(self.filepath).strip('.ncml')}_modified.ncml" else: path = str(self.filepath) xml_output = xmltodict.unparse(self.ncroot, pretty=True) with open(path, "w") as fd: fd.write(xml_output)
[docs] def to_cf_dict(self): """ Convert internal representation to a CF-JSON dictionary. The CF-JSON specification includes `data` for variables, but if the data is not within the NcML, it cannot be included in the JSON representation. Returns ------- Dictionary with `dimensions` and `variables` keys. May also optionally include an `attributes` key and a `groups` key. Additional keys prefixed with `@` may be included for <netcdf> tag attributes, for example `@location`. References ---------- http://cf-json.org/specification """ res = OrderedDict() nc = self.ncroot["netcdf"] for key, val in nc.items(): if key[0] == "@": res[key] = val if key == "dimension": res.update(_dims_to_json(val)) if key == "group": res.update(_groups_to_json(val)) if key == "attribute": res.update(_attributes_to_json(val)) if key == "variable": res.update(_variables_to_json(val)) return res
def _dims_to_json(dims: list) -> dict: """The dimensions object has dimension id:size as its key:value members.""" out = OrderedDict() for dim in dims: if int(dim["@length"]) > 1: out[dim["@name"]] = int(dim["@length"]) return {"dimensions": out} def _groups_to_json(groups: list) -> dict: out = OrderedDict() for group in groups: name = group["@name"] out[name] = OrderedDict() if "attribute" in group: out[name].update(_attributes_to_json(group["attribute"])) if "group" in group: out[name].update(_groups_to_json(group["group"])) return {"groups": out} def _attributes_to_json(attrs: list) -> dict: """The attributes object contains arbitrary attributes as its key:value members.""" out = OrderedDict() for attr in attrs: try: out[attr["@name"]] = _cast(attr) except ValueError as exc: warn(f"Could not cast {attr['@name']}:\n{exc}", stacklevel=2) return {"attributes": out} def _variables_to_json(variables: list) -> dict: """ The variables definition object has variable id:object as its key:value members. Each variable object MUST include shape, attributes and data objects. The shape field is an array of dimension IDs which correspond to the array ordering of the variable data. """ out = OrderedDict() # Put coordinate variables first for var in variables: if _is_coordinate(var): out[var["@name"]] = None for var in variables: name = var["@name"] out[name] = OrderedDict() if "@shape" in var: out[name]["shape"] = var["@shape"].split(" ") if "@type" in var: out[name]["type"] = var["@type"] if "attribute" in var: out[name].update(_attributes_to_json(var["attribute"])) if "values" in var: out[name]["data"] = _cast(var) return {"variables": out} def _cast(obj: dict) -> Any: """Cast attribute value to the appropriate type.""" from xncml.parser import DataType, nctype value = obj.get("@value") or obj.get("values") typ = DataType(obj.get("@type", "String")) if value is not None: if isinstance(value, str): if typ in [DataType.STRING, DataType.STRING_1]: return value sep = " " values = value.split(sep) return list(map(nctype(typ), values)) elif isinstance(value, dict): raise NotImplementedError(obj) else: return value def _is_coordinate(var): """Return True is variable is a coordinate.""" # Variable is 1D and has same name as dimension if var.get("@shape", "").split(" ") == [var["@name"]]: return True lat_units = ["degrees_north", "degreeN", "degree_N", "degree_north", "degreesN", "degrees_N"] lon_units = ["degrees_east", "degreeE", "degree_E", "degree_east", "degreesE", "degrees_E"] names = [ "latitude", "longitude", "time", "air_pressure", "altitude", "depth", "geopotential_height", "height", "height_above_geopotential_datum", "height_above_mean_sea_level", "height_above_reference_ellipsoid", ] if "attribute" in var: attrs = _attributes_to_json(var["attribute"]) # Check units if attrs.get("units", "") in lon_units + lat_units: return True # Check long_name and standard_name if attrs.get("long_name", attrs.get("standard_name", "")) in names: return True return False
[docs] def preparse(obj: dict) -> dict: """ - Remove None values from dictionary. - Convert booleans to strings. """ for k, v in obj.items(): if isinstance(v, bool): obj[k] = str(v).lower() return {k: v for k, v in obj.items() if v is not None}
[docs] class AggregationType(Enum): """Type of aggregation.""" FORECAST_MODEL_RUN_COLLECTION = "forecastModelRunCollection" FORECAST_MODEL_RUN_SINGLE_COLLECTION = "forecastModelRunSingleCollection" JOIN_EXISTING = "joinExisting" JOIN_NEW = "joinNew" TILED = "tiled" UNION = "union"