# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2015-2020 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
"""
module to convert from edf to (nexus tomo compliant) .nx
"""
__authors__ = [
"H. Payno",
]
__license__ = "MIT"
__date__ = "27/11/2020"
import logging
import os
from collections import namedtuple
from typing import Optional
import fabio
import h5py
import numpy
from fabio.edfimage import EdfImage
from silx.io.dictdump import dicttoh5
from silx.utils.deprecation import deprecated
from tomoscan.esrf.scan.edfscan import EDFTomoScan
from tomoscan.esrf.scan.nxtomoscan import NXtomoScan
from tomoscan.esrf.scan.utils import get_parameters_frm_par_or_info
from tomoscan.io import HDF5File
from nxtomo.paths.nxtomo import get_paths as get_nexus_paths
from nxtomo.nxobject.nxdetector import FieldOfView, ImageKey
from nxtomo.nxobject.nxsource import SourceType
from pyunitsystem.electriccurrentsystem import ElectricCurrentSystem
from pyunitsystem.energysystem import EnergySI
from nxtomomill import utils
from nxtomomill.converter.version import LATEST_VERSION
from nxtomomill.converter.version import version as converter_version
from nxtomomill.io.config.edfconfig import TomoEDFConfig
from nxtomomill.io.utils import PathType
from nxtomomill.utils.nexus import create_nx_data_group, link_nxbeam_to_root
from nxtomomill.utils.progress import Progress
from nxtomomill.settings import Tomo
try:
read_header_block = EdfImage.read_header_block
except AttributeError:
read_header_block = EdfImage._read_header_block
EDF_MOTOR_POS = Tomo.EDF.MOTOR_POS
EDF_MOTOR_MNE = Tomo.EDF.MOTOR_MNE
EDF_REFS_NAMES = Tomo.EDF.REFS_NAMES
EDF_TO_IGNORE = Tomo.EDF.TO_IGNORE
EDF_ROT_ANGLE = Tomo.EDF.ROT_ANGLE
EDF_DARK_NAMES = Tomo.EDF.DARK_NAMES
EDF_X_TRANS = Tomo.EDF.X_TRANS
EDF_Y_TRANS = Tomo.EDF.Y_TRANS
EDF_Z_TRANS = Tomo.EDF.Z_TRANS
EDF_MACHINE_ELECTRIC_CURRENT = Tomo.EDF.MACHINE_ELECTRIC_CURRENT
_logger = logging.getLogger(__name__)
EDFFileKeys = namedtuple(
"EDFFileKeys",
[
"motor_pos_keys",
"motor_mne_keys",
"rot_angle_keys",
"x_trans_keys",
"y_trans_keys",
"z_trans_keys",
"to_ignore",
"dark_names",
"ref_names",
"machine_elec_current_keys",
],
)
DEFAULT_EDF_KEYS = EDFFileKeys(
EDF_MOTOR_POS,
EDF_MOTOR_MNE,
EDF_ROT_ANGLE,
EDF_X_TRANS,
EDF_Y_TRANS,
EDF_Z_TRANS,
EDF_TO_IGNORE,
EDF_DARK_NAMES,
EDF_REFS_NAMES,
EDF_MACHINE_ELECTRIC_CURRENT,
)
[docs]@deprecated(replacement="from_edf_to_nx", since_version="0.9.0")
def edf_to_nx(
scan: EDFTomoScan,
output_file: str,
file_extension: str,
file_keys: EDFFileKeys = DEFAULT_EDF_KEYS,
progress=None,
sample_name: Optional[str] = None,
title: Optional[str] = None,
instrument_name: Optional[str] = None,
source_name: Optional[str] = None,
source_type: Optional[SourceType] = None,
) -> tuple:
"""
Convert an edf file to a nexus file.
For now duplicate data.
:param scan:
:param output_file:
:param file_extension:
:param file_keys:
:param progress:
:param sample_name: name of the sample
:param title: dataset title
:param instrument_name: name of the instrument used
:param source_name: name of the source (most likely ESRF)
:param source_type: type of the source (most likely "Synchrotron X-ray Source")
:return: (nexus_file, entry)
"""
if not isinstance(scan, EDFTomoScan):
raise TypeError("scan is expected to be an instance of EDFTomoScan")
config = TomoEDFConfig()
config.input_folder = scan.path
config.dataset_basename = scan.dataset_basename
config.output_file = output_file
config.file_extension = file_extension
config.sample_name = sample_name
config.title = title
config.instrument_name = instrument_name
config.source_name = source_name
config.source_type = source_type
# handle file_keys
config.motor_position_keys = file_keys.motor_pos_keys
config.motor_mne_keys = file_keys.motor_mne_keys
config.rotation_angle_keys = file_keys.rot_angle_keys
config.x_trans_keys = file_keys.x_trans_keys
config.y_trans_keys = file_keys.y_trans_keys
config.z_trans_keys = file_keys.z_trans_keys
config.machine_electric_current_keys = file_keys.machine_elec_current_keys
config.ignore_file_patterns = file_keys.to_ignore
config.dark_names = file_keys.dark_names
config.flat_names = file_keys.ref_names
return from_edf_to_nx(config, progress=progress)
[docs]def from_edf_to_nx(configuration: TomoEDFConfig, progress=None) -> tuple:
"""
Convert an edf file to a nexus file.
For now duplicate data.
:param configuration: configuration to use to process the data
:param progress: if provided then will be updated with conversion progress
:return: (nexus_file, entry)
"""
if configuration.input_folder is None:
raise ValueError("input_folder should be provided")
if not os.path.isdir(configuration.input_folder):
raise OSError(f"{configuration.input_folder} is not a valid folder path")
if configuration.output_file is None:
raise ValueError("output_file should be provided")
# if we don't duplicate data then we can't delete sources EDF files
if not configuration.duplicate_data and configuration.delete_edf_source_files:
raise ValueError(
"You asked for avoiding data duplication and to delete edf source files. "
"Those two options are not compatible. Avoiding data duplication will "
"lead to create HDF5Virtual dataset pointing to the edf source files"
)
fileout_h5 = utils.get_file_name(
file_name=configuration.output_file,
extension=configuration.file_extension,
check=True,
)
with HDF5File(fileout_h5, "w") as h5d:
return __process(
configuration=configuration,
output_grp=h5d,
fileout_h5=fileout_h5,
progress=progress,
)
[docs]def post_processing_check(configuration: TomoEDFConfig):
"""
check that conversion made contains the same information as a folder and optionnaly delete edf files
"""
if configuration.input_folder is None:
raise ValueError("input_folder should be provided")
if not os.path.isdir(configuration.input_folder):
raise OSError(f"{configuration.input_folder} is not a valid folder path")
if configuration.output_file is None:
raise ValueError("output_file should be provided")
if (
configuration.delete_edf_source_files is True
and len(configuration.output_checks) == 0
):
raise ValueError(
"requested to remove edf files without righting an NXtomo and not doing any check on an existing NXtomo. This is a non sense."
)
# if we don't duplicate data then we can't delete sources EDF files
if not configuration.duplicate_data and configuration.delete_edf_source_files:
raise ValueError(
"You asked for avoiding data duplication and to delete edf source files. "
"Those two options are not compatible. Avoiding data duplication will "
"lead to create HDF5Virtual dataset pointing to the edf source files"
)
fileout_h5 = utils.get_file_name(
file_name=configuration.output_file,
extension=configuration.file_extension,
check=True,
)
return __process(
configuration=configuration,
fileout_h5=fileout_h5,
output_grp=None,
progress=None,
)
def __process(
configuration: TomoEDFConfig,
output_grp: Optional[h5py.Group],
fileout_h5: str,
progress,
):
"""
To making sure the same processing is done for checking the reconstructed edf and creating it we melt the two
there. Not very nice but the simpler for making evolve some legacy code...
"""
if progress and hasattr(progress, "set_name"):
progress.set_name(
"preprocessing - retrieve all metadata (can take a few seconds - cannot display real advancement)"
)
progress.reset()
if hasattr(progress, "progress"):
progress.progress = 50.0
else:
progress = 50.0
if configuration.dataset_info_file is not None:
if not os.path.isfile(configuration.dataset_info_file):
raise ValueError(f"{configuration.dataset_info_file} is not a file")
else:
scan_info = get_parameters_frm_par_or_info(configuration.dataset_info_file)
else:
scan_info = None
scan = EDFTomoScan(
scan=configuration.input_folder,
dataset_basename=configuration.dataset_basename,
scan_info=scan_info,
# TODO: add n frames ?
)
_logger.info(f"Output file will be {fileout_h5}")
default_current = scan.retrieve_information(
scan=scan.path,
dataset_basename=scan.dataset_basename,
ref_file=None,
key="SrCurrent",
key_aliases=["SRCUR", "machineCurrentStart"],
type_=float,
scan_info=scan.scan_info,
)
if default_current is None:
_logger.warning("Unable to find machine current default current. Take 0.0")
default_current = 0.0
output_data_path = "entry"
# with today design we can simply remove the files of the url. We don't expect those files
# to contain other type of data
edf_source_files = dict()
# for each file register a tuple (existing_frame, set(url)) so at the end we can make sure all the frame have been used before
# removing the file
detector_data_urls = []
# store the url in the order they are used and if it has been modified or not (for output checks).
def update_edf_source_files(url, n_frames):
if configuration.delete_edf_source_files:
# edf_source files is only used for deleting edf files
edf_file_path = url.file_path()
file_source_info = edf_source_files.get(edf_file_path, (n_frames, set()))
urls_used = file_source_info[1]
urls_used.add(url.path())
edf_source_files[edf_file_path] = (
file_source_info[0],
urls_used,
)
DARK_ACCUM_FACT = True
metadata = []
proj_urls = scan.get_proj_urls(
scan=scan.path, dataset_basename=scan.dataset_basename
)
for dark_to_find in configuration.dark_names:
dk_urls = scan.get_darks_url(scan_path=scan.path, prefix=dark_to_find)
if len(dk_urls) > 0:
if dark_to_find == "dark":
DARK_ACCUM_FACT = False
break
if configuration.ignore_file_patterns is None:
_edf_to_ignore = list()
else:
_edf_to_ignore = list(configuration.ignore_file_patterns)
for refs_to_find in configuration.flat_names:
if refs_to_find == "ref":
_edf_to_ignore.append("HST")
else:
_edf_to_ignore.remove("HST")
refs_urls = scan.get_flats_url(
scan_path=scan.path,
prefix=refs_to_find,
ignore=_edf_to_ignore,
dataset_basename=scan.dataset_basename,
)
if len(refs_urls) > 0:
break
n_frames = len(proj_urls) + len(refs_urls) + len(dk_urls)
n_darks = len(dk_urls)
(
frame_type,
rot_angle_index,
x_trans_index,
y_trans_index,
z_trans_index,
srcur_index,
) = _getExtraInfo(scan=scan, configuration=configuration)
if rot_angle_index == -1 and configuration.force_angle_calculation is False:
_logger.warning(
f"Unable to find one of the defined key for rotation in header ({configuration.rotation_angle_keys}). Will force angle calculation"
)
configuration.force_angle_calculation = True
if not output_grp:
# in the case we don't intend to write the NXtomo but only check it
output_grp = None
ext_datasets_grp = None
dark_dataset = None
data_dataset = None
keys_dataset = None
keys_control_dataset = None
x_dataset = y_dataset = z_dataset = None
rotation_dataset = None
machine_electric_current_dataset = None
else:
if configuration.duplicate_data is True:
data_dataset = output_grp.create_dataset(
"/entry/instrument/detector/data",
shape=(n_frames, scan.dim_2, scan.dim_1),
dtype=frame_type,
)
ext_datasets_grp = None
dark_dataset = None
else:
_logger.warning(
"No data duplication requested. Will fail if your dataset contains compressed data"
)
# if necessary create external datasets groups
# note: for now we are force to create one dataset per frame because
# the byte order can be modified from one frame to the other.
ext_datasets_grp = output_grp.create_group("external_datasets")
data_dataset = None
# we must duplicate darks not matter what because exposure time is not handled
# at nabu side for the moment
dark_dataset = ext_datasets_grp.create_dataset(
"darks",
shape=(n_darks, scan.dim_2, scan.dim_1),
dtype=frame_type,
)
keys_dataset = output_grp.create_dataset(
"/entry/instrument/detector/image_key", shape=(n_frames,), dtype=numpy.int32
)
keys_control_dataset = output_grp.create_dataset(
"/entry/instrument/detector/image_key_control",
shape=(n_frames,),
dtype=numpy.int32,
)
title = configuration.title
if title is None:
title = os.path.basename(scan.path)
output_grp["/entry/title"] = title
sample_name = configuration.sample_name
if configuration.sample_name is None:
# try to deduce sample name from scan path.
try:
sample_name = os.path.abspath(scan.path).split(os.sep)[-3:]
sample_name = os.sep.join(sample_name)
except Exception:
sample_name = "unknow"
output_grp["/entry/sample/name"] = sample_name
if configuration.instrument_name is not None:
instrument_grp = output_grp["/entry"].require_group("instrument")
instrument_grp["name"] = configuration.instrument_name
if configuration.source_name is not None:
source_grp = output_grp["/entry/instrument"].require_group("source")
source_grp["name"] = configuration.source_name
if configuration.source_type is not None:
source_grp = output_grp["/entry/instrument"].require_group("source")
source_grp["type"] = configuration.source_type.value
if configuration.source_probe is not None:
source_grp = output_grp["/entry/instrument"].require_group("source")
source_grp["probe"] = configuration.source_probe.value
if scan.scan_range is None or scan.tomo_n is None:
raise ValueError(
f"Cannot find scan_range ({scan.scan_range}) and / or tomo_n ({scan.tomo_n}). Is the .info file here and / or valid ?"
)
distance = scan.retrieve_information(
scan=os.path.abspath(scan.path),
dataset_basename=scan.dataset_basename,
ref_file=None,
key="Distance",
type_=float,
key_aliases=["distance"],
scan_info=scan.scan_info,
)
if distance is not None:
output_grp["/entry/instrument/detector/distance"] = (
distance * configuration.distance_unit.value
)
output_grp["/entry/instrument/detector/distance"].attrs["units"] = "m"
pixel_size = scan.retrieve_information(
scan=os.path.abspath(scan.path),
dataset_basename=scan.dataset_basename,
ref_file=None,
key="PixelSize",
type_=float,
key_aliases=["pixelSize"],
scan_info=scan.scan_info,
)
output_grp["/entry/instrument/detector/x_pixel_size"] = (
pixel_size * configuration.pixel_size_unit.value
)
output_grp["/entry/instrument/detector/x_pixel_size"].attrs["units"] = "m"
output_grp["/entry/instrument/detector/y_pixel_size"] = (
pixel_size * configuration.pixel_size_unit.value
)
output_grp["/entry/instrument/detector/y_pixel_size"].attrs["units"] = "m"
energy = scan.retrieve_information(
scan=os.path.abspath(scan.path),
dataset_basename=scan.dataset_basename,
ref_file=None,
key="Energy",
type_=float,
key_aliases=["energy"],
scan_info=scan.scan_info,
)
if energy is not None:
if configuration.energy_unit != EnergySI.KILOELECTRONVOLT:
energy = energy * configuration.energy_unit / EnergySI.KILOELECTRONVOLT
output_grp["/entry/instrument/beam/incident_energy"] = energy
output_grp["/entry/instrument/beam/incident_energy"].attrs["units"] = "keV"
# rotations values
rotation_dataset = output_grp.create_dataset(
"/entry/sample/rotation_angle", shape=(n_frames,), dtype=numpy.float32
)
output_grp["/entry/sample/rotation_angle"].attrs["units"] = "degree"
# machine electric current
nexus_paths = get_nexus_paths(LATEST_VERSION)
machine_elec_current_path = "/".join(
["entry", nexus_paths.ELECTRIC_CURRENT_PATH]
)
machine_electric_current_dataset = output_grp.create_dataset(
machine_elec_current_path,
shape=(n_frames,),
dtype=numpy.float32,
)
output_grp[machine_elec_current_path].attrs["units"] = str(
ElectricCurrentSystem.AMPERE
)
# provision for centering motors
x_dataset = output_grp.create_dataset(
"/entry/sample/x_translation", shape=(n_frames,), dtype=numpy.float32
)
output_grp["/entry/sample/x_translation"].attrs["units"] = "m"
y_dataset = output_grp.create_dataset(
"/entry/sample/y_translation", shape=(n_frames,), dtype=numpy.float32
)
output_grp["/entry/sample/y_translation"].attrs["units"] = "m"
z_dataset = output_grp.create_dataset(
"/entry/sample/z_translation", shape=(n_frames,), dtype=numpy.float32
)
output_grp["/entry/sample/z_translation"].attrs["units"] = "m"
# ---------> and now fill all datasets!
nf = 0
external_datasets = [] # collect all urls created
progress_v = 0
if progress is not None:
if hasattr(progress, "set_name"):
progress.set_name("write dark")
if hasattr(progress, "reset"):
progress.reset()
else:
progress = 0
def ignore(file_name):
for forbid in _edf_to_ignore:
if forbid in file_name:
return True
return False
# darks
# dark in acumulation mode?
norm_dark = 1.0
if scan.dark_n > 0 and DARK_ACCUM_FACT is True:
norm_dark = len(dk_urls) / scan.dark_n
dk_indexes = sorted(dk_urls.keys())
if progress is not None:
if hasattr(progress, "reset"):
progress.reset()
else:
progress = 0
for dk_index in dk_indexes:
dk_url = dk_urls[dk_index]
if ignore(os.path.basename(dk_url.file_path())):
_logger.info("ignore " + dk_url.file_path())
continue
data, header, external_dataset, n_frames_in_file = _read_url(
url=dk_url,
i_frame=dk_index,
h5group_to_dump=None, # never create external dataset for dark
frame_prefix="dark",
configuration=configuration,
)
assert header is not None
metadata.append(header)
update_edf_source_files(url=dk_url, n_frames=n_frames_in_file)
detector_data_urls.append((dk_url, True))
if output_grp:
if configuration.duplicate_data:
data_dataset[nf, :, :] = data * norm_dark
else:
dark_dataset[nf, :, :] = data * norm_dark
keys_dataset[nf] = ImageKey.DARK_FIELD.value
keys_control_dataset[nf] = ImageKey.DARK_FIELD.value
motor_pos_key = _get_valid_key(header, configuration.motor_position_keys)
if motor_pos_key:
str_mot_val = header[motor_pos_key].split(" ")
if rot_angle_index == -1 or configuration.force_angle_calculation:
rotation_dataset[nf] = 0.0
else:
rotation_dataset[nf] = float(str_mot_val[rot_angle_index])
if x_trans_index == -1:
x_dataset[nf] = 0.0
else:
x_dataset[nf] = (
float(str_mot_val[x_trans_index])
* configuration.x_trans_unit.value
)
if y_trans_index == -1:
y_dataset[nf] = 0.0
else:
y_dataset[nf] = (
float(str_mot_val[y_trans_index])
* configuration.y_trans_unit.value
)
if z_trans_index == -1:
z_dataset[nf] = 0.0
else:
z_dataset[nf] = (
float(str_mot_val[z_trans_index])
* configuration.z_trans_unit.value
)
if srcur_index == -1:
machine_electric_current_dataset[nf] = (
default_current * configuration.machine_elec_current_unit.value
)
else:
try:
str_counter_val = header.get("counter_pos", "").split(" ")
machine_electric_current_dataset[nf] = (
float(str_counter_val[srcur_index])
* configuration.machine_elec_current_unit.value
)
except IndexError:
machine_electric_current_dataset[nf] = (
default_current * configuration.machine_elec_current_unit.value
)
nf += 1
if progress is not None:
progress_v += 1 / (n_frames - 1)
if hasattr(progress, "progress"):
progress.progress = progress_v * 100.0
else:
progress = progress_v * 100.0
ref_indexes = sorted(refs_urls.keys())
ref_projs = []
for irf in ref_indexes:
pjnum = int(irf)
if pjnum not in ref_projs:
ref_projs.append(pjnum)
# refs
def store_refs(
refIndexes,
projnum,
refUrls,
nF,
dataDataset,
keysDataset,
keysCDataset,
xDataset,
yDataset,
zDataset,
rotationDataset,
raix,
xtix,
ytix,
ztix,
):
nfr = nF
for ref_index in refIndexes:
int_rf = int(ref_index)
if int_rf == projnum:
refUrl = refUrls[ref_index]
if ignore(os.path.basename(refUrl.file_path())):
_logger.info("ignore " + refUrl.file_path())
continue
data, header, external_data, n_frames_in_file = _read_url(
url=refUrl,
i_frame=ref_index,
h5group_to_dump=ext_datasets_grp,
frame_prefix="flat",
configuration=configuration,
)
update_edf_source_files(url=refUrl, n_frames=n_frames_in_file)
if external_data is not None:
external_datasets.append(external_data)
metadata.append(header)
detector_data_urls.append((refUrl, False))
if output_grp is None:
continue
if configuration.duplicate_data and dataDataset:
dataDataset[nfr, :, :] = data
keysDataset[nfr] = ImageKey.FLAT_FIELD.value
keysCDataset[nfr] = ImageKey.FLAT_FIELD.value
motor_pos_key = _get_valid_key(
header, configuration.motor_position_keys
)
if motor_pos_key in header:
str_mot_val = header[motor_pos_key].split(" ")
if raix == -1 or configuration.force_angle_calculation:
rotationDataset[nfr] = 0.0
else:
rotationDataset[nfr] = float(str_mot_val[raix])
if xtix == -1:
xDataset[nfr] = 0.0
else:
xDataset[nfr] = (
float(str_mot_val[xtix]) * configuration.x_trans_unit.value
)
if ytix == -1:
yDataset[nfr] = 0.0
else:
yDataset[nfr] = (
float(str_mot_val[ytix]) * configuration.y_trans_unit.value
)
if ztix == -1:
zDataset[nfr] = 0.0
else:
zDataset[nfr] = (
float(str_mot_val[ztix]) * configuration.z_trans_unit.value
)
if srcur_index == -1:
machine_electric_current_dataset[nfr] = (
default_current * configuration.machine_elec_current_unit.value
)
else:
str_counter_val = header.get("counter_pos", "").split(" ")
try:
machine_electric_current_dataset[nfr] = (
float(str_counter_val[srcur_index])
* configuration.machine_elec_current_unit.value
)
except IndexError:
machine_electric_current_dataset[nfr] = (
default_current
* configuration.machine_elec_current_unit.value
)
nfr += 1
return nfr
# projections
proj_indexes = sorted(proj_urls.keys())
if progress is not None:
if hasattr(progress, "set_name"):
progress.set_name("write projections and flats")
nproj = 0
iref_pj = 0
if configuration.force_angle_calculation:
if configuration.angle_calculation_rev_neg_scan_range and scan.scan_range < 0:
proj_angles = numpy.linspace(
0,
scan.scan_range,
scan.tomo_n,
endpoint=configuration.force_angle_calculation_endpoint,
)
else:
proj_angles = numpy.linspace(
min(0, scan.scan_range),
max(0, scan.scan_range),
scan.tomo_n,
endpoint=configuration.force_angle_calculation_endpoint,
)
revert_angles_in_nx = (
configuration.angle_calculation_rev_neg_scan_range
and (scan.scan_range < 0)
)
if revert_angles_in_nx:
proj_angles = proj_angles[::-1]
alignment_indices = []
for i_proj, proj_index in enumerate(proj_indexes):
proj_url = proj_urls[proj_index]
if ignore(os.path.basename(proj_url.file_path())):
_logger.info("ignore " + proj_url.file_path())
continue
# store refs if the ref serial number is = projection number
if iref_pj < len(ref_projs) and ref_projs[iref_pj] == nproj:
nf = store_refs(
ref_indexes,
ref_projs[iref_pj],
refs_urls,
nf,
data_dataset,
keys_dataset,
keys_control_dataset,
x_dataset,
y_dataset,
z_dataset,
rotation_dataset,
rot_angle_index,
x_trans_index,
y_trans_index,
z_trans_index,
)
iref_pj += 1
data, header, external_data, n_frames_in_file = _read_url(
proj_url,
i_frame=proj_index,
h5group_to_dump=ext_datasets_grp,
frame_prefix="proj",
configuration=configuration,
)
update_edf_source_files(url=proj_url, n_frames=n_frames_in_file)
detector_data_urls.append((proj_url, False))
if output_grp:
if external_data is not None:
external_datasets.append(external_data)
metadata.append(header)
if configuration.duplicate_data:
data_dataset[nf, :, :] = data
keys_dataset[nf] = ImageKey.PROJECTION.value
keys_control_dataset[nf] = ImageKey.PROJECTION.value
if nproj >= scan.tomo_n:
keys_control_dataset[nf] = ImageKey.ALIGNMENT.value
motor_pos_key = _get_valid_key(header, configuration.motor_position_keys)
if configuration.force_angle_calculation:
if i_proj < len(proj_angles):
rotation_dataset[nf] = proj_angles[i_proj]
else:
# case of return / control projection
rotation_dataset[nf] = 0.0
if motor_pos_key in header:
str_mot_val = header[motor_pos_key].split(" ")
# continuous scan - rot angle is unknown. Compute it
if not configuration.force_angle_calculation:
rotation_dataset[nf] = float(str_mot_val[rot_angle_index])
if x_trans_index == -1:
x_dataset[nf] = 0.0
else:
x_dataset[nf] = (
float(str_mot_val[x_trans_index])
* configuration.x_trans_unit.value
)
if y_trans_index == -1:
y_dataset[nf] = 0.0
else:
y_dataset[nf] = (
float(str_mot_val[y_trans_index])
* configuration.y_trans_unit.value
)
if z_trans_index == -1:
z_dataset[nf] = 0.0
else:
z_dataset[nf] = (
float(str_mot_val[z_trans_index])
* configuration.z_trans_unit.value
)
if srcur_index == -1:
machine_electric_current_dataset[nf] = (
default_current * configuration.machine_elec_current_unit.value
)
else:
try:
str_counter_val = header.get("counter_pos", "").split(" ")
machine_electric_current_dataset[nf] = (
float(str_counter_val[srcur_index])
* configuration.machine_elec_current_unit.value
)
except IndexError:
machine_electric_current_dataset[nf] = (
default_current * configuration.machine_elec_current_unit.value
)
nf += 1
nproj += 1
if progress is not None:
progress_v += 1 / (n_frames - 1)
if hasattr(progress, "progress"):
progress.progress = progress_v * 100.0
else:
progress = 100.0
# we need to update alignement angles values. I wanted to avoid to redo all the previous existing processing.
n_alignment_angles = len(alignment_indices)
if n_alignment_angles == 3:
alignments_angles = numpy.linspace(
scan.scan_range,
0,
n_alignment_angles,
endpoint=(n_alignment_angles % 2) == 0,
)
for index, angle in zip(alignment_indices, alignments_angles):
rotation_dataset[index] = angle
# store last flat if any remaining in the list
if iref_pj < len(ref_projs) and output_grp:
nf = store_refs(
ref_indexes,
ref_projs[iref_pj],
refs_urls,
nf,
data_dataset,
keys_dataset,
keys_control_dataset,
x_dataset,
y_dataset,
z_dataset,
rotation_dataset,
rot_angle_index,
x_trans_index,
y_trans_index,
z_trans_index,
)
if output_grp:
# if we avoided data duplication: create the virtual dataset on top of the external datasets
if not configuration.duplicate_data:
virtual_layout = h5py.VirtualLayout(
shape=(n_frames, scan.dim_2, scan.dim_1),
dtype=frame_type,
)
virtual_layout[0:n_darks] = h5py.VirtualSource(dark_dataset)
for i_ext, external_dataset in enumerate(external_datasets):
assert isinstance(external_dataset, h5py.Dataset)
virtual_layout[i_ext + n_darks] = h5py.VirtualSource(external_dataset)
output_grp.create_virtual_dataset(
"/entry/instrument/detector/data",
virtual_layout,
)
# we can add some more NeXus look and feel
output_grp["/entry"].attrs["NX_class"] = "NXentry"
output_grp["/entry"].attrs["definition"] = "NXtomo"
output_grp["/entry"].attrs["version"] = converter_version()
output_grp["/entry/instrument"].attrs["NX_class"] = "NXinstrument"
output_grp["/entry/instrument/detector"].attrs["NX_class"] = "NXdetector"
output_grp["/entry/instrument/detector/data"].attrs["interpretation"] = "image"
if configuration.field_of_view is not None:
field_of_view = configuration.field_of_view
elif abs(scan.scan_range) == 180:
field_of_view = "Full"
elif abs(scan.scan_range) == 360:
field_of_view = "Half"
if field_of_view is not None:
field_of_view = FieldOfView.from_value(field_of_view)
output_grp["/entry/instrument/detector/field_of_view"] = field_of_view.value
output_grp["/entry/sample"].attrs["NX_class"] = "NXsample"
output_grp["/entry/definition"] = "NXtomo"
source_grp = output_grp["/entry/instrument"].get("source", None)
if source_grp is not None and "NX_class" not in source_grp.attrs:
source_grp.attrs["NX_class"] = "NXsource"
output_grp.flush()
for i_meta, meta in enumerate(metadata):
# save metadata
dicttoh5(
meta,
fileout_h5,
h5path=f"/entry/instrument/positioners/{i_meta}",
update_mode="replace",
mode="a",
)
try:
create_nx_data_group(
file_path=fileout_h5,
entry_path=output_data_path,
axis_scale=["linear", "linear"],
)
except Exception as e:
_logger.error(f"Fail to create NXdata group. Reason is {e}")
# create beam group at root for compatibility
try:
link_nxbeam_to_root(file_path=fileout_h5, entry_path=output_data_path)
except Exception:
pass
if progress is not None and isinstance(progress, Progress):
print("\nconversion finished\n")
if output_grp is not None:
# if the output group is provided then close it so we can safely read it back
# it should be done outside this function but this is legacy code that won't evolve
output_grp.close()
# do some check on the conversion
issues_discovered = []
for output_check in configuration.output_checks:
print("\nstart output checks\n")
from nxtomomill.converter.edf import checks as _checks_utils
output_check = _checks_utils.OUPUT_CHECK.from_value(output_check)
if output_check is _checks_utils.OUPUT_CHECK.COMPARE_VOLUME:
issues = _checks_utils.compare_volumes(
edf_volume_as_urls=detector_data_urls,
hdf5_scan=NXtomoScan(fileout_h5, output_data_path),
)
else:
raise ValueError(f"{output_check} is not handled")
issues_discovered.extend(issues)
if len(issues_discovered) > 0:
raise ValueError("Seems the conversion failed. Detected issues are {errs}")
elif len(configuration.output_checks) > 0:
print("\noutput checks done, no issues detected\n")
if configuration.delete_edf_source_files:
print("start edf source files removal")
assert (
configuration.duplicate_data
), "if data is not duplicated this make no sense to remove the edf files"
for file_path, (n_frames, used_urls) in edf_source_files.items():
if n_frames > len(used_urls):
_logger.warning(
f"Will not delete {file_path}. Only {len(used_urls)} used on the {n_frames} contained"
)
else:
try:
os.remove(file_path)
except OSError as e:
_logger.error(f"Issue when removing {file_path}. Error is {e}")
return fileout_h5, output_data_path
def _get_valid_key(header: dict, keys: tuple) -> Optional[str]:
"""Return the first existing key in header"""
for key in keys:
if key in header:
return key
else:
return None
def _get_valid_key_index(motors: list, keys: tuple) -> Optional[int]:
for key in keys:
if key in motors:
return motors.index(key)
else:
return -1
def _read_url(
url,
h5group_to_dump: h5py.Group,
i_frame: int,
frame_prefix: str,
configuration: TomoEDFConfig,
) -> tuple:
"""
if h5group_to_dump is provided then it will create a dataset with external data to
the EDF file under the name `frame_{i}`
This way we will be able to create a virtual
"""
data_slice = url.data_slice()
if data_slice is None:
data_slice = (0,)
if data_slice is None or len(data_slice) != 1:
raise ValueError(f"Fabio slice expect a single frame but {data_slice} found")
index = data_slice[0]
if not isinstance(index, int):
raise ValueError(f"Fabio slice expect a single integer but {data_slice} found")
try:
fabio_file = fabio.open(url.file_path())
except Exception:
_logger.debug(
f"Error while opening {url.file_path()} with fabio", exc_info=True
)
raise IOError(
f"Error while opening {url.path()} with fabio (use debug for more information)"
)
try:
n_frames_in_file = fabio_file.nframes
if n_frames_in_file == 1:
if index != 0:
raise ValueError(
f"Only a single frame available. Slice {index} out of range"
)
it = fabio.edfimage.EdfImage.lazy_iterator(url.file_path())
frame = next(it)
else:
frame = fabio_file.getframe(index)
data = frame.data
header = frame.header
frame_start_offset = frame.start
blobsize = frame.blobsize
byte_order = get_byte_order(frame.header)
except Exception as e:
_logger.error(e)
data = None
header = None
frame_start_offset = None
blobsize = None
byte_order = None
finally:
fabio_file.close()
fabio_file = None
if data is not None and byte_order is None:
raise ValueError("Unable to get ByteOrder for file_path")
if h5group_to_dump is not None and data is not None:
frame_start = frame_start_offset + blobsize - (data.size * data.dtype.itemsize)
frame_end = frame_start + blobsize
dataset_name = f"{frame_prefix}_{i_frame}"
data_type = numpy.dtype(byte_order + data.dtype.char)
if configuration.external_path_type is PathType.ABSOLUTE:
file_path = os.path.abspath(url.file_path())
elif configuration.external_path_type is PathType.RELATIVE:
file_path = os.path.abspath(url.file_path())
file_path = os.path.relpath(
os.path.abspath(file_path),
os.path.abspath(os.path.dirname(configuration.output_file)),
)
file_path = "./" + file_path
else:
raise NotImplementedError
external_dataset = h5group_to_dump.create_dataset(
name=dataset_name,
shape=data.shape,
dtype=data_type,
external=[
(file_path, frame_start, frame_end),
],
)
else:
external_dataset = None
return data, header, external_dataset, n_frames_in_file
def _getExtraInfo(scan, configuration):
assert isinstance(scan, EDFTomoScan)
projections_urls = scan.projections
if len(projections_urls) == 0:
raise ValueError(
f"No projections found in {scan.path} with dataset basename: {configuration.dataset_basename if configuration.dataset_basename is not None else 'Default'} and dataset info file: {configuration.dataset_info_file if configuration.dataset_info_file is not None else 'Default'}. "
)
indexes = sorted(projections_urls.keys())
first_proj_file = projections_urls[indexes[0]]
fid = fabio.open(first_proj_file.file_path())
rotangle_index = -1
xtrans_index = -1
ytrans_index = -1
ztrans_index = -1
srcur_index = -1
frame_type = None
try:
if hasattr(fid, "header"):
hd = fid.header
else:
hd = fid.getHeader()
motor_mne_key = _get_valid_key(hd, configuration.motor_mne_keys)
motors = hd.get(motor_mne_key, "").split(" ")
counters = hd.get("counter_mne", "").split(" ")
rotangle_index = _get_valid_key_index(motors, configuration.rotation_angle_keys)
xtrans_index = _get_valid_key_index(motors, configuration.x_trans_keys)
ytrans_index = _get_valid_key_index(motors, configuration.y_trans_keys)
ztrans_index = _get_valid_key_index(motors, configuration.z_trans_keys)
srcur_index = _get_valid_key_index(
counters, configuration.machine_electric_current_keys
)
if hasattr(fid, "bytecode"):
frame_type = fid.bytecode
else:
frame_type = fid.getByteCode()
finally:
fid.close()
fid = None
return (
frame_type,
rotangle_index,
xtrans_index,
ytrans_index,
ztrans_index,
srcur_index,
)
[docs]def get_byte_order(header):
"""
byte_order (as a str compatible with numpy data types)
"""
byte_order = header.get("ByteOrder", None)
if byte_order is None:
pass
elif byte_order.lower() == "highbytefirst":
byte_order = ">"
elif byte_order.lower() == "lowbytefirst":
byte_order = "<"
return byte_order