Source code for nxtomomill.app.split_nxfile
"""
Application to split a file containing several NXtomo entries into several files containing each a single NXtomo
.. code-block:: bash
usage: nxtomomill split-nxfile [-h] [--overwrite] [--duplicate-data [DUPLICATE_DATA]] input_file [output_file_pattern]
split a file containing several NXtomo (at root level) into multiple files containing each a single NXtomo
positional arguments:
input_file File containing NXtomo to be split into several files
output_file_pattern output file pattern. Must contain `{entry_name}` or `{index}` pattern to make sure it is unique
optional arguments:
-h, --help show this help message and exit
--overwrite Do not ask for user permission to overwrite output files
--duplicate-data [DUPLICATE_DATA]
Make all NXtomo free of any external link. As a result this will duplicate data
"""
import os
import argparse
import logging
import string
from nxtomo.application.nxtomo import NXtomo
from tomoscan.esrf.scan.nxtomoscan import NXtomoScan
from silx.io.utils import open as open_hdf5
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
[docs]def main(argv):
""" """
parser = argparse.ArgumentParser(
description="split a file containing several NXtomo (at root level) into multiple files containing each a single NXtomo"
)
parser.add_argument(
"input_file",
help="File containing NXtomo to be split into several files",
nargs="?",
)
parser.add_argument(
"output_file_pattern",
help="output file pattern. Must contain `{entry_name}` or `{index}` pattern to make sure it is unique",
default="{input_file_name}_{entry_name}.nx",
nargs="?",
)
parser.add_argument(
"--overwrite",
help="Do not ask for user permission to overwrite output files",
action="store_true",
default=False,
)
parser.add_argument(
"--duplicate-data",
help="Make all NXtomo free of any external link. As a result this will duplicate data",
nargs="?",
)
options = parser.parse_args(argv[1:])
output_file_pattern = options.output_file_pattern
if "{input_file_name}" in output_file_pattern:
output_file_pattern = output_file_pattern.format(
{
"output_file_pattern": os.path.splitext(options.input_file),
}
)
split(
input_file=options.input_file,
output_file_pattern=output_file_pattern,
overwrite=options.overwrite,
)
[docs]def split(
input_file: str,
output_file_pattern: str,
overwrite: bool = False,
duplicate_data: bool = False,
) -> tuple:
"""
:param str input_file: path to the file to be splitted
:param str output_file_pattern: pattern of the file to create. Must contain either `{entry_name}` or `{index}`
:param bool overwrite: can we overwrite output file if alread exists
:return: tuple of identifier of all the NXtomo generated
:rtype: tuple
"""
def get_output_file(index: int, entry_name: str, pattern: str) -> str:
"""
treat 'pattern' to create the expected output file
"""
keywords = {
"entry_name": entry_name,
"index": index,
}
# filter necessary keywords
def get_necessary_keywords():
formatter = string.Formatter()
return [field for _, field, _, _ in formatter.parse(pattern) if field]
requested_keywords = get_necessary_keywords()
def keyword_needed(pair):
keyword, _ = pair
return keyword in requested_keywords
keywords = dict(filter(keyword_needed, keywords.items()))
if len(keywords) == 0:
raise ValueError(
"pattern should at least contains keywords '{index}' or '{entry_name}' to be format. Else unable to create a unique file per NXtomo"
)
return os.path.abspath(pattern.format(**keywords))
if duplicate_data:
detector_data_as = "as_numpy_array"
else:
detector_data_as = "as_data_url"
result = []
with open_hdf5(input_file) as h5f:
for i_entry, entry in enumerate(h5f.keys()):
try:
nx_tomo = NXtomo("").load(
input_file, entry, detector_data_as=detector_data_as
)
except Exception as e:
_logger.error(
f"Fail to treat entry {entry}. Error is {e}. Is this a valid Nxtomo ?"
)
else:
output_file = get_output_file(
index=i_entry, entry_name=entry, pattern=output_file_pattern
)
dirname = os.path.dirname(output_file)
if dirname and not os.path.exists(dirname):
os.makedirs(dirname)
nx_tomo.save(
file_path=output_file, data_path=entry, overwrite=overwrite
)
result.append(NXtomoScan(output_file, entry))
return tuple(result)