# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2016-2017 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "10/10/2020"
import datetime
import os
import numpy
from typing import Optional
from tomoscan.io import HDF5File
[docs]class MockBlissAcquisition:
"""
:param n_sequence: number of sequence to create
:param n_scan_per_sequence: number of scans (projection serie) per sequence
:param n_projections_per_scan: number of projection frame in a scan
:param n_darks: number of dark frame in the serie. Only one serie at the
beginning
:param int n_flats: number of flats to create. In this case will only
create one serie of n flats after dark if any
:param str output_dir: will contain the proposal file and one folder per
sequence.
:param str acqui_type: acquisition type. Can be "basic", "zseries" or
"xrd-ct"
:param Iterable z_values: if acqui_type is zseries then users should
provide the serie of values for z (one per stage)
:param Optional[int] nb_loop: number of pcotomo loop for v1 of bliss pcotomo
:param Optional[int] nb_tomo: number of tomo per loop for v1 of bliss pcotomo
:param Optional[int] nb_turns: number of turns for v2 of bliss pcotomo ( <=> nb NXtomo to generate)
:param str file_name_prefix: bliss file prefix name
:param Optional[int] file_name_z_fill: optional z fill for the file name index. If None then file index will not be 'z filled'
:param bool create_tomo_config: if True create the 'tomo_config' group under instrument which contains
metadata describing the acquisition (which dataset to read for rotaiton, translation ...)
"""
[docs] def __init__(
self,
n_sample,
n_sequence,
n_scan_per_sequence,
n_darks,
n_flats,
output_dir,
with_nx_detector_attr=True,
detector_name="pcolinux",
acqui_type="basic",
z_values=None,
y_rot=False,
nb_loop=None,
nb_tomo=None,
nb_turns=None,
with_rotation_motor_info=True,
frame_data_type=numpy.uint16,
file_name_prefix="sample",
file_name_z_fill=None,
create_tomo_config: bool = True,
ebs_tomo_version: Optional[str] = None,
):
self._n_darks = n_darks
self._n_flats = n_flats
self._n_scan_per_sequence = n_scan_per_sequence
self.__folder = output_dir
if not os.path.exists(output_dir):
os.makedirs(output_dir)
self.__proposal_file = os.path.join(self.__folder, "ihproposal_file.h5")
if acqui_type not in ("pcotomo"):
if nb_loop is not None or nb_tomo is not None:
raise ValueError(
"nb_loop and nb_tomo are only handled by acqui_type: `pcotomo`"
)
else:
if not (
nb_turns is not None or (nb_loop is not None and nb_tomo is not None)
):
raise ValueError(
"nb_turns should be provided or nb_loop and nb_tomo must be provided"
)
# create sample
self.__samples = []
for sample_i in range(n_sample):
if file_name_z_fill is None:
dir_name = f"{file_name_prefix}_{sample_i}"
else:
dir_name = f"{file_name_prefix}_{str(sample_i).zfill(file_name_z_fill)}"
sample_dir = os.path.join(self.path, dir_name)
os.mkdir(sample_dir)
sample_file = os.path.join(sample_dir, dir_name + ".h5")
if acqui_type == "basic":
acqui_tomo = _BlissBasicTomo(
sample_dir=sample_dir,
sample_file=sample_file,
n_sequence=n_sequence,
n_scan_per_sequence=n_scan_per_sequence,
n_darks=n_darks,
n_flats=n_flats,
with_nx_detector_attr=with_nx_detector_attr,
detector_name=detector_name,
y_rot=y_rot,
with_rotation_motor_info=with_rotation_motor_info,
frame_data_type=frame_data_type,
create_tomo_config=create_tomo_config,
ebs_tomo_version=ebs_tomo_version,
)
elif acqui_type == "pcotomo":
acqui_tomo = _BlissPCOTomo(
sample_dir=sample_dir,
sample_file=sample_file,
n_sequence=n_sequence,
n_scan_per_sequence=n_scan_per_sequence,
n_darks=n_darks,
n_flats=n_flats,
with_nx_detector_attr=with_nx_detector_attr,
detector_name=detector_name,
y_rot=y_rot,
with_rotation_motor_info=with_rotation_motor_info,
nb_loop=nb_loop,
nb_tomo=nb_tomo,
nb_turns=nb_turns,
frame_data_type=frame_data_type,
create_tomo_config=create_tomo_config,
ebs_tomo_version=ebs_tomo_version,
)
elif acqui_type == "zseries":
if z_values is None:
raise ValueError("for zseries z_values should be provided")
acqui_tomo = _BlissZseriesTomo(
sample_dir=sample_dir,
sample_file=sample_file,
n_sequence=n_sequence,
n_scan_per_sequence=n_scan_per_sequence,
n_darks=n_darks,
n_flats=n_flats,
with_nx_detector_attr=with_nx_detector_attr,
detector_name=detector_name,
z_values=z_values,
y_rot=y_rot,
with_rotation_motor_info=with_rotation_motor_info,
frame_data_type=frame_data_type,
create_tomo_config=create_tomo_config,
ebs_tomo_version=ebs_tomo_version,
)
elif acqui_type == "xrd-ct":
acqui_tomo = _BlissXRD_CT(
sample_dir=sample_dir,
sample_file=sample_file,
n_sequence=n_sequence,
n_scan_per_sequence=n_scan_per_sequence,
n_darks=n_darks,
n_flats=n_flats,
with_nx_detector_attr=with_nx_detector_attr,
detector_name=detector_name,
y_rot=y_rot,
with_rotation_motor_info=with_rotation_motor_info,
frame_data_type=frame_data_type,
create_tomo_config=create_tomo_config,
ebs_tomo_version=ebs_tomo_version,
)
else:
raise NotImplementedError("")
self.__samples.append(acqui_tomo)
@property
def samples(self):
return self.__samples
@property
def proposal_file(self):
# for now a simple file
return self.__proposal_file
@property
def path(self):
return self.__folder
class _BlissSample:
"""
Simple mock of a bliss sample. For now we only create the hierarchy of
files.
"""
def __init__(
self,
sample_dir,
sample_file,
n_sequence,
n_scan_per_sequence,
n_darks,
n_flats,
detector_name,
with_nx_detector_attr=True,
y_rot=False,
with_rotation_motor_info=True,
frame_data_type=numpy.uint16,
create_tomo_config: bool = True,
ebs_tomo_version: Optional[str] = None,
):
self._with_nx_detector_attr = with_nx_detector_attr
self._sample_dir = sample_dir
self._sample_file = sample_file
self._n_sequence = n_sequence
self._n_scan_per_seq = n_scan_per_sequence
self._n_darks = n_darks
self._n_flats = n_flats
self._scan_folders = []
self._index = 1
self._detector_name = detector_name
self._det_width = 64
self._det_height = 64
self._tomo_n = 10
self._energy = 19.0
self._distance = 1.0
self._pixel_size = (0.0065, 0.0066)
self._y_rot = y_rot
self._with_rotation_motor_info = with_rotation_motor_info
self._frame_data_type = frame_data_type
self._create_tomo_config = create_tomo_config
self._ebs_tomo_version = ebs_tomo_version
for _ in range(n_sequence):
self.add_sequence()
@property
def frame_data_type(self):
return self._frame_data_type
def get_next_free_index(self):
idx = self._index
self._index += 1
return idx
def get_main_entry_title(self):
raise NotImplementedError("Base class")
@staticmethod
def get_title(scan_type):
if scan_type == "dark":
return "dark images"
elif scan_type == "flat":
return "reference images 1"
elif scan_type == "projection":
return "projections 1 - 2000"
else:
raise ValueError("Not implemented")
def create_entry_and_technique(self, seq_ini_index):
# add sequence init information
with HDF5File(self.sample_file, mode="a") as h5f:
seq_node = h5f.require_group(str(seq_ini_index) + ".1")
seq_node.attrs["NX_class"] = "NXentry"
seq_node["title"] = self.get_main_entry_title()
seq_node.require_group("instrument/positioners")
# write energy
seq_node["technique/scan/energy"] = self._energy
seq_node["technique/scan/tomo_n"] = self._tomo_n * self._n_scan_per_seq
seq_node["technique/scan/sample_detector_distance"] = self._distance
seq_node["technique/detector/pixel_size"] = numpy.asarray(self._pixel_size)
if self._y_rot:
seq_node["instrument/positioners/yrot"] = 0.13
seq_node["start_time"] = str(datetime.datetime.now())
seq_node["end_time"] = str(
datetime.datetime.now() + datetime.timedelta(minutes=10)
)
def register_scan_in_parent_seq(self, parent_index, scan_index):
with HDF5File(self.sample_file, mode="a") as h5f:
# write scan numbers
seq_node = h5f.require_group(str(parent_index) + ".1")
scan_number_node = seq_node.require_group("measurement/scan_numbers")
if "data" in scan_number_node:
res = list(scan_number_node["data"][()])
del scan_number_node["data"]
else:
res = []
res.append(scan_index)
scan_number_node["data"] = res
@staticmethod
def get_next_group_name(seq_ini_index, scan_idx):
return str(scan_idx) + ".1"
def add_scan(
self,
scan_type,
seq_ini_index,
z_value,
skip_title=False,
nb_loop=None,
nb_tomo=None,
nb_turns=1,
):
"""
:param int nb_loop: number of loop in pcotomo use case. Else must be 1
:param int nb_tomo: number of tomography done in pcotomo 'per iteration' use case. Else must be 1
"""
scan_idx = self.get_next_free_index()
scan_name = str(scan_idx).zfill(4)
scan_path = os.path.join(self.path, scan_name)
self._scan_folders.append(_BlissScan(folder=scan_path, scan_type=scan_type))
self.register_scan_in_parent_seq(
parent_index=seq_ini_index, scan_index=scan_idx
)
if nb_turns is not None:
nb_nxtomo = nb_turns
if nb_tomo is not None or nb_loop is not None:
raise ValueError(
"nb_tomo and nb_loop should be provided or nb_turns. Not both"
)
elif nb_loop is not None and nb_tomo is not None:
nb_nxtomo = nb_loop * nb_tomo
if nb_turns is not None:
raise ValueError(
"nb_tomo and nb_loop should be provided or nb_turns. Not both"
)
else:
raise ValueError(
"nb_tomo and nb_loop should be provided or nb_turns. None provided"
)
# register the scan information
with HDF5File(self.sample_file, mode="a") as h5f:
seq_node = h5f.require_group(str(scan_idx) + ".1")
# write title
title = self.get_title(scan_type=scan_type)
if not skip_title:
seq_node["title"] = title
# write data
data = (
numpy.random.random(
self._det_height * self._det_width * self._tomo_n * nb_nxtomo
)
* 256
)
n_frames = self._tomo_n * nb_nxtomo
data = data.reshape(n_frames, self._det_height, self._det_width)
data = data.astype(self.frame_data_type)
det_path_1 = "/".join(("instrument", self._detector_name))
det_grp = seq_node.require_group(det_path_1)
det_grp["data"] = data
if self._with_nx_detector_attr:
det_grp.attrs["NX_class"] = "NXdetector"
acq_grp = det_grp.require_group("acq_parameters")
acq_grp["acq_expo_time"] = 4
det_path_2 = "/".join(("technique", "scan", self._detector_name))
seq_node[det_path_2] = data
seq_node.attrs["NX_class"] = "NXentry"
# write rotation angle value and translations
instrument_group = seq_node.require_group("instrument")
positioners_grp = instrument_group.require_group("positioners")
positioners_grp["hrsrot"] = numpy.linspace(
start=0.0, stop=360, num=n_frames
)
positioners_grp["sx"] = numpy.array(numpy.random.random(size=n_frames))
positioners_grp["sy"] = numpy.random.random(size=n_frames)
positioners_grp["sz"] = numpy.asarray([z_value] * n_frames)
if self._with_rotation_motor_info:
scan_node = seq_node.require_group("technique/scan")
scan_node["motor"] = ("rotation", "hrsrot", "srot")
if self._create_tomo_config:
technique_group = seq_node.require_group("technique")
tomo_config_group = technique_group.require_group("tomo_config")
tomo_config_group["rotation"] = "hrsrot"
tomo_config_group["detector"] = self._detector_name
tomo_config_group["translation_x"] = "sx"
tomo_config_group["translation_y"] = "sy"
tomo_config_group["translation_z"] = "sz"
if self._ebs_tomo_version is not None:
technique_group = seq_node.require_group("technique")
technique_group.attrs["tomo_version"] = self._ebs_tomo_version
def add_sequence(self):
"""Add a sequence to the bliss file"""
raise NotImplementedError("Base class")
@property
def path(self):
return self._sample_dir
@property
def sample_directory(self):
return self._sample_dir
@property
def sample_file(self):
return self._sample_file
def scans_folders(self):
return self._scan_folders
@property
def n_darks(self):
return self._n_darks
@property
def with_rotation_motor_info(self):
return self._with_rotation_motor_info
class _BlissScan:
"""
mock of a bliss scan
"""
def __init__(self, folder, scan_type: str):
assert scan_type in ("dark", "flat", "projection")
self.__path = folder
def path(self):
return self.__path
class _BlissBasicTomo(_BlissSample):
def get_main_entry_title(self):
return "tomo:fullturn"
def add_sequence(self):
# reserve the index for the 'initialization' sequence. No scan folder
# will be created for this one.
seq_ini_index = self.get_next_free_index()
self.create_entry_and_technique(seq_ini_index=seq_ini_index)
if self.n_darks > 0:
self.add_scan(scan_type="dark", seq_ini_index=seq_ini_index, z_value=1)
if self._n_flats > 0:
self.add_scan(scan_type="flat", seq_ini_index=seq_ini_index, z_value=1)
for _ in range(self._n_scan_per_seq):
self.add_scan(
scan_type="projection", seq_ini_index=seq_ini_index, z_value=1
)
class _BlissPCOTomo(_BlissSample):
def __init__(
self,
sample_dir,
sample_file,
n_sequence,
n_scan_per_sequence,
n_darks,
n_flats,
detector_name,
with_nx_detector_attr=True,
y_rot=False,
with_rotation_motor_info=True,
nb_loop=None,
nb_tomo=None,
nb_turns=1,
frame_data_type=numpy.uint16,
create_tomo_config: bool = True,
ebs_tomo_version=None,
):
self.nb_loop = nb_loop
self.nb_tomo = nb_tomo
self.nb_turns = nb_turns
super().__init__(
sample_dir,
sample_file,
n_sequence,
n_scan_per_sequence,
n_darks,
n_flats,
detector_name,
with_nx_detector_attr,
y_rot,
with_rotation_motor_info=with_rotation_motor_info,
frame_data_type=frame_data_type,
create_tomo_config=create_tomo_config,
ebs_tomo_version=ebs_tomo_version,
)
if nb_loop is not None and nb_tomo is not None:
if nb_turns is not None:
raise ValueError(
"All of nb_loop, nb_tomo and nb_turns provided. Unable to deduce the pcotomo version"
)
pcotomo_version = 1
elif nb_turns is not None:
pcotomo_version = 2
else:
pcotomo_version = None
if pcotomo_version is not None:
# write Bliss version in attrs
with HDF5File(self.sample_file, mode="a") as h5f:
if "creator_version" not in h5f.attrs:
if pcotomo_version == 1:
h5f.attrs["creator_version"] = "1.2.3"
if pcotomo_version == 2:
h5f.attrs["creator_version"] = "1.10.0"
def get_main_entry_title(self):
return "tomo:pcotomo"
def add_sequence(self):
# reserve the index for the 'initialization' sequence. No scan folder
# will be created for this one.
seq_ini_index = self.get_next_free_index()
self.create_entry_and_technique(seq_ini_index=seq_ini_index)
# start dark
if self.n_darks > 0:
self.add_scan(scan_type="dark", seq_ini_index=seq_ini_index, z_value=1)
# start flat
if self._n_flats > 0:
self.add_scan(scan_type="flat", seq_ini_index=seq_ini_index, z_value=1)
for _ in range(self._n_scan_per_seq):
self.add_scan(
scan_type="projection",
seq_ini_index=seq_ini_index,
z_value=1,
nb_loop=self.nb_loop,
nb_tomo=self.nb_tomo,
nb_turns=self.nb_turns,
)
# end flat
if self._n_flats > 0:
self.add_scan(scan_type="flat", seq_ini_index=seq_ini_index, z_value=1)
def add_scan(
self,
scan_type,
seq_ini_index,
z_value,
skip_title=False,
nb_loop=None,
nb_tomo=None,
nb_turns=1,
):
super().add_scan(
scan_type, seq_ini_index, z_value, skip_title, nb_loop, nb_tomo, nb_turns
)
if scan_type == "projection":
# register pcotomo specific informations (only for projections)
with HDF5File(self.sample_file, mode="a") as h5f:
seq_node = h5f.require_group(str(self._index - 1) + ".1")
scan_grp = seq_node.require_group("technique/proj")
if nb_loop is not None and "nb_loop" not in scan_grp:
scan_grp["nb_loop"] = nb_loop
if nb_tomo is not None and "nb_tomo" not in scan_grp:
scan_grp["nb_tomo"] = nb_tomo
if nb_turns is not None and "nb_turns" not in scan_grp:
scan_grp["nb_turns"] = nb_turns
if "tomo_n" not in scan_grp:
scan_grp["tomo_n"] = self._tomo_n
class _BlissZseriesTomo(_BlissSample):
def __init__(
self,
sample_dir,
sample_file,
n_sequence,
n_scan_per_sequence,
n_darks,
n_flats,
detector_name,
z_values,
with_nx_detector_attr=True,
y_rot=False,
with_rotation_motor_info=True,
frame_data_type=numpy.uint16,
create_tomo_config: bool = True,
ebs_tomo_version=None,
):
self._z_values = z_values
super().__init__(
sample_dir=sample_dir,
sample_file=sample_file,
n_sequence=n_sequence,
n_scan_per_sequence=n_scan_per_sequence,
n_darks=n_darks,
n_flats=n_flats,
detector_name=detector_name,
with_nx_detector_attr=with_nx_detector_attr,
y_rot=y_rot,
with_rotation_motor_info=with_rotation_motor_info,
frame_data_type=frame_data_type,
create_tomo_config=create_tomo_config,
ebs_tomo_version=ebs_tomo_version,
)
def get_main_entry_title(self):
return "tomo:zseries"
def add_sequence(self):
# reserve the index for the 'initialization' sequence. No scan folder
# will be created for this one.
seq_ini_index = self.get_next_free_index()
self.create_entry_and_technique(seq_ini_index=seq_ini_index)
for z_value in self._z_values:
if self.n_darks > 0:
self.add_scan(
scan_type="dark", seq_ini_index=seq_ini_index, z_value=z_value
)
if self._n_flats > 0:
self.add_scan(
scan_type="flat", seq_ini_index=seq_ini_index, z_value=z_value
)
for i_proj_seq in range(self._n_scan_per_seq):
self.add_scan(
scan_type="projection", seq_ini_index=seq_ini_index, z_value=z_value
)
class _BlissXRD_CT(_BlissSample):
def __init__(
self,
sample_dir,
sample_file,
n_sequence,
n_scan_per_sequence,
n_darks,
n_flats,
detector_name,
with_nx_detector_attr=True,
y_rot=False,
with_rotation_motor_info=True,
frame_data_type=numpy.uint16,
create_tomo_config: bool = True,
ebs_tomo_version=None,
):
"""XRD-CT are scan with only projection. Projections are store in
the init group. Data can be store under 1.1 but also 1.2..."""
super().__init__(
sample_dir=sample_dir,
sample_file=sample_file,
n_sequence=n_sequence,
n_scan_per_sequence=n_scan_per_sequence,
n_darks=n_darks,
n_flats=n_flats,
detector_name=detector_name,
with_nx_detector_attr=with_nx_detector_attr,
y_rot=y_rot,
with_rotation_motor_info=with_rotation_motor_info,
frame_data_type=frame_data_type,
create_tomo_config=create_tomo_config,
ebs_tomo_version=ebs_tomo_version,
)
def get_main_entry_title(self):
return "aeroystepscan hrrz hry"
def add_sequence(self):
# reserve the index for the 'initialization' sequence. No scan folder
# will be created for this one.
seq_ini_index = self.get_next_free_index()
self.create_entry_and_technique(seq_ini_index=seq_ini_index)
def create_entry_and_technique(self, seq_ini_index):
super().create_entry_and_technique(seq_ini_index=seq_ini_index)
self._index = 1
self.add_scan(
scan_type="projection",
seq_ini_index=seq_ini_index,
z_value=1,
skip_title=True,
)
self.add_diode(
seq_ini_index=seq_ini_index,
)
def register_scan_in_parent_seq(self, parent_index, scan_index):
pass
def add_diode(self, seq_ini_index):
scan_idx = str(seq_ini_index) + ".2"
scan_name = str(scan_idx)
with HDF5File(self.sample_file, mode="a") as h5f:
scan_node = h5f.require_group(scan_name)
scan_node.attrs["NX_class"] = "NXentry"
measrurement_node = h5f.require_group("/".join((scan_name, "measurement")))
data = numpy.random.random(self._tomo_n) * 256.2
measrurement_node["fpico3"] = data
h5f.require_group(scan_name)["title"] = self.get_main_entry_title()
instrument_node = h5f.require_group("/".join((scan_name, "instrument")))
fpico3_node = instrument_node.require_group("fpico3")
fpico3_node.attrs["NX_class"] = "NXdetector"
fpico3_node["data"] = data
@staticmethod
def get_next_group_name(seq_ini_index, scan_idx):
return f"{seq_ini_index}.{scan_idx}"