Source code for arpes.utilities.conversion.forward

"""Contains routines for converting directly from angle to momentum.

This cannot be done easily for volumetric data because otherwise we will 
not end up with an even grid. As a result, we typically use utilities here
to look at the forward projection of a single point or collection of 
points/coordinates under the angle -> momentum transform.

Additionally, we have exact inverses for the volumetric transforms which are 
useful for aligning cuts which use those transforms. 
See `convert_coordinate_forward`.
"""
from arpes.utilities.conversion.core import convert_to_kspace
from typing import Callable, Dict
from arpes.trace import traceable
import numpy as np
import warnings
import xarray as xr

from arpes.utilities import normalize_to_spectrum
from arpes.provenance import update_provenance
from arpes.analysis.filters import gaussian_filter_arr
from arpes.utilities.conversion.bounds_calculations import (
    euler_to_kx,
    euler_to_ky,
    euler_to_kz,
    full_angles_to_k,
)
from arpes.typing import DataType

__all__ = (
    "convert_coordinates_to_kspace_forward",
    "convert_coordinates",
    "convert_coordinate_forward",
    "convert_through_angular_pair",
    "convert_through_angular_point",
)


[docs]@traceable def convert_coordinate_forward( data: DataType, coords: Dict[str, float], trace: Callable = None, **k_coords ): """This function is the inverse/forward transform for the small angle volumetric k-conversion code. This differs from the other forward transforms here which are exact, up to correct assignment of offset constants. This makes this routine very practical for determining the location of cuts to be taken around a point or direction of interest in k-space. If you use the exact methods to determine the location of interest in k-space then in general there will be some misalignment because the small angle volumetric transform is not the inverse of the exact forward transforms. The way that we accomplish this is that the data is copied and a "test charge" is placed in the data which distinguished the location of interest in angle-space. The data is converted with the volumetric interpolation methods, and the location of the "test charge" is determined in k-space. With the approximate location in k determined, this process is repeated once more with a finer k-grid to determine more precisely the forward transform location. A nice property of this approach is that it is automatic because it determines the result numerically using the volumetric transform code. Any changes to the volumetric code will automatically reflect here. However, it comes with a few downsides: #. The "test charge" is placed in a cell in the original data. This means that the resolution is limited by the resolution of the dataset in angle-space. This could be circumvented by regridding the data to have a higher resolution. #. The procedure can only be performed for a single point at a time. #. The procedure is relatively expensive. Another approach would be to write down the exact small angle approximated transforms. Args: data: The data defining the coordinate offsets and experiment geometry. coords: The coordinates of a point in angle-space to be converted. trace: Used for performance tracing and debugging. Returns: The location of the desired coordinate in momentum. """ data = normalize_to_spectrum(data) if "eV" in coords: coords = dict(coords) energy_coord = coords.pop("eV") data = data.sel(eV=energy_coord, method="nearest") elif "eV" in data.dims: warnings.warn( """ You didn't specify an energy coordinate for the high symmetry point but the dataset you provided has an energy dimension. This will likely be very slow. Where possible, provide an energy coordinate """ ) if not k_coords: k_coords = { "kx": np.linspace(-4, 4, 300), "ky": np.linspace(-4, 4, 300), } # Copying after taking a constant energy plane is much much cheaper trace("Copying") data = data.copy(deep=True) data.loc[data.G.round_coordinates(coords)] = data.values.max() * 100000 trace("Filtering") data = gaussian_filter_arr(data, default_size=3) trace("Converting once") kdata = convert_to_kspace(data, **k_coords, trace=trace) trace("argmax") near_target = kdata.G.argmax_coords() trace("Converting twice") kdata_close = convert_to_kspace( data, trace=trace, **{k: np.linspace(v - 0.08, v + 0.08, 100) for k, v in near_target.items()}, ) # inconsistently, the energy coordinate is sometimes returned here # so we remove it just in case trace("argmax") coords = kdata_close.G.argmax_coords() if "eV" in coords: del coords["eV"] return coords
[docs]@traceable def convert_through_angular_pair( data: DataType, first_point: Dict[str, float], second_point: Dict[str, float], cut_specification: Dict[str, np.ndarray], transverse_specification: Dict[str, np.ndarray], relative_coords: bool = True, trace: Callable = None, **k_coords, ): """Converts the lower dimensional ARPES cut passing through `first_point` and `second_point`. This is a sibling method to `convert_through_angular_point`. A point and a `chi` angle fix a plane in momentum, as do two points. This method implements the two points equivalent whereas `convert_through_angular_point` fixes the latter. A difference between this function and `convert_through_angular_point` is that the momentum range requested by `cut_specification` is considered a margin outside the interval defined by `first_point` and `second_point`. I.e. np.linspace(0, 0, 400) would produce a 400 point interpolation between `first_point` and `second_point` whereas passing `np.linspace(-1, 0, 400)` would provide a left margin past `first_point` of 1 inverse angstrom. This endpoint relative behavior can be disabled with the `relative_coords` flag. Args: data: The angle space data to be converted. first_point: The angle space coordinates of the first point the cut should pass through. second_point: The angle space coordinates of the second point the cut should pass through. This point will have larger momentum coordinate in the output data than `first_point`, i.e. it will appear on the "right side" of the data if the data is plotted as a cut. cut_specification: A dictionary specifying the momentum varying axes on the output data. Interpreted as defining boundaries relative to the endpoints. transverse_specification: A dictionary specifying the transverse (summed) axis on the momentum converted data. relative_coords: Whether to give `cut_specification` relative to the momentum converted location specified in `coords` trace: Flag controlling execution tracing k_coords: Passed as hints through to `convert_coordinate_forward`. Returns: The momentum cut passing first through `first_point` and then through `second_point`. """ k_first_point = convert_coordinate_forward(data, first_point, trace=trace, **k_coords) k_second_point = convert_coordinate_forward(data, second_point, trace=trace, **k_coords) k_dims = set(k_first_point.keys()) if k_dims != {"kx", "ky"}: raise NotImplementedError(f"Two point {k_dims} momentum conversion is not supported yet.") assert k_dims == set(cut_specification.keys()).union(transverse_specification.keys()) assert ( "ky" in transverse_specification.keys() ) # You must use ky as the transverse coordinate for now assert len(cut_specification) == 1 offset_ang = np.arctan2( k_second_point["ky"] - k_first_point["ky"], k_second_point["kx"] - k_first_point["kx"], ) trace(f"Determined offset angle {-offset_ang}") with data.S.with_rotation_offset(-offset_ang): trace(f"Finding first momentum coordinate.") k_first_point = convert_coordinate_forward(data, first_point, trace=trace, **k_coords) trace(f"Finding second momentum coordinate.") k_second_point = convert_coordinate_forward(data, second_point, trace=trace, **k_coords) # adjust output coordinate ranges transverse_specification = { k: v + k_first_point[k] for k, v in transverse_specification.items() } # here, we assume we were passed an array for simplicities sake # we could also allow a slice in the future parallel_axis = list(cut_specification.values())[0] parallel_dim = list(cut_specification.keys())[0] if relative_coords: delta = parallel_axis[1] - parallel_axis[0] left_margin, right_margin = parallel_axis[0], parallel_axis[-1] + delta left_point = k_first_point["kx"] + left_margin right_point = k_second_point["kx"] + right_margin parallel_axis = np.linspace(left_point, right_point, len(parallel_axis)) # perform the conversion trace(f"Performing final momentum conversion.") converted_data = convert_to_kspace( data, **transverse_specification, kx=parallel_axis, trace=trace ).mean(list(transverse_specification.keys())) trace(f"Annotating the requested point momentum values.") converted_data = converted_data.assign_attrs( { "first_point_kx": k_first_point[parallel_dim], "second_point_kx": k_second_point[parallel_dim], "offset_angle": -offset_ang, } ) return converted_data
[docs]@traceable def convert_through_angular_point( data: DataType, coords: Dict[str, float], cut_specification: Dict[str, np.ndarray], transverse_specification: Dict[str, np.ndarray], relative_coords: bool = True, trace: Callable = None, **k_coords, ) -> xr.DataArray: """Converts the lower dimensional ARPES cut passing through given angular `coords`. The fixed momentum axis is given by `cut_specification` in coordinates relative to the momentum converted copy of `coords`. Absolute coordinates can be enabled with the `relative_coords` flag. Args: data: The angle space data to be converted. coords: The angle space coordinates of the point the cut should pass through. cut_specification: A dictionary specifying the momentum varying axes on the output data. transverse_specification: A dictionary specifying the transverse (summed) axis on the momentum converted data. relative_coords: Whether to give `cut_specification` relative to the momentum converted location specified in `coords` trace: Flag controlling execution tracing k_coords: Passed as hints through to `convert_coordinate_forward`. Returns: A momentum cut passing through the point `coords`. """ k_coords = convert_coordinate_forward(data, coords, trace=trace, **k_coords) all_momentum_dims = set(k_coords.keys()) assert all_momentum_dims == set(cut_specification.keys()).union(transverse_specification.keys()) # adjust output coordinate ranges transverse_specification = {k: v + k_coords[k] for k, v in transverse_specification.items()} if relative_coords: cut_specification = {k: v + k_coords[k] for k, v in cut_specification.items()} # perform the conversion converted_data = convert_to_kspace( data, **transverse_specification, **cut_specification, trace=trace ).mean(list(transverse_specification.keys()), keep_attrs=True) for k, v in k_coords.items(): converted_data.attrs[f"highsymm_{k}"] = v return converted_data
@update_provenance("Forward convert coordinates") def convert_coordinates(arr: DataType, collapse_parallel=False, **kwargs): """Converts coordinates forward in momentum.""" def unwrap_coord(c): try: return c.values except (TypeError, AttributeError): try: return c.item() except (TypeError, AttributeError): return c coord_names = ["phi", "psi", "alpha", "theta", "beta", "chi", "hv"] raw_coords = {k: unwrap_coord(arr.S.lookup_offset_coord(k)) for k in (coord_names + ["eV"])} raw_angles = {k: v for k, v in raw_coords.items() if k not in {"eV", "hv"}} parallel_collapsible = ( len([k for k in raw_angles.keys() if isinstance(raw_angles[k], np.ndarray)]) > 1 ) sort_by = ["eV", "hv", "phi", "psi", "alpha", "theta", "beta", "chi"] old_dims = sorted( [k for k in arr.dims if k in (coord_names + ["eV"])], key=lambda item: sort_by.index(item) ) will_collapse = parallel_collapsible and collapse_parallel def expand_to(cname, c): if not isinstance(c, np.ndarray): return c index_list = [np.newaxis] * len(old_dims) index_list[old_dims.index(cname)] = slice(None, None) return c[tuple(index_list)] # build the full kinetic energy array over relevant dimensions kinetic_energy = ( expand_to("eV", raw_coords["eV"]) + expand_to("hv", raw_coords["hv"]) - arr.S.work_function ) kx, ky, kz = full_angles_to_k( kinetic_energy, inner_potential=arr.S.inner_potential, **{k: expand_to(k, v) for k, v in raw_angles.items()}, ) if will_collapse: if np.sum(kx ** 2) > np.sum(ky ** 2): sign = kx / np.sqrt(kx ** 2 + 1e-8) else: sign = ky / np.sqrt(ky ** 2 + 1e-8) kp = sign * np.sqrt(kx ** 2 + ky ** 2) data_vars = {"kp": (old_dims, np.squeeze(kp)), "kz": (old_dims, np.squeeze(kz))} else: data_vars = { "kx": (old_dims, np.squeeze(kx)), "ky": (old_dims, np.squeeze(ky)), "kz": (old_dims, np.squeeze(kx)), } return xr.Dataset(data_vars, coords=arr.indexes)
[docs]@update_provenance("Forward convert coordinates to momentum") def convert_coordinates_to_kspace_forward(arr: DataType, **kwargs): """Forward converts all the individual coordinates of the data array.""" arr = arr.copy(deep=True) skip = {"eV", "cycle", "delay", "T"} keep = { "eV", } all = {k: v for k, v in arr.indexes.items() if k not in skip} kept = {k: v for k, v in arr.indexes.items() if k in keep} old_dims = list(all.keys()) old_dims.sort() if not old_dims: return None dest_coords = { ("phi",): ["kp", "kz"], ("theta",): ["kp", "kz"], ("beta",): ["kp", "kz"], ("phi", "theta"): ["kx", "ky", "kz"], ("beta", "phi"): ["kx", "ky", "kz"], ("hv", "phi"): ["kx", "ky", "kz"], ("hv",): ["kp", "kz"], ("beta", "hv", "phi"): ["kx", "ky", "kz"], ("hv", "phi", "theta"): ["kx", "ky", "kz"], ("hv", "phi", "psi"): ["kx", "ky", "kz"], ("chi", "hv", "phi"): ["kx", "ky", "kz"], }.get(tuple(old_dims)) full_old_dims = old_dims + list(kept.keys()) projection_vectors = np.ndarray( shape=tuple(len(arr.coords[d]) for d in full_old_dims), dtype=object ) # these are a little special, depending on the scan type we might not have a phi coordinate # that aspect of this is broken for now, but we need not worry def broadcast_by_dim_location(data, target_shape, dim_location=None): if isinstance(data, xr.DataArray): if not data.dims: data = data.item() if isinstance( data, ( int, float, ), ): return np.ones(target_shape) * data # else we are dealing with an actual array the_slice = [None] * len(target_shape) the_slice[dim_location] = slice(None, None, None) return np.asarray(data)[the_slice] raw_coords = { "phi": arr.coords["phi"].values - arr.S.phi_offset, "beta": (0 if arr.coords["beta"] is None else arr.coords["beta"].values) - arr.S.beta_offset, "theta": (0 if arr.coords["theta"] is None else arr.coords["theta"].values) - arr.S.theta_offset, "hv": arr.coords["hv"], } raw_coords = { k: broadcast_by_dim_location( v, projection_vectors.shape, full_old_dims.index(k) if k in full_old_dims else None ) for k, v in raw_coords.items() } # fill in the vectors binding_energy = broadcast_by_dim_location( arr.coords["eV"] - arr.S.work_function, projection_vectors.shape, full_old_dims.index("eV") if "eV" in full_old_dims else None, ) photon_energy = broadcast_by_dim_location( arr.coords["hv"], projection_vectors.shape, full_old_dims.index("hv") if "hv" in full_old_dims else None, ) kinetic_energy = binding_energy + photon_energy inner_potential = arr.S.inner_potential # some notes on angle conversion: # BL4 conventions # angle conventions are standard: # phi = analyzer acceptance # polar = perpendicular scan angle # theta = parallel to analyzer slit rotation angle # [ 1 0 0 ] [ cos(polar) 0 sin(polar) ] [ 0 ] # [ 0 cos(theta) sin(theta) ] * [ 0 1 0 ] * [ k sin(phi) ] # [ 0 -sin(theta) cos(theta) ] [ -sin(polar) 0 cos(polar) ] [ k cos(phi) ] # # = # # [ 1 0 0 ] [ sin(polar) * cos(phi) ] # [ 0 cos(theta) sin(theta) ] * k [ sin(phi) ] # [ 0 -sin(theta) cos(theta) ] [ cos(polar) * cos(phi) ] # # = # # k ( sin(polar) * cos(phi), # cos(theta)*sin(phi) + cos(polar) * cos(phi) * sin(theta), # -sin(theta) * sin(phi) + cos(theta) * cos(polar) * cos(phi), # ) # # main chamber conventions, with no analyzer rotation (referred to as alpha angle in the Igor code # angle conventions are standard: # phi = analyzer acceptance # polar = perpendicular scan angle # theta = parallel to analyzer slit rotation angle # [ 1 0 0 ] [ sin(phi + theta) ] # [ 0 cos(polar) sin(polar) ] * k [ 0 ] # [ 0 -sin(polar) cos(polar) ] [ cos(phi + theta) ] # # = # # k (sin(phi + theta), cos(phi + theta) * sin(polar), cos(phi + theta) cos(polar), ) # # for now we are setting the theta angle to zero, this only has an effect for vertical slit analyzers, # and then only when the tilt angle is very large # TODO check me raw_translated = { "kx": euler_to_kx( kinetic_energy, raw_coords["phi"], raw_coords["beta"], theta=0, slit_is_vertical=arr.S.is_slit_vertical, ), "ky": euler_to_ky( kinetic_energy, raw_coords["phi"], raw_coords["beta"], theta=0, slit_is_vertical=arr.S.is_slit_vertical, ), "kz": euler_to_kz( kinetic_energy, raw_coords["phi"], raw_coords["beta"], theta=0, slit_is_vertical=arr.S.is_slit_vertical, inner_potential=inner_potential, ), } if "kp" in dest_coords: if np.sum(raw_translated["kx"] ** 2) > np.sum(raw_translated["ky"] ** 2): sign = raw_translated["kx"] / np.sqrt(raw_translated["kx"] ** 2 + 1e-8) else: sign = raw_translated["ky"] / np.sqrt(raw_translated["ky"] ** 2 + 1e-8) raw_translated["kp"] = np.sqrt(raw_translated["kx"] ** 2 + raw_translated["ky"] ** 2) * sign data_vars = {} for dest_coord in dest_coords: data_vars[dest_coord] = (full_old_dims, np.squeeze(raw_translated[dest_coord])) return xr.Dataset(data_vars, coords=arr.indexes)