From 3944006a939d6123fca21ffa3ec86bf342780391 Mon Sep 17 00:00:00 2001 From: mkuehbach Date: Thu, 29 Aug 2024 16:13:05 +0200 Subject: [PATCH] Reactivated and fixed IPF plotting, next apex --- src/pynxtools_em/methods/ebsd.py | 337 ++++++++-------------- src/pynxtools_em/parsers/hfive_base.py | 6 +- src/pynxtools_em/parsers/hfive_oxford.py | 5 +- src/pynxtools_em/utils/hfive_web_utils.py | 14 +- 4 files changed, 128 insertions(+), 234 deletions(-) diff --git a/src/pynxtools_em/methods/ebsd.py b/src/pynxtools_em/methods/ebsd.py index cd71599..ea0000a 100644 --- a/src/pynxtools_em/methods/ebsd.py +++ b/src/pynxtools_em/methods/ebsd.py @@ -19,7 +19,7 @@ import mmap import os -from typing import Any, Dict +from typing import Any, Dict, List import matplotlib.pyplot as plt # in the hope that this closes figures with orix plot import numpy as np @@ -113,24 +113,9 @@ def has_hfive_magic_header(file_path: str) -> bool: return False -def get_named_axis(inp: dict, dim: str) -> np.ndarray: - """Return scaled but not offset-calibrated scan point coordinates along dim.""" - # TODO::deprecate at some point - if square_grid(inp) or hexagonal_grid(inp): - # TODO::this code does not work for scaled and origin-offset scan point positions! - # TODO::below formula is only the same for sqr and hex grid if - # s_{dim_name} already accounts for the fact that typically s_y = sqrt(3)/2 s_x ! - return np.asarray( - np.linspace(0, inp[f"n_{dim}"] - 1, num=inp[f"n_{dim}"], endpoint=True) - * inp[f"s_{dim}"], - np.float32, - ) - return None - - def regrid_onto_equisized_scan_points( src_grid: EbsdPointCloud, max_edge_discr: int -) -> dict: +) -> EbsdPointCloud: """Discretize point cloud in R^d (d=1, 2, 3) and mark data to grid with equisized bins.""" if src_grid.dimensionality not in [1, 2]: @@ -209,19 +194,41 @@ def regrid_onto_equisized_scan_points( # coordinate is always 0., 0. ! if src_grid.dimensionality == 1: trg_pos = np.column_stack( - np.linspace(0, trg_n["x"] - 1, num=trg_n["x"], endpoint=True) * trg_s["x"], + np.linspace( + 0, + trg_n["x"] - 1, + num=trg_n["x"], + endpoint=True, + retstep=False, + dtype=np.float32, + ) + * trg_s["x"], 1, ) elif src_grid.dimensionality == 2: trg_pos = np.column_stack( ( np.tile( - np.linspace(0, trg_n["x"] - 1, num=trg_n["x"], endpoint=True) + np.linspace( + 0, + trg_n["x"] - 1, + num=trg_n["x"], + endpoint=True, + retstep=False, + dtype=np.float32, + ) * trg_s["x"], trg_n["y"], ), np.repeat( - np.linspace(0, trg_n["y"] - 1, num=trg_n["y"], endpoint=True) + np.linspace( + 0, + trg_n["y"] - 1, + num=trg_n["y"], + endpoint=True, + retstep=False, + dtype=np.float32, + ) * trg_s["y"], trg_n["x"], ), @@ -325,14 +332,15 @@ def ebsd_roi_overview(inp: EbsdPointCloud, id_mgn: dict, template: dict) -> dict template[f"{trg}/AXISNAME[axis_{dim}]"] = { "compress": np.asarray( np.linspace(0, trg_grid.n[dim] - 1, num=trg_grid.n[dim], endpoint=True) - * trg_grid.s[dim] + * trg_grid.s[dim].magnitude ), "strength": 1, } + template[f"{trg}/AXISNAME[axis_{dim}]/@units"] = f"{trg_grid.s[dim].units}" template[f"{trg}/AXISNAME[axis_{dim}]/@long_name"] = ( - f"Point coordinate along {dim}-axis ({trg_grid.pos[dim].units})" + f"Point coordinate along {dim}-axis ({trg_grid.s[dim].units})" ) - template[f"{trg}/AXISNAME[axis_{dim}]/@units"] = f"{trg_grid.pos[dim].units}" + template[f"{trg}/@axes"] = [] for dim in dims[::-1]: template[f"{trg}/@axes"].append(f"axis_{dim}") @@ -363,9 +371,8 @@ def ebsd_roi_overview(inp: EbsdPointCloud, id_mgn: dict, template: dict) -> dict return template -def ebsd_roi_phase_ipf(inp: dict, id_mgn: dict, template: dict) -> dict: - dimensionality = inp["dimensionality"] - print(f"Parse crystal_structure_models aka phases {dimensionality}D version...") +def ebsd_roi_phase_ipf(inp: EbsdPointCloud, id_mgn: dict, template: dict) -> dict: + """Create for each phase three inverse pole figures (IPF) maps projected along X, Y using for each map only the respective scan points that were indexed for this phase.""" nxem_phase_id = 0 prfx = f"/ENTRY[entry{id_mgn['entry_id']}]/ROI[roi{id_mgn['roi_id']}]/ebsd/indexing" # bookkeeping for how many scan points solutions were found is always for src_grid @@ -373,42 +380,46 @@ def ebsd_roi_phase_ipf(inp: dict, id_mgn: dict, template: dict) -> dict: # for the purpose of showing users a readily consumable default plot # to judge for each possible dataset in the same way if the # dataset is worthwhile and potentially valuable for ones on research - n_pts = 0 - if dimensionality == 2: - n_pts = inp["n_x"] * inp["n_y"] - elif dimensionality == 3: - n_pts = inp["n_x"] * inp["n_y"] * inp["n_z"] - else: - raise ValueError(f"Hitting unimplemented case {dimensionality} !") - if n_pts == 0: - print(f"No grid points !") + n_pts = 1 + dims = ["x", "y", "z"][0 : inp.dimensionality] + for dim in dims: + n_pts *= inp.n[dim] + if n_pts == 1: + print(f"Spot measurements are currently not supported !") return template + if n_pts >= np.iinfo(np.uint32).max: + print( + f"EBSD maps with more than {np.iinfo(np.uint32).max} scan points are currently not supported !" + ) + return template + + trg_grid: EbsdPointCloud = regrid_onto_equisized_scan_points( + inp, HFIVE_WEB_MAXIMUM_RGB + ) - n_pts_indexed = np.sum(inp["phase_id"] != 0) + n_pts_indexed = np.sum(inp.phase_id != 0) print(f"n_pts {n_pts}, n_pts_indexed {n_pts_indexed}") template[f"{prfx}/number_of_scan_points"] = np.uint32(n_pts) template[f"{prfx}/indexing_rate"] = np.float64(n_pts_indexed / n_pts) grp_name = f"{prfx}/phaseID[phase{nxem_phase_id}]" - template[f"{grp_name}/number_of_scan_points"] = np.uint32( - np.sum(inp["phase_id"] == 0) - ) + template[f"{grp_name}/number_of_scan_points"] = np.uint32(np.sum(inp.phase_id == 0)) template[f"{grp_name}/phase_identifier"] = np.uint32(nxem_phase_id) template[f"{grp_name}/phase_name"] = f"notIndexed" - print(f"----unique inp phase_id--->{np.unique(inp['phase_id'])}") - for nxem_phase_id in np.arange(1, np.max(np.unique(inp["phase_id"])) + 1): + print(f"----unique inp phase_id--->{np.unique(inp.phase_id)}") + for nxem_phase_id in np.arange(1, np.max(np.unique(inp.phase_id)) + 1): # starting here at ID 1 because the specific parsers have already normalized the # tech-partner specific phase_id conventions to follow the NXem NeXus convention # that is 0 is notIndexed, all other phase contiguously, start count from 1 - print(f"inp[phases].keys(): {inp['phases'].keys()}") - if nxem_phase_id not in inp["phases"]: - raise ValueError(f"{nxem_phase_id} is not a key in inp['phases'] !") + print(f"inp[phases].keys(): {inp.phases.keys()}") + if nxem_phase_id not in inp.phases: + raise KeyError(f"{nxem_phase_id} is not a key in inp['phases'] !") trg = f"{prfx}/phaseID[phase{nxem_phase_id}]" template[f"{trg}/number_of_scan_points"] = np.uint32( - np.sum(inp["phase_id"] == nxem_phase_id) + np.sum(inp.phase_id == nxem_phase_id) ) template[f"{trg}/phase_identifier"] = np.uint32(nxem_phase_id) - template[f"{trg}/phase_name"] = f"{inp['phases'][nxem_phase_id]['phase_name']}" + template[f"{trg}/phase_name"] = f"{inp.phases[nxem_phase_id]['phase_name']}" # internally the following function may discretize a coarser IPF # if the input grid inp is too large for h5web to display # this remove fine details in the EBSD maps but keep in mind @@ -416,47 +427,40 @@ def ebsd_roi_phase_ipf(inp: dict, id_mgn: dict, template: dict) -> dict: # of the potential usefulness of the dataset when searching in # a RDMS like NOMAD OASIS, the purpose is NOT to take the coarse-grained # discretization and use this for scientific data analysis! - process_roi_phase_ipfs_twod( - inp, - id_mgn["roi_id"], + process_roi_phase_ipf( + trg_grid, + id_mgn, nxem_phase_id, - inp["phases"][nxem_phase_id]["phase_name"], - inp["phases"][nxem_phase_id]["space_group"], + trg_grid.phases[nxem_phase_id]["phase_name"], + trg_grid.phases[nxem_phase_id]["space_group"], template, ) return template -def process_roi_phase_ipfs_twod( - inp: dict, - id_mgn: int, +def process_roi_phase_ipf( + trg_grid: EbsdPointCloud, + id_mgn: dict, nxem_phase_id: int, phase_name: str, space_group: int, template: dict, ) -> dict: - dimensionality = inp["dimensionality"] - if dimensionality == 2: - trg_grid = regrid_onto_equisized_scan_points(inp, HFIVE_WEB_MAXIMUM_RGB) - trg_shape = (trg_grid["n_y"], trg_grid["n_x"], 3) - trg_dims = (trg_grid["n_y"] * trg_grid["n_x"], 3) - elif dimensionality == 3: - trg_grid = inp - # grid would be required but havent see the plot limits exhausted because of the - # measurement time it would take collect such large maps especially in the - # serial-sectioning direction - trg_shape = (trg_grid["n_z"], trg_grid["n_y"], trg_grid["n_x"], 3) - trg_dims = (trg_grid["n_z"] * trg_grid["n_y"] * trg_grid["n_x"], 3) - else: - raise ValueError(f"Hitting unimplemented case {dimensionality} !") - print(f"Generate {dimensionality}D IPF maps for {nxem_phase_id}, {phase_name}...") + dims = ["x", "y", "z"] + n_pts: int = 1 + n_shape: List[int] = [] + for dim in dims[0 : trg_grid.dimensionality]: + n_pts *= trg_grid.n[dim] + for dim in dims[0 : trg_grid.dimensionality][::-1]: + n_shape.append(trg_grid.n[dim]) + n_shape.append(3) rotations = Rotation.from_euler( - euler=trg_grid["euler"][trg_grid["phase_id"] == nxem_phase_id], + euler=trg_grid.euler[trg_grid.phase_id == nxem_phase_id].magnitude, direction="lab2crystal", degrees=False, ) - print(f"shape rotations -----> {np.shape(rotations)}") + # print(f"shape rotations -----> {np.shape(rotations)}") for idx in np.arange(0, len(PROJECTION_VECTORS)): point_group = get_point_group(space_group, proper=False) @@ -469,21 +473,20 @@ def process_roi_phase_ipfs_twod( np.asarray(ipf_key.orientation2color(rotations) * 255.0, np.uint32), np.uint8, ) - print(f"shape rgb_px_with_phase_id -----> {np.shape(rgb_px_with_phase_id)}") + # print(f"shape rgb_px_with_phase_id -----> {np.shape(rgb_px_with_phase_id)}") - ipf_rgb_map = np.asarray( - np.asarray(np.zeros(trg_dims) * 255.0, np.uint32), np.uint8 - ) + ipf_rgb_map = np.zeros((n_pts, 3), np.uint8) # background is black instead of white (which would be more pleasing) # but IPF color maps have a whitepoint which encodes in fact an orientation # and because of that we may have a map from a single crystal characterization # whose orientation could be close to the whitepoint which becomes a fully white # seemingly "empty" image, therefore we use black as empty, i.e. white reports an # orientation - ipf_rgb_map[trg_grid["phase_id"] == nxem_phase_id, :] = rgb_px_with_phase_id - ipf_rgb_map = np.reshape(ipf_rgb_map, trg_shape, order="C") - # 0 is y, 1 is x, only valid for REGULAR_TILING and FLIGHT_PLAN ! - + ipf_rgb_map[trg_grid.phase_id == nxem_phase_id, :] = rgb_px_with_phase_id + ipf_rgb_map = np.reshape(ipf_rgb_map, n_shape, order="C") + # 3D, 0 > z, 1 > y, 2 > x + # 2D, 0 > y, 1 > x + # 1D, 0 > x trg = ( f"/ENTRY[entry{id_mgn['entry_id']}]/ROI[roi{id_mgn['roi_id']}]/ebsd/indexing" f"/phaseID[phase{nxem_phase_id}]/ipfID[ipf{idx + 1}]" @@ -492,40 +495,40 @@ def process_roi_phase_ipfs_twod( PROJECTION_VECTORS[idx].data.flatten(), np.float32 ) - # add the IPF color map + # add the IPF color map fundamental unit SO3 obeying crystal symmetry mpp = f"{trg}/map" template[f"{mpp}/title"] = ( f"Inverse pole figure {PROJECTION_DIRECTIONS[idx][0]} {phase_name}" ) template[f"{mpp}/@signal"] = "data" - dims = ["x", "y"] - if dimensionality == 3: - dims.append("z") template[f"{mpp}/@axes"] = [] - for dim in dims[::-1]: + for dim in dims[0 : trg_grid.dimensionality][::-1]: template[f"{mpp}/@axes"].append(f"axis_{dim}") - for enum, dim in enumerate(dims): - template[f"{mpp}/@AXISNAME_indices[axis_{dim}_indices]"] = np.uint32(enum) + for dim_idx, dim in enumerate(dims[0 : trg_grid.dimensionality]): + template[f"{mpp}/@AXISNAME_indices[axis_{dim}_indices]"] = np.uint32( + dim_idx + ) template[f"{mpp}/DATA[data]"] = {"compress": ipf_rgb_map, "strength": 1} hfive_web_decorate_nxdata(f"{mpp}/DATA[data]", template) - scan_unit = trg_grid["s_unit"] # TODO::this is not necessarily correct - # could be a scale-invariant synthetic microstructure whose simulation - # would work on multiple length-scales as atoms are not resolved directly! - if scan_unit == "um": - scan_unit = "µm" - for dim in dims: + # mind that EBSD map could be scale-invariant e.g. from synthetic microstructure + # simulation that could work on multiple length-scales as atoms are not resolved + # directly! + for dim in dims[0 : trg_grid.dimensionality]: template[f"{mpp}/AXISNAME[axis_{dim}]"] = { - "compress": get_named_axis(trg_grid, f"{dim}"), + "compress": np.asarray( + np.linspace( + 0, trg_grid.n[dim] - 1, num=trg_grid.n[dim], endpoint=True + ) + * trg_grid.s[dim].magnitude, + np.float32, + ), "strength": 1, } + template[f"{mpp}/AXISNAME[axis_{dim}]/@units"] = f"{trg_grid.s[dim].units}" template[f"{mpp}/AXISNAME[axis_{dim}]/@long_name"] = ( - f"Coordinate along {dim}-axis ({trg_grid['s_{dim}'].units})" - ) - template[f"{mpp}/AXISNAME[axis_{dim}]/@units"] = ( - f"{trg_grid['s_{dim}'].units}" + f"Point coordinate along {dim}-axis ({trg_grid.s[dim].units})" ) - # add the IPF color map legend/key lgd = f"{trg}/legend" template[f"{lgd}/title"] = ( @@ -534,141 +537,31 @@ def process_roi_phase_ipfs_twod( # template[f"{trg}/title"] = f"Inverse pole figure color key with SST" template[f"{lgd}/@signal"] = "data" template[f"{lgd}/@axes"] = [] - dims = ["x", "y"] + dims = ["x", "y"] # no longer the EBSD map just an RGB image of the legend! for dim in dims[::-1]: template[f"{lgd}/@axes"].append(f"axis_{dim}") enum = 0 - for dim in dims: - template[f"{lgd}/@AXISNAME_indices[axis_{dim}_indices]"] = np.uint32(enum) - enum += 1 + for dim_idx, dim in enumerate(dims): + template[f"{lgd}/@AXISNAME_indices[axis_{dim}_indices]"] = np.uint32( + dim_idx + ) template[f"{lgd}/data"] = {"compress": img, "strength": 1} hfive_web_decorate_nxdata(f"{lgd}/data", template) dims_idxs = {"x": 1, "y": 0} - for dim, enum in dims_idxs.items(): + for dim, dim_idx in dims_idxs.items(): template[f"{lgd}/AXISNAME[axis_{dim}]"] = { - "compress": np.asarray( - np.linspace( - 0, - np.shape(img)[enum] - 1, - num=np.shape(img)[enum], - endpoint=True, - ), - np.uint32, + "compress": np.linspace( + 0, + np.shape(img)[dim_idx] - 1, + num=np.shape(img)[dim_idx], + endpoint=True, + retstep=False, + dtype=np.uint32, ), "strength": 1, } template[f"{lgd}/AXISNAME[axis_{dim}]/@long_name"] = ( - f"Pixel coordinate along {dim[0]}-axis" + f"Pixel coordinate along {dim}-axis" ) return template - - -# considered as deprecated - - -def get_scan_point_axis_values(inp: dict, dim_name: str): - is_threed = False - if "dimensionality" in inp.keys(): - if inp["dimensionality"] == 3: - is_threed = True - req_keys = ["grid_type", f"n_{dim_name}", f"s_{dim_name}"] - for key in req_keys: - if key not in inp.keys(): - raise ValueError(f"Unable to find required key {key} in inp !") - - if inp["grid_type"] in [HEXAGONAL_FLAT_TOP_TILING, SQUARE_TILING]: - return np.asarray( - np.linspace( - 0, inp[f"n_{dim_name}"] - 1, num=inp[f"n_{dim_name}"], endpoint=True - ) - * inp[f"s_{dim_name}"], - np.float32, - ) - else: - return None - - -def threed(inp: dict): - """Identify if 3D triboolean.""" - if "dimensionality" in inp.keys(): - if inp["dimensionality"] == 3: - return True - return False - return None - - -def square_grid(inp: dict): - """Identify if square grid with specific assumptions.""" - if ( - inp["grid_type"] == SQUARE_TILING - and inp["tiling"] == REGULAR_TILING - and inp["flight_plan"] == FLIGHT_PLAN - ): - return True - return False - - -def hexagonal_grid(inp: dict): - """Identify if square grid with specific assumptions.""" - if ( - inp["grid_type"] == HEXAGONAL_FLAT_TOP_TILING - and inp["tiling"] == REGULAR_TILING - and inp["flight_plan"] == FLIGHT_PLAN - ): - return True - return False - - -def get_scan_point_coords(inp: dict) -> dict: - """Add scan_point_dim array assuming top-left to bottom-right snake style scanning.""" - is_threed = threed(inp) - req_keys = ["grid_type", "tiling", "flight_plan"] - dims = ["x", "y"] - if is_threed is True: - dims.append("z") - for dim in dims: - req_keys.append(f"n_{dim}") - req_keys.append(f"s_{dim}") - - for key in req_keys: - if key not in inp.keys(): - raise ValueError(f"Unable to find required key {key} in inp !") - - if is_threed is False: - if square_grid(inp) is True: - for dim in dims: - if "scan_point_{dim}" in inp.keys(): - print("WARNING::Overwriting scan_point_{dim} !") - inp["scan_point_x"] = np.tile( - np.linspace(0, inp["n_x"] - 1, num=inp["n_x"], endpoint=True) - * inp["s_x"], - inp["n_y"], - ) - inp["scan_point_y"] = np.repeat( - np.linspace(0, inp["n_y"] - 1, num=inp["n_y"], endpoint=True) - * inp["s_y"], - inp["n_x"], - ) - elif hexagonal_grid(inp) is True: - for dim in dims: - if "scan_point_{dim}" in inp.keys(): - print("WARNING::Overwriting scan_point_{dim} !") - # the following code is only the same as for the sqrgrid because - # typically the tech partners already take into account and export scan step - # values such that for a hexagonal grid one s_{dim} (typically s_y) is sqrt(3)/2*s_{other_dim} ! - inp["scan_point_x"] = np.tile( - np.linspace(0, inp["n_x"] - 1, num=inp["n_x"], endpoint=True) - * inp["s_x"], - inp["n_y"], - ) - inp["scan_point_y"] = np.repeat( - np.linspace(0, inp["n_y"] - 1, num=inp["n_y"], endpoint=True) - * inp["s_y"], - inp["n_x"], - ) - else: - print("WARNING::{__name__} facing an unknown scan strategy !") - else: - print("WARNING::{__name__} not implemented for 3D case !") - return inp diff --git a/src/pynxtools_em/parsers/hfive_base.py b/src/pynxtools_em/parsers/hfive_base.py index 2eab983..ef5dc01 100644 --- a/src/pynxtools_em/parsers/hfive_base.py +++ b/src/pynxtools_em/parsers/hfive_base.py @@ -56,10 +56,10 @@ def __init__(self, file_path: str = ""): # was written e.g. Oxford Instruments AZTec software in some version may generate # an instance of a file whose schema belongs to the H5OINA family of HDF5 container formats # specifically using version 5 - self.prfx = None + self.prfx: str = "" self.tmp: Dict = {} - self.source = None - self.file_path = None + self.source: str = "" + self.file_path: str = "" # collection of instance path self.groups: Dict = {} self.datasets: Dict = {} diff --git a/src/pynxtools_em/parsers/hfive_oxford.py b/src/pynxtools_em/parsers/hfive_oxford.py index 32be0a6..936dcea 100644 --- a/src/pynxtools_em/parsers/hfive_oxford.py +++ b/src/pynxtools_em/parsers/hfive_oxford.py @@ -43,7 +43,7 @@ def __init__(self, file_path: str = "", entry_id: int = 1, verbose: bool = False super().__init__(file_path) self.id_mgn: Dict[str, int] = {"entry_id": entry_id, "roi_id": 1} self.verbose = verbose - self.prfx = None # template path handling + self.prfx = "" # template path handling self.version: Dict = { # Dict[str, Dict[str, List[str]]] "trg": { "tech_partner": ["Oxford Instruments"], @@ -128,7 +128,8 @@ def parse(self, template: dict) -> dict: self.parse_and_normalize_slice_ebsd_phases(h5r) self.parse_and_normalize_slice_ebsd_data(h5r) ebsd_roi_overview(self.ebsd, self.id_mgn, template) - # ebsd_roi_phase_ipf(self.ebsd, self.id_mgn, template) + ebsd_roi_phase_ipf(self.ebsd, self.id_mgn, template) + self.id_mgn["roi_id"] += 1 # TODO:Vitesh example return template diff --git a/src/pynxtools_em/utils/hfive_web_utils.py b/src/pynxtools_em/utils/hfive_web_utils.py index 1821c40..4c7393d 100644 --- a/src/pynxtools_em/utils/hfive_web_utils.py +++ b/src/pynxtools_em/utils/hfive_web_utils.py @@ -20,10 +20,10 @@ import numpy as np -def hfive_web_decorate_nxdata(path: str, inp: dict) -> dict: - if f"{path}" in inp.keys(): - inp[f"{path}/@CLASS"] = f"IMAGE" # required by H5Web to plot RGB maps - inp[f"{path}/@IMAGE_VERSION"] = f"1.2" - inp[f"{path}/@SUBCLASS_VERSION"] = np.int64(15) - inp[f"{path}/@long_name"] = f"Signal" - return inp +def hfive_web_decorate_nxdata(path: str, template: dict) -> dict: + if f"{path}" in template: + template[f"{path}/@CLASS"] = f"IMAGE" # required by H5Web to plot RGB maps + template[f"{path}/@IMAGE_VERSION"] = f"1.2" + template[f"{path}/@SUBCLASS_VERSION"] = np.int64(15) + template[f"{path}/@long_name"] = f"Signal" + return template