# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2019 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
"""An :class:`.Enum` class with additional features."""
__authors__ = ["T. Vincent", "H. Payno"]
__license__ = "MIT"
__date__ = "29/04/2019"
import logging
import os
import typing
from datetime import datetime
import numpy
from silx.io.url import DataUrl
from silx.io.utils import get_data
from silx.io.utils import open as open_hdf5
from silx.utils.deprecation import deprecated
from silx.utils.enum import Enum as _Enum
from tomoscan.esrf.scan.utils import cwd_context
from tomoscan.io import HDF5File
from nxtomo.nxobject.nxdetector import ImageKey
from nxtomo.utils.frameappender import FrameAppender
from nxtomo.application.nxtomo import NXtomo
try:
import hdf5plugin # noqa F401
except ImportError:
pass
import uuid
from collections.abc import Iterable
from silx.io.utils import h5py_read_dataset
[docs]def embed_url(url: DataUrl, output_file: str) -> DataUrl:
"""
Create a dataset under duplicate_data and with a random name
to store it
:param DataUrl url: dataset to be copied
:param str output_file: where to store the dataset
:param expected_type: some metadata to put in copied dataset attributes
:type expected_type: Union[None, AcquisitionStep]
:param data: data loaded from url is already loaded
:type data: None, numpy.array
"""
if not isinstance(url, DataUrl):
return url
elif url.file_path() == output_file:
return url
else:
embed_data_path = "/".join(("/duplicate_data", str(uuid.uuid1())))
with cwd_context(os.path.dirname(os.path.abspath(output_file))):
with HDF5File(output_file, "a") as h5s:
h5s[embed_data_path] = get_data(url)
h5s[embed_data_path].attrs["original_url"] = url.path()
return DataUrl(
file_path=output_file, data_path=embed_data_path, scheme="silx"
)
[docs]class FileExtension(_Enum):
H5 = ".h5"
HDF5 = ".hdf5"
NX = ".nx"
[docs]def get_file_name(file_name, extension, check=True):
"""
set the given extension
:param str file_name: name of the file
:param str extension: extension to give
:param bool check: if check, already check if the file as one of the
'_FileExtension'
"""
if isinstance(extension, str):
extension = FileExtension.from_value(extension.lower())
assert isinstance(extension, FileExtension)
if check:
for value in FileExtension.values():
if file_name.lower().endswith(value):
return file_name
return file_name + extension.value()
[docs]def get_tuple_of_keys_from_cmd(cmd_value: str) -> tuple:
"""Return a tuple"""
return tuple(cmd_value.split(","))
[docs]def is_nx_tomo_entry(file_path, entry):
"""
:param str file_path: hdf5 file path
:param str entry: entry to check
:return: True if the entry is an NXTomo entry
"""
if not os.path.exists(file_path):
return False
else:
with open_hdf5(file_path) as h5s:
if entry not in h5s:
return False
node = h5s[entry]
return NXtomo.node_is_nxtomo(node)
[docs]def add_dark_flat_nx_file(
file_path: str,
entry: str,
darks_start: typing.Union[None, numpy.ndarray, DataUrl] = None,
flats_start: typing.Union[None, numpy.ndarray, DataUrl] = None,
darks_end: typing.Union[None, numpy.ndarray, DataUrl] = None,
flats_end: typing.Union[None, numpy.ndarray, DataUrl] = None,
extras: typing.Union[None, dict] = None,
logger: typing.Union[None, logging.Logger] = None,
embed_data: bool = False,
):
"""
This will get all data from entry@input_file and patch them with provided
dark and / or flat(s).
We consider the sequence as: dark, start_flat, projections, end_flat.
Behavior regarding data type and target dataset:
* if dataset at `entry` already exists:
* if dataset at `entry` is a 'standard' dataset:
* data will be loaded if necessary and `enrty` will be updated
* if dataset at `entry` is a virtual dataset:
* if `data` is a numpy array then we raise an error: the data should
already be saved somewhere and you should provide a DataUrl
* if `data` is a DataUrl then the virtual dataset is updated and
a virtual source pointing to the
DataUrl.file_path()@DataUrl.data_path() is added to the layout
* if a new dataset `entry` need to be added:
* if `data` is a numpy array then we create a new 'standard' Dataset
* if `data` is a DataUrl then a new virtual dataset will be created
note: Datasets `image_key`, `image_key_control`, `rotation_angle` and
`count_time` will be copied each time.
:param file_path: NXTomo file containing data to be patched
:type file_path: str
:param entry: entry to be patched
:type entry: str
:param darks_start: (3D) numpy array containing the first dark serie if any
:type darks_start: Union[None, numpy.ndarray, DataUrl]
:param flats_start: (3D) numpy array containing the first flat if any
:type flats_start: Union[None, numpy.ndarray, DataUrl]
:param darks_end: (3D) numpy array containing dark the second dark serie if
any
:type darks_end: Union[None, numpy.ndarray, DataUrl]
:param flats_end: (3D) numpy array containing the second flat if any
:type flats_end: Union[None, numpy.ndarray, DataUrl]
:param extras: dictionary to specify some parameters for flats and dark
like rotation angle.
valid keys: 'start_dark', 'end_dark', 'start_flag',
'end_flag'.
Values should be a dictionary of 'NXTomo' keys with
values to be set instead of 'default values'.
Possible values are:
* `count_time`
* `rotation_angle`
:type extras: Union[None, dict]
:param Union[None, logging.Logger] logger: object for logs
:param bool embed_data: if True then each external data will be copy
under a 'duplicate_data' folder
"""
if extras is None:
extras = {}
else:
for key in extras:
valid_extra_keys = ("darks_start", "darks_end", "flats_start", "flats_end")
if key not in valid_extra_keys:
raise ValueError(
f"{key} is not recognized. Valid values are {valid_extra_keys}"
)
if embed_data is True:
darks_start = embed_url(darks_start, output_file=file_path)
darks_end = embed_url(darks_end, output_file=file_path)
flats_start = embed_url(flats_start, output_file=file_path)
flats_end = embed_url(flats_end, output_file=file_path)
else:
for url in (darks_start, darks_end, flats_start, flats_end):
if url is not None and isinstance(url, DataUrl):
if isinstance(url.data_slice(), slice):
if url.data_slice().step not in (None, 1):
raise ValueError(
"When data is not embed slice `step`"
"must be None or 1. Other values are"
f"not handled. Failing url is {url}"
)
# !!! warning: order of dark / flat treatments import
data_names = "flats_start", "darks_end", "flats_end", "darks_start"
datas = flats_start, darks_end, flats_end, darks_start
keys_value = (
ImageKey.FLAT_FIELD.value,
ImageKey.DARK_FIELD.value,
ImageKey.FLAT_FIELD.value,
ImageKey.DARK_FIELD.value,
)
wheres = "start", "end", "end", "start" # warning: order import
for d_n, data, key, where in zip(data_names, datas, keys_value, wheres):
if data is None:
continue
n_frames_to_insert = 1
if isinstance(data, str):
data = DataUrl(path=data)
if isinstance(data, numpy.ndarray) and data.ndim == 3:
n_frames_to_insert = data.shape[0]
elif isinstance(data, DataUrl):
with open_hdf5(data.file_path()) as h5s:
if data.data_path() not in h5s:
raise KeyError(
f"Path given ({data.data_path()}) is not in {data.file_path}"
)
data_node = get_data(data)
if data_node.ndim == 3:
n_frames_to_insert = data_node.shape[0]
else:
raise TypeError(f"{type(data)} as input is not managed")
if logger is not None:
logger.info(f"insert {type(data)} frame of type {key} at the {where}")
# update 'data' dataset
data_path = os.path.join(entry, "instrument", "detector", "data")
FrameAppender(
data, file_path, data_path=data_path, where=where, logger=logger
).process()
# update image-key and image_key_control (we are not managing the
# 'alignment projection here so values are identical')
ik_path = os.path.join(entry, "instrument", "detector", "image_key")
ikc_path = os.path.join(entry, "instrument", "detector", "image_key_control")
for path in (ik_path, ikc_path):
FrameAppender(
[key] * n_frames_to_insert,
file_path,
data_path=path,
where=where,
logger=logger,
).process()
# add 'other' necessaries key:
count_time_path = os.path.join(
entry,
"instrument",
"detector",
"count_time",
)
rotation_angle_path = os.path.join(entry, "sample", "rotation_angle")
x_translation_path = os.path.join(entry, "sample", "x_translation")
y_translation_path = os.path.join(entry, "sample", "y_translation")
z_translation_path = os.path.join(entry, "sample", "z_translation")
control_data_path = os.path.join(entry, "control", "data")
data_key_paths = (
count_time_path,
rotation_angle_path,
x_translation_path,
y_translation_path,
z_translation_path,
control_data_path,
)
mandatory_keys = (
"count_time",
"rotation_angle",
)
optional_keys = (
"x_translation",
"y_translation",
"z_translation",
"control/data",
)
data_keys = tuple(list(mandatory_keys) + list(optional_keys))
for data_key, data_key_path in zip(data_keys, data_key_paths):
data_to_insert = None
if d_n in extras and data_key in extras[d_n]:
provided_value = extras[d_n][data_key]
if isinstance(provided_value, Iterable):
if len(provided_value) != n_frames_to_insert:
raise ValueError(
"Given value to store from extras has"
f" incoherent length({len(provided_value)}) compare to "
f"the number of frame to save ({n_frames_to_insert})"
)
else:
data_to_insert = provided_value
else:
try:
data_to_insert = [provided_value] * n_frames_to_insert
except Exception as e:
logger.error(f"Fail to create data to insert. Error is {e}")
return
else:
# get default values
def get_default_value(location, where_):
with open_hdf5(file_path) as h5s:
if location not in h5s:
return None
existing_data = h5s[location]
if where_ == "start":
return existing_data[0]
else:
return existing_data[-1]
try:
default_value = get_default_value(data_key_path, where)
except Exception:
default_value = None
if default_value is None:
msg = f"Unable to define a default value for {data_key_path}. Location empty in {file_path}"
if data_key in mandatory_keys:
raise ValueError(msg)
elif logger:
logger.warning(msg)
continue
elif logger:
logger.debug(
f"No value(s) provided for {data_key_path}. Extract some default value ({default_value})."
)
data_to_insert = [default_value] * n_frames_to_insert
if data_to_insert is not None:
FrameAppender(
data_to_insert,
file_path,
data_path=data_key_path,
where=where,
logger=logger,
).process()
@deprecated(replacement="_FrameAppender", since_version="0.5.0")
def _insert_frame_data(data, file_path, data_path, where, logger=None):
"""
This function is used to insert some frame(s) (numpy 2D or 3D to an
existing dataset. Before the existing array or After.
:param data:
:param file_path:
:param data_path: If the path point to a virtual dataset them this one
will be updated but data should be a DataUrl. Of the
same shape. Else we will update the data_path by
extending the dataset.
:param where:
:raises TypeError: In the case the data type and existing data_path are
incompatible.
"""
fa = FrameAppender(
data=data, file_path=file_path, data_path=data_path, where=where, logger=logger
)
return fa.process()
[docs]def change_image_key_control(
file_path: str,
entry: str,
frames_indexes: typing.Union[slice, Iterable],
image_key_control_value: typing.Union[int, ImageKey],
logger=None,
):
"""
Will modify image_key and image_key_control values for the requested
frames.
:param str file_path: path the nexus file
:param str entry: name of the entry to modify
:param frames_indexes: index of the frame for which we want to modify
the image key
:type frames_indexes: Union[slice, Iterable]
:param image_key_control_value:
:type image_key_control_value: Union[int, ImageKey]
:param logging.Logger logger: logger
"""
if not isinstance(frames_indexes, (Iterable, slice)):
raise TypeError("`frame_indexes` should be an instance of Iterable slice")
if logger:
logger.info(
"Update frames {frames_indexes} to"
"{image_key_control_value} of {entry}@{file_path}"
"".format(
frames_indexes=frames_indexes,
image_key_control_value=image_key_control_value,
entry=entry,
file_path=file_path,
)
)
image_key_control_value = ImageKey.from_value(image_key_control_value)
with HDF5File(file_path, mode="a") as h5s:
node = h5s[entry]
image_keys_path = "/".join(("instrument", "detector", "image_key"))
image_keys = h5py_read_dataset(node[image_keys_path])
image_keys_control_path = "/".join(
("instrument", "detector", "image_key_control")
)
image_keys_control = h5py_read_dataset(node[image_keys_control_path])
# filter frame indexes
if isinstance(frames_indexes, slice):
step = frames_indexes.step
if step is None:
step = 1
stop = frames_indexes.stop
if stop in (None, -1):
stop = len(image_keys)
frames_indexes = list(range(frames_indexes.start, stop, step))
frames_indexes = list(
filter(lambda x: 0 <= x <= len(image_keys_control), frames_indexes)
)
# manage image_key_control
image_keys_control[frames_indexes] = image_key_control_value.value
node[image_keys_control_path][:] = image_keys_control
# manage image_key. In this case we should get rid of Alignment values
# and replace it by Projection values
image_key_value = image_key_control_value
if image_key_value is ImageKey.ALIGNMENT:
image_key_value = ImageKey.PROJECTION
image_keys[frames_indexes] = image_key_value.value
node[image_keys_path][:] = image_keys
[docs]def str_datetime_to_numpy_datetime64(
my_datetime: typing.Union[str, datetime]
) -> numpy.datetime64:
# numpy deprecates time zone awarness conversion to numpy.datetime64.
# so we remove the time zone info.
if isinstance(my_datetime, str):
datetime_as_datetime = datetime.fromisoformat(my_datetime)
elif isinstance(my_datetime, datetime):
datetime_as_datetime = my_datetime
else:
raise TypeError(
f"my_datetime is expected to be a str or an instance of datetime. Not {type(my_datetime)}"
)
datetime_as_utc_datetime = datetime_as_datetime.astimezone(None)
tz_free_datetime_as_datetime = datetime_as_utc_datetime.replace(tzinfo=None)
return numpy.datetime64(tz_free_datetime_as_datetime).astype("<M8[ms]")
[docs]def strip_extension(filename, logger=None):
if filename.endswith((".nx", ".h5")):
return filename[:-3]
elif filename.endswith(".hdf5"):
return filename[:-5]
else:
if logger is not None:
logger.warning(f"Unusual file name {filename} has no known postfix")
return filename