Source code for ewoksid13.tasks.average

import json

from ewokscore import Task
import h5py
import numpy
from pyFAI import calibrant
from pyFAI.units import RADIAL_UNITS


[docs]class AverageIntegration( Task, input_names=["nxdata_url"], optional_input_names=["reference", "output_filename"], output_names=["nxdata_url"], ): """Average a series of 1d integrated patterns"""
[docs] def run(self): nxdata_url: str = self.inputs.nxdata_url input_filename, h5_path = nxdata_url.split("::") h5_path_parts = h5_path.split("/") h5_root = h5_path_parts[1] camera_name = h5_path_parts[2].replace("_integrate", "") output_filename: str = self.inputs.output_filename output_process = f"{camera_name}_average" reference_name: str = self.get_input_value("reference", "hydrocerussite") if output_filename: mode = "r" else: mode = "a" output_filename = input_filename with h5py.File(input_filename, mode=mode) as h5infile: integrated = h5infile[h5_path] config_path = h5_path.replace("integrated", "configuration") integration_config = json.loads(h5infile[config_path]["data"][()]) integration_unit = integration_config["unit"] x_axis_name = "q" if integration_unit.startswith("q") else "2th" intensity_data = integrated["intensity"][()] avg_intensity = numpy.average(intensity_data, axis=0) with h5py.File(output_filename, mode="a") as h5outfile: out_entry = h5outfile.require_group(f"/{h5_root}") out_entry.attrs.setdefault("NX_class", "NXentry") out_process = out_entry.create_group(output_process) out_process.attrs["default"] = "average" out_process.attrs["NX_class"] = "NXprocess" out_data = out_process.create_group("average") out_data.attrs["NX_class"] = "NXdata" out_data.attrs["signal"] = "average_intensity" out_data.attrs["auxiliary_signals"] = [f"reference_{reference_name}"] out_data.attrs["axes"] = [x_axis_name] x_axis = integrated[x_axis_name] out_data.create_dataset("average_intensity", data=avg_intensity) virtual_source = h5py.VirtualSource(x_axis) virtual_layout = h5py.VirtualLayout( shape=x_axis.shape, dtype=x_axis.dtype ) virtual_layout[:] = virtual_source[:] out_data.create_virtual_dataset(x_axis_name, virtual_layout) reference = calibrant.CALIBRANT_FACTORY(reference_name) reference.wavelength = integration_config["wavelength"] ref_peaks = reference.get_peaks(RADIAL_UNITS[integration_unit]) current_peak_index = 0 ref_bins = [] for i, radial_pos in enumerate(x_axis): current_peak_pos = ref_peaks[current_peak_index] if radial_pos > current_peak_pos: ref_bins.append(i) current_peak_index = current_peak_index + 1 ref_dataset = numpy.zeros(len(x_axis)) ref_dataset[ref_bins] = numpy.max(avg_intensity) out_data.create_dataset( f"reference_{reference_name}", data=ref_dataset, dtype=int )