# /*##########################################################################
# Copyright (C) 2016-2023 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ############################################################################*/
"""This module provides a h5py-like API to access SpecFile data.
API description
+++++++++++++++
Specfile data structure exposed by this API:
::
  /
      1.1/
          title = "…"
          start_time = "…"
          instrument/
              specfile/
                  file_header = "…"
                  scan_header = "…"
              positioners/
                  motor_name = value
                  …
              mca_0/
                  data = …
                  calibration = …
                  channels = …
                  preset_time = …
                  elapsed_time = …
                  live_time = …
              mca_1/
                  …
              …
          measurement/
              colname0 = …
              colname1 = …
              …
              mca_0/
                   data -> /1.1/instrument/mca_0/data
                   info -> /1.1/instrument/mca_0/
              …
          sample/
              ub_matrix = …
              unit_cell = …
              unit_cell_abc = …
              unit_cell_alphabetagamma = …
      2.1/
          …
``file_header`` and ``scan_header`` are the raw headers as they
appear in the original file, as a string of lines separated by newline (``\\n``) characters.
The title is the content of the ``#S`` scan header line without the leading
``#S`` and without the scan number (e.g ``"ascan  ss1vo -4.55687 -0.556875  40 0.2"``).
The start time is converted to ISO8601 format (``"2016-02-23T22:49:05Z"``),
if the original date format is standard.
Numeric datasets are stored in *float32* format, except for scalar integers
which are stored as *int64*.
Motor positions (e.g. ``/1.1/instrument/positioners/motor_name``) can be
1D numpy arrays if they are measured as scan data, or else scalars as defined
on ``#P`` scan header lines. A simple test is done to check if the motor name
is also a data column header defined in the ``#L`` scan header line.
Scan data  (e.g. ``/1.1/measurement/colname0``) is accessed by column,
the dataset name ``colname0`` being the column label as defined in the ``#L``
scan header line.
If a ``/`` character is present in a column label or in a motor name in the
original SPEC file, it will be substituted with a ``%`` character in the
corresponding dataset name.
MCA data is exposed as a 2D numpy array containing all spectra for a given
analyser. The number of analysers is calculated as the number of MCA spectra
per scan data line. Demultiplexing is then performed to assign the correct
spectra to a given analyser.
MCA calibration is an array of 3 scalars, from the ``#@CALIB`` header line.
It is identical for all MCA analysers, as there can be only one
``#@CALIB`` line per scan.
MCA channels is an array containing all channel numbers. This information is
computed from the ``#@CHANN`` scan header line (if present), or computed from
the shape of the first spectrum in a scan (``[0, … len(first_spectrum] - 1]``).
Accessing data
++++++++++++++
Data and groups are accessed in :mod:`h5py` fashion::
    from silx.io.spech5 import SpecH5
    # Open a SpecFile
    sfh5 = SpecH5("test.dat")
    # using SpecH5 as a regular group to access scans
    scan1group = sfh5["1.1"]
    instrument_group = scan1group["instrument"]
    # alternative: full path access
    measurement_group = sfh5["/1.1/measurement"]
    # accessing a scan data column by name as a 1D numpy array
    data_array = measurement_group["Pslit HGap"]
    # accessing all mca-spectra for one MCA device
    mca_0_spectra = measurement_group["mca_0/data"]
:class:`SpecH5` files and groups provide a :meth:`keys` method::
    >>> sfh5.keys()
    ['96.1', '97.1', '98.1']
    >>> sfh5['96.1'].keys()
    ['title', 'start_time', 'instrument', 'measurement']
They can also be treated as iterators:
.. code-block:: python
    from silx.io import is_dataset
    for scan_group in SpecH5("test.dat"):
        dataset_names = [item.name in scan_group["measurement"] if
                         is_dataset(item)]
        print("Found data columns in scan " + scan_group.name)
        print(", ".join(dataset_names))
You can test for existence of data or groups::
    >>> "/1.1/measurement/Pslit HGap" in sfh5
    True
    >>> "positioners" in sfh5["/2.1/instrument"]
    True
    >>> "spam" in sfh5["1.1"]
    False
"""
import datetime
import logging
import re
import io
import h5py
import numpy
from silx import version as silx_version
from .specfile import SpecFile, SfErrColNotFound
from . import commonh5
__authors__ = ["P. Knobel", "D. Naudet"]
__license__ = "MIT"
__date__ = "17/07/2018"
logger1 = logging.getLogger(__name__)
text_dtype = h5py.special_dtype(vlen=str)
def to_h5py_utf8(str_list):
    """Convert a string or a list of strings to a numpy array of
    unicode strings that can be written to HDF5 as utf-8.
    This ensures that the type will be consistent between python 2 and
    python 3, if attributes or datasets are saved to an HDF5 file.
    """
    return numpy.array(str_list, dtype=text_dtype)
