import warnings
from collections import OrderedDict
from contextlib import suppress
from enum import Enum
from functools import reduce
from math import pi
from typing import (
Any,
Callable,
Dict,
Generator,
Iterator,
List,
MutableMapping,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Union,
)
import numpy as np
import pandas as pd
import SimpleITK
from local_migrator import register_class, rename_key
from mahotas.features import haralick
from pydantic import Field
from scipy.spatial.distance import cdist
from sympy import Rational, symbols
from PartSegCore import autofit as af
from PartSegCore.algorithm_describe_base import Register, ROIExtractionProfile
from PartSegCore.analysis.calculate_pipeline import calculate_segmentation_step
from PartSegCore.analysis.measurement_base import (
AreaType,
Leaf,
MeasurementEntry,
MeasurementMethodBase,
Node,
PerComponent,
has_mask_components,
has_roi_components,
)
from PartSegCore.mask_partition_utils import BorderRim, MaskDistanceSplit
from PartSegCore.roi_info import ROIInfo
from PartSegCore.segmentation.restartable_segmentation_algorithms import LowerThresholdAlgorithm
from PartSegCore.universal_const import UNIT_SCALE, Units
from PartSegCore.utils import BaseModel
from PartSegImage import Channel, Image
# TODO change image to channel in signature of measurement calculate_property
NO_COMPONENT = -1
[docs]class CorrelationEnum(str, Enum):
pearson = "Pearson correlation coefficient"
manders = "Mander's overlap coefficient"
intensity = "Intensity correlation quotient"
spearman = "Spearman rank correlation"
def __str__(self):
return self.value
[docs]class ProhibitedDivision(Exception):
pass
[docs]class SettingsValue(NamedTuple):
function: Callable
help_message: str
arguments: Optional[dict]
is_component: bool
default_area: Optional[AreaType] = None
[docs]class ComponentsInfo(NamedTuple):
"""
Class for storage information about relation between roi components and mask components
:ivar numpy.ndarray roi_components: list of roi components
:ivar numpy.ndarray mask_components: list of mask components
:ivar Dict[int, List[int]] components_translation: mapping
from roi components to mask components base on intersections
"""
roi_components: np.ndarray
mask_components: np.ndarray
components_translation: Dict[int, List[int]]
def has_components(self):
return all(len(x) for x in self.components_translation.values())
[docs]def empty_fun(_a0=None, _a1=None):
"""This function is being used as dummy reporting function."""
MeasurementValueType = Union[float, List[float], str]
MeasurementResultType = Tuple[MeasurementValueType, str]
MeasurementResultInputType = Tuple[MeasurementValueType, str, Tuple[PerComponent, AreaType]]
FILE_NAME_STR = "File name"
[docs]class MeasurementResult(MutableMapping[str, MeasurementResultType]):
"""
Class for storage measurements info.
"""
def __init__(self, components_info: ComponentsInfo):
self.components_info = components_info
self._data_dict = OrderedDict()
self._units_dict: Dict[str, str] = {}
self._type_dict: Dict[str, Tuple[PerComponent, AreaType]] = {}
self._units_dict["Mask component"] = ""
self._units_dict["Segmentation component"] = ""
def __str__(self): # pragma: no cover
return "".join(
f"{key}: {val}; type {self._type_dict[key]}, units {self._units_dict[key]}\n"
for key, val in self._data_dict.items()
)
def __setitem__(self, k: str, v: MeasurementResultInputType) -> None:
self._data_dict[k] = v[0]
self._units_dict[k] = v[1]
self._type_dict[k] = v[2]
def __delitem__(self, v: str) -> None:
del self._data_dict[v]
del self._units_dict[v]
del self._type_dict[v]
def __getitem__(self, k: str) -> MeasurementResultType:
return self._data_dict[k], self._units_dict[k]
def __len__(self) -> int:
return len(self._data_dict)
def __iter__(self) -> Iterator[str]:
return iter(self._data_dict)
def to_dataframe(self, all_components=False) -> pd.DataFrame:
data = self.get_separated(all_components)
columns = [
f"{label} ({units})" if units else label
for label, units in zip(self.get_labels(all_components=all_components), self.get_units(all_components))
]
df = pd.DataFrame(data, columns=columns, index=self.components_info.roi_components)
if "Segmentation component" in df.columns:
df = df.astype({"Segmentation component": int}).set_index("Segmentation component")
return df
[docs] def set_filename(self, path_fo_file: str):
"""
Set name of file to be presented as first position.
"""
self._data_dict[FILE_NAME_STR] = path_fo_file
self._type_dict[FILE_NAME_STR] = PerComponent.No, AreaType.ROI
self._units_dict[FILE_NAME_STR] = ""
self._data_dict.move_to_end(FILE_NAME_STR, False)
[docs] def get_component_info(self, all_components: bool = False) -> Tuple[bool, bool]:
"""
Get information which type of components are in storage.
:return: has_mask_components, has_segmentation_components
"""
if all_components and self.components_info.has_components():
return True, True
return has_mask_components(self._type_dict.values()), has_roi_components(self._type_dict.values())
[docs] def get_labels(self, expand=True, all_components=False) -> List[str]:
"""
If expand is false return list of keys of this storage.
Otherwise return labels for measurement. Base are keys of this storage.
If has mask components, or has segmentation_components then add this labels
"""
if not expand:
return list(self.keys())
mask_components, roi_components = self.get_component_info(all_components)
labels = list(self._data_dict.keys())
index = 1 if FILE_NAME_STR in self._data_dict else 0
if mask_components:
labels.insert(index, "Mask component")
if roi_components:
labels.insert(index, "Segmentation component")
return labels
def get_units(self, all_components=False) -> List[str]:
return [self._units_dict[x] for x in self.get_labels(all_components=all_components)]
[docs] def get_global_names(self):
"""Get names for only parameters which are not 'PerComponent.Yes'"""
labels = list(self._data_dict.keys())
return [x for x in labels if self._type_dict[x][0] != PerComponent.Yes]
[docs] def get_global_parameters(self):
"""Get only parameters which are not 'PerComponent.Yes'"""
if FILE_NAME_STR in self._data_dict:
name = self._data_dict[FILE_NAME_STR]
res = [name]
iterator = iter(self._data_dict.keys())
with suppress(StopIteration):
next(iterator) # skipcq: PTC-W0063`
else:
res = []
iterator = iter(self._data_dict.keys())
for el in iterator:
per_comp = self._type_dict[el][0]
val = self._data_dict[el]
if per_comp not in {PerComponent.Yes, PerComponent.Per_Mask_component}:
res.append(val)
return res
def _get_component_info(self, mask_components, roi_components):
if mask_components:
if roi_components:
translation = self.components_info.components_translation
return [(x, y) for x in translation for y in translation[x]]
return [(0, x) for x in self.components_info.mask_components]
return [(x, 0) for x in self.components_info.roi_components]
def _prepare_res_iterator(self, counts):
if FILE_NAME_STR in self._data_dict:
name = self._data_dict[FILE_NAME_STR]
res = [[name] for _ in range(counts)]
iterator = iter(self._data_dict.keys())
with suppress(StopIteration):
next(iterator) # skipcq: PTC-W0063`
else:
res = [[] for _ in range(counts)]
iterator = iter(self._data_dict.keys())
return res, iterator
[docs] def get_separated(self, all_components=False) -> List[List[MeasurementValueType]]:
"""Get measurements separated for each component"""
mask_components, roi_components = self.get_component_info(all_components)
if not mask_components and not roi_components:
return [list(self._data_dict.values())]
component_info = self._get_component_info(mask_components, roi_components)
res, iterator = self._prepare_res_iterator(len(component_info))
for i, num in enumerate(component_info):
if roi_components:
res[i].append(num[0])
if mask_components:
res[i].append(num[1])
mask_to_pos = {val: i for i, val in enumerate(self.components_info.mask_components)}
segmentation_to_pos = {val: i for i, val in enumerate(self.components_info.roi_components)}
for el in iterator:
per_comp, area_type = self._type_dict[el]
val = self._data_dict[el]
for i, (seg, mask) in enumerate(component_info):
if per_comp not in {PerComponent.Yes, PerComponent.Per_Mask_component}:
res[i].append(val)
elif area_type == AreaType.ROI:
res[i].append(val[segmentation_to_pos[seg]])
else:
res[i].append(val[mask_to_pos[mask]])
return res
class MeasurementProfile(BaseModel):
name: str
chosen_fields: List[MeasurementEntry]
name_prefix: str = ""
@property
def _need_mask(self):
return any(cf_val.calculation_tree.need_mask() for cf_val in self.chosen_fields)
def to_dict(self): # pragma: no cover
warnings.warn(
f"{self.__class__.__name__}.to_dict is deprecated. Use as_dict instead",
category=FutureWarning,
stacklevel=2,
)
return dict(self)
def _need_mask_without_segmentation(self, tree):
if isinstance(tree, Leaf):
return tree.area == AreaType.Mask_without_ROI
return self._need_mask_without_segmentation(tree.left) or self._need_mask_without_segmentation(tree.right)
def _get_par_component_and_area_type(self, tree: Union[Node, Leaf]) -> Tuple[PerComponent, AreaType]:
if isinstance(tree, Leaf):
method = MEASUREMENT_DICT[tree.name]
area_type = method.area_type(tree.area)
if tree.per_component == PerComponent.Mean:
return PerComponent.No, area_type
if tree.per_component == PerComponent.Per_Mask_component:
return tree.per_component, AreaType.Mask
return tree.per_component, area_type
left_par, left_area = self._get_par_component_and_area_type(tree.left)
right_par, right_area = self._get_par_component_and_area_type(tree.left)
if PerComponent.Yes in [left_par, right_par]:
res_par = PerComponent.Yes
else:
res_par = PerComponent.No
area_set = {left_area, right_area}
if len(area_set) == 1:
res_area = area_set.pop()
elif AreaType.ROI in area_set:
res_area = AreaType.ROI
else:
res_area = AreaType.Mask_without_ROI
return res_par, res_area
def get_channels_num(self) -> Set[Channel]:
resp = set()
for el in self.chosen_fields:
resp.update(el.get_channel_num(MEASUREMENT_DICT))
return resp
def __str__(self):
text = f"Set name: {self.name}\n"
if self.name_prefix:
text += f"Name prefix: {self.name_prefix}\n"
text += "Measurements list:\n"
for el in self.chosen_fields:
text += f"{el.name}\n"
return text
def get_component_info(self, unit: Units):
"""
:return: list[((str, str), bool)]
"""
# Fixme remove binding to 3 dimensions
return [
(
(self.name_prefix + el.name, el.get_unit(unit, 3)),
self._is_component_measurement(el.calculation_tree),
)
for el in self.chosen_fields
]
def is_any_mask_measurement(self):
return any(el.calculation_tree.need_mask() for el in self.chosen_fields)
def _is_component_measurement(self, node):
if isinstance(node, Leaf):
return node.per_component in {PerComponent.Yes, PerComponent.Per_Mask_component}
return self._is_component_measurement(node.left) or self._is_component_measurement(node.right)
@staticmethod
def _prepare_leaf_kw(node, kwargs, method, area_type):
area_type_dict = {
AreaType.Mask: "mask",
AreaType.Mask_without_ROI: "mask_without_segmentation",
AreaType.ROI: "segmentation",
}
kw = dict(kwargs)
kw.update(dict(node.parameters))
if node.channel is not None:
kw["channel"] = kw[f"channel_{node.channel}"]
kw["channel_num"] = node.channel
else:
kw["channel_num"] = -1
kw["_area"] = node.area
kw["_per_component"] = node.per_component
kw["_cache"] = True # TODO remove cache argument
kw["area_array"] = kw[area_type_dict[area_type]]
kw["_component_num"] = NO_COMPONENT
return kw
def _clip_arrays(self, kw, node: Leaf, method: MeasurementMethodBase, component_index: int):
if node.area != AreaType.ROI or method.need_full_data():
bounds = tuple(slice(None, None) for _ in kw["area_array"].shape)
elif node.per_component == PerComponent.Per_Mask_component:
bounds = tuple(kw["mask_bound_info"][component_index].get_slices(margin=1))
else:
bounds = tuple(kw["bounds_info"][component_index].get_slices(margin=1))
kw2 = kw.copy()
component_mark_area = (
kw2["area_array"] if node.per_component != PerComponent.Per_Mask_component else kw2["mask"]
)
kw2["_component_num"] = component_index
area_array = kw2["area_array"][bounds].copy()
area_array[component_mark_area[bounds] != component_index] = 0
kw2["area_array"] = area_array
im_bounds = list(bounds)
image: Image = kw["image"]
im_bounds.insert(image.time_pos, slice(None))
kw2["image"] = image.cut_image(tuple(im_bounds))
for name in ["channel", "segmentation", "roi", "mask"] + [f"channel_{num}" for num in self.get_channels_num()]:
if kw[name] is not None:
kw2[name] = kw[name][bounds]
kw2["roi_alternative"] = kw2["roi_alternative"].copy()
for name, array in kw2["roi_alternative"].items():
kw2["roi_alternative"][name] = array[bounds]
return kw2
def _calculate_leaf_value(
self, node: Union[Node, Leaf], segmentation_mask_map: ComponentsInfo, kwargs: dict
) -> Union[float, np.ndarray]:
method: MeasurementMethodBase = MEASUREMENT_DICT[node.name]
kw = self._prepare_leaf_kw(node, kwargs, method, method.area_type(node.area))
if node.per_component == PerComponent.No:
return method.calculate_property(**kw)
# TODO use cache for per component calculate
# kw["_cache"] = False
val = []
if method.area_type(node.area) == AreaType.ROI and node.per_component != PerComponent.Per_Mask_component:
components = segmentation_mask_map.roi_components
else:
components = segmentation_mask_map.mask_components
for i in components:
kw2 = self._clip_arrays(kw, node, method, i)
val.append(method.calculate_property(**kw2))
val = np.array(val)
if node.per_component == PerComponent.Mean:
val = np.mean(val) if val.size else 0
return val
def _calculate_leaf(
self, node: Leaf, segmentation_mask_map: ComponentsInfo, help_dict: dict, kwargs: dict
) -> Tuple[Union[float, np.ndarray], symbols, AreaType]:
method: MeasurementMethodBase = MEASUREMENT_DICT[node.name]
hash_str = hash_fun_call_name(
method, node.parameters, node.area, node.per_component, node.channel, NO_COMPONENT
)
area_type = method.area_type(node.area)
if node.per_component == PerComponent.Per_Mask_component:
area_type = AreaType.Mask
if hash_str in help_dict:
val = help_dict[hash_str]
else:
kwargs["help_dict"] = help_dict
val = self._calculate_leaf_value(node, segmentation_mask_map, kwargs)
help_dict[hash_str] = val
unit: symbols = method.get_units(3) if kwargs["image"].is_stack else method.get_units(2)
if node.power != 1:
return pow(val, node.power), pow(unit, Rational(node.power)), area_type
return val, unit, area_type
def _calculate_node(
self, node: Node, segmentation_mask_map: ComponentsInfo, help_dict: dict, kwargs: dict
) -> Tuple[Union[float, np.ndarray], symbols, AreaType]:
if node.op != "/":
raise ValueError(f"Wrong measurement: {node}")
left_res, left_unit, left_area = self.calculate_tree(node.left, segmentation_mask_map, help_dict, kwargs)
right_res, right_unit, right_area = self.calculate_tree(node.right, segmentation_mask_map, help_dict, kwargs)
if not (isinstance(left_res, np.ndarray) and isinstance(right_res, np.ndarray) and left_area != right_area):
return left_res / right_res, left_unit / right_unit, left_area
area_set = {left_area, right_area}
if area_set == {AreaType.ROI, AreaType.Mask_without_ROI}: # pragma: no cover
raise ProhibitedDivision("This division is prohibited")
if area_set == {AreaType.Mask, AreaType.Mask_without_ROI}:
return left_res / right_res, left_unit / right_unit, AreaType.Mask_without_ROI
# if area_set == {AreaType.ROI, AreaType.Mask}:
res = []
if left_area == AreaType.Mask:
roi_res, mask_res = right_res, left_res
else:
roi_res, mask_res = left_res, right_res
for val, num in zip(roi_res, segmentation_mask_map.roi_components):
mask_components = segmentation_mask_map.components_translation[num]
if len(mask_components) != 1: # pragma: no cover
raise ProhibitedDivision("Cannot calculate when object do not belongs to one mask area")
if left_area == AreaType.ROI:
res.append(val / mask_res[mask_components[0] - 1])
else:
res.append(mask_res[mask_components[0] - 1] / val)
res = np.array(res)
return res, left_unit / right_unit, AreaType.ROI
# TODO check this
def calculate_tree(
self, node: Union[Node, Leaf], segmentation_mask_map: ComponentsInfo, help_dict: dict, kwargs: dict
) -> Tuple[Union[float, np.ndarray], symbols, AreaType]:
"""
Main function for calculation tree of measurements. It is executed recursively
:param node: measurement to calculate
:param segmentation_mask_map: map from mask segmentation components to mask components. Needed for division
:param help_dict: dict to cache calculation result. It reduce recalculations of same measurements.
:param kwargs: additional info needed by measurements
:return: measurement value
"""
if isinstance(node, Leaf):
return self._calculate_leaf(node, segmentation_mask_map, help_dict, kwargs)
if isinstance(node, Node):
return self._calculate_node(node, segmentation_mask_map, help_dict, kwargs)
raise ValueError(f"Node {node} need to be instance of Leaf or Node")
@staticmethod
def get_segmentation_to_mask_component(segmentation: np.ndarray, mask: Optional[np.ndarray]) -> ComponentsInfo:
"""
Calculate map from segmentation component num to mask component num
:param segmentation: numpy array with segmentation labeled as positive integers
:param mask: numpy array with mask labeled as positive integer
:return: map
"""
components = np.unique(segmentation)
if components[0] == 0 or components[0] is None:
components = components[1:]
mask_components = np.unique(mask)
if mask_components[0] == 0 or mask_components[0] is None:
mask_components = mask_components[1:]
res = OrderedDict()
if mask is None:
res = {i: [] for i in components}
elif np.max(mask) == 1:
res = {i: [1] for i in components}
else:
for num in components:
res[num] = list(np.unique(mask[segmentation == num]))
if res[num][0] == 0:
res[num] = res[num][1:]
return ComponentsInfo(components, mask_components, res)
def get_component_and_area_info(self) -> List[Tuple[PerComponent, AreaType]]:
"""For each measurement check if is per component and in which types"""
res = []
for el in self.chosen_fields:
tree = el.calculation_tree
res.append(self._get_par_component_and_area_type(tree))
return res
def get_segmentation_mask_map(self, image: Image, roi: Union[np.ndarray, ROIInfo], time: int = 0) -> ComponentsInfo:
def get_time(array: np.ndarray):
if array is not None and array.ndim == 4:
return array.take(time, axis=image.time_pos)
return array
return self.get_segmentation_to_mask_component(
get_time(roi if isinstance(roi, np.ndarray) else roi.roi), get_time(image.mask)
)
def calculate(
self,
image: Image,
channel_num: int,
roi: Union[np.ndarray, ROIInfo],
result_units: Units,
range_changed: Callable[[int, int], Any] = empty_fun,
step_changed: Callable[[int], Any] = empty_fun,
time: int = 0,
) -> MeasurementResult:
"""
Calculate measurements on given set of parameters
:param image: image on which measurements should be calculated
:param channel_num: channel number on which measurements should be calculated
:param roi: array with segmentation labeled as positive integers
:param result_units: units which should be used to present results.
:param range_changed: callback function to set information about steps range
:param step_changed: callback function fo set information about steps done
:param time: which data point should be measured
:return: measurements
"""
segmentation_mask_map = self.get_segmentation_mask_map(image, roi, time)
result = MeasurementResult(segmentation_mask_map)
range_changed(0, len(self.chosen_fields))
for i, (name, data) in enumerate(
self.calculate_yield(
image=image,
channel_num=channel_num,
roi=roi,
result_units=result_units,
segmentation_mask_map=segmentation_mask_map,
time=time,
),
start=1,
):
result[name] = data
step_changed(i)
return result
def calculate_yield(
self,
image: Image,
channel_num: int,
roi: Union[np.ndarray, ROIInfo],
result_units: Units,
segmentation_mask_map: ComponentsInfo,
time: int = 0,
) -> Generator[MeasurementResultInputType, None, None]:
"""
Calculate measurements on given set of parameters
:param image: image on which measurements should be calculated
:param roi: array with segmentation labeled as positive integers
:param result_units: units which should be used to present results.
:param segmentation_mask_map: information which component of roi belongs to which mask component.
:param time: which data point should be measured
:return: measurements
"""
def get_time(array: np.ndarray):
if array is not None and array.ndim == 4:
return array.take(time, axis=image.time_pos)
return array
if self._need_mask and image.mask is None:
raise ValueError("measurement need mask")
channel = image.get_channel(channel_num).astype(float)
cache_dict = {}
result_scalar = UNIT_SCALE[result_units.value]
if isinstance(roi, np.ndarray):
roi = ROIInfo(roi).fit_to_image(image)
mask_bound_info = None
if isinstance(image.mask, np.ndarray):
mask_bound_info = {
k: v.del_dim(image.time_pos) if len(v.lower) == 4 else v
for k, v in ROIInfo(image.mask).fit_to_image(image).bound_info.items()
}
roi_alternative = {}
for name, array in roi.alternative.items():
roi_alternative[name] = get_time(array)
kw = {
"image": image,
"channel": get_time(channel),
"segmentation": get_time(roi.roi),
"roi": get_time(roi.roi),
"bounds_info": {
k: v.del_dim(image.time_pos) if len(v.lower) == 4 else v for k, v in roi.bound_info.items()
},
"mask_bound_info": mask_bound_info,
"mask": get_time(image.mask),
"voxel_size": image.spacing,
"result_scalar": result_scalar,
"roi_alternative": roi_alternative,
"roi_annotation": roi.annotations,
}
for num in self.get_channels_num():
kw[f"channel_{num}"] = get_time(image.get_channel(num))
if any(self._need_mask_without_segmentation(el.calculation_tree) for el in self.chosen_fields):
mm = kw["mask"].copy()
mm[kw["segmentation"] > 0] = 0
kw["mask_without_segmentation"] = mm
for entry in self.chosen_fields:
name = self.name_prefix + entry.name
yield name, self._calc_single_field(entry, segmentation_mask_map, cache_dict, kw, result_units)
def _calc_single_field(
self,
entry: MeasurementEntry,
segmentation_mask_map: ComponentsInfo,
cache_dict: dict,
additional_args: dict,
result_units,
):
tree = entry.calculation_tree
component_and_area = self._get_par_component_and_area_type(tree)
try:
val, unit, _area = self.calculate_tree(tree, segmentation_mask_map, cache_dict, additional_args)
if isinstance(val, np.ndarray):
val = list(val)
return val, str(unit).format(str(result_units)), component_and_area
except ZeroDivisionError: # pragma: no cover
return "Div by zero", "", component_and_area
except TypeError as e: # pragma: no cover
if e.args[0].startswith("unsupported operand type(s) for /:"):
return "None div", "", component_and_area
raise e
except AttributeError: # pragma: no cover
return "No attribute", "", component_and_area
except ProhibitedDivision as e: # pragma: no cover
return e.args[0], "", component_and_area
def calculate_main_axis(area_array: np.ndarray, channel: np.ndarray, voxel_size):
# TODO check if it produces good values
if len(channel.shape) == 4:
if channel.shape[0] != 1:
raise ValueError("This measurements do not support time data")
channel = channel[0]
cut_img = np.copy(channel)
cut_img[area_array == 0] = 0
if np.all(cut_img == 0):
return (0,) * len(voxel_size)
orientation_matrix, _ = af.find_density_orientation(cut_img, voxel_size, 1)
center_of_mass = af.density_mass_center(cut_img, voxel_size)
positions = np.array(np.nonzero(cut_img), dtype=np.float64)
for i, v in enumerate(reversed(voxel_size), start=1):
positions[-i] *= v
positions[-i] -= center_of_mass[i - 1]
centered = np.dot(orientation_matrix.T, positions)
return np.max(centered, axis=1) - np.min(centered, axis=1)
def get_main_axis_length(
index: int, area_array: np.ndarray, channel: np.ndarray, voxel_size, result_scalar, _cache=False, **kwargs
):
if _cache and "_area" in kwargs and "_per_component" in kwargs and "channel_num" in kwargs:
help_dict: Dict = kwargs["help_dict"]
_area: AreaType = kwargs["_area"]
_per_component: PerComponent = kwargs["_per_component"]
hash_name = hash_fun_call_name(
calculate_main_axis, {}, _area, _per_component, kwargs["channel_num"], kwargs["_component_num"]
)
if hash_name not in help_dict:
help_dict[hash_name] = calculate_main_axis(area_array, channel, [x * result_scalar for x in voxel_size])
return help_dict[hash_name][index]
return calculate_main_axis(area_array, channel, [x * result_scalar for x in voxel_size])[index]
[docs]def hash_fun_call_name(
fun: Union[Callable, MeasurementMethodBase],
arguments: Dict,
area: AreaType,
per_component: PerComponent,
channel: Channel,
components_num: int,
) -> str:
"""
Calculate string for properly cache measurements result.
:param fun: method for which hash string should be calculated
:param arguments: its additional arguments
:param area: type of rea
:param per_component: If it is per component
:param channel: channel number on which calculation is performed
:return: unique string for such set of arguments
"""
if hasattr(fun, "__module__"):
fun_name = f"{fun.__module__}.{fun.__name__}"
else:
fun_name = fun.__name__
return f"{fun_name}: {arguments} # {area} & {per_component} * {channel} ^ {components_num}"
[docs]class Volume(MeasurementMethodBase):
text_info = "Volume", "Calculate volume of current segmentation"
[docs] @classmethod
def calculate_property(cls, area_array, voxel_size, result_scalar, **_): # pylint: disable=arguments-differ
return np.count_nonzero(area_array) * pixel_volume(voxel_size, result_scalar)
[docs] @classmethod
def get_units(cls, ndim):
return symbols("{}") ** ndim
[docs]class Voxels(MeasurementMethodBase):
text_info = "Voxels", "Calculate number of voxels of current segmentation"
[docs] @classmethod
def calculate_property(cls, area_array, **_): # pylint: disable=arguments-differ
return np.count_nonzero(area_array)
[docs] @classmethod
def get_units(cls, ndim):
return symbols("1")
# From Malandain, G., & Boissonnat, J. (2002). Computing the diameter of a point set,
# 12(6), 489-509. https://doi.org/10.1142/S0218195902001006
[docs]def double_normal(point_index: int, point_positions: np.ndarray, points_array: np.ndarray):
"""
:param point_index: index of starting points
:param point_positions: points array of size (points_num, number of dimensions)
:param points_array: bool matrix with information about which points are in set
:return:
"""
delta = 0
dn = 0, 0
while True:
new_delta = delta
points_array[point_index] = 0
dist_array = np.sum(np.array((point_positions - point_positions[point_index]) ** 2), 1)
dist_array[points_array == 0] = 0
point2_index = np.argmax(dist_array)
if dist_array[point2_index] > new_delta:
delta = dist_array[point2_index]
dn = point_index, point2_index
point_index = point2_index
if new_delta == delta:
return dn, delta
[docs]def iterative_double_normal(points_positions: np.ndarray):
"""
:param points_positions: points array of size (points_num, number of dimensions)
:return: square power of diameter, 2-tuple of points index gave information which points ar chosen
"""
delta = 0
dn = 0, 0
point_index = 0
points_array = np.ones(points_positions.shape[0], dtype=bool)
while True:
dn_r, delta_r = double_normal(point_index, points_positions, points_array)
if delta_r > delta:
delta = delta_r
dn = dn_r
mid_point = (points_positions[dn[0]] + points_positions[dn[1]]) / 2
dist_array = np.sum(np.array((points_positions - mid_point) ** 2), 1)
dist_array[~points_array] = 0
if np.any(dist_array >= delta / 4):
point_index = np.argmax(dist_array)
else:
break
else:
break
return delta, dn
[docs]class Diameter(MeasurementMethodBase):
"""
Class for calculate diameter of ROI in fast way.
From Malandain, G., & Boissonnat, J. (2002). Computing the diameter of a point set,
12(6), 489-509. https://doi.org/10.1142/S0218195902001006
"""
text_info = "Diameter", "Diameter of area"
[docs] @staticmethod
def calculate_property(area_array, voxel_size, result_scalar, **_): # pylint: disable=arguments-differ
pos = np.transpose(np.nonzero(get_border(area_array))).astype(float)
if pos.size == 0:
return 0
for i, val in enumerate((x * result_scalar for x in reversed(voxel_size)), start=1):
pos[:, -i] *= val
diam_sq = iterative_double_normal(pos)[0]
return np.sqrt(diam_sq)
[docs] @classmethod
def get_units(cls, ndim):
return symbols("{}")
[docs]class DiameterOld(MeasurementMethodBase): # pragma: no cover
"""
n**2 calculate diameter of ROI
"""
text_info = "Diameter old", "Diameter of area (Very slow)"
[docs] @staticmethod
def calculate_property(area_array, voxel_size, result_scalar, **_): # pylint: disable=arguments-differ
return calc_diam(get_border(area_array), [x * result_scalar for x in voxel_size])
[docs] @classmethod
def get_units(cls, ndim):
return symbols("{}")
[docs]class PixelBrightnessSum(MeasurementMethodBase):
text_info = "Pixel brightness sum", "Sum of pixel brightness for current segmentation"
[docs] @staticmethod
def calculate_property(area_array: np.ndarray, channel: np.ndarray, **_): # pylint: disable=arguments-differ
"""
:param area_array: mask for area
:param channel: data. same shape like area_type
:return: Pixels brightness sum on given area
"""
if area_array.shape != channel.shape:
if area_array.size == channel.size:
channel = channel.reshape(area_array.shape)
else: # pragma: no cover
raise ValueError(f"channel ({channel.shape}) and mask ({area_array.shape}) do not fit each other")
return np.sum(channel[area_array > 0]) if np.any(area_array) else 0
[docs] @classmethod
def get_units(cls, ndim):
return symbols("Pixel_brightness")
[docs] @classmethod
def need_channel(cls):
return True
[docs]class ComponentsNumber(MeasurementMethodBase):
text_info = "Components number", "Calculate number of connected components on segmentation"
[docs] @staticmethod
def calculate_property(area_array, **_): # pylint: disable=arguments-differ
return np.unique(area_array).size - 1
[docs] @classmethod
def get_units(cls, ndim):
return symbols("count")
[docs]class MaximumPixelBrightness(MeasurementMethodBase):
text_info = "Maximum pixel brightness", "Calculate maximum pixel brightness for current area"
[docs] @staticmethod
def calculate_property(area_array, channel, **_): # pylint: disable=arguments-differ
if area_array.shape != channel.shape: # pragma: no cover
raise ValueError(f"channel ({channel.shape}) and mask ({area_array.shape}) do not fit each other")
return np.max(channel[area_array > 0]) if np.any(area_array) else 0
[docs] @classmethod
def get_units(cls, ndim):
return symbols("Pixel_brightness")
[docs] @classmethod
def need_channel(cls):
return True
[docs]class MinimumPixelBrightness(MeasurementMethodBase):
text_info = "Minimum pixel brightness", "Calculate minimum pixel brightness for current area"
[docs] @staticmethod
def calculate_property(area_array, channel, **_): # pylint: disable=arguments-differ
if area_array.shape != channel.shape: # pragma: no cover
raise ValueError("channel and mask do not fit each other")
return np.min(channel[area_array > 0]) if np.any(area_array) else 0
[docs] @classmethod
def get_units(cls, ndim):
return symbols("Pixel_brightness")
[docs] @classmethod
def need_channel(cls):
return True
[docs]class MeanPixelBrightness(MeasurementMethodBase):
text_info = "Mean pixel brightness", "Calculate mean pixel brightness for current area"
[docs] @staticmethod
def calculate_property(area_array, channel, **_): # pylint: disable=arguments-differ
if area_array.shape != channel.shape: # pragma: no cover
raise ValueError("channel and mask do not fit each other")
return np.mean(channel[area_array > 0]) if np.any(area_array) else 0
[docs] @classmethod
def get_units(cls, ndim):
return symbols("Pixel_brightness")
[docs] @classmethod
def need_channel(cls):
return True
[docs]class StandardDeviationOfPixelBrightness(MeasurementMethodBase):
text_info = (
"Standard deviation of pixel brightness",
"Calculate standard deviation of pixel brightness for current area",
)
[docs] @staticmethod
def calculate_property(area_array, channel, **_): # pylint: disable=arguments-differ
if area_array.shape != channel.shape: # pragma: no cover
raise ValueError("channel and mask do not fit each other")
return np.std(channel[area_array > 0]) if np.any(area_array) else 0
[docs] @classmethod
def get_units(cls, ndim):
return symbols("Pixel_brightness")
[docs] @classmethod
def need_channel(cls):
return True
[docs]class Moment(MeasurementMethodBase):
text_info = "Moment", "Calculate moment of segmented structure"
[docs] @staticmethod
def calculate_property(area_array, channel, voxel_size, **_): # pylint: disable=arguments-differ
if len(channel.shape) == 4:
if channel.shape[0] != 1: # pragma: no cover
raise ValueError("This measurements do not support time data")
channel = channel[0]
img = np.copy(channel)
img[area_array == 0] = 0
if np.all(img == 0):
return 0
return af.calculate_density_momentum(img, voxel_size)
[docs] @classmethod
def get_units(cls, ndim):
return symbols("{}") ** 2 * symbols("Pixel_brightness")
[docs] @classmethod
def need_channel(cls):
return True
[docs]class FirstPrincipalAxisLength(MeasurementMethodBase):
text_info = "First principal axis length", "Length of first principal axis"
[docs] @staticmethod
def calculate_property(**kwargs): # pylint: disable=arguments-differ
return get_main_axis_length(0, **kwargs)
[docs] @classmethod
def get_units(cls, ndim):
return symbols("{}")
[docs] @classmethod
def need_channel(cls):
return True
[docs]class SecondPrincipalAxisLength(MeasurementMethodBase):
text_info = "Second principal axis length", "Length of second principal axis"
[docs] @staticmethod
def calculate_property(**kwargs): # pylint: disable=arguments-differ
return get_main_axis_length(1, **kwargs)
[docs] @classmethod
def get_units(cls, ndim):
return symbols("{}")
[docs] @classmethod
def need_channel(cls):
return True
[docs]class ThirdPrincipalAxisLength(MeasurementMethodBase):
text_info = "Third principal axis length", "Length of third principal axis"
[docs] @staticmethod
def calculate_property(**kwargs): # pylint: disable=arguments-differ
return get_main_axis_length(2, **kwargs)
[docs] @classmethod
def get_units(cls, ndim):
return symbols("{}")
[docs] @classmethod
def need_channel(cls):
return True
[docs]class Compactness(MeasurementMethodBase):
text_info = "Compactness", "Calculate compactness off segmentation (Surface^1.5/volume)"
[docs] @staticmethod
def calculate_property(**kwargs): # pylint: disable=arguments-differ
if kwargs.get("_cache", False) and "help_dict" in kwargs and "_area" in kwargs and "_per_component" in kwargs:
help_dict = kwargs["help_dict"]
border_hash_str = hash_fun_call_name(
Surface, {}, kwargs["_area"], kwargs["_per_component"], Channel(-1), kwargs["_component_num"]
)
if border_hash_str not in help_dict:
border_surface = Surface.calculate_property(**kwargs)
help_dict[border_hash_str] = border_surface
else:
border_surface = help_dict[border_hash_str]
volume_hash_str = hash_fun_call_name(
Volume, {}, kwargs["_area"], kwargs["_per_component"], Channel(-1), kwargs["_component_num"]
)
if volume_hash_str not in help_dict:
volume = Volume.calculate_property(**kwargs)
help_dict[volume_hash_str] = volume
else:
volume = help_dict[volume_hash_str]
else:
border_surface = Surface.calculate_property(**kwargs)
volume = Volume.calculate_property(**kwargs)
return border_surface**1.5 / volume
[docs] @classmethod
def get_units(cls, ndim):
return Surface.get_units(ndim) / Volume.get_units(ndim)
[docs]class Sphericity(MeasurementMethodBase):
text_info = "Sphericity", "volume/(4/3 * π * radius **3) for 3d data and volume/(π * radius **2) for 2d data"
[docs] @staticmethod
def calculate_property(**kwargs): # pylint: disable=arguments-differ
if all(key in kwargs for key in ["help_dict", "_area", "_per_component"]) and (
"_cache" not in kwargs or kwargs["_cache"]
):
help_dict = kwargs["help_dict"]
else:
help_dict = {}
kwargs.update({"_area": AreaType.ROI, "_per_component": PerComponent.No, "_component_num": NO_COMPONENT})
volume_hash_str = hash_fun_call_name(
Volume, {}, kwargs["_area"], kwargs["_per_component"], Channel(-1), kwargs["_component_num"]
)
if volume_hash_str not in help_dict:
volume = Volume.calculate_property(**kwargs)
help_dict[volume_hash_str] = volume
else:
volume = help_dict[volume_hash_str]
diameter_hash_str = hash_fun_call_name(
Diameter, {}, kwargs["_area"], kwargs["_per_component"], Channel(-1), kwargs["_component_num"]
)
if diameter_hash_str not in help_dict:
diameter_val = Diameter.calculate_property(**kwargs)
help_dict[diameter_hash_str] = diameter_val
else:
diameter_val = help_dict[diameter_hash_str]
radius = diameter_val / 2
if kwargs["area_array"].shape[0] > 1:
return volume / (4 / 3 * pi * (radius**3))
return volume / (pi * (radius**2))
[docs] @classmethod
def get_units(cls, ndim):
return Volume.get_units(ndim) / Diameter.get_units(ndim) ** ndim
[docs]class Surface(MeasurementMethodBase):
text_info = "Surface", "Calculating surface of current segmentation"
[docs] @staticmethod
def calculate_property(area_array, voxel_size, result_scalar, **_): # pylint: disable=arguments-differ
return calculate_volume_surface(area_array, [x * result_scalar for x in voxel_size])
[docs] @classmethod
def get_units(cls, ndim):
return symbols("{}") ** 2
[docs]class RimVolume(MeasurementMethodBase):
text_info = "rim volume", "Calculate volumes for elements in radius (in physical units) from mask"
__argument_class__ = BorderRim.__argument_class__
[docs] @classmethod
def get_starting_leaf(cls):
return super().get_starting_leaf().replace_(area=AreaType.Mask)
[docs] @staticmethod
def calculate_property(area_array, voxel_size, result_scalar, **kwargs): # pylint: disable=arguments-differ
border_mask_array = BorderRim.border_mask(voxel_size=voxel_size, result_scalar=result_scalar, **kwargs)
if border_mask_array is None:
return None
final_mask = np.array((border_mask_array > 0) * (area_array > 0))
return np.count_nonzero(final_mask) * pixel_volume(voxel_size, result_scalar)
[docs] @classmethod
def get_units(cls, ndim):
return symbols("{}") ** ndim
[docs] @staticmethod
def area_type(area: AreaType):
return AreaType.ROI
[docs]class RimPixelBrightnessSum(MeasurementMethodBase):
text_info = (
"rim pixel brightness sum",
"Calculate mass for components located within rim (in physical units) from mask",
)
__argument_class__ = BorderRim.__argument_class__
[docs] @classmethod
def get_starting_leaf(cls):
return super().get_starting_leaf().replace_(area=AreaType.Mask)
[docs] @staticmethod
def calculate_property(channel, area_array, **kwargs): # pylint: disable=arguments-differ
if len(channel.shape) == 4:
if channel.shape[0] != 1: # pragma: no cover
raise ValueError("This measurements do not support time data")
channel = channel[0]
border_mask_array = BorderRim.border_mask(**kwargs)
if border_mask_array is None:
return None
final_mask = np.array((border_mask_array > 0) * (area_array > 0))
return np.sum(channel[final_mask]) if np.any(final_mask) else 0
[docs] @classmethod
def get_units(cls, ndim):
return symbols("Pixel_brightness")
[docs] @classmethod
def need_channel(cls):
return True
[docs] @staticmethod
def area_type(area: AreaType):
return AreaType.ROI
[docs]@register_class(old_paths=["PartSeg.utils.analysis.statistics_calculation.DistancePoint"])
class DistancePoint(Enum):
Border = 1
Mass_center = 2
Geometrical_center = 3
def __str__(self):
return self.name.replace("_", " ")
@register_class(version="0.0.1", migrations=[("0.0.1", rename_key("distance_to_segmentation", "distance_to_roi"))])
class DistanceMaskROIParameters(BaseModel):
distance_from_mask: DistancePoint = DistancePoint.Border
distance_to_roi: DistancePoint = Field(DistancePoint.Border, title="Distance to ROI")
[docs]class DistanceMaskROI(MeasurementMethodBase):
text_info = "ROI distance", "Calculate distance between ROI and mask"
__argument_class__ = DistanceMaskROIParameters
@staticmethod
def calculate_points(channel, area_array, voxel_size, result_scalar, point_type: DistancePoint) -> np.ndarray:
if point_type == DistancePoint.Border:
area_pos = np.transpose(np.nonzero(get_border(area_array))).astype(float)
area_pos += 0.5
for i, val in enumerate((x * result_scalar for x in reversed(voxel_size)), start=1):
area_pos[:, -i] *= val
elif point_type == DistancePoint.Mass_center:
im = np.copy(channel)
im[area_array == 0] = 0
area_pos = np.array([af.density_mass_center(im, voxel_size) * result_scalar])
else:
area_pos = np.array([af.density_mass_center(area_array > 0, voxel_size) * result_scalar])
return area_pos
[docs] @classmethod
def calculate_property( # pylint: disable=arguments-differ
cls,
channel,
area_array,
mask,
voxel_size,
result_scalar,
distance_from_mask: DistancePoint,
distance_to_roi: DistancePoint,
*args,
**kwargs,
): # pylint: disable=arguments-differ
if len(channel.shape) == 4:
if channel.shape[0] != 1:
raise ValueError("This measurements do not support time data")
channel = channel[0]
if not (np.any(mask) and np.any(area_array)):
return 0
mask_pos = cls.calculate_points(channel, mask, voxel_size, result_scalar, distance_from_mask)
seg_pos = cls.calculate_points(channel, area_array, voxel_size, result_scalar, distance_to_roi)
if 1 in {mask_pos.shape[0], seg_pos.shape[0]}:
return np.min(cdist(mask_pos, seg_pos))
min_val = np.inf
for i in range(seg_pos.shape[0]):
min_val = min(min_val, np.min(cdist(mask_pos, np.array([seg_pos[i]]))))
return min_val
[docs] @classmethod
def get_starting_leaf(cls):
return super().get_starting_leaf().replace_(area=AreaType.Mask)
[docs] @classmethod
def get_units(cls, ndim):
return symbols("{}")
[docs] @classmethod
def need_channel(cls):
return True
[docs] @staticmethod
def area_type(area: AreaType):
return AreaType.ROI
class DistanceROIROIParameters(BaseModel):
profile: ROIExtractionProfile = Field(
ROIExtractionProfile(
name="default",
algorithm=LowerThresholdAlgorithm.get_name(),
values=LowerThresholdAlgorithm.get_default_values(),
),
title="ROI extraction profile",
)
distance_from_new_roi: DistancePoint = Field(DistancePoint.Border, title="Distance new ROI")
distance_to_roi: DistancePoint = Field(DistancePoint.Border, title="Distance to ROI")
[docs]class DistanceROIROI(DistanceMaskROI):
text_info = "to new ROI distance", "Calculate distance between ROI and new ROI"
__argument_class__ = DistanceROIROIParameters
[docs] @classmethod
def get_starting_leaf(cls):
return Leaf(name=cls.text_info[0], area=AreaType.ROI)
# noinspection PyMethodOverriding
[docs] @classmethod
def calculate_property(
cls,
channel: np.ndarray,
image: Image,
area_array: np.ndarray,
profile: ROIExtractionProfile,
mask: Optional[np.ndarray],
voxel_size: Sequence[float],
result_scalar: float,
distance_from_new_roi: DistancePoint,
distance_to_roi: DistancePoint,
**kwargs,
): # pylint: disable=arguments-differ
if len(channel.shape) == 4:
if channel.shape[0] != 1:
raise ValueError("This measurements do not support time data")
channel = channel[0]
try:
hash_name = hash_fun_call_name(
calculate_segmentation_step,
profile,
kwargs["_area"],
kwargs["_per_component"],
Channel(-1),
kwargs["_component_num"],
)
if hash_name in kwargs["help_dict"]:
result = kwargs["help_dict"][hash_name]
else:
result, _ = calculate_segmentation_step(profile, image, mask)
kwargs["help_dict"][hash_name] = result
except KeyError:
result, _ = calculate_segmentation_step(profile, image, mask)
if np.any(result.roi[area_array > 0]):
return 0
return super().calculate_property(
channel,
area_array,
result.roi,
tuple(voxel_size),
result_scalar,
distance_from_mask=distance_from_new_roi,
distance_to_roi=distance_to_roi,
)
@staticmethod
def need_full_data():
return True
class ROINeighbourhoodROIParameters(BaseModel):
profile: ROIExtractionProfile = Field(
ROIExtractionProfile(
name="default",
algorithm=LowerThresholdAlgorithm.get_name(),
values=LowerThresholdAlgorithm.get_default_values(),
),
title="ROI extraction profile",
)
distance: float = Field(500, ge=0, le=10000, title="Distance")
units: Units = Units.nm
[docs]class ROINeighbourhoodROI(DistanceMaskROI):
text_info = "Neighbourhood new ROI presence", "Count how many of new roi are present in neighbourhood of new ROI"
__argument_class__ = ROINeighbourhoodROIParameters
[docs] @classmethod
def get_starting_leaf(cls):
return Leaf(name=cls.text_info[0], area=AreaType.ROI)
# noinspection PyMethodOverriding
[docs] @classmethod
def calculate_property(
cls,
image: Image,
area_array: np.ndarray,
profile: ROIExtractionProfile,
mask: Optional[np.ndarray],
voxel_size,
distance: float,
units: Units,
**kwargs,
): # pylint: disable=arguments-differ
try:
hash_name = hash_fun_call_name(
calculate_segmentation_step,
profile,
kwargs["_area"],
kwargs["_per_component"],
Channel(-1),
kwargs["_component_num"],
)
if hash_name in kwargs["help_dict"]:
result = kwargs["help_dict"][hash_name]
else:
result, _ = calculate_segmentation_step(profile, image, mask)
kwargs["help_dict"][hash_name] = result
except KeyError:
result, _ = calculate_segmentation_step(profile, image, mask)
area_array = image.fit_array_to_image(area_array)
units_scalar = UNIT_SCALE[units.value]
final_radius = [int((distance / units_scalar) / x) for x in reversed(voxel_size)]
dilated = SimpleITK.GetArrayFromImage(
SimpleITK.BinaryDilate(
SimpleITK.GetImageFromArray((area_array > 0).astype(np.uint8).squeeze()), final_radius
)
)
dilated = dilated.reshape(area_array.shape)
roi = image.fit_array_to_image(result.roi)
components = set(np.unique(roi[dilated > 0]))
if 0 in components:
components.remove(0)
return len(components)
@staticmethod
def need_full_data():
return True
class SplitOnPartParameters(MaskDistanceSplit.__argument_class__):
part_selection: int = Field(2, title="Which part (from border)", ge=1, le=1024)
[docs]class SplitOnPartVolume(MeasurementMethodBase):
text_info = (
"distance splitting volume",
"Split mask on parts and then calculate volume of cross of segmentation and mask part",
)
__argument_class__ = SplitOnPartParameters
[docs] @staticmethod
def calculate_property(
part_selection, area_array, voxel_size, result_scalar, **kwargs
): # pylint: disable=arguments-differ
masked = MaskDistanceSplit.split(voxel_size=voxel_size, **kwargs)
mask = masked == part_selection
return np.count_nonzero(mask * area_array) * pixel_volume(voxel_size, result_scalar)
[docs] @classmethod
def get_units(cls, ndim):
return symbols("{}") ** ndim
[docs] @classmethod
def get_starting_leaf(cls):
return super().get_starting_leaf().replace_(area=AreaType.Mask)
[docs] @staticmethod
def area_type(area: AreaType):
return AreaType.ROI
[docs]class SplitOnPartPixelBrightnessSum(MeasurementMethodBase):
text_info = (
"distance splitting pixel brightness sum",
"Split mask on parts and then calculate pixel brightness sum of cross of segmentation and mask part",
)
__argument_class__ = SplitOnPartParameters
[docs] @staticmethod
def calculate_property(part_selection, channel, area_array, **kwargs): # pylint: disable=arguments-differ
masked = MaskDistanceSplit.split(**kwargs)
mask = np.array(masked == part_selection)
if channel.ndim - mask.ndim == 1:
channel = channel[0]
return np.sum(channel[mask * area_array > 0])
[docs] @classmethod
def get_units(cls, ndim):
return symbols("Pixel_brightness")
[docs] @classmethod
def get_starting_leaf(cls):
return super().get_starting_leaf().replace_(area=AreaType.Mask)
[docs] @staticmethod
def area_type(area: AreaType):
return AreaType.ROI
[docs] @classmethod
def need_channel(cls):
return True
HARALIC_FEATURES = """AngularSecondMoment Contrast Correlation Variance
InverseDifferenceMoment SumAverage SumVariance SumEntropy Entropy
DifferenceVariance DifferenceEntropy InfoMeas1 InfoMeas2""".split()
[docs]class HaralickEnum(Enum):
AngularSecondMoment = "AngularSecondMoment"
Contrast = "Contrast"
Correlation = "Correlation"
Variance = "Variance"
InverseDifferenceMoment = "InverseDifferenceMoment"
SumAverage = "SumAverage"
SumVariance = "SumVariance"
SumEntropy = "SumEntropy"
Entropy = "Entropy"
DifferenceVariance = "DifferenceVariance"
DifferenceEntropy = "DifferenceEntropy"
InfoMeas1 = "InfoMeas1"
InfoMeas2 = "InfoMeas2"
def index(self) -> int:
return list(self.__class__).index(self)
def _rescale_image(data: np.ndarray):
if data.dtype == np.uint8:
return data
min_val = data.min()
max_val = data.max()
return ((data - min_val) / ((max_val - min_val) / 255)).astype(np.uint8)
class HaralickParameters(BaseModel):
feature: HaralickEnum = HaralickEnum.AngularSecondMoment
distance: int = Field(1, ge=1, le=10)
[docs]class Haralick(MeasurementMethodBase):
__argument_class__ = HaralickParameters
[docs] @classmethod
def get_units(cls, ndim) -> symbols:
return "1"
text_info = "Haralick", "Calculate Haralick features"
[docs] @classmethod
def need_channel(cls):
return True
[docs] @classmethod
def calculate_property( # pylint: disable=arguments-differ
cls, area_array, channel, distance, feature, _cache=False, **kwargs
): # pylint: disable=arguments-differ
if isinstance(feature, str):
feature = HaralickEnum(feature)
if _cache := _cache and "_area" in kwargs and "_per_component" in kwargs:
help_dict: Dict = kwargs["help_dict"]
_area: AreaType = kwargs["_area"]
_per_component: PerComponent = kwargs["_per_component"]
hash_name = hash_fun_call_name(
Haralick, {"distance": distance}, _area, _per_component, kwargs["channel_num"], kwargs["_component_num"]
)
if hash_name not in help_dict:
help_dict[hash_name] = cls.calculate_haralick(channel, area_array, distance)
return help_dict[hash_name][feature.index()]
res = cls.calculate_haralick(channel, area_array, distance)
return res[feature.index()]
@staticmethod
def calculate_haralick(channel, area_array, distance):
data = channel.copy()
data[area_array == 0] = 0
data = _rescale_image(data.squeeze())
return haralick(data, distance=distance, ignore_zeros=True, return_mean=True)
[docs]class ComponentBoundingBox(MeasurementMethodBase):
text_info = "Component Bounding Box", "bounding box as string"
[docs] @classmethod
def get_units(cls, ndim):
return "str"
[docs] @staticmethod
def calculate_property(bounds_info, _component_num, **kwargs): # pylint: disable=arguments-differ
return str(bounds_info[_component_num])
[docs] @classmethod
def get_starting_leaf(cls):
return super().get_starting_leaf().replace_(area=AreaType.ROI, per_component=PerComponent.Yes)
class GetROIAnnotationTypeParameters(BaseModel):
name: str = ""
[docs]class GetROIAnnotationType(MeasurementMethodBase):
text_info = "annotation by name", "Get roi annotation by name"
__argument_class__ = GetROIAnnotationTypeParameters
[docs] @classmethod
def get_starting_leaf(cls):
return Leaf(name=cls.text_info[0], area=AreaType.ROI, per_component=PerComponent.Yes)
[docs] @staticmethod
def calculate_property(roi_annotation, name, _component_num, **kwargs): # pylint: disable=arguments-differ
return str(roi_annotation.get(_component_num, {}).get(name, ""))
[docs] @classmethod
def get_units(cls, ndim):
return "str"
class ColocalizationMeasurementParameters(BaseModel):
channel_fst: Channel = Field(0, title="Channel 1")
channel_scd: Channel = Field(1, title="Channel 2")
colocalization: CorrelationEnum = CorrelationEnum.pearson
randomize: bool = Field(
False, description="If randomize orders of pixels in one channel", title="Randomize channel"
)
randomize_repeat: int = Field(10, description="Number of repetitions for mean_calculate", title="Randomize num")
[docs]class ColocalizationMeasurement(MeasurementMethodBase):
text_info = "Colocalization", "Measurement of colocalization of two channels."
__argument_class__ = ColocalizationMeasurementParameters
@staticmethod
def _calculate_masked(data_1, data_2, colocalization):
if colocalization == CorrelationEnum.spearman:
data_1 = data_1.argsort().argsort().astype(float)
data_2 = data_2.argsort().argsort().astype(float)
colocalization = CorrelationEnum.pearson
if colocalization == CorrelationEnum.pearson:
data_1_mean = np.mean(data_1)
data_2_mean = np.mean(data_2)
nominator = np.sum((data_1 - data_1_mean) * (data_2 - data_2_mean))
numerator = np.sqrt(np.sum((data_1 - data_1_mean) ** 2) * np.sum((data_2 - data_2_mean) ** 2))
return nominator / numerator
if colocalization == CorrelationEnum.manders:
nominator = np.sum(data_1 * data_2)
numerator = np.sqrt(np.sum(data_1**2) * np.sum(data_2**2))
return nominator / numerator
if colocalization == CorrelationEnum.intensity:
data_1_mean = np.mean(data_1)
data_2_mean = np.mean(data_2)
return np.sum((data_1 > data_1_mean) == (data_2 > data_2_mean)) / data_1.size - 0.5
raise RuntimeError(f"Not supported colocalization method {colocalization}") # pragma: no cover
[docs] @classmethod
def calculate_property( # pylint: disable=arguments-differ
cls, area_array, colocalization, randomize=False, randomize_repeat=10, channel_fst=0, channel_scd=1, **kwargs
): # pylint: disable=arguments-differ
mask_binary = area_array > 0
data_1 = kwargs[f"channel_{channel_fst}"][mask_binary].astype(float)
data_2 = kwargs[f"channel_{channel_scd}"][mask_binary].astype(float)
if not randomize:
return cls._calculate_masked(data_1, data_2, colocalization)
res_list = []
for _ in range(randomize_repeat):
rand_data2 = np.random.default_rng().permutation(data_2)
res_list.append(cls._calculate_masked(data_1, rand_data2, colocalization))
return np.mean(res_list)
[docs] @classmethod
def get_units(cls, ndim) -> symbols:
return 1
def pixel_volume(spacing, result_scalar):
return reduce((lambda x, y: x * y), [x * result_scalar for x in spacing])
def calculate_volume_surface(volume_mask, voxel_size):
surf_im: np.ndarray = np.array(volume_mask).astype(np.uint8).squeeze()
return sum(
(
np.count_nonzero(
np.logical_xor(
surf_im.take(np.arange(surf_im.shape[ax] - 1), axis=ax),
surf_im.take(np.arange(surf_im.shape[ax] - 1) + 1, axis=ax),
)
)
* reduce(
lambda x, y: x * y,
[voxel_size[x] for x in range(surf_im.ndim) if x != ax],
)
)
for ax in range(surf_im.ndim)
)
def get_border(array):
if array.dtype == bool:
array = array.astype(np.uint8)
return SimpleITK.GetArrayFromImage(SimpleITK.LabelContour(SimpleITK.GetImageFromArray(array)))
def calc_diam(array, voxel_size): # pragma: no cover
pos = np.transpose(np.nonzero(array)).astype(float)
for i, val in enumerate(voxel_size):
pos[:, i] *= val
diam = 0
for i, p in enumerate(zip(pos[:-1])):
tmp = np.array((pos[i + 1 :] - p) ** 2)
diam = max(diam, np.max(np.sum(tmp, 1)))
return np.sqrt(diam)
[docs]class VoxelSize(MeasurementMethodBase):
text_info = "Voxel size", "Voxel size"
[docs] @classmethod
def calculate_property(cls, voxel_size, result_scalar, **kwargs): # pylint: disable=arguments-differ
return " x ".join([str(x * result_scalar) for x in voxel_size])
[docs] @classmethod
def get_units(cls, ndim) -> symbols:
return symbols("str {}")
MEASUREMENT_DICT = Register(suggested_base_class=MeasurementMethodBase)
"""Register with all measurements algorithms"""
MEASUREMENT_DICT.register(Volume)
MEASUREMENT_DICT.register(Diameter)
MEASUREMENT_DICT.register(PixelBrightnessSum, old_names=["Pixel Brightness Sum"])
MEASUREMENT_DICT.register(ComponentBoundingBox)
MEASUREMENT_DICT.register(GetROIAnnotationType)
MEASUREMENT_DICT.register(ComponentsNumber, old_names=["Components Number"])
MEASUREMENT_DICT.register(MaximumPixelBrightness)
MEASUREMENT_DICT.register(MinimumPixelBrightness)
MEASUREMENT_DICT.register(MeanPixelBrightness)
MEASUREMENT_DICT.register(MedianPixelBrightness)
MEASUREMENT_DICT.register(StandardDeviationOfPixelBrightness)
MEASUREMENT_DICT.register(ColocalizationMeasurement)
MEASUREMENT_DICT.register(Moment, old_names=["Moment of inertia"])
MEASUREMENT_DICT.register(FirstPrincipalAxisLength, old_names=["Longest main axis length"])
MEASUREMENT_DICT.register(SecondPrincipalAxisLength, old_names=["Middle main axis length"])
MEASUREMENT_DICT.register(ThirdPrincipalAxisLength, old_names=["Shortest main axis length"])
MEASUREMENT_DICT.register(Compactness)
MEASUREMENT_DICT.register(Sphericity)
MEASUREMENT_DICT.register(Surface)
MEASUREMENT_DICT.register(RimVolume, old_names=["Rim Volume"])
MEASUREMENT_DICT.register(RimPixelBrightnessSum, old_names=["Rim Pixel Brightness Sum"])
MEASUREMENT_DICT.register(ROINeighbourhoodROI)
MEASUREMENT_DICT.register(DistanceMaskROI, old_names=["segmentation distance"])
MEASUREMENT_DICT.register(DistanceROIROI, old_names=["to ROI distance"])
MEASUREMENT_DICT.register(SplitOnPartVolume, old_names=["split on part volume"])
MEASUREMENT_DICT.register(SplitOnPartPixelBrightnessSum, old_names=["split on part pixel brightness sum"])
MEASUREMENT_DICT.register(Voxels)
MEASUREMENT_DICT.register(Haralick)
MEASUREMENT_DICT.register(VoxelSize)