from tomoscan.io import HDF5File
from .. import version
from ..utils import list_match_queries
from ..pipeline.config import parse_nabu_config_file
from ..pipeline.config_validators import convert_to_int
from .cli_configs import ReconstructConfig
from .utils import parse_params_values
[docs]
def update_reconstruction_start_end(conf_dict, user_indices):
if len(user_indices) == 0:
return
rec_cfg = conf_dict["reconstruction"]
err = None
val_int, conv_err = convert_to_int(user_indices)
if conv_err is None:
start_z = user_indices
end_z = user_indices
else:
if user_indices in ["first", "middle", "last"]:
start_z = user_indices
end_z = user_indices
elif user_indices == "all":
start_z = 0
end_z = -1
elif "-" in user_indices:
try:
start_z, end_z = user_indices.split("-")
start_z = int(start_z)
end_z = int(end_z)
except Exception as exc:
err = "Could not interpret slice indices '%s': %s" % (user_indices, str(exc))
else:
err = "Could not interpret slice indices: %s" % user_indices
if err is not None:
print(err)
exit(1)
rec_cfg["start_z"] = start_z
rec_cfg["end_z"] = end_z
[docs]
def get_log_file(arg_logfile, legacy_arg_logfile, forbidden=None):
default_arg_val = ""
# Compat. log_file --> logfile
if legacy_arg_logfile != default_arg_val:
logfile = legacy_arg_logfile
else:
logfile = arg_logfile
#
if forbidden is None:
forbidden = []
for forbidden_val in forbidden:
if logfile == forbidden_val:
print("Error: --logfile argument cannot have the value %s" % forbidden_val)
exit(1)
if logfile == "":
logfile = True
return logfile
[docs]
def get_reconstructor(args, overwrite_options=None):
# Imports are done here, otherwise "nabu --version" takes forever
from ..pipeline.fullfield.processconfig import ProcessConfig
from ..pipeline.fullfield.reconstruction import FullFieldReconstructor
#
logfile = get_log_file(args["logfile"], args["log_file"], forbidden=[args["input_file"]])
conf_dict = parse_nabu_config_file(args["input_file"])
update_reconstruction_start_end(conf_dict, args["slice"].strip())
if overwrite_options is not None:
for option_key, option_val in overwrite_options.items():
opt_section, opt_name = option_key.split("/")
conf_dict[opt_section][opt_name] = option_val
proc = ProcessConfig(conf_dict=conf_dict, create_logger=logfile)
logger = proc.logger
logger.info("Going to reconstruct slices (%d, %d)" % (proc.rec_region["start_z"], proc.rec_region["end_z"]))
# Get extra options
extra_options = {
"gpu_mem_fraction": args["gpu_mem_fraction"],
"cpu_mem_fraction": args["cpu_mem_fraction"],
"chunk_size": args["max_chunk_size"] if args["max_chunk_size"] > 0 else None,
"margin": args["phase_margin"],
"force_grouped_mode": bool(args["force_use_grouped_pipeline"]),
}
reconstructor = FullFieldReconstructor(proc, logger=logger, extra_options=extra_options)
return reconstructor
[docs]
def list_hdf5_entries(fname):
with HDF5File(fname, "r") as f:
entries = list(f.keys())
return entries
[docs]
def main():
args = parse_params_values(
ReconstructConfig,
parser_description=f"Perform a tomographic reconstruction.",
program_version="nabu " + version,
)
# Get extra options
extra_options = {
"gpu_mem_fraction": args["gpu_mem_fraction"],
"cpu_mem_fraction": args["cpu_mem_fraction"],
"chunk_size": args["max_chunk_size"] if args["max_chunk_size"] > 0 else None,
"margin": args["phase_margin"],
"force_grouped_mode": bool(args["force_use_grouped_pipeline"]),
}
#
logfile = get_log_file(args["logfile"], args["log_file"], forbidden=[args["input_file"]])
conf_dict = parse_nabu_config_file(args["input_file"])
update_reconstruction_start_end(conf_dict, args["slice"].strip())
# Imports are done here, otherwise "nabu --version" takes forever
from ..pipeline.fullfield.processconfig import ProcessConfig
from ..pipeline.fullfield.reconstruction import FullFieldReconstructor
#
hdf5_entries = conf_dict["dataset"].get("hdf5_entry", "").strip(",")
# spit by coma and remove empty spaces
hdf5_entries = [e.strip() for e in hdf5_entries.split(",")]
# clear '/' at beginning of the entry (so both entry like 'entry0000' and '/entry0000' are handled)
hdf5_entries = [e.lstrip("/") for e in hdf5_entries]
if hdf5_entries != [""]:
file_hdf5_entries = list_hdf5_entries(conf_dict["dataset"]["location"])
hdf5_entries = list_match_queries(file_hdf5_entries, hdf5_entries)
if hdf5_entries == []:
raise ValueError("No entry found matching pattern '%s'" % conf_dict["dataset"]["hdf5_entry"])
for hdf5_entry in hdf5_entries:
if len(hdf5_entries) > 1:
print("-" * 80)
print("Processing entry: %s" % hdf5_entry)
print("-" * 80)
conf_dict["dataset"]["hdf5_entry"] = hdf5_entry
proc = ProcessConfig(conf_dict=conf_dict, create_logger=logfile) # logger is in append mode
logger = proc.logger
logger.info("Going to reconstruct slices (%d, %d)" % (proc.rec_region["start_z"], proc.rec_region["end_z"]))
R = FullFieldReconstructor(proc, logger=logger, extra_options=extra_options)
proc = R.process_config
R.reconstruct()
R.finalize_files_saving()
return 0
if __name__ == "__main__":
main()