def _get_number_of_mca_analysers(scan):
    """
    :param SpecFile sf: :class:`SpecFile` instance
    """
    number_of_mca_spectra = len(scan.mca)
    # Scan.data is transposed
    number_of_data_lines = scan.data.shape[1]
    if not number_of_data_lines == 0:
        # Number of MCA spectra must be a multiple of number of data lines
        assert number_of_mca_spectra % number_of_data_lines == 0
        return number_of_mca_spectra // number_of_data_lines
    elif number_of_mca_spectra:
        # Case of a scan without data lines, only MCA.
        # Our only option is to assume that the number of analysers
        # is the number of #@CHANN lines
        return len(scan.mca.channels)
    else:
        return 0
def _motor_in_scan(sf, scan_key, motor_name):
    """
    :param sf: :class:`SpecFile` instance
    :param scan_key: Scan identification key (e.g. ``1.1``)
    :param motor_name: Name of motor as defined in file header lines
    :return: ``True`` if motor exists in scan, else ``False``
    :raise: ``KeyError`` if scan_key not found in SpecFile
    """
    if scan_key not in sf:
        raise KeyError(
            "Scan key %s " % scan_key + "does not exist in SpecFile %s" % sf.filename
        )
    ret = motor_name in sf[scan_key].motor_names
    if not ret and "%" in motor_name:
        motor_name = motor_name.replace("%", "/")
        ret = motor_name in sf[scan_key].motor_names
    return ret
def _column_label_in_scan(sf, scan_key, column_label):
    """
    :param sf: :class:`SpecFile` instance
    :param scan_key: Scan identification key (e.g. ``1.1``)
    :param column_label: Column label as defined in scan header
    :return: ``True`` if data column label exists in scan, else ``False``
    :raise: ``KeyError`` if scan_key not found in SpecFile
    """
    if scan_key not in sf:
        raise KeyError(
            "Scan key %s " % scan_key + "does not exist in SpecFile %s" % sf.filename
        )
    ret = column_label in sf[scan_key].labels
    if not ret and "%" in column_label:
        column_label = column_label.replace("%", "/")
        ret = column_label in sf[scan_key].labels
    return ret
def _parse_UB_matrix(header_line):
    """Parse G3 header line and return UB matrix
    :param str header_line: G3 header line
    :return: UB matrix
    :raises ValueError: For malformed UB matrix header line
    """
    values = list(map(float, header_line.split()))  # Can raise ValueError
    if len(values) < 9:
        raise ValueError("Not enough values in UB matrix")
    return numpy.array(values).reshape((1, 3, 3))
def _ub_matrix_in_scan(scan):
    """Return True if scan header has a G3 line and all values are not 0.
    :param scan: specfile.Scan instance
    :return: True or False
    """
    header_line = scan.scan_header_dict.get("G3", None)
    if header_line is None:
        return False
    try:
        ub_matrix = _parse_UB_matrix(header_line)
    except ValueError:
        logger1.warning("Malformed G3 header line")
        return False
    return numpy.any(ub_matrix)
def _parse_unit_cell(header_line):
    """Parse G1 header line and return unit cell
    :param str header_line: G1 header line
    :return: unit cell
    :raises ValueError: For malformed unit cell header line
    """
    values = list(map(float, header_line.split()[0:6]))  # can raise ValueError
    if len(values) < 6:
        raise ValueError("Not enough values in unit cell")
    return numpy.array(values).reshape((1, 6))
def _unit_cell_in_scan(scan):
    """Return True if scan header has a G1 line and all values are not 0.
    :param scan: specfile.Scan instance
    :return: True or False
    """
    header_line = scan.scan_header_dict.get("G1", None)
    if header_line is None:
        return False
    try:
        unit_cell = _parse_unit_cell(header_line)
    except ValueError:
        logger1.warning("Malformed G1 header line")
        return False
    return numpy.any(unit_cell)
def _parse_ctime(ctime_lines, analyser_index=0):
    """
    :param ctime_lines: e.g ``@CTIME %f %f %f``, first word ``@CTIME`` optional
        When multiple CTIME lines are present in a scan header, this argument
        is a concatenation of them separated by a ``\\n`` character.
    :param analyser_index: MCA device/analyser index, when multiple devices
        are in a scan.
    :return: (preset_time, live_time, elapsed_time)
    """
    ctime_lines = ctime_lines.lstrip("@CTIME ")
    ctimes_lines_list = ctime_lines.split("\n")
    if len(ctimes_lines_list) == 1:
        # single @CTIME line for all devices
        ctime_line = ctimes_lines_list[0]
    else:
        ctime_line = ctimes_lines_list[analyser_index]
    if not len(ctime_line.split()) == 3:
        raise ValueError(
            "Incorrect format for @CTIME header line " + '(expected "@CTIME %f %f %f").'
        )
    return list(map(float, ctime_line.split()))
def spec_date_to_iso8601(date, zone=None):
    """Convert SpecFile date to Iso8601.
    :param date: Date (see supported formats below)
    :type date: str
    :param zone: Time zone as it appears in a ISO8601 date
    Supported formats:
    * ``DDD MMM dd hh:mm:ss YYYY``
    * ``DDD YYYY/MM/dd hh:mm:ss YYYY``
    where `DDD` is the abbreviated weekday, `MMM` is the month abbreviated
    name, `MM` is the month number (zero padded), `dd` is the weekday number
    (zero padded) `YYYY` is the year, `hh` the hour (zero padded), `mm` the
    minute (zero padded) and `ss` the second (zero padded).
    All names are expected to be in english.
    Examples::
        >>> spec_date_to_iso8601("Thu Feb 11 09:54:35 2016")
        '2016-02-11T09:54:35'
        >>> spec_date_to_iso8601("Sat 2015/03/14 03:53:50")
        '2015-03-14T03:53:50'
    """
    months = [
        "Jan",
        "Feb",
        "Mar",
        "Apr",
        "May",
        "Jun",
        "Jul",
        "Aug",
        "Sep",
        "Oct",
        "Nov",
        "Dec",
    ]
    days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
    days_rx = "(?P<day>" + "|".join(days) + ")"
    months_rx = "(?P<month>" + "|".join(months) + ")"
    year_rx = r"(?P<year>\d{4})"
    day_nb_rx = r"(?P<day_nb>[0-3 ]\d)"
    month_nb_rx = r"(?P<month_nb>[0-1]\d)"
    hh_rx = r"(?P<hh>[0-2]\d)"
    mm_rx = r"(?P<mm>[0-5]\d)"
    ss_rx = r"(?P<ss>[0-5]\d)"
    tz_rx = r"(?P<tz>[+-]\d\d:\d\d){0,1}"
    # date formats must have either month_nb (1..12) or month (Jan, Feb, ...)
    re_tpls = [
        "{days} {months} {day_nb} {hh}:{mm}:{ss}{tz} {year}",
        "{days} {year}/{month_nb}/{day_nb} {hh}:{mm}:{ss}{tz}",
    ]
    grp_d = None
    for rx in re_tpls:
        full_rx = rx.format(
            days=days_rx,
            months=months_rx,
            year=year_rx,
            day_nb=day_nb_rx,
            month_nb=month_nb_rx,
            hh=hh_rx,
            mm=mm_rx,
            ss=ss_rx,
            tz=tz_rx,
        )
        m = re.match(full_rx, date)
        if m:
            grp_d = m.groupdict()
            break
    if not grp_d:
        raise ValueError("Date format not recognized : {0}".format(date))
    year = grp_d["year"]
    month = grp_d.get("month_nb")
    if not month:
        month = "{0:02d}".format(months.index(grp_d.get("month")) + 1)
    day = grp_d["day_nb"]
    tz = grp_d["tz"]
    if not tz:
        tz = zone
    time = "{0}:{1}:{2}".format(grp_d["hh"], grp_d["mm"], grp_d["ss"])
    full_date = "{0}-{1}-{2}T{3}{4}".format(year, month, day, time, tz if tz else "")
    return full_date
