diff --git a/.gitignore b/.gitignore index 70a1387..b382093 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ __pycache__/ *.egg-info/ .eggs/ /doc/_generated +/doc/_static/example_nxxas_data.h5 diff --git a/README.md b/README.md index e84a352..3a0438f 100644 --- a/README.md +++ b/README.md @@ -10,4 +10,6 @@ Library for reading and writing XAS data in NeXus format. + +

diff --git a/doc/_ext/myhdf5_inline_role.py b/doc/_ext/myhdf5_inline_role.py new file mode 100644 index 0000000..09a67a0 --- /dev/null +++ b/doc/_ext/myhdf5_inline_role.py @@ -0,0 +1,50 @@ +import re +import os +from docutils import nodes +from pynxxas.io.convert import convert_files + + +def setup(app): + app.add_role("myhdf5", myhdf5_role) + app.connect("html-page-context", inject_dynamic_url_js) + app.connect("builder-inited", generate_example_nxxas_data) + + +def myhdf5_role(name, rawtext, text, lineno, inliner, options={}, content=[]): + matches = re.match(r"(\S+)\s*<([^<>]+)>", text) + display_text = matches.group(1) + filename = matches.group(2) + + url_template = f"https://myhdf5.hdfgroup.org/view?url=placeholder{filename}" + + link = f'{display_text}' + + node = nodes.raw("", link, format="html") + return [node], [] + + +def inject_dynamic_url_js(app, pagename, templatename, context, doctree): + if app.builder.name != "html" or doctree is None: + return + + script = """ + + """ + + context["body"] += script + + +def generate_example_nxxas_data(app): + output_filename = os.path.join(app.srcdir, "_static", "example_nxxas_data.h5") + file_pattern1 = os.path.join(app.srcdir, "..", "xdi_files", "*") + file_pattern2 = os.path.join(app.srcdir, "..", "xas_beamline_data", "*") + convert_files([file_pattern1, file_pattern2], output_filename, "nexus") diff --git a/doc/conf.py b/doc/conf.py index 2519940..33d8f4d 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -4,8 +4,12 @@ # -- Project information ----------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information +import os +import sys from pynxxas import __version__ as release +sys.path.append(os.path.abspath("./_ext")) + project = "pynxxas" version = ".".join(release.split(".")[:2]) copyright = "2024-present, ESRF" @@ -20,6 +24,7 @@ "sphinx.ext.autosummary", "sphinx.ext.viewcode", "sphinx_autodoc_typehints", + "myhdf5_inline_role", ] templates_path = ["_templates"] exclude_patterns = ["build"] @@ -39,7 +44,8 @@ # https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output html_theme = "pydata_sphinx_theme" -html_static_path = [] +html_static_path = ["_static"] +html_extra_path = ["_static"] html_theme_options = { "icon_links": [ { diff --git a/doc/howtoguides.rst b/doc/howtoguides.rst new file mode 100644 index 0000000..b9d659f --- /dev/null +++ b/doc/howtoguides.rst @@ -0,0 +1,7 @@ +How-to Guides +============= + +.. toctree:: + + howtoguides/install + howtoguides/convert_files diff --git a/doc/howtoguides/convert_files.rst b/doc/howtoguides/convert_files.rst new file mode 100644 index 0000000..1e6d2b7 --- /dev/null +++ b/doc/howtoguides/convert_files.rst @@ -0,0 +1,8 @@ +Convert file formats +==================== + +Convert all files in the *xdi_files* and *xas_beamline_data* to *HDF5/NeXus* format + +.. code-block:: bash + + nxxas-convert xdi_files/*.* xas_beamline_data/*.* ./converted/data.h5 diff --git a/doc/howtoguides/install.rst b/doc/howtoguides/install.rst new file mode 100644 index 0000000..69956a9 --- /dev/null +++ b/doc/howtoguides/install.rst @@ -0,0 +1,6 @@ +Install +======= + +.. code-block:: bash + + pip install pynxxas diff --git a/doc/index.rst b/doc/index.rst index 8ccdcb8..7c9e2da 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -3,7 +3,11 @@ pynxxas |version| Library for reading and writing XAS data in `NeXus format `_. +An example HDF5 file can be found :myhdf5:`here `. + .. toctree:: :hidden: + howtoguides + tutorials api diff --git a/doc/tutorials.rst b/doc/tutorials.rst new file mode 100644 index 0000000..68f6f41 --- /dev/null +++ b/doc/tutorials.rst @@ -0,0 +1,6 @@ +Tutorials +========= + +.. toctree:: + + tutorials/models diff --git a/doc/tutorials/models.rst b/doc/tutorials/models.rst new file mode 100644 index 0000000..8b22179 --- /dev/null +++ b/doc/tutorials/models.rst @@ -0,0 +1,34 @@ +Data models +=========== + +Data from different data formats are represented in memory as a *pydantic* models. +You can convert between different models and save/load models from file. + +NeXus models +------------ + +Build an *NXxas* model instance in steps + +.. code-block:: python + + from pynxxas.models import NxXasModel + + nxxas_model = NxXasModel(element="Fe", absorption_edge="K", mode="transmission") + nxxas_model.energy = [7, 7.1], "keV" + nxxas_model.intensity = [10, 20] + +Create an *NXxas* model instance from a dictionary and convert back to a dictionary + +.. code-block:: python + + data_in = { + "NX_class": "NXsubentry", + "mode": "transmission", + "element": "Fe", + "absorption_edge": "K", + "energy": [[7, 7.1], "keV"], + "intensity": [10, 20], + } + + nxxas_model = NxXasModel(**data_in) + data_out = nxxas_model.model_dump() diff --git a/setup.cfg b/setup.cfg index 872fee0..3016783 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,11 +23,13 @@ package_dir= packages=find: python_requires = >=3.8 install_requires = + typing_extensions; python_version < "3.9" + strenum; python_version < "3.11" numpy h5py pydantic >=2.6 pint - typing_extensions; python_version < "3.9" + periodictable [options.packages.find] where=src diff --git a/src/pynxxas/apps/__init__.py b/src/pynxxas/apps/__init__.py index e69de29..2c9e51d 100644 --- a/src/pynxxas/apps/__init__.py +++ b/src/pynxxas/apps/__init__.py @@ -0,0 +1,2 @@ +"""Command-Line Interface (CLI) +""" diff --git a/src/pynxxas/apps/nxxas_convert.py b/src/pynxxas/apps/nxxas_convert.py index 751be61..c31a093 100644 --- a/src/pynxxas/apps/nxxas_convert.py +++ b/src/pynxxas/apps/nxxas_convert.py @@ -1,12 +1,14 @@ import sys import logging import argparse -from glob import glob -from ..io.xdi import read_xdi +from .. import models +from ..io.convert import convert_files +logger = logging.getLogger(__name__) -def main(argv=None): + +def main(argv=None) -> int: if argv is None: argv = sys.argv @@ -14,20 +16,31 @@ def main(argv=None): prog="nxxas_convert", description="Convert data to NXxas format" ) - parser.add_argument("--output", type=str, default=None, help="Path to HDF5 file") parser.add_argument( - "patterns", + "--output-format", + type=str, + default="nexus", + choices=list(models.MODELS), + help="Output format", + ) + + parser.add_argument( + "file_patterns", type=str, - nargs="+", - help="Glob file name patterns", + nargs="*", + help="Files to convert", + ) + + parser.add_argument( + "output_filename", type=str, help="Convert destination filename" ) args = parser.parse_args(argv[1:]) logging.basicConfig() - for pattern in args.patterns: - for filename in glob(pattern): - read_xdi(filename) + convert_files( + args.file_patterns, args.output_filename, args.output_format, interactive=True + ) if __name__ == "__main__": diff --git a/src/pynxxas/io/__init__.py b/src/pynxxas/io/__init__.py index e69de29..0a4300e 100644 --- a/src/pynxxas/io/__init__.py +++ b/src/pynxxas/io/__init__.py @@ -0,0 +1,31 @@ +"""File formats +""" + +from typing import Generator + +import pydantic + +from .url_utils import UrlType +from . import xdi +from . import nexus +from .. import models + + +def load_models(url: UrlType) -> Generator[pydantic.BaseModel, None, None]: + if xdi.is_xdi_file(url): + yield from xdi.load_xdi_file(url) + elif nexus.is_nexus_file(url): + yield from nexus.load_nexus_file(url) + else: + raise NotImplementedError(f"File format not supported: {url}") + + +def save_model(model_instance: pydantic.BaseModel, url: UrlType) -> None: + if isinstance(model_instance, models.NxXasModel): + nexus.save_nexus_file(model_instance, url) + elif isinstance(model_instance, models.XdiModel): + xdi.save_xdi_file(model_instance, url) + else: + raise NotImplementedError( + f"Saving of {type(model_instance).__name__} not implemented" + ) diff --git a/src/pynxxas/io/convert.py b/src/pynxxas/io/convert.py new file mode 100644 index 0000000..3e76ab1 --- /dev/null +++ b/src/pynxxas/io/convert.py @@ -0,0 +1,92 @@ +import logging +import pathlib +from glob import glob +from contextlib import contextmanager +from typing import Iterator, Generator + +import pydantic + +from .. import io +from .. import models +from ..models import convert + +logger = logging.getLogger(__name__) + + +def convert_files( + file_patterns: Iterator[str], + output_filename: str, + output_format: str, + interactive: bool = False, +) -> int: + model_type = models.MODELS[output_format] + + output_filename = pathlib.Path(output_filename) + if output_filename.exists(): + if interactive: + result = input(f"Overwrite {output_filename}? (y/[n])") + if not result.lower() in ("y", "yes"): + return 1 + output_filename.unlink() + output_filename.parent.mkdir(parents=True, exist_ok=True) + + state = {"return_code": 0, "scan_number": 0, "filename": None} + scan_number = 0 + for model_in in _iter_load_models(file_patterns, state): + scan_number += 1 + for model_out in _iter_convert_model(model_in, model_type, state): + if output_format == "nexus": + output_url = f"{output_filename}?path=/dataset{scan_number:02}" + if model_out.NX_class == "NXsubentry": + breakpoint() + output_url = f"{output_url}/{model_out.mode.replace(' ', '_')}" + else: + basename = f"{output_filename.stem}_{scan_number:02}" + if model_out.NX_class == "NXsubentry": + basename = f"{basename}_{model_out.mode.replace(' ', '_')}" + output_url = output_filename.parent / basename + output_filename.suffix + + with _handle_error("saving", state): + io.save_model(model_out, output_url) + + return state["return_code"] + + +def _iter_load_models( + file_patterns: Iterator[str], state: dict +) -> Generator[pydantic.BaseModel, None, None]: + for file_pattern in file_patterns: + for filename in glob(file_pattern): + filename = pathlib.Path(filename).absolute() + state["filename"] = filename + it_model_in = io.load_models(filename) + while True: + with _handle_error("loading", state): + try: + yield next(it_model_in) + except StopIteration: + break + + +def _iter_convert_model( + model_in: Iterator[pydantic.BaseModel], model_type: str, state: dict +) -> Generator[pydantic.BaseModel, None, None]: + it_model_out = convert.convert_model(model_in, model_type) + while True: + with _handle_error("converting", state): + try: + yield next(it_model_out) + except StopIteration: + break + + +@contextmanager +def _handle_error(action: str, state: dict) -> Generator[None, None, None]: + try: + yield + except NotImplementedError as e: + state["return_code"] = 1 + logger.warning("Error when %s '%s': %s", action, state["filename"], e) + except Exception: + state["return_code"] = 1 + logger.error("Error when %s '%s'", action, state["filename"], exc_info=True) diff --git a/src/pynxxas/io/hdf5_utils.py b/src/pynxxas/io/hdf5_utils.py new file mode 100644 index 0000000..80fda85 --- /dev/null +++ b/src/pynxxas/io/hdf5_utils.py @@ -0,0 +1,41 @@ +import os +from typing import Optional, Union + +import h5py + + +def create_hdf5_link( + h5group: h5py.Group, + target_name: str, + target_filename: Optional[str], + absolute: bool = False, +) -> Union[h5py.SoftLink, h5py.ExternalLink]: + """Create HDF5 soft link (supports relative down paths) or external link (supports relative paths).""" + this_name = h5group.name + this_filename = h5group.file.filename + + target_filename = target_filename or this_filename + + if os.path.isabs(target_filename): + rel_target_filename = os.path.relpath(target_filename, this_filename) + else: + rel_target_filename = target_filename + target_filename = os.path.abs(os.path.join(this_filename, target_filename)) + + if "." not in target_name: + rel_target_name = os.path.relpath(target_name, this_name) + else: + rel_target_name = target_name + target_name = os.path.abspath(os.path.join(this_name, target_name)) + + # Internal link + if rel_target_filename == ".": + if absolute or ".." in rel_target_name: + # h5py.SoftLink does not support relative links upwards + return h5py.SoftLink(target_name) + return h5py.SoftLink(rel_target_name) + + # External link + if absolute: + return h5py.ExternalLink(target_filename, target_name) + return h5py.ExternalLink(rel_target_filename, target_name) diff --git a/src/pynxxas/io/nexus.py b/src/pynxxas/io/nexus.py new file mode 100644 index 0000000..5bf4cff --- /dev/null +++ b/src/pynxxas/io/nexus.py @@ -0,0 +1,157 @@ +"""NeXus/HDF5 file format +""" + +from typing import Generator, Any, Tuple + +try: + from enum import StrEnum +except ImportError: + from strenum import StrEnum + + +import h5py +import pint +import pydantic + +from . import url_utils +from . import hdf5_utils +from ..models import nexus + + +def is_nexus_file(url: url_utils.UrlType) -> bool: + filename = url_utils.as_url(url).path + with open(filename, "rb") as file: + try: + with h5py.File(file, mode="r"): + return True + except Exception: + return False + + +def load_nexus_file(url: url_utils.UrlType) -> Generator[nexus.NxGroup, None, None]: + raise NotImplementedError(f"File format not supported: {url}") + + +def save_nexus_file(nxgroup: nexus.NxXasModel, url: url_utils.UrlType) -> None: + if not isinstance(nxgroup, nexus.NxXasModel): + raise TypeError(f"nxgroup is not of type NxXasModel ({type(nxgroup)})") + if not nxgroup.has_data(): + return + filename = url_utils.as_url(url).path + url = url_utils.as_url(url) + + with h5py.File(filename, mode="a", track_order=True) as nxroot: + nxparent = _prepare_nxparent(nxgroup, url, nxroot) + _save_nxgroup(nxgroup, nxparent) + + +def _save_nxgroup(nxgroup: nexus.NxGroup, nxparent: h5py.Group) -> None: + if not isinstance(nxgroup, nexus.NxGroup): + raise TypeError(f"nxgroup is not of type NxGroup ({type(nxgroup)})") + for field_name, field, field_value in _iter_model_fields(nxgroup): + if field_value is None: + continue + elif isinstance(field_value, nexus.NxGroup): + nxchild = nxparent.require_group(field_name) + _save_nxgroup(field_value, nxchild) + if isinstance(field_value, nexus.NxDataModel): + _set_default(nxchild) + elif field.alias and field.alias.startswith("@"): + try: + _save_attribute(nxparent, field_name, field_value) + except Exception as e: + raise ValueError( + f"{field_name} = {field_value} ({type(field_value)}) cannot be saved as an HDF5 attribute" + ) from e + else: + try: + _save_dataset(nxparent, field_name, field_value) + except Exception as e: + raise ValueError( + f"{field_name} = {field_value} ({type(field_value)}) cannot be saved as an HDF5 dataset" + ) from e + + +def _iter_model_fields( + model: pydantic.BaseModel, +) -> Generator[Tuple[str, pydantic.Field, Any], None, None]: + for field_name, field in model.__fields__.items(): + field_value = getattr(model, field_name) + yield field_name, field, field_value + + +def _save_dataset(nxparent: h5py.Group, field_name: str, field_value: Any) -> None: + if isinstance(field_value, nexus.NxField): + nxparent[field_name] = field_value.value + for attr_name, attr, attr_value in _iter_model_fields(field_value): + if attr.alias and attr.alias.startswith("@"): + nxparent[field_name].attrs[attr_name] = attr_value + elif isinstance(field_value, StrEnum): + nxparent[field_name] = str(field_value) + elif isinstance(field_value, pint.Quantity): + if field_value.size: + nxparent[field_name] = field_value.magnitude + units = str(field_value.units) + if units: + nxparent[field_name].attrs["units"] = units + elif isinstance(field_value, nexus.NxLinkModel): + link = hdf5_utils.create_hdf5_link( + nxparent, field_value.target_name, field_value.target_filename + ) + nxparent[field_name] = link + else: + nxparent[field_name] = field_value + + +def _save_attribute(nxparent: h5py.Group, field_name: str, field_value: Any) -> None: + if isinstance(field_value, StrEnum): + nxparent.attrs[field_name] = str(field_value) + else: + nxparent.attrs[field_name] = field_value + + +def _set_default(h5group: h5py.Group) -> None: + while h5group.name != "/": + h5group.parent.attrs["default"] = h5group.name.split("/")[-1] + h5group = h5group.parent + + +def _prepare_nxparent( + nxgroup: nexus.NxGroup, + url: url_utils.ParsedUrlType, + nxroot: h5py.File, +) -> h5py.Group: + """Creates and returns the parent group of `nxgroup`""" + internal_path = url_utils.as_url(url).internal_path + parts = [s for s in internal_path.split("/") if s] + nparts = len(parts) + + if nxgroup.NX_class == "NXroot": + if nparts != 0: + raise ValueError( + f"NXroot URL cannot have an internal path ({internal_path})" + ) + nxclasses = [] + elif nxgroup.NX_class == "NXentry": + if nparts != 1: + raise ValueError( + f"NXentry URL must have an internal path of 1 level deep ({internal_path})" + ) + nxclasses = ["NXentry"] + elif nxgroup.NX_class == "NXsubentry": + if nparts != 2: + raise ValueError( + f"NXsubentry URL must have an internal path of 2 levels deep ({internal_path})" + ) + nxclasses = ["NXentry", "NXsubentry"] + else: + nxclasses = ["NXentry"] + ["NXsubentry"] * (len(parts) - 1) + + nxroot.attrs.setdefault("NX_class", "NXroot") + + nxparent = nxroot + for part, nxclass in zip(parts, nxclasses): + nxparent = nxparent.require_group(part) + nxparent.attrs.setdefault("NX_class", nxclass) + + return nxparent diff --git a/src/pynxxas/io/url_utils.py b/src/pynxxas/io/url_utils.py new file mode 100644 index 0000000..82a414a --- /dev/null +++ b/src/pynxxas/io/url_utils.py @@ -0,0 +1,44 @@ +import os +import sys +import pathlib +import urllib.parse +import urllib.request +from typing import Union, NamedTuple + + +class ParsedUrlType(NamedTuple): + path: str + internal_path: str + + +UrlType = Union[str, pathlib.Path, urllib.parse.ParseResult, ParsedUrlType] + + +_WIN32 = sys.platform == "win32" + + +def as_url(url: UrlType) -> ParsedUrlType: + if isinstance(url, ParsedUrlType): + return url + + if isinstance(url, urllib.parse.ParseResult): + parsed = url + else: + url_str = str(url) + parsed = urllib.parse.urlparse(url_str) + if not parsed.scheme or (_WIN32 and len(parsed.scheme) == 1): + url_str = "file://" + os.path.abspath(url_str).replace("\\", "/") + parsed = urllib.parse.urlparse(url_str) + + if parsed.scheme != "file": + raise ValueError("URL is not a file") + + if parsed.netloc: + path = f"{parsed.netloc}{parsed.path}" + else: + path = parsed.path + + query = urllib.parse.parse_qs(parsed.query) + internal_path = query.get("path", [""])[0] + + return ParsedUrlType(path=path, internal_path=internal_path) diff --git a/src/pynxxas/io/xdi.py b/src/pynxxas/io/xdi.py index d172d9c..2836dae 100644 --- a/src/pynxxas/io/xdi.py +++ b/src/pynxxas/io/xdi.py @@ -1,29 +1,37 @@ +"""XAS Data Interchange (XDI) file format +""" + import re -import pathlib import datetime -from typing import Union, Tuple, Optional +from typing import Union, Tuple, Optional, Generator import pint import numpy +from . import url_utils from ..models import units from ..models.xdi import XdiModel -def is_xdi_file(filename: Union[str, pathlib.Path]) -> bool: +def is_xdi_file(url: url_utils.UrlType) -> bool: + filename = url_utils.as_url(url).path with open(filename, "r") as file: - for line in file: - line = line.strip() - if not line: - continue - return line.startswith("# XDI") + try: + for line in file: + line = line.strip() + if not line: + continue + return line.startswith("# XDI") + except Exception: + return False -def read_xdi(filename: Union[str, pathlib.Path]) -> XdiModel: +def load_xdi_file(url: url_utils.UrlType) -> Generator[XdiModel, None, None]: """Specs described in https://github.com/XraySpectroscopy/XAS-Data-Interchange/blob/master/specification/spec.md """ + filename = url_utils.as_url(url).path content = {"comments": [], "column": dict(), "data": dict()} with open(filename, "r") as file: @@ -78,10 +86,8 @@ def read_xdi(filename: Union[str, pathlib.Path]) -> XdiModel: key = _parse_xdi_value(key) content[key] = value - # Data - table = numpy.loadtxt(file, dtype=float) - - # Parse data in dictionary of pint Quantity objects + # Data + table = numpy.loadtxt(filename, dtype=float) columns = [ name for _, name in sorted(content.pop("column").items(), key=lambda tpl: tpl[0]) @@ -90,7 +96,13 @@ def read_xdi(filename: Union[str, pathlib.Path]) -> XdiModel: name, quant = _parse_xdi_column_name(name) content["data"][name] = array, quant - return XdiModel(**content) + yield XdiModel(**content) + + +def save_xdi_file(model_instance: XdiModel, url: url_utils.UrlType) -> None: + raise NotImplementedError( + f"Saving of {type(model_instance).__name__} not implemented" + ) _XDI_FIELD_REGEX = re.compile(r"#\s*([\w.]+):\s*(.*)") @@ -134,10 +146,13 @@ def _parse_xdi_value( def _parse_xdi_column_name( name: str, -) -> Union[Tuple[str, Optional[pint.Unit]]]: +) -> Union[Tuple[str, Optional[str]]]: parts = _SPACES_REGEX.split(name) if len(parts) == 1: return name, None - if len(parts) == 2: - return tuple(parts) - raise ValueError(f"XDI column name '{name}' is not valid") + try: + units.as_units(parts[-1]) + except pint.UndefinedUnitError: + return name, None + name = " ".join(parts[:-1]) + return name, parts[-1] diff --git a/src/pynxxas/models/__init__.py b/src/pynxxas/models/__init__.py index e69de29..9ef6a64 100644 --- a/src/pynxxas/models/__init__.py +++ b/src/pynxxas/models/__init__.py @@ -0,0 +1,7 @@ +"""Data models +""" + +from .xdi import XdiModel +from .nexus import NxXasModel + +MODELS = {"xdi": XdiModel, "nexus": NxXasModel} diff --git a/src/pynxxas/models/convert/__init__.py b/src/pynxxas/models/convert/__init__.py new file mode 100644 index 0000000..b060393 --- /dev/null +++ b/src/pynxxas/models/convert/__init__.py @@ -0,0 +1,27 @@ +from typing import Type, Generator +import pydantic + +from . import xdi +from . import nexus +from .. import XdiModel +from .. import NxXasModel + + +def convert_model( + instance: pydantic.BaseModel, model_type: Type[pydantic.BaseModel] +) -> Generator[pydantic.BaseModel, None, None]: + if isinstance(instance, model_type): + yield instance + + mod_to = _CONVERT_MODULE.get(type(instance)) + mod_from = _CONVERT_MODULE.get(model_type) + if mod_to is None or mod_from is None: + raise NotImplementedError( + f"Conversion from {type(instance).__name__} to {model_type.__name__} is not implemented" + ) + + for nxxas_model in mod_to.to_nxxas(instance): + yield from mod_from.from_nxxas(nxxas_model) + + +_CONVERT_MODULE = {XdiModel: xdi, NxXasModel: nexus} diff --git a/src/pynxxas/models/convert/nexus.py b/src/pynxxas/models/convert/nexus.py new file mode 100644 index 0000000..9a59b77 --- /dev/null +++ b/src/pynxxas/models/convert/nexus.py @@ -0,0 +1,10 @@ +from typing import Generator +from .. import NxXasModel + + +def to_nxxas(nxxas_model: NxXasModel) -> Generator[NxXasModel, None, None]: + yield nxxas_model + + +def from_nxxas(nxxas_model: NxXasModel) -> Generator[NxXasModel, None, None]: + yield nxxas_model diff --git a/src/pynxxas/models/convert/xdi.py b/src/pynxxas/models/convert/xdi.py new file mode 100644 index 0000000..7fef8c4 --- /dev/null +++ b/src/pynxxas/models/convert/xdi.py @@ -0,0 +1,63 @@ +from typing import Generator + +from .. import XdiModel +from .. import NxXasModel + + +def to_nxxas(xdi_model: XdiModel) -> Generator[NxXasModel, None, None]: + has_mu = xdi_model.data.mutrans is not None or xdi_model.data.normtrans is not None + has_fluo = ( + xdi_model.data.mufluor is not None or xdi_model.data.normfluor is not None + ) + if not has_mu and not has_fluo: + return + + data = { + "element": xdi_model.element.symbol, + "absorption_edge": xdi_model.element.edge, + } + + if has_mu and has_fluo: + data["NX_class"] = "NXsubentry" + else: + data["NX_class"] = "NXentry" + + if xdi_model.facility and xdi_model.facility.name: + if xdi_model.beamline and xdi_model.beamline.name: + name = { + "value": f"{xdi_model.facility.name}-{xdi_model.beamline.name}", + "@short_name": xdi_model.beamline.name, + } + else: + name = {"value": xdi_model.facility.name} + data["instrument"] = {"name": name} + + if has_mu: + nxxas_model = NxXasModel(mode="transmission", **data) + nxxas_model.energy = xdi_model.data.energy + if xdi_model.data.mutrans is not None: + nxxas_model.intensity = xdi_model.data.mutrans + else: + nxxas_model.intensity = xdi_model.data.normtrans + yield nxxas_model + + if has_fluo: + nxxas_model = NxXasModel(mode="fluorescence yield", **data) + nxxas_model.energy = xdi_model.data.energy + if xdi_model.data.mufluor is not None: + nxxas_model.intensity = xdi_model.data.mufluor + else: + nxxas_model.intensity = xdi_model.data.normfluor + yield nxxas_model + + +def from_nxxas(nxxas_model: NxXasModel) -> Generator[XdiModel, None, None]: + xdi_model = XdiModel() + xdi_model.element.symbol = nxxas_model.element + xdi_model.element.edge = nxxas_model.absorption_edge + xdi_model.data.energy = nxxas_model.energy + if nxxas_model.mode == "transmission": + xdi_model.data.mutrans = nxxas_model.intensity + elif nxxas_model.mode == "fluorescence yield": + xdi_model.data.mufluor = nxxas_model.intensity + yield xdi_model diff --git a/src/pynxxas/models/nexus.py b/src/pynxxas/models/nexus.py new file mode 100644 index 0000000..40b698f --- /dev/null +++ b/src/pynxxas/models/nexus.py @@ -0,0 +1,103 @@ +"""NeXus data model_instance +""" + +from typing import Dict, Literal, List, Optional, Any + +try: + from enum import StrEnum +except ImportError: + from strenum import StrEnum + +import pydantic +import periodictable + +from . import units + + +class NxGroup(pydantic.BaseModel, extra="allow"): + pass + + +class NxField(pydantic.BaseModel): + value: Any + + +class NxClass: + _NXCLASSES: Dict[str, "NxClass"] = dict() + + def __init_subclass__(cls, nx_class: str, **kwargs): + super().__init_subclass__(**kwargs) + NxClass._NXCLASSES[nx_class] = cls + + +class NxLinkModel(pydantic.BaseModel): + target_name: str + target_filename: Optional[str] = None + + +class NxDataModel(NxClass, NxGroup, nx_class="NxData"): + NX_class: Literal["NXdata"] = pydantic.Field(default="NXdata", alias="@NX_class") + signal: Literal["intensity"] = pydantic.Field(default="intensity", alias="@signal") + axes: List[str] = pydantic.Field(default=["energy"], alias="@axes") + energy: NxLinkModel + intensity: NxLinkModel + + +class NxInstrumentName(NxField): + value: Optional[str] + short_name: Optional[str] = pydantic.Field(alias="@short_name") + + +class NxInstrument(NxClass, NxGroup, nx_class="NxInstrument"): + NX_class: Literal["NxInstrument"] = pydantic.Field( + default="NxInstrument", alias="@NX_class" + ) + name: Optional[NxInstrumentName] = None + + +class NxEntryClass(StrEnum): + NXentry = "NXentry" + NXsubentry = "NXsubentry" + + +class NxXasMode(StrEnum): + transmission = "transmission" + fluorescence_yield = "fluorescence yield" + + +ChemicalElement = StrEnum( + "ChemicalElement", {el.symbol: el.symbol for el in periodictable.elements} +) + +XRayCoreExcitationState = StrEnum( + "XRayCoreExcitationState", {s: s for s in ("K", "L1", "L2", "L3")} +) + + +class NxXasModel(NxClass, NxGroup, nx_class="NXxas"): + NX_class: NxEntryClass = pydantic.Field(alias="@NX_class", default="NXentry") + definition: Literal["NXxas"] = "NXxas" + mode: NxXasMode + element: ChemicalElement + absorption_edge: XRayCoreExcitationState + energy: units.PydanticQuantity = units.as_quantity([]) + intensity: units.PydanticQuantity = units.as_quantity([]) + title: Optional[str] = None + plot: Optional[NxDataModel] = None + instrument: Optional[NxInstrument] = None + + @pydantic.model_validator(mode="after") + def set_title(self) -> "NxXasModel": + if self.element is not None and self.absorption_edge is not None: + title = f"{self.element} {self.absorption_edge}" + if self.instrument is not None and self.instrument.name is not None: + title = f"{self.instrument.name.value}: {title}" + self.title = f"{title} ({self.mode})" + if self.plot is None: + energy = NxLinkModel(target_name="../energy") + intensity = NxLinkModel(target_name="../intensity") + self.plot = NxDataModel(energy=energy, intensity=intensity) + return self + + def has_data(self) -> bool: + return bool(self.energy.size and self.intensity.size) diff --git a/src/pynxxas/models/units.py b/src/pynxxas/models/units.py index 9c13f48..82cecb3 100644 --- a/src/pynxxas/models/units.py +++ b/src/pynxxas/models/units.py @@ -43,8 +43,9 @@ def __get_pydantic_core_schema__( _source_type: Any, _handler: pydantic.GetCoreSchemaHandler, ) -> core_schema.CoreSchema: - def serialize(value: pint.Quantity) -> List: - return list(value.to_tuple()) + def serialize(value: Any) -> List: + value = as_quantity(value) + return [value.magnitude.tolist(), str(value.units)] json_schema = core_schema.chain_schema( [ diff --git a/src/pynxxas/models/xdi.py b/src/pynxxas/models/xdi.py index cc63d79..7353d07 100644 --- a/src/pynxxas/models/xdi.py +++ b/src/pynxxas/models/xdi.py @@ -1,3 +1,6 @@ +"""XAS Data Interchange (XDI) data model_instance +""" + import datetime from typing import Optional, List, Any, Mapping @@ -110,12 +113,12 @@ class XdiData(XdiBaseModel): class XdiModel(XdiBaseModel): - element: Optional[XdiElementNamespace] = None - scan: Optional[XdiScanNamespace] = None - mono: Optional[XdiMonoNamespace] = None - beamline: Optional[XdiBeamlineNamespace] = None - facility: Optional[XdiFacilityNamespace] = None - detector: Optional[XdiDetectorNamespace] = None - sample: Optional[XdiSampleNamespace] = None - comments: Optional[List[str]] = None - data: Optional[XdiData] = None + element: XdiElementNamespace = XdiElementNamespace() + scan: XdiScanNamespace = XdiScanNamespace() + mono: XdiMonoNamespace = XdiMonoNamespace() + beamline: XdiBeamlineNamespace = XdiBeamlineNamespace() + facility: XdiFacilityNamespace = XdiFacilityNamespace() + detector: XdiDetectorNamespace = XdiDetectorNamespace() + sample: XdiSampleNamespace = XdiSampleNamespace() + comments: List[str] = list() + data: XdiData = XdiData() diff --git a/src/pynxxas/tests/conftest.py b/src/pynxxas/tests/conftest.py index a555a75..142e104 100644 --- a/src/pynxxas/tests/conftest.py +++ b/src/pynxxas/tests/conftest.py @@ -1,4 +1,6 @@ import pytest +from ..models import NxXasModel +from ..io.xdi import load_xdi_file @pytest.fixture() @@ -9,6 +11,24 @@ def xdi_file(tmp_path): return filename +@pytest.fixture() +def xdi_model(xdi_file): + return next(load_xdi_file(xdi_file)) + + +@pytest.fixture() +def nxxas_model(): + return NxXasModel(**_NXXAS_CONTENT) + + +_NXXAS_CONTENT = { + "element": "Co", + "absorption_edge": "K", + "mode": "transmission", + "energy": [[7509, 7519], "eV"], + "intensity": [[-0.51329170, -0.78493490], ""], +} + _XDI_CONTENT = """ # XDI/1.0 GSE/1.0 # Column.1: energy eV diff --git a/src/pynxxas/tests/test_convert.py b/src/pynxxas/tests/test_convert.py new file mode 100644 index 0000000..29303e0 --- /dev/null +++ b/src/pynxxas/tests/test_convert.py @@ -0,0 +1,54 @@ +from .. import models +from ..models import convert + + +def test_xdi_to_xdi(xdi_model): + xdi_model = next(convert.convert_model(xdi_model, models.XdiModel)) + _assert_model(xdi_model) + + +def test_nxxas_to_nxxas(nxxas_model): + nxxas_model = next(convert.convert_model(nxxas_model, models.NxXasModel)) + _assert_model(nxxas_model) + + +def test_xdi_to_nexus(xdi_model): + nxxas_model = next(convert.convert_model(xdi_model, models.NxXasModel)) + _assert_model(nxxas_model) + + +def test_nexus_to_xdi(nxxas_model): + xdi_model = next(convert.convert_model(nxxas_model, models.XdiModel)) + _assert_model(xdi_model) + + +def _assert_xdi_model(xdi_model: models.XdiModel): + xdi_model.element.symbol = "Co" + assert str(xdi_model.data.energy.units) == "eV" + + assert xdi_model.data.energy.magnitude.tolist() == [7509, 7519] + assert str(xdi_model.data.energy.units) == "eV" + + assert xdi_model.data.mutrans.magnitude.tolist() == [-0.51329170, -0.78493490] + assert str(xdi_model.data.mutrans.units) == "" + + +def _assert_nxxas_model(xdi_model: models.NxXasModel): + xdi_model.element = "Co" + assert str(xdi_model.energy.units) == "eV" + + assert xdi_model.energy.magnitude.tolist() == [7509, 7519] + assert str(xdi_model.energy.units) == "eV" + + assert xdi_model.intensity.magnitude.tolist() == [-0.51329170, -0.78493490] + assert str(xdi_model.intensity.units) == "" + + +_ASSERT_MODEL = { + models.XdiModel: _assert_xdi_model, + models.NxXasModel: _assert_nxxas_model, +} + + +def _assert_model(model_instance): + _ASSERT_MODEL[type(model_instance)](model_instance) diff --git a/src/pynxxas/tests/test_nexus.py b/src/pynxxas/tests/test_nexus.py new file mode 100644 index 0000000..672471b --- /dev/null +++ b/src/pynxxas/tests/test_nexus.py @@ -0,0 +1,72 @@ +from ..models import NxXasModel + + +def test_nxxas(): + data = { + "@NX_class": "NXsubentry", + "definition": "NXxas", + "mode": "transmission", + "element": "Fe", + "absorption_edge": "K", + "energy": [[7, 7.1], "keV"], + "intensity": [10, 20], + } + model_instance = NxXasModel(**data) + + expected = _expected_content("NXsubentry", [[7, 7.1], "keV"], [[10, 20], ""]) + assert model_instance.model_dump() == expected + + +def test_nxxas_defaults(): + data = { + "mode": "transmission", + "element": "Fe", + "absorption_edge": "K", + } + model_instance = NxXasModel(**data) + + expected = _expected_content("NXentry", [[], ""], [[], ""]) + assert model_instance.model_dump() == expected + + +def test_nxxas_fill_data(): + data = { + "mode": "transmission", + "element": "Fe", + "absorption_edge": "K", + } + model_instance = NxXasModel(**data) + model_instance.energy = [7, 7.1], "keV" + model_instance.intensity = [10, 20] + + expected = _expected_content("NXentry", [[7, 7.1], "keV"], [[10, 20], ""]) + assert model_instance.model_dump() == expected + + +def _expected_content(nx_class, energy, intensity): + return { + "NX_class": nx_class, + "definition": "NXxas", + "mode": "transmission", + "element": "Fe", + "absorption_edge": "K", + "energy": energy, + "intensity": intensity, + "title": "Fe K (transmission)", + "instrument": None, + "plot": { + "NX_class": "NXdata", + "axes": [ + "energy", + ], + "energy": { + "target_filename": None, + "target_name": "../energy", + }, + "intensity": { + "target_filename": None, + "target_name": "../intensity", + }, + "signal": "intensity", + }, + } diff --git a/src/pynxxas/tests/test_xdi.py b/src/pynxxas/tests/test_xdi.py index 38db020..bee2bf3 100644 --- a/src/pynxxas/tests/test_xdi.py +++ b/src/pynxxas/tests/test_xdi.py @@ -5,12 +5,14 @@ def test_is_xdi(xdi_file): assert xdi.is_xdi_file(xdi_file) -def test_read_xdi(xdi_file): - model = xdi.read_xdi(xdi_file) +def test_load_xdi_file(xdi_file): + models = list(xdi.load_xdi_file(xdi_file)) + assert len(models) == 1 + model_instance = models[0] # Fields - assert model.facility.energy.magnitude == 7 - assert str(model.facility.energy.units) == "GeV" + assert model_instance.facility.energy.magnitude == 7 + assert str(model_instance.facility.energy.units) == "GeV" # User ccomments comments = [ @@ -18,14 +20,14 @@ def test_read_xdi(xdi_file): "measured at beamline 13-ID-C", "vert slits = 0.3 x 0.3mm (at ~50m)", ] - assert model.comments == comments + assert model_instance.comments == comments # XAS data - assert model.data.energy.magnitude.tolist() == [7509, 7519] - assert str(model.data.energy.units) == "eV" + assert model_instance.data.energy.magnitude.tolist() == [7509, 7519] + assert str(model_instance.data.energy.units) == "eV" - assert model.data.mutrans.magnitude.tolist() == [-0.51329170, -0.78493490] - assert str(model.data.mutrans.units) == "" + assert model_instance.data.mutrans.magnitude.tolist() == [-0.51329170, -0.78493490] + assert str(model_instance.data.mutrans.units) == "" - assert model.data.i0.magnitude.tolist() == [165872.70, 161255.70] - assert str(model.data.i0.units) == "" + assert model_instance.data.i0.magnitude.tolist() == [165872.70, 161255.70] + assert str(model_instance.data.i0.units) == ""