Source code for nxtomomill.converter.hdf5.acquisition.standardacquisition

# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2015-2020 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/

"""
module to define a standard tomography acquisition (made by bliss)
"""

__authors__ = [
    "H. Payno",
]
__license__ = "MIT"
__date__ = "14/02/2022"


from datetime import datetime
from typing import Optional, Union

import h5py
from silx.io.url import DataUrl
from silx.io.utils import h5py_read_dataset
from pyunitsystem import ElectricCurrentSystem, metricsystem

from nxtomo.utils.transformation import UDDetTransformation, LRDetTransformation
from nxtomo.nxobject.nxsource import SourceType
from nxtomo.nxobject.nxdetector import ImageKey

from nxtomomill.io.acquisitionstep import AcquisitionStep
from nxtomomill.utils.utils import str_datetime_to_numpy_datetime64

from .baseacquisition import BaseAcquisition, EntryReader
from .utils import (
    deduce_machine_electric_current,
    get_entry_type,
    get_nx_detectors,
    guess_nx_detector,
)

try:
    import hdf5plugin  # noqa F401
except ImportError:
    pass
import fnmatch
import logging
import os

import numpy

from nxtomomill.converter.hdf5.acquisition.blisstomoconfig import (
    TomoConfig as BlissTomoConfig,
)
from nxtomomill.io.config import TomoHDF5Config
from nxtomo.application.nxtomo import NXtomo

_logger = logging.getLogger(__name__)


