import json
from ewoksxrpd.tasks.utils.data_utils import create_hdf5_link
from silx.io import h5py_utils
from ewokscore import Task
import h5py
import numpy
from pyFAI import calibrant
from pyFAI.units import RADIAL_UNITS
import logging
logger = logging.getLogger(__name__)
[docs]
class AverageIntegration(
Task,
input_names=[
"nxdata_url",
],
optional_input_names=[
"lima_name",
"scan_nb",
"external_output_filename",
"output_filename",
"nxprocess_name",
"reference",
"do_average",
],
output_names=[
"nxdata_url",
],
):
"""
Average a series of 1d integrated patterns
Input names:
nxdata_url : url to stored integrated data. Normally it's stored in the PROCESSED_DATA scan file, example scan_file.h5::16.1/eiger_integrate/integrated
Optional input names:
lima_name: name of the detector device. For example: "eiger"
scan_nb: number of the scan
external_output_filename: external file where to write the diffmap nexus process (usually it's the PROCESSED_DATA scan file)
output_filename: file where to write a link to the diffmap nexus process (usually it's the PROCESSED_DATA master file of collection)
nxprocess_name: name of the nexus process. If not provided, will be set to "<lima_name>_average"
integration_options: dictionary to write as a configuration group
reference:str: name of the standard sample to compare. For example: "hydrocerussite"
do_average:bool: if False, skip this process
* nxdata_url: where to read (integrated_data)
* external_output_filename: where to save the diffmap
* output_filename: where to link the diffmap
"""
[docs]
def run(self):
if not self.get_input_value("do_average", True):
self.outputs.nxdata_url = None
return
nxdata_url: str = self.inputs.nxdata_url
lima_name: str = self.get_input_value("lima_name", None)
input_filename, nxdata_integrated_path = nxdata_url.split("::")
h5_path_parts = nxdata_integrated_path.split("/")
h5_root = h5_path_parts[1]
if not lima_name:
lima_name = h5_path_parts[2].replace("_integrate", "")
scan_number: int = self.get_input_value("scan_nb", None)
external_output_filename: str = self.get_input_value(
"external_output_filename", None
)
output_filename: str = self.get_input_value("output_filename", None)
reference_name: str = self.get_input_value("reference", None)
mode = "a"
if not external_output_filename and not output_filename:
external_output_filename = input_filename
output_filename = input_filename
mode = "r"
elif not external_output_filename:
external_output_filename = output_filename
elif not output_filename:
output_filename = external_output_filename
# Write the result into external_output_filename
entry_name = f"{scan_number}.1" if scan_number else "entry_0000"
nxprocess_name = self.get_input_value("nxprocess_name", f"{lima_name}_average")
nxdata_name = "average"
with h5py_utils.open_item(
filename=input_filename, name="/", mode=mode
) as file_input:
nxdata_integrated = file_input[nxdata_integrated_path]
config_path = nxdata_integrated_path.replace("integrated", "configuration")
integration_config = json.loads(file_input[config_path]["data"][()])
integration_unit = integration_config["unit"]
intensity_data = nxdata_integrated["intensity"][()]
avg_intensity = numpy.average(intensity_data, axis=0)
with h5py_utils.open_item(
filename=external_output_filename, name="/", mode="r+"
) as file_output:
out_entry = file_output.require_group(f"/{h5_root}")
out_entry.attrs.setdefault("NX_class", "NXentry")
nxprocess_average = out_entry.create_group(nxprocess_name)
nxprocess_average["name"] = "Averaged integration"
nxprocess_average.attrs.update(
{
"NX_class": "NXprocess",
"default": nxdata_name,
}
)
nxdata_average = nxprocess_average.create_group(nxdata_name)
nxdata_average_path = nxdata_average.name
nxdata_average.create_dataset("average_intensity", data=avg_intensity)
# Copy the axis information
nxdata_axes = nxdata_integrated.attrs.get("axes", [""])
for ax_name in nxdata_axes[1:]:
nxdata_integrated_axis = nxdata_integrated[ax_name]
virtual_source = h5py.VirtualSource(nxdata_integrated_axis)
virtual_layout = h5py.VirtualLayout(
shape=nxdata_integrated_axis.shape,
dtype=nxdata_integrated_axis.dtype,
)
virtual_layout[:] = virtual_source[:]
nxdata_average.create_virtual_dataset(ax_name, virtual_layout)
nxdata_average.attrs.update(
{
"NX_class": "NXdata",
"signal": "average_intensity",
"axes": nxdata_axes[1:],
}
)
if reference_name and avg_intensity.ndim == 1:
radial_name = nxdata_axes[-1]
radial_array = nxdata_integrated[radial_name]
if reference_name not in calibrant.ALL_CALIBRANTS:
logger.error(
f"Unknown calibrant: {reference_name}. Available calibrants: {calibrant.ALL_CALIBRANTS}"
)
else:
nxdata_average.attrs["auxiliary_signals"] = [
f"reference_{reference_name}"
]
reference = calibrant.CALIBRANT_FACTORY(reference_name)
if integration_config.get("version") < 4:
wavelength = integration_config.get("wavelength")
else:
poni_config = integration_config.get("poni")
wavelength = poni_config.get("wavelength")
if wavelength:
reference.wavelength = wavelength
ref_peaks = reference.get_peaks(
RADIAL_UNITS[integration_unit]
)
current_peak_index = 0
ref_bins = []
for i, radial_pos in enumerate(radial_array):
if current_peak_index >= len(ref_peaks):
break
current_peak_pos = ref_peaks[current_peak_index]
if radial_pos > current_peak_pos:
ref_bins.append(i)
current_peak_index = current_peak_index + 1
ref_dataset = numpy.zeros(len(radial_array))
ref_dataset[ref_bins] = numpy.max(avg_intensity)
nxdata_average.create_dataset(
f"reference_{reference_name}",
data=ref_dataset,
dtype=int,
)
if output_filename != external_output_filename:
with h5py_utils.open_item(
filename=output_filename,
name=entry_name,
mode="r+",
) as scan_entry:
create_hdf5_link(
parent=scan_entry,
link_name=nxprocess_name,
target=nxprocess_average,
relative=True,
)
self.outputs.nxdata_url = f"{external_output_filename}::{nxdata_average_path}"