def _demultiplex_mca(scan, analyser_index):
    """Return MCA data for a single analyser.
    Each MCA spectrum is a 1D array. For each analyser, there is one
    spectrum recorded per scan data line. When there are more than a single
    MCA analyser in a scan, the data will be multiplexed. For instance if
    there are 3 analysers, the consecutive spectra for the first analyser must
    be accessed as ``mca[0], mca[3], mca[6]…``.
    :param scan: :class:`Scan` instance containing the MCA data
    :param analyser_index: 0-based index referencing the analyser
    :type analyser_index: int
    :return: 2D numpy array containing all spectra for one analyser
    """
    number_of_analysers = _get_number_of_mca_analysers(scan)
    number_of_spectra = len(scan.mca)
    number_of_spectra_per_analyser = number_of_spectra // number_of_analysers
    len_spectrum = len(scan.mca[analyser_index])
    mca_array = numpy.empty((number_of_spectra_per_analyser, len_spectrum))
    for i in range(number_of_spectra_per_analyser):
        mca_array[i, :] = scan.mca[analyser_index + i * number_of_analysers]
    return mca_array
# Node classes
[docs]
class SpecH5Dataset(object):
    """This convenience class is to be inherited by all datasets, for
    compatibility purpose with code that tests for
    ``isinstance(obj, SpecH5Dataset)``.
    This legacy behavior is deprecated. The correct way to test
    if an object is a dataset is to use :meth:`silx.io.utils.is_dataset`.
    Datasets must also inherit :class:`SpecH5NodeDataset` or
    :class:`SpecH5LazyNodeDataset` which actually implement all the
    API."""
    pass 
[docs]
class SpecH5NodeDataset(commonh5.Dataset, SpecH5Dataset):
    """This class inherits :class:`commonh5.Dataset`, to which it adds
    little extra functionality. The main additional functionality is the
    proxy behavior that allows to mimic the numpy array stored in this
    class.
    """
    def __init__(self, name, data, parent=None, attrs=None):
        # get proper value types, to inherit from numpy
        # attributes (dtype, shape, size)
        if isinstance(data, str):
            # use unicode (utf-8 when saved to HDF5 output)
            value = to_h5py_utf8(data)
        elif isinstance(data, float):
            # use 32 bits for float scalars
            value = numpy.float32(data)
        elif isinstance(data, int):
            value = numpy.int_(data)
        else:
            # Enforce numpy array
            array = numpy.array(data)
            data_kind = array.dtype.kind
            if data_kind in ["S", "U"]:
                value = numpy.asarray(array, dtype=text_dtype)
            elif data_kind in ["f"]:
                value = numpy.asarray(array, dtype=numpy.float32)
            else:
                value = array
        commonh5.Dataset.__init__(self, name, value, parent, attrs)
[docs]
    def __getattr__(self, item):
        """Proxy to underlying numpy array methods."""
        if hasattr(self[()], item):
            return getattr(self[()], item)
        raise AttributeError("SpecH5Dataset has no attribute %s" % item) 
 
class SpecH5LazyNodeDataset(commonh5.LazyLoadableDataset, SpecH5Dataset):
    """This class inherits :class:`commonh5.LazyLoadableDataset`,
    to which it adds a proxy behavior that allows to mimic the numpy
    array stored in this class.
    The class has to be inherited and the :meth:`_create_data` method has to be
    implemented to return the numpy data exposed by the dataset. This factory
    method is only called once, when the data is needed.
    """
    def __getattr__(self, item):
        """Proxy to underlying numpy array methods."""
        if hasattr(self[()], item):
            return getattr(self[()], item)
        raise AttributeError("SpecH5Dataset has no attribute %s" % item)
    def _create_data(self):
        """
        Factory to create the data exposed by the dataset when it is needed.
        It has to be implemented for the class to work.
        :rtype: numpy.ndarray
        """
        raise NotImplementedError()
[docs]
class SpecH5Group(object):
    """This convenience class is to be inherited by all groups, for
    compatibility purposes with code that tests for
    ``isinstance(obj, SpecH5Group)``.
    This legacy behavior is deprecated. The correct way to test
    if an object is a group is to use :meth:`silx.io.utils.is_group`.
    Groups must also inherit :class:`silx.io.commonh5.Group`, which
    actually implements all the methods and attributes."""
    pass 