[docs]class StandardAcquisition(BaseAcquisition): """ Class to collect information from a bliss - hdf scan (see https://bliss.gitlab-pages.esrf.fr/fscan). Once all data is collected a set of NXtomo will be created. Then NXtomo instances will be saved to disk. :param DataUrl root_url: url of the acquisition. Can be None if this is the initialization entry :param TomoHDF5Config configuration: configuration to use to collect raw data and generate outputs :param Optional[Function] detector_sel_callback: possible callback to retrieve missing information """
[docs] def __init__( self, root_url: Union[DataUrl, None], configuration: TomoHDF5Config, detector_sel_callback, start_index, parent=None, ): super().__init__( root_url=root_url, configuration=configuration, detector_sel_callback=detector_sel_callback, start_index=start_index, ) self._parent = parent # possible parent. Like for z series self._nx_tomos = [NXtomo()] self._image_key_control = None self._rotation_angle = None """list of rotation angles""" self._x_translation = None """x_translation""" self._y_translation = None """y_translation""" self._z_translation = None self._x_flipped = None self._y_flipped = None self._unique_detector_names = list() # register names self._virtual_sources = None self._acq_expo_time = None self._copied_dataset = {} "register dataset copied. Key if the original location as" "DataUrl.path. Value is the DataUrl it has been moved to" self._known_machine_electric_current = None # store all registred amchine electric current self._frames_timestamp = None
# try to deduce time stamp of each frame def parent_root_url(self) -> Optional[DataUrl]: if self._parent is not None: return self._parent.root_url else: return None
[docs] def get_expected_nx_tomo(self): return 1
@property def image_key_control(self): return self._image_key_control @property def rotation_angle(self): return self._rotation_angle @property def x_translation(self): return self._x_translation @property def y_translation(self): return self._y_translation @property def z_translation(self): return self._z_translation @property def x_flipped(self): return self._x_flipped @property def y_flipped(self): return self._y_flipped @property def n_frames(self): return self._n_frames @property def n_frames_actual_bliss_scan(self): return self._n_frames_actual_bliss_scan @property def dim_1(self): return self._dim_1 @property def dim_2(self): return self._dim_2 @property def data_type(self): return self._data_type @property def expo_time(self): return self._acq_expo_time @property def known_machine_electric_current(self) -> Optional[dict]: """ Return the dict of all know machine electric current. Key is the time stamp, value is the electric current """ return self._known_machine_electric_current @property def is_xrd_ct(self): return False @property def require_x_translation(self): return True @property def require_y_translation(self): return True @property def require_z_translation(self): return True @property def has_diode(self): return False
[docs] def is_different_sequence(self, entry): return True
[docs] def register_step( self, url: DataUrl, entry_type: Optional[AcquisitionStep] = None, copy_frames=False, ) -> None: """ :param DataUrl url: entry to be registered and contained in the acquisition :param entry_type: type of the entry if know. Overwise will be 'evaluated' """ if entry_type is None: entry_type = get_entry_type(url=url, configuration=self.configuration) assert ( entry_type is not AcquisitionStep.INITIALIZATION ), "Initialization are root node of a new sequence and not a scan of a sequence" if entry_type is None: _logger.warning(f"{url} not recognized, skip it") else: self._registered_entries[url.path()] = entry_type self._copy_frames[url.path()] = copy_frames self._entries_o_path[url.path()] = url.data_path()
# path from the original file. Haven't found another way to get it ?! def _get_valid_camera_names(self, instrument_grp: h5py.Group): # 1: try to get detector from nx property detectors = get_nx_detectors(instrument_grp) detectors = [grp.name.split("/")[-1] for grp in detectors] def filter_detectors(det_grps): if len(det_grps) > 0: _logger.info(f"{len(det_grps)} detector found from NX_class attribute") if len(det_grps) > 1: # if an option: pick the first one once orderered # else ask user if self._detector_sel_callback is None: sel_det = det_grps[0] _logger.warning( f"several detector found. Only one is managed for now. Will pick {sel_det}" ) else: sel_det = self._detector_sel_callback(det_grps) if sel_det is None: _logger.warning("no detector given, avoid conversion") det_grps = (sel_det,) return det_grps return None detectors = filter_detectors(det_grps=detectors) if detectors is not None: return detectors # 2: get nx detector from shape... detectors = guess_nx_detector(instrument_grp) detectors = [grp.name.split("/")[-1] for grp in detectors] return filter_detectors(det_grps=detectors) def __get_data_from_camera( self, data_dataset: h5py.Dataset, data_name, frame_type, entry, entry_path, camera_dataset_url, ): if data_dataset.ndim == 2: shape = (1, data_dataset.shape[0], data_dataset.shape[1]) elif data_dataset.ndim != 3: err = f"dataset {data_name} is expected to be 3D when {data_dataset.ndim}D found." if data_dataset.ndim == 1: err = "\n".join( [ err, "This might be a bliss-EDF dataset. Those are not handled by nxtomomill", ] ) _logger.error(err) return 0 else: shape = data_dataset.shape n_frame = shape[0] self._n_frames += n_frame self._n_frames_actual_bliss_scan = n_frame if self.dim_1 is None: self._dim_2 = shape[1] self._dim_1 = shape[2] else: if self._dim_1 != shape[2] or self._dim_2 != shape[1]: raise ValueError("Inconsistency in detector shapes") if self._data_type is None: self._data_type = data_dataset.dtype elif self._data_type != data_dataset.dtype: raise ValueError("detector frames have incoherent " "data types") # update image_key and image_key_control # Note: for now there is no image_key on the master file # should be added later. image_key_control = frame_type.to_image_key_control() self._image_key_control.extend([image_key_control.value] * n_frame) data_dataset_path = data_dataset.name.replace(entry.name, entry_path, 1) # replace data_dataset name by the original entry_path. # this is a workaround to use the dataset path on the # "treated file". Because .name if the name on the 'target' # file of the virtual dataset v_source = h5py.VirtualSource( camera_dataset_url.file_path(), data_dataset_path, data_dataset.shape, dtype=self._data_type, ) self._virtual_sources.append(v_source) self._virtual_sources_len.append(n_frame) return n_frame def _treate_valid_camera( self, detector_node, entry, frame_type, input_file_path, entry_path, entry_url, ) -> bool: """ return True if the entry contains frames """ if "data_cast" in detector_node: _logger.warning( f"!!! looks like this data has been cast. Take cast data for {detector_node}!!!" ) data_dataset = detector_node["data_cast"] data_name = "/".join((detector_node.name, "data_cast")) else: data_dataset = detector_node["data"] data_name = "/".join((detector_node.name, "data")) camera_dataset_url = DataUrl( file_path=entry_url.file_path(), data_path=data_name, scheme="silx" ) n_frame = self.__get_data_from_camera( data_dataset, data_name=data_name, frame_type=frame_type, entry=entry, entry_path=entry_path, camera_dataset_url=camera_dataset_url, ) # save information if this url must be embed / copy or not. Will be used later at nxtomo side self._copy_frames[camera_dataset_url.path()] = self._copy_frames[ entry_url.path() ] x_flipped, y_flipped = self._get_flipped_frame() if x_flipped is not None and y_flipped is not None: if self._x_flipped is None and self._y_flipped is None: self._x_flipped, self._y_flipped = bool(x_flipped), bool(y_flipped) elif x_flipped != self._x_flipped or y_flipped != self._y_flipped: raise ValueError( f"Found different detector flips inside the same sequence on {entry}. Unable to handle it." ) # store rotation rots = self._get_rotation_angle(root_node=entry, n_frame=n_frame)[0] self._rotation_angle.extend(rots) if self.require_x_translation: self._x_translation.extend( self._get_x_translation(root_node=entry, n_frame=n_frame)[0] ) else: self._x_translation = None if self.require_y_translation: self._y_translation.extend( self._get_y_translation(root_node=entry, n_frame=n_frame)[0] ) else: self._y_translation = None if self.require_z_translation: self._z_translation.extend( self._get_z_translation(root_node=entry, n_frame=n_frame)[0] ) else: self._z_translation = None # store acquisition time self._acq_expo_time.extend( self._get_expo_time( root_node=entry, detector_node=detector_node, n_frame=n_frame, )[0] ) self._current_scan_n_frame = n_frame def camera_is_valid(self, det_name): assert isinstance(det_name, str) if self.configuration.valid_camera_names is None: return True for vcm in self.configuration.valid_camera_names: if fnmatch.fnmatch(det_name, vcm): return True return False def _preprocess_registered_entry(self, entry_url, type_): with EntryReader(entry_url) as entry: entry_path = self._entries_o_path[entry_url.path()] input_file_path = entry_url.file_path() input_file_path = os.path.abspath( os.path.relpath(input_file_path, os.getcwd()) ) input_file_path = os.path.abspath(input_file_path) if type_ is AcquisitionStep.INITIALIZATION: raise RuntimeError( "no initialization should be registered." "There should be only one per acquisition." ) if "instrument" not in entry: _logger.error( f"no instrument group found in {entry.name}, unable to retrieve frames" ) return instrument_grp = entry["instrument"] # if we don't get a valid camera (not provided by the user or not found on the bliss tomo metadata) if self.configuration.valid_camera_names is None: # if we need to guess detector name(s) # ignore in case we read information from bliss config det_grps = self._get_valid_camera_names(instrument_grp) # update valid camera names self.configuration.valid_camera_names = det_grps has_frames = False for key, _ in instrument_grp.items(): if ( "NX_class" in instrument_grp[key].attrs and instrument_grp[key].attrs["NX_class"] == "NXdetector" ): _logger.debug(f"Found one detector at {key} for {entry.name}.") # diode if self.has_diode: try: diode_vals, diode_unit = self._get_diode( root_node=entry, n_frame=self.n_frames ) except Exception: pass else: self._diode.extend(diode_vals) self._diode_unit = diode_unit if not self.camera_is_valid(key): _logger.debug(f"ignore {key}, not a `valid` camera name") continue else: detector_node = instrument_grp[key] if key not in self._unique_detector_names: self._unique_detector_names.append(key) self._treate_valid_camera( detector_node, entry=entry, frame_type=type_, input_file_path=input_file_path, entry_path=entry_path, entry_url=entry_url, ) has_frames = True # try to get some other metadata # handle frame time stamp start_time = self._get_start_time(entry) if start_time is not None: start_time = datetime.fromisoformat(start_time) end_time = self._get_end_time(entry) if end_time is not None: end_time = datetime.fromisoformat(end_time) if has_frames: self._register_frame_timestamp(entry, start_time, end_time) # handle electric current. Can retrieve some current even on bliss scan entry doesn;t containing directly frames self._register_machine_electric_current(entry, start_time, end_time) def _register_machine_electric_current( self, entry: h5py.Group, start_time, end_time ): """Update machine electric current for provided entry (bliss scan""" ( electric_currents, electric_current_unit, ) = self._get_electric_current(root_node=entry) electric_current_unit_ref = ElectricCurrentSystem.AMPERE # electric current will be saved as Ampere if electric_currents is not None and len(electric_currents) > 0: if electric_current_unit is None: electric_current_unit = ElectricCurrentSystem.MILLIAMPERE _logger.warning( "No unit found for electric current. Consider it as mA." ) unit_factor = ( ElectricCurrentSystem.from_str(electric_current_unit).value / electric_current_unit_ref.value ) new_know_electric_currents = {} if start_time is None or end_time is None: if start_time != end_time: _logger.warning( f"Unable to find {'start_time' if start_time is None else 'end_time'}. Will pick the first available electric_current for the frame" ) t_time = start_time or end_time # if at least one can find out new_know_electric_currents[ str_datetime_to_numpy_datetime64(t_time) ] = (electric_currents[0] * unit_factor) else: _logger.error( "Unable to find start_time and end_time. Will not register any machine electric current" ) elif len(electric_currents) == 1: # if we have only one value, consider the machine electric current is constant during this time # might be improved later if we can know if current is determine at the # beginning or the end. But should have no impact # as the time slot is short new_know_electric_currents[ str_datetime_to_numpy_datetime64(start_time) ] = (electric_currents[0] * unit_factor) else: # linspace from datetime within ms precision. # see https://localcoder.org/creating-numpy-linspace-out-of-datetime#credit_4 # and https://stackoverflow.com/questions/37964100/creating-numpy-linspace-out-of-datetime timestamps = numpy.linspace( start=str_datetime_to_numpy_datetime64(start_time).astype( numpy.float128 ), stop=str_datetime_to_numpy_datetime64(end_time).astype( numpy.float128 ), num=len(electric_currents), endpoint=True, dtype="<M8[ms]", ) for timestamp, mach_electric_current in zip( timestamps, electric_currents ): new_know_electric_currents[timestamp.astype(numpy.datetime64)] = ( mach_electric_current * unit_factor ) self._known_machine_electric_current.update(new_know_electric_currents) def _register_frame_timestamp(self, entry: h5py.Group, start_time, end_time): """ update frame time stamp for the provided entry (bliss scan) """ if start_time is None or end_time is None: if start_time != end_time: t_time = str_datetime_to_numpy_datetime64(start_time or end_time) message = f"Unable to find start_time and / or end_time. Takes {t_time} as frame time stamp for {entry} " self._frames_timestamp.extend( [t_time] * self._n_frames_actual_bliss_scan ) _logger.warning(message) else: message = f"Unable to find start_time and end_time. Can't deduce frames time stamp for {entry}" _logger.error(message) else: frames_times_stamps_as_f8 = numpy.linspace( start=str_datetime_to_numpy_datetime64(start_time).astype( numpy.float128 ), stop=str_datetime_to_numpy_datetime64(end_time).astype(numpy.float128), num=self._n_frames_actual_bliss_scan, endpoint=True, dtype="<M8[ms]", ) frames_times_stamps_as_f8 = [ timestamp.astype("<M8[ms]") for timestamp in frames_times_stamps_as_f8 ] self._frames_timestamp.extend(frames_times_stamps_as_f8) def _preprocess_registered_entries(self): """parse all frames of the different steps and retrieve data, image_key...""" self._n_frames = 0 self._n_frames_actual_bliss_scan = 0 # number of frame contains in X.1 self._dim_1 = None self._dim_2 = None self._data_type = None self._x_translation = [] self._y_translation = [] self._z_translation = [] self._image_key_control = [] self._rotation_angle = [] self._known_machine_electric_current = {} self._frames_timestamp = [] self._virtual_sources = [] self._instrument_name = None self._virtual_sources_len = [] self._diode = [] self._acq_expo_time = [] self._diode_unit = None self._copied_dataset = {} self._x_flipped = None self._y_flipped = None # if rotation motor is not defined try to deduce it from root_url/technique/scan/motor if self.configuration.rotation_angle_keys is None: rotation_motor = self._read_rotation_motor_name() if rotation_motor is not None: self.configuration.rotation_angle_keys = (rotation_motor,) else: self.configuration.rotation_angle_keys = tuple() # list of data virtual source for the virtual dataset for entry_url, type_ in self._registered_entries.items(): url = DataUrl(path=entry_url) self._preprocess_registered_entry(url, type_) if len(self._diode) == 0: self._diode = None if self._diode is not None: self._diode = numpy.asarray(self._diode) self._diode = self._diode / self._diode.mean() def _get_diode(self, root_node, n_frame) -> tuple: values, unit = self._get_node_values_for_frame_array( node=root_node["measurement"], n_frame=n_frame, keys=self.configuration.diode_keys, info_retrieve="diode", expected_unit="volt", ) return values, unit def get_already_defined_params(self, key): defined = self.__get_extra_param(key=key) if len(defined) > 1: raise ValueError("{} are aliases. Only one should be defined") elif len(defined) == 0: return None else: return list(defined.values())[0] def __get_extra_param(self, key) -> dict: """return already defined parameters for one key. A key as aliases so it returns a dict""" aliases = list(TomoHDF5Config.EXTRA_PARAMS_ALIASES[key]) aliases.append(key) res = {} for alias in aliases: if alias in self.configuration.param_already_defined: res[alias] = self.configuration.param_already_defined[alias] return res def _generic_path_getter(self, paths: tuple, message, level="warning", entry=None): """ :param str level: level can be logging.level values : "warning", "error", "info" :param H5group entry: user can provide directly an entry to be used as an open h5Group """ if not isinstance(paths, tuple): raise TypeError url = self.parent_root_url() or self.root_url if url is not None: self._check_has_metadata(url) def process(h5_group): for path in paths: if h5_group is not None and path in h5_group: return h5py_read_dataset(h5_group[path]) if message is not None: getattr(_logger, level)(message) if entry is None: if url is None: return None with EntryReader(url) as h5_group: return process(h5_group) else: return process(entry) def _get_source_name(self): """ """ return self._generic_path_getter( paths=self._SOURCE_NAME, message="Unable to find source name", level="info" ) def _get_source_type(self): """ """ return self._generic_path_getter( paths=self._SOURCE_TYPE, message="Unable to find source type", level="info" ) def _get_title(self): """return acquisition title""" return self._generic_path_getter( paths=self._TITLE_PATH, message="Unable to find title" ) def _get_instrument_name(self): """:return instrument instrument name (aka beamline name)""" name = self._generic_path_getter( paths=self._INSTRUMENT_NAME_PATH, message="Unable to find instrument name", level="info", ) # on some path / old hdf5 the name is prefixed by "ESRF:". clean those if name is not None and name.startswith("ESRF:"): name = name.replace("ESRF:", "") return name def _get_dataset_name(self): """return name of the acquisition""" return self._generic_path_getter( paths=self._DATASET_NAME_PATH, message="No name describing the acquisition has been " "found, Name dataset will be skip", ) def _get_sample_name(self): """return sample name""" return self._generic_path_getter( paths=self._SAMPLE_NAME_PATH, message="No sample name has been " "found, Sample name dataset will be skip", ) def _get_grp_size(self): """return the nb_scans composing the zseries if is part of a group of sequence""" return self._generic_path_getter( paths=self._GRP_SIZE_PATH, message=None, ) def _get_tomo_n(self): return self._generic_path_getter( paths=self._TOMO_N_PATH, message="unable to find information regarding tomo_n", ) def _get_start_time(self, entry=None): return self._generic_path_getter( paths=self._START_TIME_PATH, message="Unable to find start time", level="info", entry=entry, ) def _get_end_time(self, entry=None): return self._generic_path_getter( paths=self._END_TIME_PATH, message="Unable to find end time", level="info", entry=entry, ) def _get_flipped_frame(self): url = self.parent_root_url() or self.root_url if url is None: return None, None self._check_has_metadata(url) with EntryReader(url) as entry: for flip_path in self._FRAME_FLIP_PATHS: if len(self._unique_detector_names) > 0: key = flip_path.format(detector_name=self._unique_detector_names[0]) else: key = flip_path if key in entry: return h5py_read_dataset(entry[key]) else: return None, None def _get_energy(self, ask_if_0, input_callback): """return tuple(energy, unit)""" url = self.parent_root_url() or self.root_url if url is None: return None, None self._check_has_metadata() with EntryReader(url) as entry: if self._ENERGY_PATH in entry: energy = h5py_read_dataset(entry[self._ENERGY_PATH]) unit = self._get_unit(entry[self._ENERGY_PATH], default_unit="kev") if energy == 0 and ask_if_0: desc = ( "Energy has not been registered. Please enter " "incoming beam energy (in kev):" ) if input_callback is None: en = input(desc) else: en = input_callback("energy", desc) if energy is not None: energy = float(en) return energy, unit else: mess = f"unable to find energy for entry {entry}." if self.raise_error_if_issue: raise ValueError(mess) else: mess += " Default value will be set (19kev)" _logger.warning(mess) return 19.0, "kev" def _get_distance(self): """return tuple(distance, unit)""" url = self.parent_root_url() or self.root_url if url is None: return None, None self._check_has_metadata(url) with EntryReader(url) as entry: for key in self.configuration.sample_detector_distance_paths: if key in entry: node = entry[key] distance = h5py_read_dataset(node) unit = self._get_unit(node, default_unit="cm") # convert to meter distance = ( distance * metricsystem.MetricSystem.from_value(unit).value ) return distance, "m" mess = f"unable to find distance for entry {entry}." if self.raise_error_if_issue: raise ValueError(mess) else: mess += "Default value will be set (1m)" _logger.warning(mess) return 1.0, "m" def _get_pixel_size(self, axis): """return tuple(pixel_size, unit)""" url = self.parent_root_url() or self.root_url if url is None: return None, None if axis not in ("x", "y"): raise ValueError self._check_has_metadata() if axis == "x": keys = self.configuration.x_pixel_size_paths elif axis == "y": keys = self.configuration.y_pixel_size_paths else: raise ValueError(f"axis {axis} is invalid") with EntryReader(url) as entry: for key in keys: if key in entry: node = entry[key] node_item = h5py_read_dataset(node) # if the pixel size is provided as x, y if isinstance(node_item, numpy.ndarray): if len(node_item) > 1 and axis == "y": size_ = node_item[1] else: size_ = node_item[0] # if this is a single value else: size_ = node_item unit = self._get_unit(node, default_unit="micrometer") # convert to meter size_ = size_ * metricsystem.MetricSystem.from_value(unit).value return size_, "m" mess = f"unable to find {axis} pixel size for entry {entry}" if self.raise_error_if_issue: raise ValueError(mess) else: mess += "Value will be set to default (10-6m)" _logger.warning(mess) return 10e-6, "m" def _get_field_of_fiew(self): if self.configuration.field_of_view is not None: return self.configuration.field_of_view.value url = self.parent_root_url() or self.root_url if url is None: return None with EntryReader(url) as entry: if self._FOV_PATH in entry: return h5py_read_dataset(entry[self._FOV_PATH]) else: # FOV is optional: don't raise an error _logger.warning( f"unable to find information regarding field of view for entry {entry}. set it to default value (Full)" ) return "Full" def _get_estimated_cor_from_motor(self, pixel_size): """given pixel is expected to be given in meter""" if self.root_url is None: return None, None with self.read_entry() as entry: if self.configuration.y_rot_key in entry: y_rot = h5py_read_dataset(entry[self.configuration.y_rot_key]) else: _logger.warning( f"unable to find information on positioner {self.configuration.y_rot_key}" ) return None, None # y_rot is provided in mm when pixel size is in meter. unit = self._get_unit( entry[self.configuration.y_rot_key], default_unit="millimeter" ) unit = metricsystem.MetricSystem.from_value(unit).value y_rot = y_rot * unit if pixel_size is None: mess = ( "pixel size is required to estimate the cor from the " "motor position on pixels" ) if self.raise_error_if_issue: raise ValueError(mess) else: mess += " Set default value (0m)" _logger.warning(mess) return 0, "m" else: return y_rot / pixel_size, "pixels" def _update_configuration_from_tomo_config(self): """ force some values from EBS tomo 'tomoconfig' group to make sure correct dataset are read """ if self.configuration.ignore_bliss_tomo_config: return url = self.parent_root_url() or self.root_url if url is None: # case of entries are made manually and user do not provide an init node. return with EntryReader(url) as entry: technique_grp = entry.get("technique", None) if technique_grp is None: _logger.warning( f"Unable to find a technique group in {entry}. Unable to reach EBStomo metadata" ) return bliss_tomo_version = technique_grp.attrs.get("tomo_version", None) # read metadata try: bliss_metadata = BlissTomoConfig.from_technique_group( technique_group=technique_grp ) except KeyError: if bliss_tomo_version is not None: _logger.warning( f"Unable to find bliss 'tomo_config' when expected (tomo_version={bliss_tomo_version}). Fallback to conversion based on list of paths to check" ) else: # check if some metadata are missing metadata_values = { "detector": bliss_metadata.tomo_detector, "translation_x": bliss_metadata.translation_x, "translation_y": bliss_metadata.translation_y, "translation_z": bliss_metadata.translation_z, "rotation": bliss_metadata.rotation, } missing_metadata = list( [k for k, v in metadata_values.items() if v is None] ) _logger.info(f"read tomo config from bliss. Get {metadata_values}") if len(missing_metadata) > 0: _logger.warning( f"couldn't find {missing_metadata} in bliss 'technique/tomoconfig' dataset" ) if bliss_metadata.tomo_detector is not None: self.configuration.valid_camera_names = bliss_metadata.tomo_detector if bliss_metadata.translation_x is not None: self.configuration.x_trans_keys = bliss_metadata.translation_x if bliss_metadata.translation_y is not None: self.configuration.y_trans_keys = bliss_metadata.translation_y if bliss_metadata.translation_z is not None: self.configuration.z_trans_keys = bliss_metadata.translation_z if bliss_metadata.rotation is not None: self.configuration.rotation_angle_keys = bliss_metadata.rotation def to_NXtomos(self, request_input, input_callback, check_tomo_n=True) -> tuple: self._update_configuration_from_tomo_config() self._preprocess_registered_entries() nx_tomo = NXtomo() # 1. root level information # start and end time nx_tomo.start_time = self._get_start_time() nx_tomo.end_time = self._get_end_time() # title nx_tomo.title = self._get_dataset_name() # group size nx_tomo.group_size = self._get_grp_size() # 2. define beam try: energy, unit = self._get_user_settable_parameter( param_key=TomoHDF5Config.EXTRA_PARAMS_ENERGY_DK, fallback_fct=self._get_energy, dtype=float, input_callback=input_callback, ask_if_0=request_input, ) except TypeError: # if the path lead to unexpected dataset (group instead of a dataset or a broken folder) _logger.error("Fail to get energy") energy = None unit = None if energy is not None: # TODO: better manamgent of energy ? might be energy.beam or energy.instrument.beam ? nx_tomo.energy = energy nx_tomo.energy.unit = unit # 3. define instrument nx_tomo.instrument.name = self._get_instrument_name() nx_tomo.instrument.detector.data = self._virtual_sources nx_tomo.instrument.detector.image_key_control = self.image_key_control nx_tomo.instrument.detector.count_time = self._acq_expo_time nx_tomo.instrument.detector.count_time.unit = "s" # distance try: distance, unit = self._get_user_settable_parameter( param_key=TomoHDF5Config.EXTRA_PARAMS_DISTANCE, fallback_fct=self._get_distance, dtype=float, ) except TypeError: # if the path lead to unexpected dataset (group instead of a dataset or a broken folder) _logger.error("Fail to get sample/detector distance") distance = None unit = None if distance is not None: nx_tomo.instrument.detector.distance = distance if nx_tomo.instrument.detector.distance is not None: nx_tomo.instrument.detector.distance.unit = unit # x and y pixel size try: x_pixel_size, unit = self._get_user_settable_parameter( param_key=TomoHDF5Config.EXTRA_PARAMS_X_PIXEL_SIZE_DK, fallback_fct=self._get_pixel_size, dtype=float, axis="x", ) except TypeError: # if the path lead to unexpected dataset (group instead of a dataset or a broken folder) _logger.error("Fail to get x_pixel_size") unit = None x_pixel_size = None else: nx_tomo.instrument.detector.x_pixel_size = x_pixel_size if unit is not None: nx_tomo.instrument.detector.x_pixel_size.unit = unit try: y_pixel_size, unit = self._get_user_settable_parameter( param_key=TomoHDF5Config.EXTRA_PARAMS_Y_PIXEL_SIZE_DK, fallback_fct=self._get_pixel_size, dtype=float, axis="y", ) except TypeError: # if the path lead to unexpected dataset (group instead of a dataset or a broken folder) _logger.error("Fail to get y_pixel_size") unit = None y_pixel_size = None else: nx_tomo.instrument.detector.y_pixel_size = y_pixel_size if unit is not None: nx_tomo.instrument.detector.y_pixel_size.unit = unit # flips if self.x_flipped: nx_tomo.instrument.detector.transformations.add_transformation( LRDetTransformation() ) if self.y_flipped: nx_tomo.instrument.detector.transformations.add_transformation( UDDetTransformation() ) # fov fov = self._get_field_of_fiew() if fov is not None: nx_tomo.instrument.detector.field_of_view = fov # estimated cor from motor (from yrot) estimated_cor, unit = self._get_estimated_cor_from_motor( pixel_size=y_pixel_size ) if estimated_cor is not None: nx_tomo.instrument.detector.estimated_cor_from_motor = estimated_cor # define tomo_n nx_tomo.instrument.detector.tomo_n = self._get_tomo_n() # 4. define nx source source_name = self._get_source_name() nx_tomo.instrument.source.name = source_name source_type = self._get_source_type() if source_type is not None: if "synchrotron" in source_type.lower(): source_type = SourceType.SYNCHROTRON_X_RAY_SOURCE.value # drop a warning if the source type is invalid if source_type not in SourceType.values(): _logger.warning( f"Source type ({source_type}) is not a 'standard value'" ) nx_tomo.instrument.source.type = source_type # 5. define sample nx_tomo.sample.name = self._get_sample_name() nx_tomo.sample.rotation_angle = self.rotation_angle nx_tomo.sample.x_translation.value = self.x_translation nx_tomo.sample.x_translation.unit = "m" nx_tomo.sample.y_translation.value = self.y_translation nx_tomo.sample.y_translation.unit = "m" nx_tomo.sample.z_translation.value = self.z_translation nx_tomo.sample.z_translation.unit = "m" # 6. define control if ( self.configuration.handle_machine_current and self.known_machine_electric_current not in (None, dict()) ): nx_tomo.control.data = deduce_machine_electric_current( timestamps=self._frames_timestamp, known_machine_electric_current=self._known_machine_electric_current, ) nx_tomo.control.data.unit = ElectricCurrentSystem.AMPERE types = set() if nx_tomo.control.data.value is not None: for d in nx_tomo.control.data.value: types.add(type(d)) # 7. define diode if self.has_diode: nx_tomo.instrument.diode.data = self._diode nx_tomo.instrument.diode.data.unit = self._diode_unit if check_tomo_n: self.check_tomo_n() return (nx_tomo,) def check_tomo_n(self): # check scan is complete tomo_n = self._get_tomo_n() if self.configuration.check_tomo_n and tomo_n is not None: image_key_control = numpy.asarray(self._image_key_control) proj_found = len( image_key_control[image_key_control == ImageKey.PROJECTION.value] ) if proj_found < tomo_n: mess = f"Incomplete scan. Expect {tomo_n} projection but only {proj_found} found" if self.configuration.raises_error is True: raise ValueError(mess) else: _logger.error(mess) def _check_has_metadata(self, url: Optional[DataUrl] = None): url = url or self.root_url if url is None: raise ValueError( "no initialization entry specify, unable to" "retrieve energy" ) def _get_user_settable_parameter( self, param_key, fallback_fct, dtype: Optional[type] = None, *fallback_args, **fallback_kwargs, ): """ return value, unit """ value = self.get_already_defined_params(param_key) if value is not None: unit = TomoHDF5Config.get_extra_params_default_unit(param_key) else: value, unit = fallback_fct(*fallback_args, **fallback_kwargs) if dtype is None or value is None: return value, unit else: return dtype(value), unit