Source code for ewoksid13.tasks.average
import json
from ewokscore import Task
import h5py
import numpy
from pyFAI import calibrant
from pyFAI.units import RADIAL_UNITS
[docs]class AverageIntegration(
Task,
input_names=["nxdata_url"],
optional_input_names=["reference", "output_filename"],
output_names=["nxdata_url"],
):
"""Average a series of 1d integrated patterns"""
[docs] def run(self):
nxdata_url: str = self.inputs.nxdata_url
input_filename, h5_path = nxdata_url.split("::")
h5_path_parts = h5_path.split("/")
h5_root = h5_path_parts[1]
camera_name = h5_path_parts[2].replace("_integrate", "")
output_filename: str = self.inputs.output_filename
output_process = f"{camera_name}_average"
reference_name: str = self.get_input_value("reference", "hydrocerussite")
if output_filename:
mode = "r"
else:
mode = "a"
output_filename = input_filename
with h5py.File(input_filename, mode=mode) as h5infile:
integrated = h5infile[h5_path]
config_path = h5_path.replace("integrated", "configuration")
integration_config = json.loads(h5infile[config_path]["data"][()])
integration_unit = integration_config["unit"]
x_axis_name = "q" if integration_unit.startswith("q") else "2th"
intensity_data = integrated["intensity"][()]
avg_intensity = numpy.average(intensity_data, axis=0)
with h5py.File(output_filename, mode="a") as h5outfile:
out_entry = h5outfile.require_group(f"/{h5_root}")
out_entry.attrs.setdefault("NX_class", "NXentry")
out_process = out_entry.create_group(output_process)
out_process.attrs["default"] = "average"
out_process.attrs["NX_class"] = "NXprocess"
out_data = out_process.create_group("average")
out_data.attrs["NX_class"] = "NXdata"
out_data.attrs["signal"] = "average_intensity"
out_data.attrs["auxiliary_signals"] = [f"reference_{reference_name}"]
out_data.attrs["axes"] = [x_axis_name]
x_axis = integrated[x_axis_name]
out_data.create_dataset("average_intensity", data=avg_intensity)
virtual_source = h5py.VirtualSource(x_axis)
virtual_layout = h5py.VirtualLayout(
shape=x_axis.shape, dtype=x_axis.dtype
)
virtual_layout[:] = virtual_source[:]
out_data.create_virtual_dataset(x_axis_name, virtual_layout)
reference = calibrant.CALIBRANT_FACTORY(reference_name)
reference.wavelength = integration_config["wavelength"]
ref_peaks = reference.get_peaks(RADIAL_UNITS[integration_unit])
current_peak_index = 0
ref_bins = []
for i, radial_pos in enumerate(x_axis):
current_peak_pos = ref_peaks[current_peak_index]
if radial_pos > current_peak_pos:
ref_bins.append(i)
current_peak_index = current_peak_index + 1
ref_dataset = numpy.zeros(len(x_axis))
ref_dataset[ref_bins] = numpy.max(avg_intensity)
out_data.create_dataset(
f"reference_{reference_name}", data=ref_dataset, dtype=int
)