Source code for ewoksid13.tasks.diffmap

from ewokscore import Task
from silx.io import h5py_utils
import h5py
import logging
import numpy as np
import json
from pyFAI.diffmap import DiffMap
from pyFAI.units import to_unit
from pyFAI.worker import Worker
from pyFAI.version import MAJOR, MINOR, version

logger = logging.getLogger(__name__)


[docs]class CreateDiffMapFile( Task, input_names=[ "nxdata_url", "filename", "scan", "output_filename", "integration_options", "lima_name", ], optional_input_names=[ "save_diffmap_separately", ], output_names=[ "diffmap_file", "diffmap_dataset_path", "diffmap_nxprocess_path", "nxdata_url", ], ): """Creates a DiffMap NexusProcess compliant with pyFAI-diffmap/view API after an already integrated dataset"""
[docs] def run(self): nxdata_url: str = self.inputs.nxdata_url filename: str = self.inputs.filename scan_number: int = self.inputs.scan output_filename: str = self.inputs.output_filename integration_options: dict = self.inputs.integration_options lima_name: str = self.inputs.lima_name # Open the PROCESSED_DATA nxdata url to retrieve the integrated array (nb_frames x nbpt_rad), the name of the radial units and the radial vector file_url, nxdata_url = nxdata_url.split("::") with h5py_utils.open_item( filename=file_url, name=nxdata_url, mode="r" ) as nxdata: intensity_dataset = nxdata["intensity"][:] radial_name = nxdata.attrs["axes"][-1] radial_dataset = nxdata[radial_name][:] nbpt_rad = radial_dataset.shape[0] # It will only proceed if the Scan object is kmap (ID13) with h5py_utils.open_item( filename=filename, name=f"{scan_number}.1" ) as scan_entry: instrument = scan_entry["instrument"] if "kmap_parameters" not in instrument: self.outputs.diffmap_file = None self.outputs.diffmap_dataset_path = ( f"{scan_number}.1/measurement/{lima_name}" ) self.outputs.diffmap_nxprocess_path = None self.outputs.nxdata_url = None return kmap_parameters = instrument["kmap_parameters"] x_nb_points = kmap_parameters["x_nb_points"][()] y_nb_points = kmap_parameters["y_nb_points"][()] save_diffmap_separately = self.get_input_value("save_diffmap_separately", False) if not save_diffmap_separately and (MAJOR, MINOR) < (2024, 7): logger.error( f"pyFAI version has to be at least 2024.7. This version is {version}. The diffmap process will be saved as a separated file." ) save_diffmap_separately = True if save_diffmap_separately: diffmap = DiffMap( nbpt_rad=nbpt_rad, nbpt_azim=1, nbpt_fast=x_nb_points, nbpt_slow=y_nb_points, ) worker = Worker() worker.set_config(integration_options) diffmap.worker = worker diffmap_filename = output_filename.replace( ".h5", f"_{scan_number:04}_diffmap.h5" ) diffmap.hdf5 = diffmap_filename diffmap.makeHDF5() with h5py_utils.open_item( filename=filename, name=f"{scan_number}.1", mode="r" ) as scan_entry: diffmap.set_hdf5_input_dataset( dataset=scan_entry["measurement"][lima_name] ) # Paste the intensity dataset into the output file with the new shape and the radial vector with h5py_utils.open_item( filename=diffmap_filename, name="entry_0000", mode="r+" ) as entry: intensity_dts = entry["pyFAI/result/intensity"] reshape_array = np.reshape( intensity_dataset, (y_nb_points, x_nb_points, nbpt_rad) ) intensity_dts[:, :, :] = reshape_array entry["pyFAI/result"].create_dataset( name=radial_name, data=radial_dataset, ) if integration_options.get( "mask_file", None ) and integration_options.get("do_mask", True): mask_file = integration_options.get("mask_file") else: mask_file = "" entry["pyFAI/mask_file"] = mask_file self.outputs.diffmap_file = diffmap_filename self.outputs.diffmap_dataset_path = ( "/entry_0000/measurement/images_0001" ) self.outputs.diffmap_nxprocess_path = "/entry_0000/pyFAI" self.outputs.nxdata_url = ( f"{diffmap_filename}::/entry_0000/pyFAI/result" ) else: with h5py_utils.open_item( filename=file_url, name=f"/{scan_number}.1", mode="r+" ) as scan_entry: nxprocess = scan_entry.create_group(f"{lima_name}_diffmap") nxprocess.attrs["NX_class"] = "NXprocess" nxprocess["program"] = "pyFAI" nxprocess["dim0"] = y_nb_points nxprocess["dim0"].attrs["axis"] = "motory" nxprocess["dim1"] = x_nb_points nxprocess["dim1"].attrs["axis"] = "motorx" nxprocess["dim2"] = nbpt_rad nxprocess["dim2"].attrs["axis"] = "diffraction" nxprocess["offset"] = 0 if integration_options.get( "mask_file", None ) and integration_options.get("do_mask", True): mask_file = integration_options.get("mask_file") else: mask_file = "" nxprocess["mask_file"] = mask_file # NxNote: configuration config = nxprocess.require_group("configuration") config.attrs["NX_class"] = "NXnote" worker = Worker() worker.set_config(integration_options) config["data"] = json.dumps( worker.get_config(), indent=2, separators=(",\r\n", ": ") ) config["type"] = "text/json" # NxData: result nxdata = nxprocess.require_group("result") nxdata.attrs["NX_class"] = "NXdata" intensity_dts = nxdata.create_dataset( name="intensity", data=np.reshape( intensity_dataset, (y_nb_points, x_nb_points, nbpt_rad) ), dtype="float32", chunks=(1, x_nb_points, nbpt_rad), maxshape=(None, None, nbpt_rad), fillvalue=np.nan, ) intensity_dts.attrs["interpretation"] = "spectrum" nxdata.attrs["axes"] = [".", ".", radial_name] nxdata.attrs["signal"] = "intensity" nxdata[radial_name] = radial_dataset nxdata[radial_name].attrs["axes"] = 3 nxdata[radial_name].attrs["unit"] = integration_options["unit"] nxdata[radial_name].attrs["long_name"] = to_unit( integration_options["unit"] ).label nxdata[radial_name].attrs["interpretation"] = "scalar" layout = h5py.VirtualLayout( shape=(nbpt_rad, y_nb_points, x_nb_points), dtype=intensity_dts.dtype, ) source = h5py.VirtualSource(intensity_dts) for i in range(y_nb_points): for j in range(x_nb_points): layout[:, i, j] = source[i, j] nxdata.create_virtual_dataset("map", layout, fillvalue=np.nan).attrs[ "interpretation" ] = "image" if lima_name not in scan_entry["measurement"]: scan_entry["measurement"][lima_name] = h5py.ExternalLink( filename=filename, path=f"{scan_number}.1/measurement/{lima_name}", ) self.outputs.diffmap_file = file_url self.outputs.diffmap_dataset_path = ( f"{scan_number}.1/measurement/{lima_name}" ) self.outputs.diffmap_nxprocess_path = f"{scan_number}.1/{lima_name}_diffmap" self.outputs.nxdata_url = ( f"{file_url}::{scan_number}.1/{lima_name}_diffmap/result" )