Source code for PartSegCore.analysis.measurement_calculation

import warnings
from collections import OrderedDict
from collections.abc import Generator, Iterator, MutableMapping, Sequence
from contextlib import suppress
from enum import Enum
from functools import reduce
from math import pi
from typing import (
    Any,
    Callable,
    NamedTuple,
    Optional,
    Union,
)

import numpy as np
import pandas as pd
import SimpleITK
from local_migrator import register_class, rename_key
from mahotas.features import haralick
from pydantic import Field
from scipy.spatial.distance import cdist
from sympy import Rational, symbols

from PartSegCore import autofit as af
from PartSegCore.algorithm_describe_base import Register, ROIExtractionProfile
from PartSegCore.analysis.calculate_pipeline import calculate_segmentation_step
from PartSegCore.analysis.measurement_base import (
    AreaType,
    Leaf,
    MeasurementEntry,
    MeasurementMethodBase,
    Node,
    PerComponent,
    has_mask_components,
    has_roi_components,
)
from PartSegCore.mask_partition_utils import BorderRim, MaskDistanceSplit
from PartSegCore.roi_info import ROIInfo
from PartSegCore.segmentation.restartable_segmentation_algorithms import LowerThresholdAlgorithm
from PartSegCore.universal_const import UNIT_SCALE, Units
from PartSegCore.utils import BaseModel
from PartSegImage import Channel, Image

# TODO change image to channel in signature of measurement calculate_property

NO_COMPONENT = -1