[docs]
class SpecH5(commonh5.File, SpecH5Group):
    """This class opens a SPEC file and exposes it as a *h5py.File*.
    It inherits :class:`silx.io.commonh5.Group` (via :class:`commonh5.File`),
    which implements most of its API.
    """
    def __init__(self, filename):
        """
        :param filename: Path to SpecFile in filesystem
        :type filename: str
        """
        if isinstance(filename, io.IOBase):
            # see https://github.com/silx-kit/silx/issues/858
            filename = filename.name
        self._sf = SpecFile(filename)
        attrs = {
            "NX_class": to_h5py_utf8("NXroot"),
            "file_time": to_h5py_utf8(datetime.datetime.now().isoformat()),
            "file_name": to_h5py_utf8(filename),
            "creator": to_h5py_utf8("silx spech5 %s" % silx_version),
        }
        commonh5.File.__init__(self, filename, attrs=attrs)
        for scan_key in self._sf.keys():
            scan = self._sf[scan_key]
            scan_group = ScanGroup(scan_key, parent=self, scan=scan)
            self.add_node(scan_group)
[docs]
    def close(self):
        self._sf.close()
        self._sf = None 
 
class ScanGroup(commonh5.Group, SpecH5Group):
    def __init__(self, scan_key, parent, scan):
        """
        :param parent: parent Group
        :param str scan_key: Scan key (e.g. "1.1")
        :param scan: specfile.Scan object
        """
        commonh5.Group.__init__(
            self, scan_key, parent=parent, attrs={"NX_class": to_h5py_utf8("NXentry")}
        )
        # take title in #S after stripping away scan number and spaces
        s_hdr_line = scan.scan_header_dict["S"]
        title = s_hdr_line.lstrip("0123456789").lstrip()
        self.add_node(
            SpecH5NodeDataset(name="title", data=to_h5py_utf8(title), parent=self)
        )
        if "D" in scan.scan_header_dict:
            try:
                start_time_str = spec_date_to_iso8601(scan.scan_header_dict["D"])
            except (IndexError, ValueError):
                logger1.warning(
                    "Could not parse date format in scan %s header."
                    + " Using original date not converted to ISO-8601",
                    scan_key,
                )
                start_time_str = scan.scan_header_dict["D"]
        elif "D" in scan.file_header_dict:
            logger1.warning(
                "No #D line in scan %s header. " + "Using file header for start_time.",
                scan_key,
            )
            try:
                start_time_str = spec_date_to_iso8601(scan.file_header_dict["D"])
            except (IndexError, ValueError):
                logger1.warning(
                    "Could not parse date format in scan %s header. "
                    + "Using original date not converted to ISO-8601",
                    scan_key,
                )
                start_time_str = scan.file_header_dict["D"]
        else:
            logger1.warning(
                "No #D line in %s header. Setting date to empty string.", scan_key
            )
            start_time_str = ""
        self.add_node(
            SpecH5NodeDataset(
                name="start_time", data=to_h5py_utf8(start_time_str), parent=self
            )
        )
        self.add_node(InstrumentGroup(parent=self, scan=scan))
        self.add_node(MeasurementGroup(parent=self, scan=scan))
        if _unit_cell_in_scan(scan) or _ub_matrix_in_scan(scan):
            self.add_node(SampleGroup(parent=self, scan=scan))
class InstrumentGroup(commonh5.Group, SpecH5Group):
    def __init__(self, parent, scan):
        """
        :param parent: parent Group
        :param scan: specfile.Scan object
        """
        commonh5.Group.__init__(
            self,
            name="instrument",
            parent=parent,
            attrs={"NX_class": to_h5py_utf8("NXinstrument")},
        )
        self.add_node(InstrumentSpecfileGroup(parent=self, scan=scan))
        self.add_node(PositionersGroup(parent=self, scan=scan))
        num_analysers = _get_number_of_mca_analysers(scan)
        for anal_idx in range(num_analysers):
            self.add_node(
                InstrumentMcaGroup(parent=self, analyser_index=anal_idx, scan=scan)
            )
class InstrumentSpecfileGroup(commonh5.Group, SpecH5Group):
    def __init__(self, parent, scan):
        commonh5.Group.__init__(
            self,
            name="specfile",
            parent=parent,
            attrs={"NX_class": to_h5py_utf8("NXcollection")},
        )
        self.add_node(
            SpecH5NodeDataset(
                name="file_header",
                data=to_h5py_utf8(scan.file_header),
                parent=self,
                attrs={},
            )
        )
        self.add_node(
            SpecH5NodeDataset(
                name="scan_header",
                data=to_h5py_utf8(scan.scan_header),
                parent=self,
                attrs={},
            )
        )
