Source code for arpes.endstations.plugin.ANTARES

"""Implements data loading for ANTARES at SOLEIL."""
import warnings

import h5py
import numpy as np

import xarray as xr
from arpes.endstations import HemisphericalEndstation, SingleFileEndstation, SynchrotronEndstation
from arpes.endstations.nexus_utils import read_data_attributes_from
from arpes.preparation import disambiguate_coordinates

__all__ = ("ANTARESEndstation",)


mono = [
    ["ANTARES", "Monochromator"],
    ["exitSlitAperture", "resolution", "currentGratingName", "currentSlotName", "energy"],
]
user_info = [["User"], ["email", "address", "affiliation", "name", "telephone_number"]]
misc = [[], ["comment_conditions", "experimental_frame", "start_time"]]


general_paths = [mono, user_info, misc]

# not generally needed
mbs_general_location = [[], ["PI-X", "PI-Y", "PI-Z", "Phi", "Theta"]]
mbs_general_paths = [mbs_general_location]

# To be run inside ANTARES/MBSAcquisition_{idx}
mbs_acquisition_spectrometer = [
    [],
    [
        "Frames",
        "LensMode",
        "PASSENERGY",
        "DeflX",
        "DeflY",
        "CenterKE",
        "StepSize",
        "StartX",
        "StartY",
        "EndX",
        "EndY",
        "StartKE",
        "NoSlices",
        "NoScans",
    ],
]
mbs_acquisition_paths = [mbs_acquisition_spectrometer]


def parse_axis_name_from_long_name(name):
    return name.split("/")[-1].replace("'", "")


def infer_scan_type_from_data(group):
    """Determines the scan type for NeXuS format data.

    Because ANTARES stores every possible data type in the NeXuS file format, zeroing information that is
    not used, we have to determine which data folder to use on the basis of what kind of scan was done.
    """
    scan_name = str(group["scan_config"]["name"][()])

    if "DeflX" in scan_name:
        # Fermi Surface, might need to be more robust
        return "data_09"

    if "Scan2D_MBS" in scan_name:
        # two piezo or two DOF image scan
        return "data_12"

    raise NotImplementedError(scan_name)


