Source code for ewoksid13.tasks.average

import json
from ewoksxrpd.tasks.utils.data_utils import create_hdf5_link
from silx.io import h5py_utils
from ewokscore import Task
import h5py
import numpy
from pyFAI import calibrant
from pyFAI.units import RADIAL_UNITS
import logging

logger = logging.getLogger(__name__)


[docs] class AverageIntegration( Task, input_names=[ "nxdata_url", ], optional_input_names=[ "lima_name", "scan_nb", "external_output_filename", "output_filename", "nxprocess_name", "reference", "do_average", ], output_names=[ "nxdata_url", ], ): """ Average a series of 1d integrated patterns Input names: nxdata_url : url to stored integrated data. Normally it's stored in the PROCESSED_DATA scan file, example scan_file.h5::16.1/eiger_integrate/integrated Optional input names: lima_name: name of the detector device. For example: "eiger" scan_nb: number of the scan external_output_filename: external file where to write the diffmap nexus process (usually it's the PROCESSED_DATA scan file) output_filename: file where to write a link to the diffmap nexus process (usually it's the PROCESSED_DATA master file of collection) nxprocess_name: name of the nexus process. If not provided, will be set to "<lima_name>_average" integration_options: dictionary to write as a configuration group reference:str: name of the standard sample to compare. For example: "hydrocerussite" do_average:bool: if False, skip this process * nxdata_url: where to read (integrated_data) * external_output_filename: where to save the diffmap * output_filename: where to link the diffmap """
[docs] def run(self): if not self.get_input_value("do_average", True): self.outputs.nxdata_url = None return nxdata_url: str = self.inputs.nxdata_url lima_name: str = self.get_input_value("lima_name", None) input_filename, nxdata_integrated_path = nxdata_url.split("::") h5_path_parts = nxdata_integrated_path.split("/") h5_root = h5_path_parts[1] if not lima_name: lima_name = h5_path_parts[2].replace("_integrate", "") scan_number: int = self.get_input_value("scan_nb", None) external_output_filename: str = self.get_input_value( "external_output_filename", None ) output_filename: str = self.get_input_value("output_filename", None) reference_name: str = self.get_input_value("reference", None) mode = "a" if not external_output_filename and not output_filename: external_output_filename = input_filename output_filename = input_filename mode = "r" elif not external_output_filename: external_output_filename = output_filename elif not output_filename: output_filename = external_output_filename # Write the result into external_output_filename entry_name = f"{scan_number}.1" if scan_number else "entry_0000" nxprocess_name = self.get_input_value("nxprocess_name", f"{lima_name}_average") nxdata_name = "average" with h5py_utils.open_item( filename=input_filename, name="/", mode=mode ) as file_input: nxdata_integrated = file_input[nxdata_integrated_path] config_path = nxdata_integrated_path.replace("integrated", "configuration") integration_config = json.loads(file_input[config_path]["data"][()]) integration_unit = integration_config["unit"] intensity_data = nxdata_integrated["intensity"][()] avg_intensity = numpy.average(intensity_data, axis=0) with h5py_utils.open_item( filename=external_output_filename, name="/", mode="r+" ) as file_output: out_entry = file_output.require_group(f"/{h5_root}") out_entry.attrs.setdefault("NX_class", "NXentry") nxprocess_average = out_entry.create_group(nxprocess_name) nxprocess_average["name"] = "Averaged integration" nxprocess_average.attrs.update( { "NX_class": "NXprocess", "default": nxdata_name, } ) nxdata_average = nxprocess_average.create_group(nxdata_name) nxdata_average_path = nxdata_average.name nxdata_average.create_dataset("average_intensity", data=avg_intensity) # Copy the axis information nxdata_axes = nxdata_integrated.attrs.get("axes", [""]) for ax_name in nxdata_axes[1:]: nxdata_integrated_axis = nxdata_integrated[ax_name] virtual_source = h5py.VirtualSource(nxdata_integrated_axis) virtual_layout = h5py.VirtualLayout( shape=nxdata_integrated_axis.shape, dtype=nxdata_integrated_axis.dtype, ) virtual_layout[:] = virtual_source[:] nxdata_average.create_virtual_dataset(ax_name, virtual_layout) nxdata_average.attrs.update( { "NX_class": "NXdata", "signal": "average_intensity", "axes": nxdata_axes[1:], } ) if reference_name and avg_intensity.ndim == 1: radial_name = nxdata_axes[-1] radial_array = nxdata_integrated[radial_name] if reference_name not in calibrant.ALL_CALIBRANTS: logger.error( f"Unknown calibrant: {reference_name}. Available calibrants: {calibrant.ALL_CALIBRANTS}" ) else: nxdata_average.attrs["auxiliary_signals"] = [ f"reference_{reference_name}" ] reference = calibrant.CALIBRANT_FACTORY(reference_name) if integration_config.get("version") < 4: wavelength = integration_config.get("wavelength") else: poni_config = integration_config.get("poni") wavelength = poni_config.get("wavelength") if wavelength: reference.wavelength = wavelength ref_peaks = reference.get_peaks( RADIAL_UNITS[integration_unit] ) current_peak_index = 0 ref_bins = [] for i, radial_pos in enumerate(radial_array): if current_peak_index >= len(ref_peaks): break current_peak_pos = ref_peaks[current_peak_index] if radial_pos > current_peak_pos: ref_bins.append(i) current_peak_index = current_peak_index + 1 ref_dataset = numpy.zeros(len(radial_array)) ref_dataset[ref_bins] = numpy.max(avg_intensity) nxdata_average.create_dataset( f"reference_{reference_name}", data=ref_dataset, dtype=int, ) if output_filename != external_output_filename: with h5py_utils.open_item( filename=output_filename, name=entry_name, mode="r+", ) as scan_entry: create_hdf5_link( parent=scan_entry, link_name=nxprocess_name, target=nxprocess_average, relative=True, ) self.outputs.nxdata_url = f"{external_output_filename}::{nxdata_average_path}"