Source code for nxtomomill.converter.dxfile.dxfileconverter

# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2015-2020 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/

"""
module to convert from dx file (hdf5) to nexus tomo compliant .nx (hdf5)
"""

__authors__ = [
    "H. Payno",
]
__license__ = "MIT"
__date__ = "18/05/2021"


import logging
import os
from typing import Union, Optional

import h5py
import numpy
from silx.io.url import DataUrl
from silx.io.utils import get_data, h5py_read_dataset
from tomoscan.io import HDF5File
from pyunitsystem import metricsystem
from nxtomo.nxobject.nxdetector import FieldOfView, ImageKey
from nxtomo.utils.frameappender import FrameAppender

from nxtomomill.converter.baseconverter import BaseConverter
from nxtomomill.converter.version import version as converter_version
from nxtomomill.io.config import DXFileConfiguration

from nxtomomill.utils.hdf5 import DatasetReader, EntryReader
from silx.utils.deprecation import deprecated

_logger = logging.getLogger(__name__)


# dev note: to ensure backward compatibility we needed to keep from_dx_to_nx. And we created from_dx_config_to_nx to handle the configuration file.
# but with time we expect to remove `from_dx_to_nx` and them rename from_dx_config_to_nx to `from_dx_to_nx` in order to get something coherent.
[docs]@deprecated( replacement="from_dx_config_to_nx", since_version="0.13", reason="Some configuration dedicated classes exists for tuning the configuration directly instead of provided n parameter to the function.", ) def from_dx_to_nx( input_file: str, output_file: Optional[str] = None, file_extension: str = ".nx", duplicate_data: bool = True, input_entry: str = "/", output_entry: str = "entry0000", scan_range: tuple = (0.0, 180.0), pixel_size=(None, None), field_of_view: Optional[Union[FieldOfView, str]] = None, distance: Optional[float] = None, overwrite: bool = True, energy: Optional[float] = None, ) -> tuple: """ :param input_file: dxfile to convert :param output_file: output file to save :param file_extension: file extension to give if the output file is not provided. :param duplicate_data: if True frames will be duplicated. Otherwise we will create (relative) link to the input file. :param input_entry: path to the HDF5 group to convert. For now it looks each file can only contain one dataset. Just to ensure any future compatibility if it evolve with time. :param output_entry: path to store the NxTomo created. :param scan_range: tuple of two elements with the minimum scan range. Projections are expected to be taken with equal angular spaces. :param tuple pixel_size: pixel size can be provided (in meter and as x_pizel_size, y_pixel_size) :param field_of_view: field of view :param distance: sample / detector distance in meter :param bool overwrite: if True and if the entry already exists in the output file then will overwrite it. :return: tuple of (output_file, entry) created. For now the list should contain at most one of those tuple """ configuration = DXFileConfiguration(input_file=input_file, output_file=output_file) configuration.file_extension = file_extension configuration.copy_data = duplicate_data configuration.input_entry = input_entry configuration.output_entry = output_entry configuration.scan_range = scan_range configuration.pixel_size = pixel_size configuration.field_of_view = field_of_view configuration.distance = distance configuration.overwrite = overwrite configuration.energy = energy return from_dx_config_to_nx(configuration=configuration)
[docs]def from_dx_config_to_nx( configuration: DXFileConfiguration, ): """ Convert from dxfile to NXtomo. Dark and flats will be store at the beginning and we consider they are take at start so rotation angle will set to scan_range[0]. Projection rotation angle will be interpolated from scan_range and with equality space distribution. We do not expect any alignment projection. """ converter = _DxFileToNxConverter(configuration=configuration) return converter.convert()
class _PathDoesNotExistsInExchange(Exception): pass class _DxFileToNxConverter(BaseConverter): """ Convert from dxfile to NXtomo. Dark and flats will be store at the beginning and we consider they are take at start so rotation angle will set to scan_range[0]. Projection rotation angle will be interpolated from scan_range and with equality space distribution. We do not expect any alignment projection. """ DEFAULT_DISTANCE_VALUE = 1.0 * metricsystem.MetricSystem.METER.value DEFAULT_PIXEL_VALUE = 1.0 * metricsystem.MetricSystem.MICROMETER.value DEFAULT_BEAM_ENERGY = 1.0 * metricsystem.EnergySI.KILOELECTRONVOLT.value def __init__( self, configuration: DXFileConfiguration, ): self._configuration = configuration if not len(self.scan_range) == 2: raise ValueError("scan_range expects to be a tuple with two elements") input_file = os.path.abspath(self.input_file) if self.output_file is None: input_file_basename, _ = os.path.splitext(input_file) if not self._configuration.file_extension.startswith("."): self._configuration.file_extension = ( "." + self._configuration.file_extension ) output_file = os.path.join( os.path.dirname(input_file), os.path.basename(input_file_basename) + self._configuration.file_extension, ) else: output_file = os.path.abspath(self.output_file) self._configuration.output_file = output_file if self.input_entry == "/": self._configuration._input_entry = "" else: self._configuration._input_entry = self.input_entry if self.field_of_view is not None: fov = self.field_of_view if isinstance(fov, str): fov = fov.title() self._configuration.field_of_view = FieldOfView.from_value(fov) self._configuration.distance = self.distance self._configuration.overwrite = self.overwrite self._n_frames = 0 self._data_proj_url = None self._data_darks_url = None self._data_flats_url = None self._input_root_url = DataUrl( file_path=self.input_file, data_path=self.input_entry, scheme="silx", ) @property def input_file(self): return self._configuration.input_file @property def input_entry(self): return self._configuration.input_entry @property def output_file(self): return self._configuration.output_file @property def output_entry(self): return self._configuration.output_entry @property def scan_range(self): return self._configuration.scan_range @property def copy_data(self): return self._configuration.copy_data @property def overwrite(self): return self._configuration.overwrite @property def distance(self) -> Union[float, None]: return self._configuration.distance @property def field_of_view(self) -> Union[FieldOfView, None]: return self._configuration.field_of_view @property def energy(self) -> Union[float, None]: return self._configuration.energy @property def input_root_url(self): return self._input_root_url def convert(self): """ do conversion from dxfile to NXtomo :return: tuple of (output_file, entry) created. For now the list should contain at most one of those tuple :rtype: tuple """ with HDF5File(self.output_file, mode="a") as h5f: if self.output_entry in h5f: if self.overwrite: del h5f[self.output_entry] else: raise OSError( "{} already exists cannot create requested NXtomo entry. Won't overwrite it as not requested" ) self._n_frames = 0 self._data_proj_url = DataUrl( file_path=self.input_file, data_path="/".join((self.input_entry, "exchange", "data")), scheme="silx", ) self._data_darks_url = DataUrl( file_path=self.input_file, data_path="/".join((self.input_entry, "exchange", "data_dark")), scheme="silx", ) self._data_flats_url = DataUrl( file_path=self.input_file, data_path="/".join((self.input_entry, "exchange", "data_white")), scheme="silx", ) # convert frames if self.copy_data: self._convert_frames_with_duplication() else: self._convert_frames_without_duplication() # convert detector extra information # x pixel size x_pixel_size, y_pixel_size = self._configuration.pixel_size if x_pixel_size is None: try: x_pixel_size = self._read_x_pixel_size() except Exception: x_pixel_size = None if x_pixel_size is None: x_pixel_size = self.DEFAULT_PIXEL_VALUE _logger.warning( "No x pixel size found or provided. Set the it to the default value" ) with HDF5File(self.output_file, mode="a") as h5f: root_grp = h5f.require_group(self.output_entry) instrument_grp = root_grp.require_group("instrument") detector_grp = instrument_grp.require_group("detector") detector_grp["x_pixel_size"] = x_pixel_size detector_grp["x_pixel_size"].attrs["units"] = "m" # y pixel size if y_pixel_size is None: try: y_pixel_size = self._read_y_pixel_size() except Exception: y_pixel_size = None if y_pixel_size is None: y_pixel_size = self.DEFAULT_PIXEL_VALUE _logger.warning( "No y pixel size found or provided. Set the it to the default value" ) with HDF5File(self.output_file, mode="a") as h5f: root_grp = h5f.require_group(self.output_entry) instrument_grp = root_grp.require_group("instrument") detector_grp = instrument_grp.require_group("detector") detector_grp["y_pixel_size"] = y_pixel_size detector_grp["y_pixel_size"].attrs["units"] = "m" # field of view if self.field_of_view is not None: with HDF5File(self.output_file, mode="a") as h5f: root_grp = h5f.require_group(self.output_entry) instrument_grp = root_grp.require_group("instrument") detector_grp = instrument_grp.require_group("detector") detector_grp["field_of_view"] = self.field_of_view.value # distance if self.distance is None: try: self._configuration.distance = self._read_distance() except Exception: self._configuration.distance = None # set default distance value if self.distance is None: self._configuration.distance = self.DEFAULT_DISTANCE_VALUE _logger.warning( "No detector / sample distance found or provided. " "Set the it the default value" ) with HDF5File(self.output_file, mode="a") as h5f: root_grp = h5f.require_group(self.output_entry) instrument_grp = root_grp.require_group("instrument") detector_grp = instrument_grp.require_group("detector") detector_grp["distance"] = self.distance detector_grp["distance"].attrs["units"] = "m" # energy if self.energy is None: try: self._configuration.energy = self._read_energy() except Exception: self._configuration.energy = None if self.energy is None: self._configuration.energy = self.DEFAULT_BEAM_ENERGY _logger.warning( "No energy found or provided. " "Set the it to the default value" ) with HDF5File(self.output_file, mode="a") as h5f: root_grp = h5f.require_group(self.output_entry) beam_grp = root_grp.require_group("beam") beam_grp["incident_energy"] = ( self.energy / metricsystem.EnergySI.KILOELECTRONVOLT.value ) beam_grp["incident_energy"].attrs["units"] = "kev" # count_time self._copy_count_time() # start time with EntryReader(self.input_root_url) as input_entry: if "file_creation_datetime" in input_entry: with HDF5File(self.output_file, mode="a") as h5f: root_grp = h5f.require_group(self.output_entry) root_grp["start_time"] = h5py_read_dataset( input_entry["file_creation_datetime"] ) return ((self.output_file, self.output_entry),) def _copy_count_time(self): """Try to read and copy count_time / exposure_period""" def read_and_write_count_time(dataset: h5py.Dataset): count_time = h5py_read_dataset(dataset) if count_time == "Error: Unknown Attribute": _logger.info( f"Count time not stored on {self.input_entry}@{self.input_file}" ) else: try: if len(count_time) == self._n_frames: with HDF5File(self.output_file, mode="a") as h5f: root_grp = h5f.require_group(self.input_entry) instrument_grp = root_grp.require_group("instrument") detector_grp = instrument_grp.require_group("detector") detector_grp["count_time"] = count_time else: _logger.warning( f"exposure period and data frame have an incoherent size ({len(count_time)} vs {self._n_frames})" ) except Exception as e: _logger.error( f"Failed to get 'count_time' / 'exposure_period'. reason is {e}" ) with EntryReader(self.input_root_url) as input_entry: if "measurement" in input_entry: measurement_grp = input_entry["measurement"] if "detector" in measurement_grp: detector_grp = measurement_grp["detector"] if "exposure_period" in detector_grp: read_and_write_count_time(detector_grp["exposure_period"]) def _convert_frames_with_duplication(self): image_key = [] image_key_control = [] rotation_angle = [] data = None # handle darks try: data_dark = get_data(self._data_darks_url) except _PathDoesNotExistsInExchange: _logger.warning(f"No darks found in {self.input_entry}@{self.input_file}") else: image_key.extend([ImageKey.DARK_FIELD.value] * len(data_dark)) image_key_control.extend([ImageKey.DARK_FIELD.value] * len(data_dark)) rotation_angle.extend([self.scan_range[0]] * len(data_dark)) if data is None: data = data_dark else: data = numpy.concatenate((data, data_dark), axis=0) # handle flats try: data_flat = get_data(self._data_flats_url) except _PathDoesNotExistsInExchange: _logger.warning(f"No flats found in {self.input_entry}@{self.input_file}") else: image_key.extend([ImageKey.FLAT_FIELD.value] * len(data_flat)) image_key_control.extend([ImageKey.FLAT_FIELD.value] * len(data_flat)) rotation_angle.extend([self.scan_range[0]] * len(data_flat)) if data is None: data = data_flat else: data = numpy.concatenate((data, data_flat), axis=0) # handle projections data_proj = get_data(self._data_proj_url) image_key.extend([ImageKey.PROJECTION.value] * len(data_proj)) image_key_control.extend([ImageKey.PROJECTION.value] * len(data_proj)) assert ( self.scan_range[0] is not None ), "scan range is expected to be a tuple of float" assert ( self.scan_range[1] is not None ), "scan range is expected to be a tuple of float" rotation_angle.extend( numpy.linspace( self.scan_range[0], self.scan_range[1], num=len(data_proj), endpoint=True, ) ) if data is None: data = data_proj else: data = numpy.concatenate((data, data_proj), axis=0) self._n_frames = len(data) with HDF5File(self.output_file, mode="a") as h5f: root_grp = h5f.require_group(self.output_entry) instrument_grp = root_grp.require_group("instrument") detector_grp = instrument_grp.require_group("detector") detector_grp["data"] = data detector_grp["image_key"] = image_key detector_grp["image_key_control"] = image_key_control sample_grp = root_grp.require_group("sample") sample_grp["rotation_angle"] = rotation_angle sample_grp["rotation_angle"].attrs["units"] = "degree" self._path_nxtomo_attrs(root_grp) def _path_nxtomo_attrs(self, root_grp): root_grp.attrs["NX_class"] = "NXentry" root_grp.attrs["definition"] = "NXtomo" root_grp.attrs["version"] = converter_version() root_grp.attrs["default"] = "instrument/detector" instrument_grp = root_grp.require_group("instrument") instrument_grp.attrs["NX_class"] = "NXinstrument" instrument_grp.attrs["default"] = "detector" detector_grp = instrument_grp.require_group("detector") detector_grp.attrs["NX_class"] = "NXdetector" detector_grp.attrs["NX_class"] = "NXdata" detector_grp.attrs["signal"] = "data" detector_grp.attrs["SILX_style/axis_scale_types"] = ["linear", "linear"] if "data" in detector_grp: detector_grp["data"].attrs["interpretation"] = "image" sample_node = root_grp.require_group("sample") sample_node.attrs["NX_class"] = "NXsample" def _convert_frames_without_duplication(self): image_key = [] image_key_control = [] rotation_angle = [] dataset_path = "/".join((self.output_entry, "instrument", "detector", "data")) n_dark = 0 n_flat = 0 n_proj = 0 # handle darks try: with DatasetReader(self._data_darks_url) as dark_dataset: n_dark = dark_dataset.shape[0] # FIXME: avoid keeping some file open. not clear why this is needed dark_dataset = None FrameAppender( self._data_darks_url, self.output_file, data_path=dataset_path, where="end", logger=_logger, ).process() except Exception: _logger.error( f"No darks found in {self._data_darks_url.path()} or unable to add them to the dataset" ) else: image_key.extend([ImageKey.DARK_FIELD.value] * n_dark) image_key_control.extend([ImageKey.DARK_FIELD.value] * n_dark) rotation_angle.extend([self.scan_range[0]] * n_dark) # handle flats try: with DatasetReader(self._data_flats_url) as flat_dataset: n_flat = flat_dataset.shape[0] # FIXME: avoid keeping some file open. not clear why this is needed flat_dataset = None FrameAppender( self._data_flats_url, self.output_file, data_path=dataset_path, where="end", logger=_logger, ).process() except Exception: _logger.warning( f"No flats found in {self._data_flats_url.path()} or unable to add them to the dataset" ) else: image_key.extend([ImageKey.FLAT_FIELD.value] * n_flat) image_key_control.extend([ImageKey.FLAT_FIELD.value] * n_flat) rotation_angle.extend([self.scan_range[0]] * n_flat) # handle projections try: with DatasetReader(self._data_proj_url) as proj_dataset: n_proj = proj_dataset.shape[0] # FIXME: avoid keeping some file open. not clear why this is needed proj_dataset = None FrameAppender( self._data_proj_url, self.output_file, data_path=dataset_path, where="end", logger=_logger, ).process() except Exception: _logger.warning( f"No projections found in {self._data_proj_url.path()} or unable to add them to the dataset" ) else: image_key.extend([ImageKey.PROJECTION.value] * n_proj) image_key_control.extend([ImageKey.PROJECTION.value] * n_proj) rotation_angle.extend( numpy.linspace( self.scan_range[0], self.scan_range[1], num=n_proj, endpoint=True, ) ) self._n_frames = n_flat + n_dark + n_proj with HDF5File(self.output_file, mode="a") as h5f: root_grp = h5f.require_group(self.output_entry) instrument_grp = root_grp.require_group("instrument") detector_grp = instrument_grp.require_group("detector") detector_grp["image_key"] = image_key detector_grp["image_key_control"] = image_key_control sample_grp = root_grp.require_group("sample") sample_grp["rotation_angle"] = rotation_angle sample_grp["rotation_angle"].attrs["unit"] = "degree" self._path_nxtomo_attrs(root_grp) def _read_distance(self): with EntryReader(self.input_root_url) as input_entry: path = "/".join( ("measurement", "instrument", "sample", "setup", "detector_distance") ) dataset = input_entry[path] distance = h5py_read_dataset(dataset) try: if "units" in dataset.attrs: unit = dataset.attrs["units"] else: unit = dataset.attrs["unit"] unit = metricsystem.MetricSystem.from_value(unit).value except Exception: unit = 1 return distance * unit def _read_energy(self): with EntryReader(self.input_root_url) as input_entry: path = "/".join(("measurement", "instrument", "source", "energy")) dataset = input_entry[path] energy = float(h5py_read_dataset(dataset)) try: if "units" in dataset.attrs: unit = dataset.attrs["units"] else: unit = dataset.attrs["unit"] # patch until next tomoscan is released if unit.lower() in ("mev", "megaelectronvolt"): unit = metricsystem.EnergySI.ELECTRONVOLT.value * 1e6 elif unit.lower() in ("gev", "gigaelectronvolt"): unit = metricsystem.EnergySI.ELECTRONVOLT.value * 1e9 else: unit = metricsystem.EnergySI.from_value(unit).value except Exception as e: raise e unit = 1 return energy * unit def _read_x_pixel_size(self): with EntryReader(self.input_root_url) as input_entry: path = "/".join( ("measurement", "instrument", "detector", "actual_pixel_size_x") ) dataset = input_entry[path] pixel_size = float(h5py_read_dataset(dataset)) try: if "units" in dataset.attrs: unit = dataset.attrs["units"] else: unit = dataset.attrs["unit"] # patch until next tomoscan is released if unit == "microns": unit = "um" unit = metricsystem.MetricSystem.from_value(unit).value except Exception: unit = 1 return pixel_size * unit def _read_y_pixel_size(self): with EntryReader(self.input_root_url) as input_entry: path = "/".join( ("measurement", "instrument", "detector", "actual_pixel_size_y") ) dataset = input_entry[path] pixel_size = float(h5py_read_dataset(dataset)) try: if "units" in dataset.attrs: unit = dataset.attrs["units"] else: unit = dataset.attrs["unit"] # patch until next tomoscan is released if unit == "microns": unit = "um" unit = metricsystem.MetricSystem.from_value(unit).value except Exception: unit = 1 return pixel_size * unit