[docs]class ANTARESEndstation(HemisphericalEndstation, SynchrotronEndstation, SingleFileEndstation): """Implements data loading for ANTARES at SOLEIL. There's not too much metadata here except what comes with the analyzer settings. """ PRINCIPAL_NAME = "ANTARES" ALIASES = [] _TOLERATED_EXTENSIONS = {".nxs"} RENAME_KEYS = { "DeflX": "psi", "deflx": "psi", "energy": "hv", "PASSENERGY": "pass_energy", "LensMode": "lens_mode", } def load_top_level_scan(self, group, scan_desc: dict = None, spectrum_index=None): """Reads a spectrum from the top level group in a NeXuS scan format.""" dr = self.read_scan_data(group) attrs = read_data_attributes_from(group, general_paths) try: mbs_key = [k for k in list(group["ANTARES"].keys()) if "MBSAcquisition" in k][0] attrs.update( read_data_attributes_from(group["ANTARES"][mbs_key], mbs_acquisition_paths) ) except IndexError: pass dr = dr.assign_attrs(attrs) return xr.Dataset(dict([["spectrum-{}".format(spectrum_index), dr]])) def get_coords(self, group, scan_name, shape): """Extracts coordinates from the actuator header information. In the future, this should be modified for data which lacks either a phi or energy axis. """ dims = list(shape) data = group["scan_data"] # handle actuators relaxed_shape = list(shape) actuator_list = [k for k in list(data.keys()) if "actuator" in k] actuator_names = [ parse_axis_name_from_long_name(str(data[act].attrs["long_name"])) for act in actuator_list ] actuator_list = [data[act][:] for act in actuator_list] actuator_dim_order = [] for act in actuator_list: found = relaxed_shape.index(act.shape[-1]) actuator_dim_order.append(found) relaxed_shape[found] = None coords = {} def take_last(vs): while len(vs.shape) > 1: vs = vs[0] return vs for dim_order, name, values in zip(actuator_dim_order, actuator_names, actuator_list): name = self.RENAME_KEYS.get(name, name) dims[dim_order] = name coords[name] = take_last(values) # handle standard spectrometer axes, keeping in mind things get stored # in different places sometimes for no reasons energy_keys = { "data_09": ( "data_01", "data_03", "data_02", ), "data_12": ( "data_04", "data_06", "data_05", ), } angle_keys = { "data_09": ( "data_04", "data_06", "data_05", ), "data_12": ( "data_07", "data_09", "data_08", ), } e_keys = energy_keys[scan_name] ang_keys = angle_keys[scan_name] energy = data[e_keys[0]][0], data[e_keys[1]][0], data[e_keys[2]][0] angle = data[ang_keys[0]][0], data[ang_keys[1]][0], data[ang_keys[2]][0] def get_first(item): if isinstance(item, np.ndarray): return item.ravel()[0] return item def build_axis(low, high, step_size): # this might not work out to be the right thing to do, we will see low, high, step_size = get_first(low), get_first(high), get_first(step_size) est_n = int((high - low) / step_size) closest = None diff = np.inf idx = None for i, s in enumerate(shape): if closest is None or np.abs(s - est_n) < diff: idx = i diff = np.abs(s - est_n) closest = s if diff != 0: warnings.warn("Could not identify axis by length.") return np.linspace(low, high, closest, endpoint=False), idx energy, energy_idx = build_axis(*energy) angle, angle_idx = build_axis(*angle) dims[energy_idx] = "eV" dims[angle_idx] = "phi" coords["eV"] = energy coords["phi"] = angle * np.pi / 180 return dims, coords def read_scan_data(self, group): """Reads the scan data stored in /scan_data/data_{idx} for the appropriate filetype.""" data_key = infer_scan_type_from_data(group) data_group = group["scan_data"][data_key] data = data_group[:] dims, coords = self.get_coords(group, data_key, shape=data.shape) return xr.DataArray(data, coords=coords, dims=dims) def load_single_frame(self, frame_path: str = None, scan_desc: dict = None, **kwargs): """Loads a single ANTARES scan. Additionally, we try to deduplicate coordinates for multi-region scans here. """ f = h5py.File(frame_path) top_level = list(f.keys()) loaded = [ self.load_top_level_scan(f[key], scan_desc, spectrum_index=i) for i, key in enumerate(top_level) ] if isinstance(loaded, list) and loaded: loaded = disambiguate_coordinates(loaded, ["phi", "eV"]) loaded = xr.merge(loaded) else: loaded = loaded[0] loaded.rename({"spectrum-1": "spectrum"}) loaded = loaded.assign_attrs( **{self.RENAME_KEYS.get(k, k): v for k, v in loaded.attrs.items()} ) return loaded def postprocess_final(self, data: xr.Dataset, scan_desc: dict = None): """Performs final scan postprocessing. This mostly consists of unwrapping bytestring attributes, and inserting missing default coordinates if they are not provided. """ def check_attrs(s): for k in ["psi", "hv", "lens_mode", "pass_energy"]: try: if isinstance( s.attrs[k], ( np.ndarray, list, tuple, ), ): s.attrs[k] = s.attrs[k][0] elif isinstance(s.attrs[k], bytes): s.attrs[k] = s.attrs[k].decode() except (TypeError, KeyError): pass ls = [data] + data.S.spectra for l in ls: check_attrs(l) # TODO fix this defaults = { "z": 0, "x": 0, "y": 0, "alpha": 0, "chi": 0, "theta": 0, "beta": 0, "hv": 100, "psi": 0, } for k, v in defaults.items(): data.attrs[k] = data.attrs.get(k, v) for s in data.S.spectra: s.attrs[k] = s.attrs.get(k, v) return super().postprocess_final(data, scan_desc)