[docs] class CorrelationEnum(str, Enum): pearson = "Pearson correlation coefficient" manders = "Mander's overlap coefficient" intensity = "Intensity correlation quotient" spearman = "Spearman rank correlation" def __str__(self): return self.value
[docs] class ProhibitedDivision(Exception): pass
[docs] class SettingsValue(NamedTuple): function: Callable help_message: str arguments: Optional[dict] is_component: bool default_area: Optional[AreaType] = None
[docs] class ComponentsInfo(NamedTuple): """ Class for storage information about relation between roi components and mask components :ivar numpy.ndarray roi_components: list of roi components :ivar numpy.ndarray mask_components: list of mask components :ivar Dict[int, List[int]] components_translation: mapping from roi components to mask components base on intersections """ roi_components: np.ndarray mask_components: np.ndarray components_translation: dict[int, list[int]] def has_components(self): return all(len(x) for x in self.components_translation.values())
[docs] def empty_fun(_a0=None, _a1=None): """This function is being used as dummy reporting function."""
MeasurementValueType = Union[float, list[float], str] MeasurementResultType = tuple[MeasurementValueType, str] MeasurementResultInputType = tuple[MeasurementValueType, str, tuple[PerComponent, AreaType]] FILE_NAME_STR = "File name"
[docs] class MeasurementResult(MutableMapping[str, MeasurementResultType]): """ Class for storage measurements info. """ def __init__(self, components_info: ComponentsInfo): self.components_info = components_info self._data_dict = OrderedDict() self._units_dict: dict[str, str] = {} self._type_dict: dict[str, tuple[PerComponent, AreaType]] = {} self._units_dict["Mask component"] = "" self._units_dict["Segmentation component"] = "" def __str__(self): # pragma: no cover return "".join( f"{key}: {val}; type {self._type_dict[key]}, units {self._units_dict[key]}\n" for key, val in self._data_dict.items() ) def __setitem__(self, k: str, v: MeasurementResultInputType) -> None: self._data_dict[k] = v[0] self._units_dict[k] = v[1] self._type_dict[k] = v[2] def __delitem__(self, v: str) -> None: del self._data_dict[v] del self._units_dict[v] del self._type_dict[v] def __getitem__(self, k: str) -> MeasurementResultType: return self._data_dict[k], self._units_dict[k] def __len__(self) -> int: return len(self._data_dict) def __iter__(self) -> Iterator[str]: return iter(self._data_dict) def to_dataframe(self, all_components=False) -> pd.DataFrame: data = self.get_separated(all_components) columns = [ f"{label} ({units})" if units else label for label, units in zip(self.get_labels(all_components=all_components), self.get_units(all_components)) ] df = pd.DataFrame(data, columns=columns, index=self.components_info.roi_components) if "Segmentation component" in df.columns: df = df.astype({"Segmentation component": int}).set_index("Segmentation component") return df
[docs] def set_filename(self, path_fo_file: str): """ Set name of file to be presented as first position. """ self._data_dict[FILE_NAME_STR] = path_fo_file self._type_dict[FILE_NAME_STR] = PerComponent.No, AreaType.ROI self._units_dict[FILE_NAME_STR] = "" self._data_dict.move_to_end(FILE_NAME_STR, False)
[docs] def get_component_info(self, all_components: bool = False) -> tuple[bool, bool]: """ Get information which type of components are in storage. :return: has_mask_components, has_segmentation_components """ if all_components and self.components_info.has_components(): return True, True return has_mask_components(self._type_dict.values()), has_roi_components(self._type_dict.values())
[docs] def get_labels(self, expand=True, all_components=False) -> list[str]: """ If expand is false return list of keys of this storage. Otherwise return labels for measurement. Base are keys of this storage. If has mask components, or has segmentation_components then add this labels """ if not expand: return list(self.keys()) mask_components, roi_components = self.get_component_info(all_components) labels = list(self._data_dict.keys()) index = 1 if FILE_NAME_STR in self._data_dict else 0 if mask_components: labels.insert(index, "Mask component") if roi_components: labels.insert(index, "Segmentation component") return labels
def get_units(self, all_components=False) -> list[str]: return [self._units_dict[x] for x in self.get_labels(all_components=all_components)]
[docs] def get_global_names(self): """Get names for only parameters which are not 'PerComponent.Yes'""" labels = list(self._data_dict.keys()) return [x for x in labels if self._type_dict[x][0] != PerComponent.Yes]
[docs] def get_global_parameters(self): """Get only parameters which are not 'PerComponent.Yes'""" if FILE_NAME_STR in self._data_dict: name = self._data_dict[FILE_NAME_STR] res = [name] iterator = iter(self._data_dict.keys()) with suppress(StopIteration): next(iterator) # skipcq: PTC-W0063 else: res = [] iterator = iter(self._data_dict.keys()) for el in iterator: per_comp = self._type_dict[el][0] val = self._data_dict[el] if per_comp not in {PerComponent.Yes, PerComponent.Per_Mask_component}: res.append(val) return res
def _get_component_info(self, mask_components, roi_components): if mask_components: if roi_components: translation = self.components_info.components_translation return [(x, y) for x in translation for y in translation[x]] return [(0, x) for x in self.components_info.mask_components] return [(x, 0) for x in self.components_info.roi_components] def _prepare_res_iterator(self, counts): if FILE_NAME_STR in self._data_dict: name = self._data_dict[FILE_NAME_STR] res = [[name] for _ in range(counts)] iterator = iter(self._data_dict.keys()) with suppress(StopIteration): next(iterator) # skipcq: PTC-W0063 else: res = [[] for _ in range(counts)] iterator = iter(self._data_dict.keys()) return res, iterator
[docs] def get_separated(self, all_components=False) -> list[list[MeasurementValueType]]: """Get measurements separated for each component""" mask_components, roi_components = self.get_component_info(all_components) if not mask_components and not roi_components: return [list(self._data_dict.values())] component_info = self._get_component_info(mask_components, roi_components) res, iterator = self._prepare_res_iterator(len(component_info)) for i, num in enumerate(component_info): if roi_components: res[i].append(num[0]) if mask_components: res[i].append(num[1]) mask_to_pos = {val: i for i, val in enumerate(self.components_info.mask_components)} segmentation_to_pos = {val: i for i, val in enumerate(self.components_info.roi_components)} for el in iterator: per_comp, area_type = self._type_dict[el] val = self._data_dict[el] for i, (seg, mask) in enumerate(component_info): if per_comp not in {PerComponent.Yes, PerComponent.Per_Mask_component}: res[i].append(val) elif area_type == AreaType.ROI: res[i].append(val[segmentation_to_pos[seg]]) else: res[i].append(val[mask_to_pos[mask]]) return res
[docs] class MeasurementProfile(BaseModel): name: str chosen_fields: list[MeasurementEntry] name_prefix: str = "" @property def _need_mask(self): return any(cf_val.calculation_tree.need_mask() for cf_val in self.chosen_fields)
[docs] def to_dict(self): # pragma: no cover warnings.warn( f"{self.__class__.__name__}.to_dict is deprecated. Use as_dict instead", category=FutureWarning, stacklevel=2, ) return dict(self)
def _need_mask_without_segmentation(self, tree): if isinstance(tree, Leaf): return tree.area == AreaType.Mask_without_ROI return self._need_mask_without_segmentation(tree.left) or self._need_mask_without_segmentation(tree.right) def _get_par_component_and_area_type(self, tree: Union[Node, Leaf]) -> tuple[PerComponent, AreaType]: if isinstance(tree, Leaf): method = MEASUREMENT_DICT[tree.name] area_type = method.area_type(tree.area) if tree.per_component == PerComponent.Mean: return PerComponent.No, area_type if tree.per_component == PerComponent.Per_Mask_component: return tree.per_component, AreaType.Mask return tree.per_component, area_type left_par, left_area = self._get_par_component_and_area_type(tree.left) right_par, right_area = self._get_par_component_and_area_type(tree.left) if PerComponent.Yes in [left_par, right_par]: res_par = PerComponent.Yes else: res_par = PerComponent.No area_set = {left_area, right_area} if len(area_set) == 1: res_area = area_set.pop() elif AreaType.ROI in area_set: res_area = AreaType.ROI else: res_area = AreaType.Mask_without_ROI return res_par, res_area
[docs] def get_channels_num(self) -> set[Channel]: resp = set() for el in self.chosen_fields: resp.update(el.get_channel_num(MEASUREMENT_DICT)) return resp
def __str__(self): text = f"Set name: {self.name}\n" if self.name_prefix: text += f"Name prefix: {self.name_prefix}\n" text += "Measurements list:\n" for el in self.chosen_fields: text += f"{el.name}\n" return text
[docs] def get_component_info(self, unit: Units): """ :return: list[((str, str), bool)] """ # Fixme remove binding to 3 dimensions return [ ( (self.name_prefix + el.name, el.get_unit(unit, 3)), self._is_component_measurement(el.calculation_tree), ) for el in self.chosen_fields ]
[docs] def is_any_mask_measurement(self): return any(el.calculation_tree.need_mask() for el in self.chosen_fields)
def _is_component_measurement(self, node): if isinstance(node, Leaf): return node.per_component in {PerComponent.Yes, PerComponent.Per_Mask_component} return self._is_component_measurement(node.left) or self._is_component_measurement(node.right) @staticmethod def _prepare_leaf_kw(node, kwargs, method, area_type): area_type_dict = { AreaType.Mask: "mask", AreaType.Mask_without_ROI: "mask_without_segmentation", AreaType.ROI: "segmentation", } kw = dict(kwargs) kw.update(dict(node.parameters)) if node.channel is not None: kw["channel"] = kw[f"channel_{node.channel}"] kw["channel_num"] = node.channel else: kw["channel_num"] = -1 kw["_area"] = node.area kw["_per_component"] = node.per_component kw["_cache"] = True # TODO remove cache argument kw["area_array"] = kw[area_type_dict[area_type]] kw["_component_num"] = NO_COMPONENT return kw def _clip_arrays(self, kw, node: Leaf, method: MeasurementMethodBase, component_index: int): if node.area != AreaType.ROI or method.need_full_data(): bounds = tuple(slice(None, None) for _ in kw["area_array"].shape) elif node.per_component == PerComponent.Per_Mask_component: bounds = tuple(kw["mask_bound_info"][component_index].get_slices(margin=1)) else: bounds = tuple(kw["bounds_info"][component_index].get_slices(margin=1)) kw2 = kw.copy() component_mark_area = ( kw2["area_array"] if node.per_component != PerComponent.Per_Mask_component else kw2["mask"] ) kw2["_component_num"] = component_index area_array = kw2["area_array"][bounds].copy() area_array[component_mark_area[bounds] != component_index] = 0 kw2["area_array"] = area_array im_bounds = list(bounds) image: Image = kw["image"] im_bounds.insert(image.time_pos, slice(None)) kw2["image"] = image.cut_image(tuple(im_bounds)) for name in ["channel", "segmentation", "roi", "mask"] + [f"channel_{num}" for num in self.get_channels_num()]: if kw[name] is not None: kw2[name] = kw[name][bounds] kw2["roi_alternative"] = kw2["roi_alternative"].copy() for name, array in kw2["roi_alternative"].items(): kw2["roi_alternative"][name] = array[bounds] return kw2 def _calculate_leaf_value( self, node: Union[Node, Leaf], segmentation_mask_map: ComponentsInfo, kwargs: dict ) -> Union[float, np.ndarray]: method: MeasurementMethodBase = MEASUREMENT_DICT[node.name] kw = self._prepare_leaf_kw(node, kwargs, method, method.area_type(node.area)) if node.per_component == PerComponent.No: return method.calculate_property(**kw) # TODO use cache for per component calculate val = [] if method.area_type(node.area) == AreaType.ROI and node.per_component != PerComponent.Per_Mask_component: components = segmentation_mask_map.roi_components else: components = segmentation_mask_map.mask_components for i in components: kw2 = self._clip_arrays(kw, node, method, i) val.append(method.calculate_property(**kw2)) val = np.array(val) if node.per_component == PerComponent.Mean: val = np.mean(val) if val.size else 0 return val def _calculate_leaf( self, node: Leaf, segmentation_mask_map: ComponentsInfo, help_dict: dict, kwargs: dict ) -> tuple[Union[float, np.ndarray], symbols, AreaType]: method: MeasurementMethodBase = MEASUREMENT_DICT[node.name] hash_str = hash_fun_call_name( method, node.parameters, node.area, node.per_component, node.channel, NO_COMPONENT ) area_type = method.area_type(node.area) if node.per_component == PerComponent.Per_Mask_component: area_type = AreaType.Mask if hash_str in help_dict: val = help_dict[hash_str] else: kwargs["help_dict"] = help_dict val = self._calculate_leaf_value(node, segmentation_mask_map, kwargs) help_dict[hash_str] = val unit: symbols = method.get_units(3) if kwargs["image"].is_stack else method.get_units(2) if node.power != 1: return pow(val, node.power), pow(unit, Rational(node.power)), area_type return val, unit, area_type def _calculate_node( self, node: Node, segmentation_mask_map: ComponentsInfo, help_dict: dict, kwargs: dict ) -> tuple[Union[float, np.ndarray], symbols, AreaType]: if node.op != "/": raise ValueError(f"Wrong measurement: {node}") left_res, left_unit, left_area = self.calculate_tree(node.left, segmentation_mask_map, help_dict, kwargs) right_res, right_unit, right_area = self.calculate_tree(node.right, segmentation_mask_map, help_dict, kwargs) if not (isinstance(left_res, np.ndarray) and isinstance(right_res, np.ndarray) and left_area != right_area): return left_res / right_res, left_unit / right_unit, left_area area_set = {left_area, right_area} if area_set == {AreaType.ROI, AreaType.Mask_without_ROI}: # pragma: no cover raise ProhibitedDivision("This division is prohibited") if area_set == {AreaType.Mask, AreaType.Mask_without_ROI}: return left_res / right_res, left_unit / right_unit, AreaType.Mask_without_ROI # if area_set == {AreaType.ROI, AreaType.Mask}: res = [] if left_area == AreaType.Mask: roi_res, mask_res = right_res, left_res else: roi_res, mask_res = left_res, right_res for val, num in zip(roi_res, segmentation_mask_map.roi_components): mask_components = segmentation_mask_map.components_translation[num] if len(mask_components) != 1: # pragma: no cover raise ProhibitedDivision("Cannot calculate when object do not belongs to one mask area") if left_area == AreaType.ROI: res.append(val / mask_res[mask_components[0] - 1]) else: res.append(mask_res[mask_components[0] - 1] / val) res = np.array(res) return res, left_unit / right_unit, AreaType.ROI # TODO check this
[docs] def calculate_tree( self, node: Union[Node, Leaf], segmentation_mask_map: ComponentsInfo, help_dict: dict, kwargs: dict ) -> tuple[Union[float, np.ndarray], symbols, AreaType]: """ Main function for calculation tree of measurements. It is executed recursively :param node: measurement to calculate :param segmentation_mask_map: map from mask segmentation components to mask components. Needed for division :param help_dict: dict to cache calculation result. It reduce recalculations of same measurements. :param kwargs: additional info needed by measurements :return: measurement value """ if isinstance(node, Leaf): return self._calculate_leaf(node, segmentation_mask_map, help_dict, kwargs) if isinstance(node, Node): return self._calculate_node(node, segmentation_mask_map, help_dict, kwargs) raise ValueError(f"Node {node} need to be instance of Leaf or Node")
[docs] @staticmethod def get_segmentation_to_mask_component(segmentation: np.ndarray, mask: Optional[np.ndarray]) -> ComponentsInfo: """ Calculate map from segmentation component num to mask component num :param segmentation: numpy array with segmentation labeled as positive integers :param mask: numpy array with mask labeled as positive integer :return: map """ components = np.unique(segmentation) if components[0] == 0 or components[0] is None: components = components[1:] mask_components = np.unique(mask) if mask_components[0] == 0 or mask_components[0] is None: mask_components = mask_components[1:] res = OrderedDict() if mask is None: res = {i: [] for i in components} elif np.max(mask) == 1: res = {i: [1] for i in components} else: for num in components: res[num] = list(np.unique(mask[segmentation == num])) if res[num][0] == 0: res[num] = res[num][1:] return ComponentsInfo(components, mask_components, res)
[docs] def get_component_and_area_info(self) -> list[tuple[PerComponent, AreaType]]: """For each measurement check if is per component and in which types""" res = [] for el in self.chosen_fields: tree = el.calculation_tree res.append(self._get_par_component_and_area_type(tree)) return res
[docs] def get_segmentation_mask_map(self, image: Image, roi: Union[np.ndarray, ROIInfo], time: int = 0) -> ComponentsInfo: def get_time(array: np.ndarray): if array is not None and array.ndim == 4: return array.take(time, axis=image.time_pos) return array return self.get_segmentation_to_mask_component( get_time(roi if isinstance(roi, np.ndarray) else roi.roi), get_time(image.mask) )
[docs] def calculate( self, image: Image, channel_num: int, roi: Union[np.ndarray, ROIInfo], result_units: Units, range_changed: Callable[[int, int], Any] = empty_fun, step_changed: Callable[[int], Any] = empty_fun, time: int = 0, ) -> MeasurementResult: """ Calculate measurements on given set of parameters :param image: image on which measurements should be calculated :param channel_num: channel number on which measurements should be calculated :param roi: array with segmentation labeled as positive integers :param result_units: units which should be used to present results. :param range_changed: callback function to set information about steps range :param step_changed: callback function for set information about steps done :param time: which data point should be measured :return: measurements """ segmentation_mask_map = self.get_segmentation_mask_map(image, roi, time) result = MeasurementResult(segmentation_mask_map) range_changed(0, len(self.chosen_fields)) for i, (name, data) in enumerate( self.calculate_yield( image=image, channel_num=channel_num, roi=roi, result_units=result_units, segmentation_mask_map=segmentation_mask_map, time=time, ), start=1, ): result[name] = data step_changed(i) return result
[docs] def calculate_yield( self, image: Image, channel_num: int, roi: Union[np.ndarray, ROIInfo], result_units: Units, segmentation_mask_map: ComponentsInfo, time: int = 0, ) -> Generator[MeasurementResultInputType, None, None]: """ Calculate measurements on given set of parameters :param image: image on which measurements should be calculated :param roi: array with segmentation labeled as positive integers :param result_units: units which should be used to present results. :param segmentation_mask_map: information which component of roi belongs to which mask component. :param time: which data point should be measured :return: measurements """ def get_time(array: np.ndarray): if array is not None and array.ndim == 4: return array.take(time, axis=image.time_pos) return array if self._need_mask and image.mask is None: raise ValueError("measurement need mask") channel = image.get_channel(channel_num).astype(float) cache_dict = {} result_scalar = UNIT_SCALE[result_units.value] if isinstance(roi, np.ndarray): roi = ROIInfo(roi).fit_to_image(image) mask_bound_info = None if isinstance(image.mask, np.ndarray): mask_bound_info = { k: v.del_dim(image.time_pos) if len(v.lower) == 4 else v for k, v in ROIInfo(image.mask).fit_to_image(image).bound_info.items() } roi_alternative = {} for name, array in roi.alternative.items(): roi_alternative[name] = get_time(array) kw = { "image": image, "channel": get_time(channel), "segmentation": get_time(roi.roi), "roi": get_time(roi.roi), "bounds_info": { k: v.del_dim(image.time_pos) if len(v.lower) == 4 else v for k, v in roi.bound_info.items() }, "mask_bound_info": mask_bound_info, "mask": get_time(image.mask), "voxel_size": image.spacing, "result_scalar": result_scalar, "roi_alternative": roi_alternative, "roi_annotation": roi.annotations, } for num in self.get_channels_num(): kw[f"channel_{num}"] = get_time(image.get_channel(num)) if any(self._need_mask_without_segmentation(el.calculation_tree) for el in self.chosen_fields): mm = kw["mask"].copy() mm[kw["segmentation"] > 0] = 0 kw["mask_without_segmentation"] = mm for entry in self.chosen_fields: name = self.name_prefix + entry.name yield name, self._calc_single_field(entry, segmentation_mask_map, cache_dict, kw, result_units)
def _calc_single_field( self, entry: MeasurementEntry, segmentation_mask_map: ComponentsInfo, cache_dict: dict, additional_args: dict, result_units, ): tree = entry.calculation_tree component_and_area = self._get_par_component_and_area_type(tree) try: val, unit, _area = self.calculate_tree(tree, segmentation_mask_map, cache_dict, additional_args) if isinstance(val, np.ndarray): val = list(val) return val, str(unit).format(str(result_units)), component_and_area except ZeroDivisionError: # pragma: no cover return "Div by zero", "", component_and_area except TypeError as e: # pragma: no cover if e.args[0].startswith("unsupported operand type(s) for /:"): return "None div", "", component_and_area raise e except AttributeError: # pragma: no cover return "No attribute", "", component_and_area except ProhibitedDivision as e: # pragma: no cover return e.args[0], "", component_and_area
def calculate_main_axis(area_array: np.ndarray, channel: np.ndarray, voxel_size): # TODO check if it produces good values if len(channel.shape) == 4: if channel.shape[0] != 1: raise ValueError("This measurements do not support time data") channel = channel[0] cut_img = np.copy(channel) cut_img[area_array == 0] = 0 if np.all(cut_img == 0): return (0,) * len(voxel_size) orientation_matrix, _ = af.find_density_orientation(cut_img, voxel_size, 1) center_of_mass = af.density_mass_center(cut_img, voxel_size) positions = np.array(np.nonzero(cut_img), dtype=np.float64) for i, v in enumerate(reversed(voxel_size), start=1): positions[-i] *= v positions[-i] -= center_of_mass[i - 1] centered = np.dot(orientation_matrix.T, positions) return np.max(centered, axis=1) - np.min(centered, axis=1) def get_main_axis_length( index: int, area_array: np.ndarray, channel: np.ndarray, voxel_size, result_scalar, _cache=False, **kwargs ): if _cache and "_area" in kwargs and "_per_component" in kwargs and "channel_num" in kwargs: help_dict: dict = kwargs["help_dict"] _area: AreaType = kwargs["_area"] _per_component: PerComponent = kwargs["_per_component"] hash_name = hash_fun_call_name( calculate_main_axis, {}, _area, _per_component, kwargs["channel_num"], kwargs["_component_num"] ) if hash_name not in help_dict: help_dict[hash_name] = calculate_main_axis(area_array, channel, [x * result_scalar for x in voxel_size]) return help_dict[hash_name][index] return calculate_main_axis(area_array, channel, [x * result_scalar for x in voxel_size])[index]
[docs] def hash_fun_call_name( fun: Union[Callable, MeasurementMethodBase], arguments: dict, area: AreaType, per_component: PerComponent, channel: Channel, components_num: int, ) -> str: """ Calculate string for properly cache measurements result. :param fun: method for which hash string should be calculated :param arguments: its additional arguments :param area: type of rea :param per_component: If it is per component :param channel: channel number on which calculation is performed :return: unique string for such set of arguments """ if hasattr(fun, "__module__"): fun_name = f"{fun.__module__}.{fun.__name__}" else: fun_name = fun.__name__ return f"{fun_name}: {arguments} # {area} & {per_component} * {channel} ^ {components_num}"
[docs] class Volume(MeasurementMethodBase): text_info = "Volume", "Calculate volume of current segmentation"
[docs] @classmethod def calculate_property(cls, area_array, voxel_size, result_scalar, **_): # pylint: disable=arguments-differ return np.count_nonzero(area_array) * pixel_volume(voxel_size, result_scalar)
[docs] @classmethod def get_units(cls, ndim): return symbols("{}") ** ndim
[docs] class Voxels(MeasurementMethodBase): text_info = "Voxels", "Calculate number of voxels of current segmentation"
[docs] @classmethod def calculate_property(cls, area_array, **_): # pylint: disable=arguments-differ return np.count_nonzero(area_array)
[docs] @classmethod def get_units(cls, ndim): return symbols("1")
# From Malandain, G., & Boissonnat, J. (2002). Computing the diameter of a point set, # 12(6), 489-509. https://doi.org/10.1142/S0218195902001006
[docs] def double_normal(point_index: int, point_positions: np.ndarray, points_array: np.ndarray): """ :param point_index: index of starting points :param point_positions: points array of size (points_num, number of dimensions) :param points_array: bool matrix with information about which points are in set :return: """ delta = 0 dn = 0, 0 while True: new_delta = delta points_array[point_index] = 0 dist_array = np.sum(np.array((point_positions - point_positions[point_index]) ** 2), 1) dist_array[points_array == 0] = 0 point2_index = np.argmax(dist_array) if dist_array[point2_index] > new_delta: delta = dist_array[point2_index] dn = point_index, point2_index point_index = point2_index if new_delta == delta: return dn, delta
[docs] def iterative_double_normal(points_positions: np.ndarray): """ :param points_positions: points array of size (points_num, number of dimensions) :return: square power of diameter, 2-tuple of points index gave information which points ar chosen """ delta = 0 dn = 0, 0 point_index = 0 points_array = np.ones(points_positions.shape[0], dtype=bool) while True: dn_r, delta_r = double_normal(point_index, points_positions, points_array) if delta_r > delta: delta = delta_r dn = dn_r mid_point = (points_positions[dn[0]] + points_positions[dn[1]]) / 2 dist_array = np.sum(np.array((points_positions - mid_point) ** 2), 1) dist_array[~points_array] = 0 if np.any(dist_array >= delta / 4): point_index = np.argmax(dist_array) else: break else: break return delta, dn
[docs] class Diameter(MeasurementMethodBase): """ Class for calculate diameter of ROI in fast way. From Malandain, G., & Boissonnat, J. (2002). Computing the diameter of a point set, 12(6), 489-509. https://doi.org/10.1142/S0218195902001006 """ text_info = "Diameter", "Diameter of area"
[docs] @staticmethod def calculate_property(area_array, voxel_size, result_scalar, **_): # pylint: disable=arguments-differ pos = np.transpose(np.nonzero(get_border(area_array))).astype(float) if pos.size == 0: return 0 for i, val in enumerate((x * result_scalar for x in reversed(voxel_size)), start=1): pos[:, -i] *= val diam_sq = iterative_double_normal(pos)[0] return np.sqrt(diam_sq)
[docs] @classmethod def get_units(cls, ndim): return symbols("{}")
[docs] class DiameterOld(MeasurementMethodBase): # pragma: no cover """ n**2 calculate diameter of ROI """ text_info = "Diameter old", "Diameter of area (Very slow)"
[docs] @staticmethod def calculate_property(area_array, voxel_size, result_scalar, **_): # pylint: disable=arguments-differ return calc_diam(get_border(area_array), [x * result_scalar for x in voxel_size])
[docs] @classmethod def get_units(cls, ndim): return symbols("{}")
[docs] class PixelBrightnessSum(MeasurementMethodBase): text_info = "Pixel brightness sum", "Sum of pixel brightness for current segmentation"
[docs] @staticmethod def calculate_property(area_array: np.ndarray, channel: np.ndarray, **_): # pylint: disable=arguments-differ """ :param area_array: mask for area :param channel: data. same shape like area_type :return: Pixels brightness sum on given area """ if area_array.shape != channel.shape: if area_array.size == channel.size: channel = channel.reshape(area_array.shape) else: # pragma: no cover raise ValueError(f"channel ({channel.shape}) and mask ({area_array.shape}) do not fit each other") return np.sum(channel[area_array > 0]) if np.any(area_array) else 0
[docs] @classmethod def get_units(cls, ndim): return symbols("Pixel_brightness")
[docs] @classmethod def need_channel(cls): return True
[docs] class ComponentsNumber(MeasurementMethodBase): text_info = "Components number", "Calculate number of connected components on segmentation"
[docs] @staticmethod def calculate_property(area_array, **_): # pylint: disable=arguments-differ return np.unique(area_array).size - 1
[docs] @classmethod def get_units(cls, ndim): return symbols("count")
[docs] class MaximumPixelBrightness(MeasurementMethodBase): text_info = "Maximum pixel brightness", "Calculate maximum pixel brightness for current area"
[docs] @staticmethod def calculate_property(area_array, channel, **_): # pylint: disable=arguments-differ if area_array.shape != channel.shape: # pragma: no cover raise ValueError(f"channel ({channel.shape}) and mask ({area_array.shape}) do not fit each other") return np.max(channel[area_array > 0]) if np.any(area_array) else 0
[docs] @classmethod def get_units(cls, ndim): return symbols("Pixel_brightness")
[docs] @classmethod def need_channel(cls): return True
[docs] class MinimumPixelBrightness(MeasurementMethodBase): text_info = "Minimum pixel brightness", "Calculate minimum pixel brightness for current area"
[docs] @staticmethod def calculate_property(area_array, channel, **_): # pylint: disable=arguments-differ if area_array.shape != channel.shape: # pragma: no cover raise ValueError("channel and mask do not fit each other") return np.min(channel[area_array > 0]) if np.any(area_array) else 0
[docs] @classmethod def get_units(cls, ndim): return symbols("Pixel_brightness")
[docs] @classmethod def need_channel(cls): return True
[docs] class MeanPixelBrightness(MeasurementMethodBase): text_info = "Mean pixel brightness", "Calculate mean pixel brightness for current area"
[docs] @staticmethod def calculate_property(area_array, channel, **_): # pylint: disable=arguments-differ if area_array.shape != channel.shape: # pragma: no cover raise ValueError("channel and mask do not fit each other") return np.mean(channel[area_array > 0]) if np.any(area_array) else 0
[docs] @classmethod def get_units(cls, ndim): return symbols("Pixel_brightness")
[docs] @classmethod def need_channel(cls): return True
[docs] class MedianPixelBrightness(MeasurementMethodBase): text_info = "Median pixel brightness", "Calculate median pixel brightness for current area"
[docs] @staticmethod def calculate_property(area_array, channel, **_): # pylint: disable=arguments-differ if area_array.shape != channel.shape: # pragma: no cover raise ValueError("channel and mask do not fit each other") return np.median(channel[area_array > 0]) if np.any(area_array) else 0
[docs] @classmethod def get_units(cls, ndim): return symbols("Pixel_brightness")
[docs] @classmethod def need_channel(cls): return True
[docs] class StandardDeviationOfPixelBrightness(MeasurementMethodBase): text_info = ( "Standard deviation of pixel brightness", "Calculate standard deviation of pixel brightness for current area", )
[docs] @staticmethod def calculate_property(area_array, channel, **_): # pylint: disable=arguments-differ if area_array.shape != channel.shape: # pragma: no cover raise ValueError("channel and mask do not fit each other") return np.std(channel[area_array > 0]) if np.any(area_array) else 0
[docs] @classmethod def get_units(cls, ndim): return symbols("Pixel_brightness")
[docs] @classmethod def need_channel(cls): return True
[docs] class Moment(MeasurementMethodBase): text_info = "Moment", "Calculate moment of segmented structure"
[docs] @staticmethod def calculate_property(area_array, channel, voxel_size, **_): # pylint: disable=arguments-differ if len(channel.shape) == 4: if channel.shape[0] != 1: # pragma: no cover raise ValueError("This measurements do not support time data") channel = channel[0] img = np.copy(channel) img[area_array == 0] = 0 if np.all(img == 0): return 0 return af.calculate_density_momentum(img, voxel_size)
[docs] @classmethod def get_units(cls, ndim): return symbols("{}") ** 2 * symbols("Pixel_brightness")
[docs] @classmethod def need_channel(cls): return True
[docs] class FirstPrincipalAxisLength(MeasurementMethodBase): text_info = "First principal axis length", "Length of first principal axis"
[docs] @staticmethod def calculate_property(**kwargs): # pylint: disable=arguments-differ return get_main_axis_length(0, **kwargs)
[docs] @classmethod def get_units(cls, ndim): return symbols("{}")
[docs] @classmethod def need_channel(cls): return True
[docs] class SecondPrincipalAxisLength(MeasurementMethodBase): text_info = "Second principal axis length", "Length of second principal axis"
[docs] @staticmethod def calculate_property(**kwargs): # pylint: disable=arguments-differ return get_main_axis_length(1, **kwargs)
[docs] @classmethod def get_units(cls, ndim): return symbols("{}")
[docs] @classmethod def need_channel(cls): return True
[docs] class ThirdPrincipalAxisLength(MeasurementMethodBase): text_info = "Third principal axis length", "Length of third principal axis"
[docs] @staticmethod def calculate_property(**kwargs): # pylint: disable=arguments-differ return get_main_axis_length(2, **kwargs)
[docs] @classmethod def get_units(cls, ndim): return symbols("{}")
[docs] @classmethod def need_channel(cls): return True
[docs] class Compactness(MeasurementMethodBase): text_info = "Compactness", "Calculate compactness off segmentation (Surface^1.5/volume)"
[docs] @staticmethod def calculate_property(**kwargs): # pylint: disable=arguments-differ if kwargs.get("_cache", False) and "help_dict" in kwargs and "_area" in kwargs and "_per_component" in kwargs: help_dict = kwargs["help_dict"] border_hash_str = hash_fun_call_name( Surface, {}, kwargs["_area"], kwargs["_per_component"], Channel(-1), kwargs["_component_num"] ) if border_hash_str not in help_dict: border_surface = Surface.calculate_property(**kwargs) help_dict[border_hash_str] = border_surface else: border_surface = help_dict[border_hash_str] volume_hash_str = hash_fun_call_name( Volume, {}, kwargs["_area"], kwargs["_per_component"], Channel(-1), kwargs["_component_num"] ) if volume_hash_str not in help_dict: volume = Volume.calculate_property(**kwargs) help_dict[volume_hash_str] = volume else: volume = help_dict[volume_hash_str] else: border_surface = Surface.calculate_property(**kwargs) volume = Volume.calculate_property(**kwargs) return border_surface**1.5 / volume
[docs] @classmethod def get_units(cls, ndim): return Surface.get_units(ndim) / Volume.get_units(ndim)
[docs] class Sphericity(MeasurementMethodBase): text_info = "Sphericity", "volume/(4/3 * π * radius **3) for 3d data and volume/(π * radius **2) for 2d data"
[docs] @staticmethod def calculate_property(**kwargs): # pylint: disable=arguments-differ if all(key in kwargs for key in ["help_dict", "_area", "_per_component"]) and ( "_cache" not in kwargs or kwargs["_cache"] ): help_dict = kwargs["help_dict"] else: help_dict = {} kwargs.update({"_area": AreaType.ROI, "_per_component": PerComponent.No, "_component_num": NO_COMPONENT}) volume_hash_str = hash_fun_call_name( Volume, {}, kwargs["_area"], kwargs["_per_component"], Channel(-1), kwargs["_component_num"] ) if volume_hash_str not in help_dict: volume = Volume.calculate_property(**kwargs) help_dict[volume_hash_str] = volume else: volume = help_dict[volume_hash_str] diameter_hash_str = hash_fun_call_name( Diameter, {}, kwargs["_area"], kwargs["_per_component"], Channel(-1), kwargs["_component_num"] ) if diameter_hash_str not in help_dict: diameter_val = Diameter.calculate_property(**kwargs) help_dict[diameter_hash_str] = diameter_val else: diameter_val = help_dict[diameter_hash_str] radius = diameter_val / 2 if kwargs["area_array"].shape[0] > 1: return volume / (4 / 3 * pi * (radius**3)) return volume / (pi * (radius**2))
[docs] @classmethod def get_units(cls, ndim): return Volume.get_units(ndim) / Diameter.get_units(ndim) ** ndim
[docs] class Surface(MeasurementMethodBase): text_info = "Surface", "Calculating surface of current segmentation"
[docs] @staticmethod def calculate_property(area_array, voxel_size, result_scalar, **_): # pylint: disable=arguments-differ return calculate_volume_surface(area_array, [x * result_scalar for x in voxel_size])
[docs] @classmethod def get_units(cls, ndim): return symbols("{}") ** 2
[docs] class RimVolume(MeasurementMethodBase): text_info = "rim volume", "Calculate volumes for elements in radius (in physical units) from mask" __argument_class__ = BorderRim.__argument_class__
[docs] @classmethod def get_starting_leaf(cls): return super().get_starting_leaf().replace_(area=AreaType.Mask)
[docs] @staticmethod def calculate_property(area_array, voxel_size, result_scalar, **kwargs): # pylint: disable=arguments-differ border_mask_array = BorderRim.border_mask(voxel_size=voxel_size, result_scalar=result_scalar, **kwargs) if border_mask_array is None: return None final_mask = np.array((border_mask_array > 0) * (area_array > 0)) return np.count_nonzero(final_mask) * pixel_volume(voxel_size, result_scalar)
[docs] @classmethod def get_units(cls, ndim): return symbols("{}") ** ndim
[docs] @staticmethod def area_type(area: AreaType): return AreaType.ROI
[docs] class RimPixelBrightnessSum(MeasurementMethodBase): text_info = ( "rim pixel brightness sum", "Calculate mass for components located within rim (in physical units) from mask", ) __argument_class__ = BorderRim.__argument_class__
[docs] @classmethod def get_starting_leaf(cls): return super().get_starting_leaf().replace_(area=AreaType.Mask)
[docs] @staticmethod def calculate_property(channel, area_array, **kwargs): # pylint: disable=arguments-differ if len(channel.shape) == 4: if channel.shape[0] != 1: # pragma: no cover raise ValueError("This measurements do not support time data") channel = channel[0] border_mask_array = BorderRim.border_mask(**kwargs) if border_mask_array is None: return None final_mask = np.array((border_mask_array > 0) * (area_array > 0)) return np.sum(channel[final_mask]) if np.any(final_mask) else 0
[docs] @classmethod def get_units(cls, ndim): return symbols("Pixel_brightness")
[docs] @classmethod def need_channel(cls): return True
[docs] @staticmethod def area_type(area: AreaType): return AreaType.ROI
[docs] @register_class(old_paths=["PartSeg.utils.analysis.statistics_calculation.DistancePoint"]) class DistancePoint(Enum): Border = 1 Mass_center = 2 Geometrical_center = 3 def __str__(self): return self.name.replace("_", " ")
[docs] @register_class(version="0.0.1", migrations=[("0.0.1", rename_key("distance_to_segmentation", "distance_to_roi"))]) class DistanceMaskROIParameters(BaseModel): distance_from_mask: DistancePoint = DistancePoint.Border distance_to_roi: DistancePoint = Field(DistancePoint.Border, title="Distance to ROI")
[docs] class DistanceMaskROI(MeasurementMethodBase): text_info = "ROI distance", "Calculate distance between ROI and mask" __argument_class__ = DistanceMaskROIParameters @staticmethod def calculate_points(channel, area_array, voxel_size, result_scalar, point_type: DistancePoint) -> np.ndarray: if point_type == DistancePoint.Border: area_pos = np.transpose(np.nonzero(get_border(area_array))).astype(float) area_pos += 0.5 for i, val in enumerate((x * result_scalar for x in reversed(voxel_size)), start=1): area_pos[:, -i] *= val elif point_type == DistancePoint.Mass_center: im = np.copy(channel) im[area_array == 0] = 0 area_pos = np.array([af.density_mass_center(im, voxel_size) * result_scalar]) else: area_pos = np.array([af.density_mass_center(area_array > 0, voxel_size) * result_scalar]) return area_pos
[docs] @classmethod def calculate_property( # pylint: disable=arguments-differ cls, channel, area_array, mask, voxel_size, result_scalar, distance_from_mask: DistancePoint, distance_to_roi: DistancePoint, *args, **kwargs, ): # pylint: disable=arguments-differ if len(channel.shape) == 4: if channel.shape[0] != 1: raise ValueError("This measurements do not support time data") channel = channel[0] if not (np.any(mask) and np.any(area_array)): return 0 mask_pos = cls.calculate_points(channel, mask, voxel_size, result_scalar, distance_from_mask) seg_pos = cls.calculate_points(channel, area_array, voxel_size, result_scalar, distance_to_roi) if 1 in {mask_pos.shape[0], seg_pos.shape[0]}: return np.min(cdist(mask_pos, seg_pos)) min_val = np.inf for i in range(seg_pos.shape[0]): min_val = min(min_val, np.min(cdist(mask_pos, np.array([seg_pos[i]])))) return min_val
[docs] @classmethod def get_starting_leaf(cls): return super().get_starting_leaf().replace_(area=AreaType.Mask)
[docs] @classmethod def get_units(cls, ndim): return symbols("{}")
[docs] @classmethod def need_channel(cls): return True
[docs] @staticmethod def area_type(area: AreaType): return AreaType.ROI
[docs] class DistanceROIROIParameters(BaseModel): profile: ROIExtractionProfile = Field( ROIExtractionProfile( name="default", algorithm=LowerThresholdAlgorithm.get_name(), values=LowerThresholdAlgorithm.get_default_values(), ), title="ROI extraction profile", ) distance_from_new_roi: DistancePoint = Field(DistancePoint.Border, title="Distance new ROI") distance_to_roi: DistancePoint = Field(DistancePoint.Border, title="Distance to ROI")
[docs] class DistanceROIROI(DistanceMaskROI): text_info = "to new ROI distance", "Calculate distance between ROI and new ROI" __argument_class__ = DistanceROIROIParameters
[docs] @classmethod def get_starting_leaf(cls): return Leaf(name=cls.text_info[0], area=AreaType.ROI)
# noinspection PyMethodOverriding
[docs] @classmethod def calculate_property( cls, channel: np.ndarray, image: Image, area_array: np.ndarray, profile: ROIExtractionProfile, mask: Optional[np.ndarray], voxel_size: Sequence[float], result_scalar: float, distance_from_new_roi: DistancePoint, distance_to_roi: DistancePoint, **kwargs, ): # pylint: disable=arguments-differ if len(channel.shape) == 4: if channel.shape[0] != 1: raise ValueError("This measurements do not support time data") channel = channel[0] try: hash_name = hash_fun_call_name( calculate_segmentation_step, profile, kwargs["_area"], kwargs["_per_component"], Channel(-1), kwargs["_component_num"], ) if hash_name in kwargs["help_dict"]: result = kwargs["help_dict"][hash_name] else: result, _ = calculate_segmentation_step(profile, image, mask) kwargs["help_dict"][hash_name] = result except KeyError: result, _ = calculate_segmentation_step(profile, image, mask) if np.any(result.roi[area_array > 0]): return 0 return super().calculate_property( channel, area_array, result.roi, tuple(voxel_size), result_scalar, distance_from_mask=distance_from_new_roi, distance_to_roi=distance_to_roi, )
@staticmethod def need_full_data(): return True
[docs] class ROINeighbourhoodROIParameters(BaseModel): profile: ROIExtractionProfile = Field( ROIExtractionProfile( name="default", algorithm=LowerThresholdAlgorithm.get_name(), values=LowerThresholdAlgorithm.get_default_values(), ), title="ROI extraction profile", ) distance: float = Field(500, ge=0, le=10000, title="Distance") units: Units = Units.nm
[docs] class ROINeighbourhoodROI(DistanceMaskROI): text_info = "Neighbourhood new ROI presence", "Count how many of new roi are present in neighbourhood of new ROI" __argument_class__ = ROINeighbourhoodROIParameters
[docs] @classmethod def get_starting_leaf(cls): return Leaf(name=cls.text_info[0], area=AreaType.ROI)
# noinspection PyMethodOverriding
[docs] @classmethod def calculate_property( cls, image: Image, area_array: np.ndarray, profile: ROIExtractionProfile, mask: Optional[np.ndarray], voxel_size, distance: float, units: Units, **kwargs, ): # pylint: disable=arguments-differ try: hash_name = hash_fun_call_name( calculate_segmentation_step, profile, kwargs["_area"], kwargs["_per_component"], Channel(-1), kwargs["_component_num"], ) if hash_name in kwargs["help_dict"]: result = kwargs["help_dict"][hash_name] else: result, _ = calculate_segmentation_step(profile, image, mask) kwargs["help_dict"][hash_name] = result except KeyError: result, _ = calculate_segmentation_step(profile, image, mask) area_array = image.fit_array_to_image(area_array) units_scalar = UNIT_SCALE[units.value] final_radius = [int((distance / units_scalar) / x) for x in reversed(voxel_size)] dilated = SimpleITK.GetArrayFromImage( SimpleITK.BinaryDilate( SimpleITK.GetImageFromArray((area_array > 0).astype(np.uint8).squeeze()), final_radius ) ) dilated = dilated.reshape(area_array.shape) roi = image.fit_array_to_image(result.roi) components = set(np.unique(roi[dilated > 0])) if 0 in components: components.remove(0) return len(components)
@staticmethod def need_full_data(): return True
[docs] class SplitOnPartParameters(MaskDistanceSplit.__argument_class__): part_selection: int = Field(2, title="Which part (from border)", ge=1, le=1024)
[docs] class SplitOnPartVolume(MeasurementMethodBase): text_info = ( "distance splitting volume", "Split mask on parts and then calculate volume of cross of segmentation and mask part", ) __argument_class__ = SplitOnPartParameters
[docs] @staticmethod def calculate_property( part_selection, area_array, voxel_size, result_scalar, **kwargs ): # pylint: disable=arguments-differ masked = MaskDistanceSplit.split(voxel_size=voxel_size, **kwargs) mask = masked == part_selection return np.count_nonzero(mask * area_array) * pixel_volume(voxel_size, result_scalar)
[docs] @classmethod def get_units(cls, ndim): return symbols("{}") ** ndim
[docs] @classmethod def get_starting_leaf(cls): return super().get_starting_leaf().replace_(area=AreaType.Mask)
[docs] @staticmethod def area_type(area: AreaType): return AreaType.ROI
[docs] class SplitOnPartPixelBrightnessSum(MeasurementMethodBase): text_info = ( "distance splitting pixel brightness sum", "Split mask on parts and then calculate pixel brightness sum of cross of segmentation and mask part", ) __argument_class__ = SplitOnPartParameters
[docs] @staticmethod def calculate_property(part_selection, channel, area_array, **kwargs): # pylint: disable=arguments-differ masked = MaskDistanceSplit.split(**kwargs) mask = np.array(masked == part_selection) if channel.ndim - mask.ndim == 1: channel = channel[0] return np.sum(channel[mask * area_array > 0])
[docs] @classmethod def get_units(cls, ndim): return symbols("Pixel_brightness")
[docs] @classmethod def get_starting_leaf(cls): return super().get_starting_leaf().replace_(area=AreaType.Mask)
[docs] @staticmethod def area_type(area: AreaType): return AreaType.ROI
[docs] @classmethod def need_channel(cls): return True
HARALIC_FEATURES = [ "AngularSecondMoment", "Contrast", "Correlation", "Variance", "InverseDifferenceMoment", "SumAverage", "SumVariance", "SumEntropy", "Entropy", "DifferenceVariance", "DifferenceEntropy", "InfoMeas1", "InfoMeas2", ]
[docs] class HaralickEnum(Enum): AngularSecondMoment = "AngularSecondMoment" Contrast = "Contrast" Correlation = "Correlation" Variance = "Variance" InverseDifferenceMoment = "InverseDifferenceMoment" SumAverage = "SumAverage" SumVariance = "SumVariance" SumEntropy = "SumEntropy" Entropy = "Entropy" DifferenceVariance = "DifferenceVariance" DifferenceEntropy = "DifferenceEntropy" InfoMeas1 = "InfoMeas1" InfoMeas2 = "InfoMeas2" def index(self) -> int: return list(self.__class__).index(self)
def _rescale_image(data: np.ndarray): if data.dtype == np.uint8: return data if np.issubdtype(data.dtype, np.integer) and data.min() >= 0 and data.max() < 255: return data.astype(np.uint8) min_val = data.min() max_val = data.max() return ((data - min_val) / ((max_val - min_val) / 254)).astype(np.uint8)
[docs] class HaralickParameters(BaseModel): feature: HaralickEnum = HaralickEnum.AngularSecondMoment distance: int = Field(1, ge=1, le=10)
[docs] class Haralick(MeasurementMethodBase): __argument_class__ = HaralickParameters
[docs] @classmethod def get_units(cls, ndim) -> symbols: return "1"
text_info = "Haralick", "Calculate Haralick features"
[docs] @classmethod def need_channel(cls): return True
[docs] @classmethod def calculate_property( # pylint: disable=arguments-differ cls, area_array, channel, distance, feature, _cache=False, **kwargs ): # pylint: disable=arguments-differ if isinstance(feature, str): feature = HaralickEnum(feature) if _cache := _cache and "_area" in kwargs and "_per_component" in kwargs: help_dict: dict = kwargs["help_dict"] _area: AreaType = kwargs["_area"] _per_component: PerComponent = kwargs["_per_component"] hash_name = hash_fun_call_name( Haralick, {"distance": distance}, _area, _per_component, kwargs["channel_num"], kwargs["_component_num"] ) if hash_name not in help_dict: help_dict[hash_name] = cls.calculate_haralick(channel, area_array, distance) return help_dict[hash_name][feature.index()] res = cls.calculate_haralick(channel, area_array, distance) return res[feature.index()]
@staticmethod def calculate_haralick(channel, area_array, distance): data = channel.copy() data[area_array == 0] = 0 data = _rescale_image(data.squeeze()) return haralick(data, distance=distance, ignore_zeros=True, return_mean=True)
[docs] class ComponentBoundingBox(MeasurementMethodBase): text_info = "Component Bounding Box", "bounding box as string"
[docs] @classmethod def get_units(cls, ndim): return "str"
[docs] @staticmethod def calculate_property(bounds_info, _component_num, **kwargs): # pylint: disable=arguments-differ return str(bounds_info[_component_num])
[docs] @classmethod def get_starting_leaf(cls): return super().get_starting_leaf().replace_(area=AreaType.ROI, per_component=PerComponent.Yes)
[docs] class GetROIAnnotationTypeParameters(BaseModel): name: str = ""
[docs] class GetROIAnnotationType(MeasurementMethodBase): text_info = "annotation by name", "Get roi annotation by name" __argument_class__ = GetROIAnnotationTypeParameters
[docs] @classmethod def get_starting_leaf(cls): return Leaf(name=cls.text_info[0], area=AreaType.ROI, per_component=PerComponent.Yes)
[docs] @staticmethod def calculate_property(roi_annotation, name, _component_num, **kwargs): # pylint: disable=arguments-differ return str(roi_annotation.get(_component_num, {}).get(name, ""))
[docs] @classmethod def get_units(cls, ndim): return "str"
[docs] class ColocalizationMeasurementParameters(BaseModel): channel_fst: Channel = Field(0, title="Channel 1") channel_scd: Channel = Field(1, title="Channel 2") colocalization: CorrelationEnum = CorrelationEnum.pearson randomize: bool = Field( False, description="If randomize orders of pixels in one channel", title="Randomize channel" ) randomize_repeat: int = Field(10, description="Number of repetitions for mean_calculate", title="Randomize num")
[docs] class ColocalizationMeasurement(MeasurementMethodBase): text_info = "Colocalization", "Measurement of colocalization of two channels." __argument_class__ = ColocalizationMeasurementParameters @staticmethod def _calculate_masked(data_1, data_2, colocalization): if colocalization == CorrelationEnum.spearman: data_1 = data_1.argsort().argsort().astype(float) data_2 = data_2.argsort().argsort().astype(float) colocalization = CorrelationEnum.pearson if colocalization == CorrelationEnum.pearson: data_1_mean = np.mean(data_1) data_2_mean = np.mean(data_2) nominator = np.sum((data_1 - data_1_mean) * (data_2 - data_2_mean)) numerator = np.sqrt(np.sum((data_1 - data_1_mean) ** 2) * np.sum((data_2 - data_2_mean) ** 2)) return nominator / numerator if colocalization == CorrelationEnum.manders: nominator = np.sum(data_1 * data_2) numerator = np.sqrt(np.sum(data_1**2) * np.sum(data_2**2)) return nominator / numerator if colocalization == CorrelationEnum.intensity: data_1_mean = np.mean(data_1) data_2_mean = np.mean(data_2) return np.sum((data_1 > data_1_mean) == (data_2 > data_2_mean)) / data_1.size - 0.5 raise RuntimeError(f"Not supported colocalization method {colocalization}") # pragma: no cover
[docs] @classmethod def calculate_property( # pylint: disable=arguments-differ cls, area_array, colocalization, randomize=False, randomize_repeat=10, channel_fst=0, channel_scd=1, **kwargs ): # pylint: disable=arguments-differ mask_binary = area_array > 0 data_1 = kwargs[f"channel_{channel_fst}"][mask_binary].astype(float) data_2 = kwargs[f"channel_{channel_scd}"][mask_binary].astype(float) if not randomize: return cls._calculate_masked(data_1, data_2, colocalization) res_list = [] for _ in range(randomize_repeat): rand_data2 = np.random.default_rng().permutation(data_2) res_list.append(cls._calculate_masked(data_1, rand_data2, colocalization)) return np.mean(res_list)
[docs] @classmethod def get_units(cls, ndim) -> symbols: return 1
def pixel_volume(spacing, result_scalar): return reduce((lambda x, y: x * y), [x * result_scalar for x in spacing]) def calculate_volume_surface(volume_mask, voxel_size): surf_im: np.ndarray = np.array(volume_mask).astype(np.uint8).squeeze() return sum( ( np.count_nonzero( np.logical_xor( surf_im.take(np.arange(surf_im.shape[ax] - 1), axis=ax), surf_im.take(np.arange(surf_im.shape[ax] - 1) + 1, axis=ax), ) ) * reduce( lambda x, y: x * y, [voxel_size[x] for x in range(surf_im.ndim) if x != ax], ) ) for ax in range(surf_im.ndim) ) def get_border(array): if array.dtype == bool: array = array.astype(np.uint8) return SimpleITK.GetArrayFromImage(SimpleITK.LabelContour(SimpleITK.GetImageFromArray(array))) def calc_diam(array, voxel_size): # pragma: no cover pos = np.transpose(np.nonzero(array)).astype(float) for i, val in enumerate(voxel_size): pos[:, i] *= val diam = 0 for i, p in enumerate(zip(pos[:-1])): tmp = np.array((pos[i + 1 :] - p) ** 2) diam = max(diam, np.max(np.sum(tmp, 1))) return np.sqrt(diam)
[docs] class VoxelSize(MeasurementMethodBase): text_info = "Voxel size", "Voxel size"
[docs] @classmethod def calculate_property(cls, voxel_size, result_scalar, **kwargs): # pylint: disable=arguments-differ return " x ".join([str(x * result_scalar) for x in voxel_size])
[docs] @classmethod def get_units(cls, ndim) -> symbols: return symbols("str {}")
MEASUREMENT_DICT = Register(suggested_base_class=MeasurementMethodBase) """Register with all measurements algorithms""" MEASUREMENT_DICT.register(Volume) MEASUREMENT_DICT.register(Diameter) MEASUREMENT_DICT.register(PixelBrightnessSum, old_names=["Pixel Brightness Sum"]) MEASUREMENT_DICT.register(ComponentBoundingBox) MEASUREMENT_DICT.register(GetROIAnnotationType) MEASUREMENT_DICT.register(ComponentsNumber, old_names=["Components Number"]) MEASUREMENT_DICT.register(MaximumPixelBrightness) MEASUREMENT_DICT.register(MinimumPixelBrightness) MEASUREMENT_DICT.register(MeanPixelBrightness) MEASUREMENT_DICT.register(MedianPixelBrightness) MEASUREMENT_DICT.register(StandardDeviationOfPixelBrightness) MEASUREMENT_DICT.register(ColocalizationMeasurement) MEASUREMENT_DICT.register(Moment, old_names=["Moment of inertia"]) MEASUREMENT_DICT.register(FirstPrincipalAxisLength, old_names=["Longest main axis length"]) MEASUREMENT_DICT.register(SecondPrincipalAxisLength, old_names=["Middle main axis length"]) MEASUREMENT_DICT.register(ThirdPrincipalAxisLength, old_names=["Shortest main axis length"]) MEASUREMENT_DICT.register(Compactness) MEASUREMENT_DICT.register(Sphericity) MEASUREMENT_DICT.register(Surface) MEASUREMENT_DICT.register(RimVolume, old_names=["Rim Volume"]) MEASUREMENT_DICT.register(RimPixelBrightnessSum, old_names=["Rim Pixel Brightness Sum"]) MEASUREMENT_DICT.register(ROINeighbourhoodROI) MEASUREMENT_DICT.register(DistanceMaskROI, old_names=["segmentation distance"]) MEASUREMENT_DICT.register(DistanceROIROI, old_names=["to ROI distance"]) MEASUREMENT_DICT.register(SplitOnPartVolume, old_names=["split on part volume"]) MEASUREMENT_DICT.register(SplitOnPartPixelBrightnessSum, old_names=["split on part pixel brightness sum"]) MEASUREMENT_DICT.register(Voxels) MEASUREMENT_DICT.register(Haralick) MEASUREMENT_DICT.register(VoxelSize)