# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2015-2020 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
"""
Base class for tomography acquisition (defined by Bliss)
"""
__authors__ = [
"H. Payno",
]
__license__ = "MIT"
__date__ = "27/11/2020"
import os
import h5py
from nxtomo.application.nxtomo import NXtomo
from nxtomomill.utils.h5pyutils import from_data_url_to_virtual_source
from nxtomomill.utils.hdf5 import EntryReader
from nxtomomill.utils.utils import embed_url, get_file_name # noqa F401
try:
import hdf5plugin # noqa F401
except ImportError:
pass
import logging
import typing
from collections import OrderedDict
import numpy
from silx.io.url import DataUrl
from silx.io.utils import h5py_read_dataset
from pyunitsystem import metricsystem
from nxtomomill.io.config import TomoHDF5Config
_logger = logging.getLogger(__name__)
def _ask_for_file_removal(file_path):
res = input(f"Overwrite {file_path} ? (Y/n)")
return res == "Y"
[docs]class BaseAcquisition:
"""
Util class to group several hdf5 group together and to write the data
Nexus / NXtomo compliant
"""
_SCAN_NUMBER_PATH = "measurement/scan_numbers"
_ENERGY_PATH = "technique/scan/energy"
_DATASET_NAME_PATH = ("technique/scan/name",)
_GRP_SIZE_PATH = ("technique/scan/nb_scans",)
_SAMPLE_NAME_PATH = ("sample/name",)
_TITLE_PATH = ("title",)
_INSTRUMENT_NAME_PATH = (
"technique/saving/beamline",
"instrument/title",
)
_FOV_PATH = "technique/scan/field_of_view"
_NB_LOOP_PATH = ("technique/scan/nb_loop", "technique/proj/nb_loop")
_NB_TOMO_PATH = ("technique/scan/nb_tomo", "technique/proj/nb_tomo")
_NB_TURNS_PATH = ("technique/proj/nb_turns",)
_TOMO_N_PATH = "technique/proj/tomo_n", "technique/scan/tomo_n"
_START_TIME_PATH = ("start_time",)
_END_TIME_PATH = ("end_time",)
_TECHNIQUE_MOTOR_PATHS = ("technique/scan/motor", "technique/proj/motor")
_SOURCE_NAME = ("instrument/machine/name",)
_SOURCE_TYPE = ("instrument/machine/type",)
_FRAME_FLIP_PATHS = (
"technique/detector/{detector_name}/flipping",
"technique/detector/flipping",
)
[docs] def __init__(
self,
root_url: typing.Union[DataUrl, None],
configuration: TomoHDF5Config,
detector_sel_callback,
start_index: int,
):
self._root_url = root_url
self._detector_sel_callback = detector_sel_callback
self._registered_entries = OrderedDict()
self._copy_frames = OrderedDict()
# key is the entry, value his type
self._entries_o_path = dict()
# key is the entry, value his the entry path from the original file.
# this value is different from `.name`. Don't know if this is a bug ?
"""user can have defined already some parameter values as energy.
The idea is to avoid asking him if """
self._configuration = configuration
self._start_index = start_index
@property
def configuration(self):
return self._configuration
@property
def start_index(self) -> int:
return self._start_index
def write_as_nxtomo(
self,
shift_entry: int,
input_file_path: str,
request_input: bool,
divide_into_sub_files,
input_callback=None,
) -> tuple:
nx_tomos = self.to_NXtomos(
request_input=request_input,
input_callback=input_callback,
check_tomo_n=True,
)
# preprocessing to define output file name
possible_extensions = (".hdf5", ".h5", ".nx", ".nexus")
output_file_basename = os.path.basename(self.configuration.output_file)
file_extension_ = None
for possible_extension in possible_extensions:
if output_file_basename.endswith(possible_extension):
output_file_basename.rstrip(possible_extension)
file_extension_ = possible_extension
def get_file_name_and_entry(index, divide_sub_files):
entry = "entry" + str(index).zfill(4)
if self.configuration.single_file or not divide_sub_files:
en_output_file = self.configuration.output_file
else:
ext = file_extension_ or self.configuration.file_extension
file_name = (
os.path.splitext(output_file_basename)[0]
+ "_"
+ str(index).zfill(4)
+ ext
)
en_output_file = os.path.join(
os.path.dirname(self.configuration.output_file), file_name
)
if os.path.exists(en_output_file):
if self.configuration.overwrite is True:
_logger.warning(en_output_file + " will be removed")
_logger.info("remove " + en_output_file)
os.remove(en_output_file)
elif _ask_for_file_removal(en_output_file) is False:
raise OSError(f"unable to overwrite {en_output_file}, exit")
else:
os.remove(en_output_file)
return en_output_file, entry
result = []
for i_nx_tomo, nx_tomo in enumerate(nx_tomos):
output_file, data_path = get_file_name_and_entry(
(shift_entry + i_nx_tomo), divide_sub_files=divide_into_sub_files
)
output_file = os.path.abspath(os.path.relpath(output_file, os.getcwd()))
output_file = os.path.abspath(output_file)
# For pcotomo for example the data path is modified in order to handle with the splitting
# that does not fit at 100 % with the current API. for now there is
# no convenient way to handle this in a better way
assert isinstance(nx_tomo, NXtomo)
# embed data if requested
if len(nx_tomo.instrument.detector.data) == 0:
_logger.warning(
f"No frame found for NXtomo number {i_nx_tomo}. Won't write any output for it"
)
continue
vs_0 = nx_tomo.instrument.detector.data[0]
if nx_tomo.detector_data_is_defined_by_url():
new_urls = []
for url in nx_tomo.instrument.detector.data:
if self._copy_frames[url.path()]:
created_url = embed_url(
url=url,
output_file=output_file,
)
new_urls.append(created_url)
else:
new_urls.append(url)
nx_tomo.instrument.detector.data = new_urls
elif nx_tomo.detector_data_is_defined_by_virtual_source():
new_vs = []
for vs in nx_tomo.instrument.detector.data:
assert isinstance(vs, h5py.VirtualSource)
assert isinstance(vs_0, h5py.VirtualSource)
url = DataUrl(file_path=vs.path, data_path=vs.name, scheme="silx")
if (
url.path() in self._copy_frames
and self._copy_frames[url.path()]
):
new_url = embed_url(url, output_file=output_file)
n_vs, _, _ = from_data_url_to_virtual_source(new_url)
new_vs.append(n_vs)
else:
new_vs.append(vs)
nx_tomo.instrument.detector.data = new_vs
# provide some extra data like origin of the input_file
if input_file_path is not None:
nx_tomo.bliss_original_files = (os.path.abspath(input_file_path),)
# save data
nx_tomo.save(file_path=output_file, data_path=data_path)
# check rotation angle
if nx_tomo.sample.rotation_angle is not None:
unique_angles = numpy.unique(nx_tomo.sample.rotation_angle)
if len(unique_angles) == 1:
_logger.warning(
f"NXtomo {data_path}@{output_file} seems to have a single value ({unique_angles}) for rotation angle. Seems it fails to find correct path to the rotation angle dataset"
)
result.append((output_file, data_path))
return tuple(result)
def to_NXtomos(self, request_input, input_callback, check_tomo_n=True) -> tuple:
raise NotImplementedError("Base class")
@property
def raise_error_if_issue(self):
"""
Should we raise an error if we encounter or an issue or should we
just log an error message
"""
return self.configuration.raises_error
@property
def is_xrd_ct(self):
"""Is this an XRD-CT acquisition"""
raise NotImplementedError("Base class")
@property
def require_x_translation(self):
"""is `x_translation` expected"""
raise NotImplementedError("Base class")
@property
def require_z_translation(self):
"""is `z_translation` expected"""
raise NotImplementedError("Base class")
@property
def has_diode(self):
"""is the acquisition expect to have a diode (instead of an energy
field)"""
raise NotImplementedError("Base class")
@property
def root_url(self):
return self._root_url
[docs] def get_expected_nx_tomo(self):
"""
Return the expected number of nxtomo created for this acquisition.
This is required to get consistent entry and file name. At lest for automation
"""
raise NotImplementedError("Base class")
def read_entry(self):
return EntryReader(self._root_url)
[docs] def is_different_sequence(self, entry):
"""
Can we have several entries 1.1, 1.2, 1.3... to consider.
This is the case for XRD-CT where 1.1, 1.2, 1.3 should be consider as
being part of the same sequence. Not for 'standard tomography'
"""
raise ValueError("Base class")
@staticmethod
def _get_node_values_for_frame_array(
node: h5py.Group,
n_frame: int,
keys: typing.Iterable,
info_retrieve,
expected_unit,
):
def get_values():
# this is a two step process: first step we parse all the
# the keys until we found one with the expected length
# if first iteration fails then we return the first existing key
for respect_length in (True, False):
for possible_key in keys:
if possible_key in node and isinstance(
node[possible_key], h5py.Dataset
):
values_ = h5py_read_dataset(node[possible_key])
unit_ = BaseAcquisition._get_unit(
node[possible_key], default_unit=expected_unit
)
# skip values containing '*DIS*'
if isinstance(values_, str) and values_ == "*DIS*":
continue
if n_frame is not None and respect_length is True:
if numpy.isscalar(values_):
length = 1
else:
length = len(values_)
if length in (n_frame, n_frame + 1):
return values_, unit_
else:
return values_, unit_
return None, None
values, unit = get_values()
if values is None:
raise ValueError(
f"Unable to retrieve {info_retrieve} for {node.name}. Was looking for {keys} datasets"
)
elif n_frame is None:
return values, unit
elif numpy.isscalar(values):
return numpy.array([values] * n_frame), unit
elif len(values) == n_frame:
return values.tolist(), unit
elif len(values) == (n_frame + 1):
# for now we can have one extra position for rotation,
# x_translation...
# because saved after the last projection. It is recording the
# motor position. For example in this case: 1 is the motor movement
# (saved) and 2 is the acquisition
#
# 1 2 1 2 1
# ----- -----
# ----- ----- -----
#
return values[:-1].tolist(), unit
elif len(values) > n_frame:
_logger.warning(
f"Incoherent number of values found for {info_retrieve}. Can come from an acquisition canceled. Else please investigate."
)
# in this case only get the values which have a frame
return values[0:n_frame], unit
elif len(values) < n_frame:
_logger.warning(
f"Incoherent number of values found for {info_retrieve}. Can come from an acquisition canceled. Else please investigate."
)
# in this case append 0 to existing values. Maybe -1 would be better ?
return list(values) + [0] * (n_frame - values), unit
elif len(values) == 1:
return numpy.array([values[0]] * n_frame), unit
else:
raise ValueError("incoherent number of angle position vs number of frame")
[docs] def register_step(self, url: DataUrl, entry_type, copy_frames) -> None:
"""
Add a bliss entry to the acquisition
:param url:
:param entry_type:
"""
raise NotImplementedError("Base class")
@staticmethod
def _get_unit(node: h5py.Dataset, default_unit):
"""Simple process to retrieve unit from an attribute"""
if "unit" in node.attrs:
return node.attrs["unit"]
elif "units" in node.attrs:
return node.attrs["units"]
else:
_logger.info(
f"no unit found for {node.name}, take default unit: {default_unit}"
)
return default_unit
@staticmethod
def _get_instrument_node(entry_node: h5py.Group) -> h5py.Group:
if not isinstance(entry_node, h5py.Group):
raise TypeError("entry_node: h5py.group expected")
return entry_node["instrument"]
@staticmethod
def _get_positioners_node(entry_node):
if not isinstance(entry_node, h5py.Group):
raise TypeError("entry_node is expected to be a h5py.Group")
parent_node = BaseAcquisition._get_instrument_node(entry_node)
if "positioners" in parent_node:
return parent_node["positioners"]
else:
return None
@staticmethod
def _get_measurement_node(entry_node):
if not isinstance(entry_node, h5py.Group):
raise TypeError("entry_node is expected to be a h5py.Group")
if "measurement" in entry_node:
return entry_node["measurement"]
else:
return None
@staticmethod
def _get_machine_node(entry_node):
if not isinstance(entry_node, h5py.Group):
raise TypeError("entry_node is expected to be a h5py.Group")
if "instrument/machine" in entry_node:
return entry_node["instrument/machine"]
else:
return None
def _read_rotation_motor_name(self) -> typing.Union[str, None]:
"""read rotation motor from root_url/technique/scan/motor
:return: name of the motor used for rotation. None if cannot find
:rtype: Union[tuple, None]
"""
if self._root_url is None:
_logger.warning("no root url. Unable to read rotation motor")
return None
else:
with EntryReader(self._root_url) as entry:
for motor_path in self._TECHNIQUE_MOTOR_PATHS:
if motor_path in entry:
try:
rotation_motor = get_dataset_name_from_motor(
motors=h5py_read_dataset(
numpy.asarray(entry[motor_path])
),
motor_name="rotation",
)
except Exception as e:
_logger.error(e)
else:
return rotation_motor
else:
_logger.warning(
f"{motor_path} unable to find rotation motor from {self._root_url}"
)
return None
def _get_electric_current(self, root_node) -> list:
"""retrieve electric current provide a time stamp for each of them"""
if root_node is None:
_logger.warning("no root url. Unable to read electric current")
return None, None
else:
grps = [
root_node,
]
measurement_node = self._get_measurement_node(root_node)
if measurement_node is not None:
grps.append(measurement_node)
machine_node = self._get_machine_node(root_node)
if machine_node is not None:
grps.append(machine_node)
for grp in grps:
try:
elec_current, unit = self._get_node_values_for_frame_array(
node=grp,
keys=self.configuration.machine_electric_current_keys,
info_retrieve="machine electric current",
expected_unit="mA",
n_frame=None,
)
except (ValueError, KeyError):
pass
else:
# handle case where elec_current is a scalar. Cast it to list before return
if numpy.isscalar(elec_current):
elec_current = [
elec_current,
]
return elec_current, unit
else:
_logger.warning(
f"Unable to retrieve machine electric current for {root_node.name}"
)
return None, None
def _get_rotation_angle(self, root_node, n_frame) -> tuple:
"""return the list of rotation angle for each frame"""
if not isinstance(root_node, h5py.Group):
raise TypeError("root_node is expected to be a h5py.Group")
for grp in (
self._get_positioners_node(root_node),
root_node,
self._get_measurement_node(root_node),
):
try:
angles, unit = self._get_node_values_for_frame_array(
node=grp,
n_frame=n_frame,
keys=self.configuration.rotation_angle_keys,
info_retrieve="rotation angle",
expected_unit="degree",
)
except (ValueError, KeyError):
pass
else:
return angles, unit
mess = f"Unable to find rotation angle for {root_node.name}"
if self.raise_error_if_issue:
raise ValueError(mess)
else:
mess += "default value will be set. (0)"
_logger.warning(mess)
return [0] * n_frame, "degree"
def _get_x_translation(self, root_node, n_frame) -> tuple:
"""return the list of translation for each frame"""
for grp in self._get_positioners_node(root_node), root_node:
try:
x_tr, unit = self._get_node_values_for_frame_array(
node=grp,
n_frame=n_frame,
keys=self.configuration.x_trans_keys,
info_retrieve="x translation",
expected_unit="mm",
)
x_tr = (
numpy.asarray(x_tr)
* metricsystem.MetricSystem.from_value(unit).value
)
except (ValueError, KeyError):
pass
else:
return x_tr, "m"
mess = f"Unable to find x translation for {self.root_url.path()}"
if self.raise_error_if_issue:
raise ValueError(mess)
else:
mess += "default value will be set. (0)"
_logger.warning(mess)
return [0] * n_frame, "m"
def _get_y_translation(self, root_node, n_frame) -> tuple:
"""return the list of translation for each frame"""
for grp in self._get_positioners_node(root_node), root_node:
try:
y_tr, unit = self._get_node_values_for_frame_array(
node=grp,
n_frame=n_frame,
keys=self.configuration.y_trans_keys,
info_retrieve="y translation",
expected_unit="mm",
)
y_tr = (
numpy.asarray(y_tr)
* metricsystem.MetricSystem.from_value(unit).value
)
except (ValueError, KeyError):
pass
else:
return y_tr, "m"
mess = f"Unable to find y translation for {self.root_url.path()}"
if self.raise_error_if_issue:
raise ValueError(mess)
else:
mess += "default value will be set. (0)"
_logger.warning(mess)
return [0] * n_frame, "m"
@staticmethod
def get_z_translation_frm(root_node, n_frame: int, configuration: TomoHDF5Config):
for grp in BaseAcquisition._get_positioners_node(root_node), root_node:
try:
z_tr, unit = BaseAcquisition._get_node_values_for_frame_array(
node=grp,
n_frame=n_frame,
keys=configuration.z_trans_keys,
info_retrieve="z translation",
expected_unit="mm",
)
z_tr = (
numpy.asarray(z_tr)
* metricsystem.MetricSystem.from_value(unit).value
)
except (ValueError, KeyError):
pass
else:
return z_tr, "m"
mess = f"Unable to find z translation on node {root_node.name}"
if configuration.raises_error:
raise ValueError(mess)
else:
mess += "default value will be set. (0)"
_logger.warning(mess)
return [0] * n_frame, "m"
def _get_z_translation(self, root_node, n_frame) -> tuple:
"""return the list of translation for each frame"""
return self.get_z_translation_frm(
root_node=root_node,
n_frame=n_frame,
configuration=self.configuration,
)
def _get_expo_time(self, root_node, n_frame, detector_node) -> tuple:
"""return expo time for each frame"""
for grp in detector_node["acq_parameters"], root_node:
try:
expo, unit = self._get_node_values_for_frame_array(
node=grp,
n_frame=n_frame,
keys=self.configuration.exposition_time_keys,
info_retrieve="exposure time",
expected_unit="s",
)
except (ValueError, KeyError):
pass
else:
return expo, unit
mess = f"Unable to find frame exposure time on entry {self.root_url.path()}"
if self.raise_error_if_issue:
raise ValueError(mess)
else:
mess += "default value will be set. (0)"
_logger.warning(mess)
return 0, "s"
[docs] def get_axis_scale_types(self):
"""
Return axis display for the detector data to be used by silx view
"""
return ["linear", "linear"]
def __str__(self):
if self.root_url is None:
return "NXTomo"
else:
return self.root_url.path()
[docs]def get_dataset_name_from_motor(motors, motor_name):
motors = numpy.asarray(motors)
indexes = numpy.where(motors == motor_name)[0]
if len(indexes) == 0:
return None
elif len(indexes) == 1:
index = indexes[0]
index_dataset_id = index + 1
if index_dataset_id < len(motors):
return motors[index_dataset_id]
else:
raise ValueError(
f"{motor_name} found but unable to find dataset name from {motors}"
)
else:
raise ValueError(f"More than one instance of {motor_name} as been found.")