class PositionersGroup(commonh5.Group, SpecH5Group):
    def __init__(self, parent, scan):
        commonh5.Group.__init__(
            self,
            name="positioners",
            parent=parent,
            attrs={"NX_class": to_h5py_utf8("NXcollection")},
        )
        dataset_info = []  # Store list of positioner's (name, value)
        is_error = False  # True if error encountered
        for motor_name in scan.motor_names:
            safe_motor_name = motor_name.replace("/", "%")
            if motor_name in scan.labels and scan.data.shape[0] > 0:
                # return a data column if one has the same label as the motor
                motor_value = scan.data_column_by_name(motor_name)
            else:
                # Take value from #P scan header.
                # (may return float("inf") if #P line is missing from scan hdr)
                try:
                    motor_value = scan.motor_position_by_name(motor_name)
                except SfErrColNotFound:
                    is_error = True
                    motor_value = float("inf")
            dataset_info.append((safe_motor_name, motor_value))
        if is_error:  # Filter-out scalar values
            logger1.warning("Mismatching number of elements in #P and #O: Ignoring")
            dataset_info = [
                (name, value)
                for name, value in dataset_info
                if not isinstance(value, float)
            ]
        for name, value in dataset_info:
            self.add_node(SpecH5NodeDataset(name=name, data=value, parent=self))
class InstrumentMcaGroup(commonh5.Group, SpecH5Group):
    def __init__(self, parent, analyser_index, scan):
        name = "mca_%d" % analyser_index
        commonh5.Group.__init__(
            self,
            name=name,
            parent=parent,
            attrs={"NX_class": to_h5py_utf8("NXdetector")},
        )
        mcaDataDataset = McaDataDataset(
            parent=self, analyser_index=analyser_index, scan=scan
        )
        self.add_node(mcaDataDataset)
        spectrum_length = mcaDataDataset.shape[-1]
        mcaDataDataset = None
        if len(scan.mca.channels) == 1:
            # single @CALIB line applying to multiple devices
            calibration_dataset = scan.mca.calibration[0]
            channels_dataset = scan.mca.channels[0]
        else:
            calibration_dataset = scan.mca.calibration[analyser_index]
            channels_dataset = scan.mca.channels[analyser_index]
        channels_length = len(channels_dataset)
        if (channels_length > 1) and (spectrum_length > 0):
            logger1.info("Spectrum and channels length mismatch")
            # this should always be the case
            if channels_length > spectrum_length:
                channels_dataset = channels_dataset[:spectrum_length]
            elif channels_length < spectrum_length:
                # only trust first channel and increment
                channel0 = channels_dataset[0]
                increment = channels_dataset[1] - channels_dataset[0]
                channels_dataset = numpy.linspace(
                    channel0,
                    channel0 + increment * spectrum_length,
                    spectrum_length,
                    endpoint=False,
                )
        self.add_node(
            SpecH5NodeDataset(name="calibration", data=calibration_dataset, parent=self)
        )
        self.add_node(
            SpecH5NodeDataset(name="channels", data=channels_dataset, parent=self)
        )
        if "CTIME" in scan.mca_header_dict:
            ctime_line = scan.mca_header_dict["CTIME"]
            preset_time, live_time, elapsed_time = _parse_ctime(
                ctime_line, analyser_index
            )
            self.add_node(
                SpecH5NodeDataset(name="preset_time", data=preset_time, parent=self)
            )
            self.add_node(
                SpecH5NodeDataset(name="live_time", data=live_time, parent=self)
            )
            self.add_node(
                SpecH5NodeDataset(name="elapsed_time", data=elapsed_time, parent=self)
            )
