- TODO: lire directement les fichiers NeXus pour la calibration.
- TODO: faire une calibration en utilisant toutes les images d’une série.
- Le bras détecteur bouge et il faut le prendre en compte.
voir ce qu’a fait marco voltolini
[1]:
# list of imports
import os
import sys
import time
import fabio
import pyFAI
import numpy
import numpy.ma
from typing import Iterator, List, NamedTuple, Optional, Text, Tuple, Union
from collections import namedtuple
from functools import partial
from h5py import Dataset, File
from numpy import ndarray
from matplotlib import pyplot
from scipy.ndimage import generic_filter
from pyFAI.detectors import detector_factory
# A local import with Soleil common methodes.
from soleil import *
[3]:
# define a bunch of constants
LAMBDA = 0.6887
CALIBRANT = "CeO2"
CEO2 = "XRD18keV_27.nxs"
ROOT = os.path.join("/nfs", "ruche-diffabs", "diffabs-soleil", "com-diffabs", "2016", "Run2")
PUBLISHED_DATA = os.path.join("/nfs", "ruche-diffabs", "diffabs-users", "99160066", "published-data")
# temporary until the ruch is ON
ROOT = os.path.join("/home", "experiences", "instrumentation", "picca", "data", "99160066", "2016", "Run2")
PUBLISHED_DATA = os.path.join("/home", "experiences", "instrumentation", "picca", "data", "99160066", "published-data")
[3]:
# define a bunch of generic hdf5 structures.
DatasetPathContains = NamedTuple("DatasetPathContains", [("path", Text)])
DatasetPathWithAttribute = NamedTuple("DatasetPathWithAttribute", [('attribute', Text),
('value', bytes)])
DatasetPath = Union[DatasetPathContains,
DatasetPathWithAttribute]
def _v_attrs(attribute: Text, value: Text, _name: Text, obj) -> Dataset:
"""extract all the images and accumulate them in the acc variable"""
if isinstance(obj, Dataset):
if attribute in obj.attrs and obj.attrs[attribute] == value:
return obj
def _v_item(key: Text, name: Text, obj: Dataset) -> Dataset:
if key in name:
return obj
def get_dataset(h5file: File, path: DatasetPath) -> Dataset:
res = None
if isinstance(path, DatasetPathContains):
res = h5file.visititems(partial(_v_item, path.path))
elif isinstance(path, DatasetPathWithAttribute):
res = h5file.visititems(partial(_v_attrs,
path.attribute, path.value))
return res
[4]:
# Compute the mask and the dark
# 3.5 could be simplify with 3.6
DarkMaskSources = NamedTuple('DarkMaskSources', [('filenames', List[Text]), # name of the files
('threshold', int), # mask all pixel above this treshold
('detector', Optional[Text]),
('images_path', DatasetPath)])
def create_dark_and_mask(params: DarkMaskSources) -> Tuple[ndarray, ndarray]:
"""
génère un dark moyen ainsi qu'un mask avec les dark mesurés. Sur
un XPAD, le dark doit valoir zero (si le détecteur est bien
calibré). Nous utilisons tous les pixels dont la valeur est
supérieur à 2 pour définir un masque (mask ;). Pour la prochaine
expérience penser à faire non pas un seul dark mais des séries de
dark.
"""
for i, filename in enumerate(params.filenames):
with File(filename, mode='r') as f:
img = get_dataset(f, params.images_path)[0] # read the first image
_mask = numpy.where(img > params.threshold, True, False)
if i == 0:
dark = img.copy()
mask = _mask.copy()
else:
dark += img
mask = numpy.logical_or(mask, _mask)
if params.detector is not None:
det = detector_factory(params.detector)
mask = numpy.logical_or(mask, det.calc_mask())
# on a repere des discidents
mask[480:482, 480:482] = True
dark = dark.astype(float)
dark /= len(params.filenames)
dark = dark.astype('uint32')
return dark, mask
dark, mask = create_dark_and_mask(DarkMaskSources([os.path.join(ROOT, "2016-03-26", "dark_%d.nxs" % n) for n in range(7, 12)],
1, 'Xpad_flat',
DatasetPathWithAttribute("interpretation", b"image")))
[5]:
# compute the flat
FlatParams = NamedTuple('FlatParams', [('filename', Text), # name of the file
('threshold', float), # mask all pixel above this treshold
('dark', Optional[ndarray]),
('images_path', DatasetPath)])
def get_flat(params: FlatParams) -> ndarray:
"""
:param filename: name of the files
:type filename: list(str)
génère un flat corrigé du dark si dark is not None
"""
with File(params.filename, mode='r') as f:
images = get_dataset(f, params.images_path)[:]
flat = images.mean(axis=0)
if dark is not None:
flat -= dark
flat = numpy.where(numpy.abs(flat) <= params.threshold, params.threshold, flat)
return flat
flat = get_flat(FlatParams(os.path.join(ROOT, "2016-03-26", "flat_12.nxs"),
1, dark,
DatasetPathWithAttribute("interpretation", b"image")))
[25]:
# Unfold the images
# defines the namedtuple used for the tomogaphie.
# 3.5 could be simplify with 3.6
TomoSources = NamedTuple('TomoSources', [('filename', Text),
('images_path', DatasetPath),
('rotations_path', DatasetPath),
('translations_path', DatasetPath)])
TomoFrame = NamedTuple('TomoFrame', [("image", ndarray),
("shape", Tuple[int, int]),
("index", Tuple[int, int]),
("rotation", float),
("translation", float)]
)
UnfoldFrame = NamedTuple('UnfoldFrame', [("tomoframe", TomoFrame),
("unfolded", ndarray)])
Unfold = NamedTuple('Unfold', [('sources', TomoSources), # sources of the data's
('poni', Text), # the poni file use to do the unfold
('mask', Optional[ndarray]), # name of the poni file used for the integration.
('dark', Optional[ndarray]), # mask used for the computation.
('flat', Optional[ndarray]), # number of bins used for the powder spectrum.
('npt_rad', int), # he number of bins used for the powder spectrum.
('npt_azim', int), # he number of bins used for the powder spectrum.
]
)
TomoSave = NamedTuple('TomoSave', [('volume', ndarray),
('rotation', ndarray),
('translation', ndarray)])
def read_multi(params: TomoSources) -> Iterator[TomoFrame]:
"""
:param sources: list des fichiers à traiter.
:type sources: list(str)
:return: yield frames of data contain in all NxEntries located at the data_path location.
:rtype: Frame
"""
with File(params.filename, mode='r') as f:
images = get_dataset(f, params.images_path)
rotations = get_dataset(f, params.rotations_path)
translations = get_dataset(f, params.translations_path)
shape = images.shape[0], images.shape[1]
for rotation_idx, rotation in enumerate(rotations):
for translation_idx, translation in enumerate(translations):
yield TomoFrame(images[rotation_idx, translation_idx,:],
shape,
(rotation_idx, translation_idx),
rotation, translation)
def unfold(params: Unfold) -> Iterator[UnfoldFrame]:
"""
:return: the tomography datas
:rtype: numpy.ndarray
the return data is a 5 dimensions numpy array (rotation,
translation, 2, Nr, Na). the third dimension contain the x and y
coordinates of the powder diffraction (0 for X and 1 for Y)
"""
# load the Azimuthal Integration parameters
ai = pyFAI.load(params.poni)
for tomoframe in read_multi(params.sources):
data = tomoframe.image
""" mask all the non counting pixels """
# TODO comment optimiser ce calcul du mask dynamic qui prend vraiment du temps
# we will use dummy value (0) in order to define the dynamic mask
data[data>500] = 0
unfolded, r, a = ai.integrate2d(data, params.npt_rad, params.npt_azim,
filename=None, dark=dark, flat=flat, dummy=0)
yield UnfoldFrame(tomoframe, unfolded)
def save_unfolded(filename: Text,
params: Unfold) -> None:
with File(filename, mode="r+") as f:
# now fill with the values.
for i, frame in enumerate(unfold(params)):
if i == 0:
# first create output volums at the right place
group = f.require_group("tomography_unfolded")
unfolded = group.require_dataset("unfolded", shape=frame.tomoframe.shape + (params.npt_azim, params.npt_rad),
dtype=frame.unfolded.dtype)
rotation = group.require_dataset("rotation", shape=(frame.tomoframe.shape[0],),
dtype='float32')
translation = group.require_dataset("translation", shape=(frame.tomoframe.shape[1],),
dtype='float32')
if dark is not None:
dark = group.require_dataset("dark", data=params.dark, dtype='uint32')
dark = params.dark
if flat is not None:
group.require_dataset("flat", data=params.flat, dtype='uint32')
if mask is not None:
group.require_dataset("mask", data=params.mask.astype('uint32'), dtype='uint32')
unfolded[frame.tomoframe.index[0], frame.tomoframe.index[1], :] = frame.unfolded
rotation[frame.tomoframe.index[0]] = frame.tomoframe.rotation
translation[frame.tomoframe.index[1]] = frame.tomoframe.translation
PONI = os.path.join(PUBLISHED_DATA, 'calibration', 'XRD18keV_27_0.poni')
PONI = os.path.join(PUBLISHED_DATA, 'calibration', 'XRD18keV_26.nxs_03.poni')
TOMOSOURCES = TomoSources(os.path.join(ROOT, "2016-03-27", 'P14_13_57.nxs'),
DatasetPathWithAttribute("interpretation", b"image"),
DatasetPathContains("scan_data/trajectory_2_1"),
DatasetPathContains("scan_data/trajectory_1_1"))
t0 = time.time()
params = Unfold(TOMOSOURCES, PONI, mask, dark, flat, 1000, 360)
save_unfolded(os.path.join(PUBLISHED_DATA, "P14_13_57_unfold.h5"),
params)
print("unfold time: ", time.time() - t0)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-25-8186de126bd4> in <module>()
118 params = Unfold(TOMOSOURCES, PONI, mask, dark, flat, 1000, 360)
119 save_unfolded(os.path.join(PUBLISHED_DATA, "P14_13_57_unfold.h5"),
--> 120 params)
121 print("unfold time: ", time.time() - t0)
<ipython-input-25-8186de126bd4> in save_unfolded(filename, params)
96 dtype='float32')
97 if dark is not None:
---> 98 group.require_dataset("dark", data=params.dark)
99 if flat is not None:
100 group.require_dataset("flat", data=params.flat)
TypeError: require_dataset() missing 2 required positional arguments: 'shape' and 'dtype'
[ ]:
# ici tout un tas de variables utilisées pour trouver les fichiers sur
# le file system SOLEIL. Il y a de grand chance que cela ne
# fonctionne pas pour vous :p donc veiller à adapter ces chemins.
def convert(filename, numbers=None, data_path='scan_data/data_15'):
"""
:param filename: name of the nexus file
:type filename: str
:param numbers: index of the image to extract if None extract all images
:type numbers: list(int)
conversion d'image nxs -> edf. L'image edf est ensuite utilisée
avec pyFAI-calib pour générer le fichier .poni
"""
imgs = read(filename, numbers, data_path)
for index, img in enumerate(imgs):
edf = fabio.edfimage.edfimage(img)
name = os.path.splitext(os.path.basename(filename))[0]
name += '_%d' % index + '.edf'
saved_name = os.path.join(TREATMENT_PATH, name)
edf.write(saved_name)
def integrate2(sources, poni, mask=None, dark=None, flat=None, N=600,
data_path=None,
rotation_path=None,
translation_path=None,
monitor_path=None):
"""
:param sources: list of nexus files to treat
:type sources: list(str)
:param poni: the name of the poni file used for the integration.
:type poni: str
:param mask: the mask used for the computation.
:type mask: None or numpy.ndarray
:param N: the number of bins used for the powder spectrum.
:type N: int
:param data_path: location of the images in the nexus file
:type data_path: str
:param rotation_path: location of the rotation coordinated in the nexus file
:type rotation_path: str
:param translation_path: location of the translation coordinates in the nexus file
:type translation_path: str
:return: the tomography datas
:rtype: numpy.ndarray
the return data is a 4 dimensions numpy array (rotation,
translation, 2, N). the third dimension contain the x and y
coordinates of the powder diffraction (0 for X and 1 for Y)
"""
# load the Azimuthal Integration parameters
ai = pyFAI.load(poni)
volume = numpy.array([])
rotation = numpy.array([])
translation = numpy.array([])
for i, frame in enumerate(read_multi(sources,
data_path,
rotation_path,
translation_path,
monitor_path)):
if i == 0:
volume = numpy.empty(frame.shape + (2, N))
rotation = numpy.empty((frame.shape[0],))
translation = numpy.empty((frame.shape[1],))
data = frame.img
""" mask all the non counting pixels """
# TODO comment optimiser ce calcul du mask dynamic qui prend vraiment du temps
# we will use dummy value (0) in order to define the dynamic mask
data[data>500] = 0
#mask_data = numpy.where(data == 0, True, False)
#mask_data = numpy.logical_or(mask_data, numpy.where(data > 500, True, False))
# mask_data = numpy.logical_or(mask_data,
# numpy.where(data > 500, True, False))
#if mask is not None:
# mask_data = numpy.logical_or(mask_data, mask)
#filename = os.path.splitext(f)[0] + ".txt"
t0 = time.time()
spectrum_x_y = ai.integrate1d(data, N, filename=None, dark=dark, flat=flat, dummy=0, method="csr_ocl_0,0") #dark=dark, flat=flat) #, method="lut") #, method="csr_ocl_0,1")
#spectrum_x_y = ai.integrate1d(data, N, mask=mask_data, method="csr_ocl_0,0") #, method="csr_ocl_0,1")
print(i, frame.rotation, frame.translation, time.time() - t0)
if frame.monitor:
volume[frame.rotation, frame.translation] = spectrum_x_y[0], spectrum_x_y[1] / frame.monitor
else:
volume[frame.rotation, frame.translation] = spectrum_x_y
rotation[frame.rotation] = frame.rotation_value
translation[frame.translation] = frame.translation_value
return volume, rotation, translation
def from_hdf5(filename):
"""
:param filename: name of the file where the tomographie was stored
:type filename: str
:return: volume, rotation, translation
:rtype: truple(numpy.ndarray, numpy.ndarray, numpy.ndarray)
read the datas from the filename
"""
with tables.openFile(filename, mode="r") as h5file:
volume = h5file.getNode("/tomography/diffraction")[:]
rotation = h5file.getNode("/tomography/rotation")[:]
translation = h5file.getNode("/tomography/translation")[:]
return volume, rotation, translation
def load_volume(filename, basedir):
return from_hdf5(os.path.join(basedir, filename + ".h5"))
# ugly but for now it is ok
SIZE = 3
def reject_outliers(data, m=5.):
"""
:param data: the data to filter
:type data: numpy.ndarray
:param m: the filter rejection parameter
:type m: float
"""
center = SIZE / 2
median = numpy.median(data)
d = numpy.abs(data - median)
mdev = numpy.median(d)
return data[center] if (d[center] / mdev) < m else median
def filter_sinogramme(data):
"""
:param volume: the volume containing all the sinogram
:param type: numpy.ndarray
"""
return generic_filter(data, reject_outliers, size=SIZE)
def all_conversions():
convert(SI_0_100, data_path='scan_data/data_03')
convert(SI_14, data_path='scan_data/data_15')
def padd_sino(sino, offset=0):
n = (256 - sino.shape[0]) / 2
n1 = n + offset
n2 = 256 - n1 - sino.shape[0]
padd1 = numpy.tile(sino[0], (n1, 1))
padd2 = numpy.tile(sino[-1], (n2, 1))
tmp = numpy.vstack([padd1, sino, padd2])
print(tmp.shape)
return tmp
############
# P7_19_12 #
############
def p14_13_57():
def create_unfold():
t0 = time.time()
params = Unfold(TOMOSOURCES, PONI, mask, dark, flat)
save = unfold(params)
print("unfold time: ", time.time() - t0)
# TODO sauver la transposé de façon a ce que les données
# soient organisée correctement pour les reconstructions et la
# filtration look at
to_hdf5(os.path.join(PUBLISHED_DATA, "P14_13_57_unfold.h5"),
save, params)
def create_volume():
t0 = time.time()
dark, mask = create_mask_and_dark(MASKS_15S, detector='Xpad_flat', data_path=DATAPATH)
#with h5py.File(P7_19_12, mode='r') as f:
# dark += f["scan_13/scan_data/data_58"][35, 65]
flat = get_flat(FLAT, dark=dark, data_path=DATAPATH, threshold=1) # TODO moyenner les flats. 12, 13
volume, rotation, translation = \
integrate2(P14_13_57, PONI, mask=mask,
dark=dark, flat=flat,
data_path=DATAPATH,
rotation_path="scan_data/trajectory_2_1",
translation_path="scan_data/trajectory_1_1")
# monitor_path="scan_data/data_01")
print("integration time: ", time.time() - t0)
# TODO sauver la transposé de façon a ce que les données
# soient organisée correctement pour les reconstructions et la
# filtration look at
save_volume("P14_13_57", volume[:,:, 1,:], rotation, translation, dark, flat, mask, basedir=PUBLISHED_DATA)
def filter_volume():
volume, rotation, translation = load_volume("P14_13_57", basedir=PUBLISHED_DATA)
new_volume = volume.T.copy()
for i, img in enumerate(new_volume):
print("treat the %i sinogram" % i)
new_volume[i] = filter_sinogramme(img)
# new_volume[i] = medfilt2d(img)
save_volume("P14_13_57_filtered", new_volume, rotation, translation, basedir=PUBLISHED_DATA)
def _reconstruction(filename, transpose=False):
CHANNEL = 366
from skimage.transform import iradon
volume, rotation, translation = load_volume(filename, basedir=PUBLISHED_DATA)
if transpose:
volume = volume.T.copy()
print(rotation.shape)
# sino = volume.mean(axis=0)
# sino = volume[192:201].mean(axis=0)[:, :-1] # quartz
sino = volume[CHANNEL][:, :-1] # iron
# sino = volume[52:63].mean(axis=0)[:,:-1]
sino = padd_sino(sino, 3)
pyplot.subplot(121)
pyplot.title(filename + str(CHANNEL) + " channel")
pyplot.imshow(sino, interpolation='nearest')
pyplot.colorbar()
rec = iradon(sino, theta=rotation[:-1], filter='shepp-logan')
pyplot.subplot(122)
pyplot.title(filename + str(CHANNEL) + " channel")
pyplot.imshow(rec, interpolation='nearest')
def reconstruction():
_reconstruction("P14_13_57_filtered")
pyplot.figure()
_reconstruction("P14_13_57", transpose=True)
pyplot.show()
create_unfold()
#create_volume()
#filter_volume()
#reconstruction()
#p14_13_57()
[2]:
[ ]: