Source code for nxtomomill.converter.hdf5.acquisition.utils

# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2015-2020 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/

"""
Utils related to bliss-HDF5
"""


import typing
from typing import Iterable

import h5py
import numpy
from silx.io.url import DataUrl
from silx.io.utils import h5py_read_dataset
from silx.io.utils import open as open_hdf5

from nxtomo.nxobject.nxdetector import ImageKey
from nxtomomill.io.acquisitionstep import AcquisitionStep
from nxtomomill.io.config import TomoHDF5Config

try:
    import hdf5plugin  # noqa F401
except ImportError:
    pass
import logging

_logger = logging.getLogger(__name__)


[docs]def has_valid_detector(node, detectors_names): """ :return True if the node looks like a valid nx detector """ for key in node.keys(): if ( "NX_class" in node[key].attrs and node[key].attrs["NX_class"] == "NXdetector" ): if detectors_names is None or key in detectors_names: return True return False
[docs]def get_entry_type( url: DataUrl, configuration: TomoHDF5Config ) -> typing.Optional[AcquisitionStep]: """ :param DataUrl url: bliss scan url to type :return: return the step of the acquisition or None if cannot find it. """ if not isinstance(url, DataUrl): raise TypeError(f"DataUrl is expected. Not {type(url)}") if url.data_slice() is not None: raise ValueError( "url expect to provide a link to bliss scan. Slice " "are not handled" ) def _get_entry_type_from_title(entry: h5py.Group): """ try to determine the entry type from the title """ try: title = h5py_read_dataset(entry["title"]) except Exception: _logger.error(f"fail to find title for {entry.name}, skip this group") return None else: init_titles = list(configuration.init_titles) init_titles.extend(configuration.zserie_init_titles) init_titles.extend(configuration.pcotomo_init_titles) step_titles = { AcquisitionStep.INITIALIZATION: init_titles, AcquisitionStep.DARK: configuration.dark_titles, AcquisitionStep.FLAT: configuration.flat_titles, AcquisitionStep.PROJECTION: configuration.projections_titles, AcquisitionStep.ALIGNMENT: configuration.alignment_titles, } for step, titles in step_titles.items(): for title_start in titles: if title.startswith(title_start): return step return None def _get_entry_type_from_technique(entry: h5py.Group): """ try to determine entry type from the scan/technique sub groups. If this is a flat then we expect to have a "flat" group. If this is a set of projection we expect to have a "proj" group. For now alignment / return are unfiled """ group_technique = entry.get("technique", dict()) if "image_key" not in group_technique: return None image_key = h5py_read_dataset(group_technique["image_key"]) if image_key is None: return None else: try: image_key = ImageKey.from_value(image_key) except ValueError: _logger.error(f"unrecognized image key: '{image_key}'") return None else: connections = { ImageKey.DARK_FIELD: AcquisitionStep.DARK, ImageKey.FLAT_FIELD: AcquisitionStep.FLAT, ImageKey.PROJECTION: AcquisitionStep.PROJECTION, ImageKey.ALIGNMENT: AcquisitionStep.ALIGNMENT, } return connections.get(image_key, None) with open_hdf5(url.file_path()) as h5f: if url.data_path() not in h5f: raise ValueError(f"Provided path does not exists: {url}") entry = h5f[url.data_path()] if not isinstance(entry, h5py.Group): raise ValueError( f"Expected path is not related to a h5py.Group ({entry}) when expect to target a bliss entry." ) return _get_entry_type_from_technique(entry) or _get_entry_type_from_title( entry )
[docs]def get_nx_detectors(node: h5py.Group) -> tuple: """ :param h5py.Group node: node to inspect :return: tuple of NXdetector (h5py.Group) contained in `node` (expected to be the `instrument` group) :rtype: tuple """ if not isinstance(node, h5py.Group): raise TypeError("node should be an instance of h5py.Group") nx_detectors = [] for _, subnode in node.items(): if isinstance(subnode, h5py.Group) and "NX_class" in subnode.attrs: if subnode.attrs["NX_class"] == "NXdetector": if "data" in subnode and hasattr(subnode["data"], "ndim"): if subnode["data"].ndim == 3: nx_detectors.append(subnode) nx_detectors = sorted(nx_detectors, key=lambda det: det.name) return tuple(nx_detectors)
[docs]def guess_nx_detector(node: h5py.Group) -> tuple: """ Try to guess what can be an nx_detector without using the "NXdetector" NX_class attribute. Expect to find a 3D dataset named 'data' under a subnode """ if not isinstance(node, h5py.Group): raise TypeError("node should be an instance of h5py.Group") nx_detectors = [] for _, subnode in node.items(): if isinstance(subnode, h5py.Group) and "data" in subnode: if isinstance(subnode["data"], h5py.Dataset) and subnode["data"].ndim == 3: nx_detectors.append(subnode) nx_detectors = sorted(nx_detectors, key=lambda det: det.name) return tuple(nx_detectors)
[docs]def deduce_machine_electric_current( timestamps: tuple, known_machine_electric_current: dict ) -> dict: """ :param dict knowned_machine_electric_current: keys are electric timestamp. Value is electric current :param tuple timestamp: keys are frame index. timestamp. Value is electric current """ if not isinstance(known_machine_electric_current, dict): raise TypeError("knowned_machine_electric_current is expected to be a dict") for elmt in timestamps: if not isinstance(elmt, numpy.datetime64): raise TypeError( f"elmts of timestamps are expected to be {numpy.datetime64} and not {type(elmt)}" ) if len(known_machine_electric_current) == 0: raise ValueError( "knowned_machine_electric_current should at least contains one element" ) for key, value in known_machine_electric_current.items(): if not isinstance(key, numpy.datetime64): raise TypeError( f"knowned_machine_electric_current keys are expected to be instances of {numpy.datetime64} and not {type(key)}" ) if not isinstance(value, (float, numpy.number)): raise TypeError( "knowned_machine_electric_current values are expected to be instances of float" ) # 1. order **knowned** electric current by time stamps (key) known_machine_electric_current = dict( sorted(known_machine_electric_current.items()) ) known_timestamps = tuple(known_machine_electric_current.keys()) known_electric_currents = tuple(known_machine_electric_current.values()) # 3. order input timestamps timestamp_input_ordering = numpy.argsort(numpy.array(timestamps)) ordered_timestamps = numpy.take_along_axis( numpy.array(timestamps), indices=timestamp_input_ordering, axis=0 ) left_indexes = numpy.searchsorted( known_timestamps, # input array ordered_timestamps, side="left", ) def compute(timestamp, left_know_index): if left_know_index == 0: return known_electric_currents[0] elif left_know_index > len(known_timestamps) - 1: return known_electric_currents[-1] else: ec1 = known_electric_currents[left_know_index - 1] ec2 = known_electric_currents[left_know_index] left_timestamp = known_timestamps[left_know_index - 1] right_timestamp = known_timestamps[left_know_index] delta = right_timestamp - left_timestamp assert right_timestamp >= left_timestamp w1 = 1 - (timestamp - left_timestamp) / delta w2 = 1 - (right_timestamp - timestamp) / delta return ec1 * w1 + ec2 * w2 res = tuple( [ compute(timestamp, left_know_index) for timestamp, left_know_index in zip(ordered_timestamps, left_indexes) ] ) assert len(res) == len( timestamp_input_ordering ), f"incoherent number of computed electric current ({len(res)}) vs input time stamp({len(timestamp_input_ordering)})" assert len(res) == len(timestamp_input_ordering) # 4. reorder resulting electrical current to fit the order of provided timestamp original_order_res = [None] * len(res) for i, o_pos in enumerate(timestamp_input_ordering): original_order_res[o_pos] = res[i] return tuple(original_order_res)
[docs]def split_timestamps(my_array: Iterable, n_part: int): """ split given array into n_part (as equal as possible) :param Iterable my_array: """ array_size = len(my_array) if array_size < n_part: yield my_array else: start = 0 for _ in range(n_part): end = max(start + int(array_size / n_part) + 1, array_size) yield my_array[start:end] start = end