class McaDataDataset(SpecH5LazyNodeDataset):
    """Lazy loadable dataset for MCA data"""
    def __init__(self, parent, analyser_index, scan):
        commonh5.LazyLoadableDataset.__init__(
            self,
            name="data",
            parent=parent,
            attrs={
                "interpretation": to_h5py_utf8("spectrum"),
            },
        )
        self._scan = scan
        self._analyser_index = analyser_index
        self._shape = None
        self._num_analysers = _get_number_of_mca_analysers(self._scan)
    def _create_data(self):
        return _demultiplex_mca(self._scan, self._analyser_index)
    @property
    def shape(self):
        if self._shape is None:
            num_spectra_in_file = len(self._scan.mca)
            num_spectra_per_analyser = num_spectra_in_file // self._num_analysers
            len_spectrum = len(self._scan.mca[self._analyser_index])
            self._shape = num_spectra_per_analyser, len_spectrum
        return self._shape
    @property
    def size(self):
        return numpy.prod(self.shape, dtype=numpy.intp)
    @property
    def dtype(self):
        # we initialize the data with numpy.empty() without specifying a dtype
        # in _demultiplex_mca()
        return numpy.empty((1,)).dtype
    def __len__(self):
        return self.shape[0]
    def __getitem__(self, item):
        # optimization for fetching a single spectrum if data not already loaded
        if not self._is_initialized:
            if isinstance(item, int):
                if item < 0:
                    # negative indexing
                    item += len(self)
                return self._scan.mca[self._analyser_index + item * self._num_analysers]
            # accessing a slice or element of a single spectrum [i, j:k]
            try:
                spectrum_idx, channel_idx_or_slice = item
                assert isinstance(spectrum_idx, int)
            except (ValueError, TypeError, AssertionError):
                pass
            else:
                if spectrum_idx < 0:
                    item += len(self)
                idx = self._analyser_index + spectrum_idx * self._num_analysers
                return self._scan.mca[idx][channel_idx_or_slice]
        return super(McaDataDataset, self).__getitem__(item)
class MeasurementGroup(commonh5.Group, SpecH5Group):
    def __init__(self, parent, scan):
        """
        :param parent: parent Group
        :param scan: specfile.Scan object
        """
        commonh5.Group.__init__(
            self,
            name="measurement",
            parent=parent,
            attrs={
                "NX_class": to_h5py_utf8("NXcollection"),
            },
        )
        for label in scan.labels:
            safe_label = label.replace("/", "%")
            self.add_node(
                SpecH5NodeDataset(
                    name=safe_label, data=scan.data_column_by_name(label), parent=self
                )
            )
        num_analysers = _get_number_of_mca_analysers(scan)
        for anal_idx in range(num_analysers):
            self.add_node(MeasurementMcaGroup(parent=self, analyser_index=anal_idx))
class MeasurementMcaGroup(commonh5.Group, SpecH5Group):
    def __init__(self, parent, analyser_index):
        basename = "mca_%d" % analyser_index
        commonh5.Group.__init__(self, name=basename, parent=parent, attrs={})
        target_name = self.name.replace("measurement", "instrument")
        self.add_node(
            commonh5.SoftLink(name="data", path=target_name + "/data", parent=self)
        )
        self.add_node(commonh5.SoftLink(name="info", path=target_name, parent=self))
class SampleGroup(commonh5.Group, SpecH5Group):
    def __init__(self, parent, scan):
        """
        :param parent: parent Group
        :param scan: specfile.Scan object
        """
        commonh5.Group.__init__(
            self,
            name="sample",
            parent=parent,
            attrs={
                "NX_class": to_h5py_utf8("NXsample"),
            },
        )
        if _unit_cell_in_scan(scan):
            self.add_node(
                SpecH5NodeDataset(
                    name="unit_cell",
                    data=_parse_unit_cell(scan.scan_header_dict["G1"]),
                    parent=self,
                    attrs={"interpretation": to_h5py_utf8("scalar")},
                )
            )
            self.add_node(
                SpecH5NodeDataset(
                    name="unit_cell_abc",
                    data=_parse_unit_cell(scan.scan_header_dict["G1"])[0, 0:3],
                    parent=self,
                    attrs={"interpretation": to_h5py_utf8("scalar")},
                )
            )
            self.add_node(
                SpecH5NodeDataset(
                    name="unit_cell_alphabetagamma",
                    data=_parse_unit_cell(scan.scan_header_dict["G1"])[0, 3:6],
                    parent=self,
                    attrs={"interpretation": to_h5py_utf8("scalar")},
                )
            )
        if _ub_matrix_in_scan(scan):
            self.add_node(
                SpecH5NodeDataset(
                    name="ub_matrix",
                    data=_parse_UB_matrix(scan.scan_header_dict["G3"]),
                    parent=self,
                    attrs={"interpretation": to_h5py_utf8("scalar")},
                )
            )