Source code for PartSegCore.segmentation.restartable_segmentation_algorithms

import dataclasses
import operator
import typing
from abc import ABC, abstractmethod
from collections import defaultdict
from copy import deepcopy

import numpy as np
import SimpleITK
from local_migrator import REGISTER, class_to_str, register_class, rename_key
from pydantic import Field, validator

from PartSegCore.algorithm_describe_base import ROIExtractionProfile
from PartSegCore.mask_partition_utils import BorderRim as BorderRimBase
from PartSegCore.mask_partition_utils import MaskDistanceSplit as MaskDistanceSplitBase
from PartSegCore.project_info import AdditionalLayerDescription
from PartSegCore.segmentation.algorithm_base import (
    ROIExtractionAlgorithm,
    ROIExtractionResult,
    SegmentationLimitException,
)
from PartSegCore.segmentation.mu_mid_point import BaseMuMid, MuMidSelection
from PartSegCore.segmentation.noise_filtering import NoiseFilterSelection
from PartSegCore.segmentation.threshold import (
    BaseThreshold,
    DoubleThreshold,
    DoubleThresholdParams,
    DoubleThresholdSelection,
    ManualThreshold,
    RangeThresholdSelection,
    SingleThresholdParams,
    ThresholdSelection,
)
from PartSegCore.segmentation.watershed import BaseWatershed, WatershedSelection, calculate_distances_array, get_neigh
from PartSegCore.universal_const import Units
from PartSegCore.utils import BaseModel, bisect
from PartSegCore_compiled_backend.multiscale_opening import PyMSO, calculate_mu_mid
from PartSegImage import Channel

REQUIRE_MASK_STR = "Need mask"


def blank_operator(_x, _y):
    raise NotImplementedError


[docs]class RestartableAlgorithm(ROIExtractionAlgorithm, ABC): """ Base class for restartable segmentation algorithm. The idea is to store two copies of algorithm parameters and base on difference check from which point restart the calculation. :ivar dict ~.parameters: variable for store last run parameters :ivar dict ~.new_parameters: variable for store parameters for next run """ def __init__(self, **kwargs): super().__init__() self.parameters: typing.Dict[str, typing.Optional[typing.Any]] = defaultdict(lambda: None) self.new_parameters = self.__argument_class__() if self.__new_style__ else {} # pylint: disable=not-callable def set_image(self, image): self.parameters = defaultdict(lambda: None) super().set_image(image)
[docs] def set_mask(self, mask): super().set_mask(mask) self.parameters["threshold"] = None
def get_info_text(self): return "No info [Report this ass error]" def get_segmentation_profile(self) -> ROIExtractionProfile: return ROIExtractionProfile(name="", algorithm=self.get_name(), values=deepcopy(self.new_parameters)) @classmethod def support_time(cls): return False @classmethod def support_z(cls): return True
[docs] @abstractmethod def calculation_run(self, report_fun: typing.Callable[[str, int], None]) -> typing.Optional[ROIExtractionResult]: """Restartable calculation may return None if there is no need to recalculate""" raise NotImplementedError
class BorderRimParameters(BorderRimBase.__argument_class__): @staticmethod def header(): return REQUIRE_MASK_STR
[docs]class BorderRim(RestartableAlgorithm): """ This class wrap the :py:class:`PartSegCore.mask_partition_utils.BorderRim`` class in segmentation algorithm interface. It allow user to check how rim look with given set of parameters """ __argument_class__ = BorderRimParameters
[docs] @classmethod def get_name(cls): return "Border Rim"
def __init__(self): super().__init__() self.distance = 0 self.units = Units.nm def get_info_text(self): return REQUIRE_MASK_STR if self.mask is None else ""
[docs] def calculation_run(self, _report_fun) -> ROIExtractionResult: if self.mask is not None: result = BorderRimBase.border_mask( mask=self.mask, voxel_size=self.image.spacing, **self.new_parameters.dict() ) return ROIExtractionResult(roi=result, parameters=self.get_segmentation_profile()) raise SegmentationLimitException("Border Rim needs mask")
class MaskDistanceSplitParameters(MaskDistanceSplitBase.__argument_class__): @staticmethod def header(): return REQUIRE_MASK_STR
[docs]class MaskDistanceSplit(RestartableAlgorithm): """ This class wrap the :py:class:`PartSegCore.mask_partition_utils.SplitMaskOnPart` class in segmentation algorithm interface. It allow user to check how split look with given set of parameters """ __argument_class__ = MaskDistanceSplitParameters
[docs] def calculation_run(self, report_fun: typing.Callable[[str, int], None]) -> ROIExtractionResult: if self.mask is not None: result = MaskDistanceSplitBase.split( mask=self.mask, voxel_size=self.image.voxel_size, **self.new_parameters.dict() ) return ROIExtractionResult(roi=result, parameters=self.get_segmentation_profile()) raise SegmentationLimitException("Mask Distance Split needs mask")
[docs] @classmethod def get_name(cls) -> str: return "Mask Distance Splitting"
@register_class(version="0.0.1", migrations=[("0.0.1", rename_key("noise_removal", "noise_filtering", optional=True))]) class ThresholdBaseAlgorithmParameters(BaseModel): channel: Channel = Channel(0) noise_filtering: NoiseFilterSelection = Field(NoiseFilterSelection.get_default(), title="Filter") minimum_size: int = Field(8000, title="Minimum size (px)", ge=0, le=10**6) side_connection: bool = Field( False, title="Connect only sides", description="During calculation of connected components includes only side by side connected pixels", ) @validator("noise_filtering") def _noise_filter_validate(cls, v): # pylint: disable=no-self-use if not isinstance(v, dict): return v algorithm = NoiseFilterSelection[v["name"]] if not algorithm.__new_style__ or not algorithm.__argument_class__.__fields__: return v return algorithm.__argument_class__(**REGISTER.migrate_data(class_to_str(algorithm.__argument_class__), {}, v)) class ThresholdBaseAlgorithmParametersAnnot(ThresholdBaseAlgorithmParameters): threshold: typing.Any = None
[docs]class ThresholdBaseAlgorithm(RestartableAlgorithm, ABC): """ Base class for most threshold Algorithm implemented in PartSeg analysis. Created for reduce code repetition. """ __argument_class__ = ThresholdBaseAlgorithmParameters new_parameters: ThresholdBaseAlgorithmParametersAnnot threshold_operator = staticmethod(blank_operator) def __init__(self, **kwargs): super().__init__() self.cleaned_image = None self.threshold_image = None self._sizes_array = [] self.components_num = 0 self.threshold_info = None self.old_threshold_info = None
[docs] def get_additional_layers( self, full_segmentation: typing.Optional[np.ndarray] = None ) -> typing.Dict[str, AdditionalLayerDescription]: """ Create dict with standard additional layers. :param full_segmentation: no size filtering if not `self.segmentation` :return: """ if full_segmentation is None: full_segmentation = self.segmentation return { "denoised image": AdditionalLayerDescription(data=self.cleaned_image, layer_type="image"), "no size filtering": AdditionalLayerDescription(data=full_segmentation, layer_type="labels"), }
[docs] def prepare_result(self, roi: np.ndarray) -> ROIExtractionResult: """ Collect data for result. :param roi: array with segmentation :return: algorithm result description """ sizes = np.bincount(roi.flat) annotation = {i: {"component": i, "voxels": size} for i, size in enumerate(sizes[1:], 1) if size > 0} return ROIExtractionResult( roi=roi, parameters=self.get_segmentation_profile(), additional_layers=self.get_additional_layers(), roi_annotation=annotation, )
def set_image(self, image): super().set_image(image) self.threshold_info = None def get_info_text(self): return f"Threshold: {self.threshold_info}\nSizes: " + ", ".join( map(str, self._sizes_array[1 : self.components_num + 1]) ) def _lack_of_components(self): res = self.prepare_result(self.threshold_image.astype(np.uint8)) info_text = ( "Something wrong with chosen threshold. Please check it. " "May be too low or too high. The channel brightness range is " f"{self.cleaned_image.min()}-{self.cleaned_image.max()} " f"and chosen threshold is {self.threshold_info}" ) return dataclasses.replace(res, info_text=info_text) def _get_channel(self) -> bool: """Get channel from image if number of channel is changed from previous run, or image is changed""" if self.channel is None or self.parameters["channel"] != self.new_parameters.channel: self.parameters["channel"] = self.new_parameters.channel self.channel = self.get_channel(self.new_parameters.channel) return True return False def _update_cleaned_image(self, restarted: bool) -> bool: """Update cleaned image if selected channel or or noise filter is changed""" if restarted or self.parameters["noise_filtering"] != self.new_parameters.noise_filtering: self.parameters["noise_filtering"] = deepcopy(self.new_parameters.noise_filtering) noise_filtering_parameters = self.new_parameters.noise_filtering self.cleaned_image = NoiseFilterSelection[noise_filtering_parameters.name].noise_filter( self.channel, self.image.spacing, noise_filtering_parameters.values ) return True return False def _calculate_threshold(self, restarted: bool): """Calculate threshold if cleaned image is changed""" if restarted or self.new_parameters.threshold != self.parameters["threshold"]: self.parameters["threshold"] = deepcopy(self.new_parameters.threshold) self.threshold_image = self._threshold(self.cleaned_image) return True return False def _calculate_components(self, restarted: bool): """Calculate components if threshold image is changed""" if restarted or self.new_parameters.side_connection != self.parameters["side_connection"]: self.parameters["side_connection"] = self.new_parameters.side_connection connect = SimpleITK.ConnectedComponent( SimpleITK.GetImageFromArray(self.threshold_image), not self.new_parameters.side_connection ) self.segmentation = SimpleITK.GetArrayFromImage(SimpleITK.RelabelComponent(connect)) self._sizes_array = np.bincount(self.segmentation.flat) return True return False def _filter_by_size(self, restarted: bool) -> typing.Optional[np.ndarray]: """Filter components by size if size filter is changed""" if restarted or self.new_parameters.minimum_size != self.parameters["size_filter"]: self.parameters["minimum_size"] = self.new_parameters.minimum_size minimum_size = self.new_parameters.minimum_size ind = bisect(self._sizes_array[1:], minimum_size, operator.gt) finally_segment = np.copy(self.segmentation) finally_segment[finally_segment > ind] = 0 self.components_num = ind return finally_segment return None
[docs] def calculation_run( self, report_fun: typing.Callable[[str, int], typing.Any] ) -> typing.Optional[ROIExtractionResult]: """ main calculation function :param report_fun: function used to trace progress """ # TODO Refactor self.old_threshold_info = self.threshold_info restarted = self._get_channel() restarted = self._update_cleaned_image(restarted) restarted = self._calculate_threshold(restarted) if self.threshold_image.max() == 0: return self._lack_of_components() restarted = self._calculate_components(restarted) if len(self._sizes_array) < 2: return self._lack_of_components() finally_segment = self._filter_by_size(restarted) if finally_segment is not None: if self.components_num == 0: info_text = ( f"Please check the minimum size parameter. The biggest element has size {self._sizes_array[1]}" ) else: info_text = "" res = self.prepare_result(finally_segment) return dataclasses.replace(res, info_text=info_text) return None
def clean(self): super().clean() self.parameters: typing.Dict[str, typing.Optional[typing.Any]] = defaultdict(lambda: None) self.cleaned_image = None self.mask = None def _threshold(self, image, thr=None): if thr is None: thr: BaseThreshold = ThresholdSelection[self.new_parameters.threshold.name] mask, thr_val = thr.calculate_mask( image, self.mask, self.new_parameters.threshold.values, self.threshold_operator ) self.threshold_info = thr_val return mask
class OneThresholdAlgorithmParameters(ThresholdBaseAlgorithmParameters): threshold: ThresholdSelection = Field(ThresholdSelection.get_default(), position=2)
[docs]class OneThresholdAlgorithm(ThresholdBaseAlgorithm, ABC): """Base class for PartSeg analysis algorithm which apply one threshold. Created for reduce code repetition.""" __argument_class__ = OneThresholdAlgorithmParameters
[docs]class LowerThresholdAlgorithm(OneThresholdAlgorithm): """ Implementation of lower threshold algorithm. It has same flow like :py:class:`ThresholdBaseAlgorithm`. The area of interest are voxels from filtered channel with value above the given threshold """ threshold_operator = staticmethod(operator.gt)
[docs] @classmethod def get_name(cls): return "Lower threshold"
[docs]class UpperThresholdAlgorithm(OneThresholdAlgorithm): """ Implementation of upper threshold algorithm. It has same flow like :py:class:`ThresholdBaseAlgorithm`. The area of interest are voxels from filtered channel with value below the given threshold """ threshold_operator = staticmethod(operator.lt)
[docs] @classmethod def get_name(cls): return "Upper threshold"
class TwoThreshold(BaseModel): # keep for backward compatibility lower_threshold: float = Field(1000, ge=0, le=10**6) upper_threshold: float = Field(10000, ge=0, le=10**6) def _to_two_thresholds(dkt): dkt["threshold"] = TwoThreshold( lower_threshold=dkt.pop("lower_threshold"), upper_threshold=dkt.pop("upper_threshold") ) return dkt def _to_double_threshold(dkt): dkt["threshold"] = DoubleThresholdSelection( name=DoubleThreshold.get_name(), values=DoubleThresholdParams( core_threshold=ThresholdSelection( name=ManualThreshold.get_name(), values=SingleThresholdParams(threshold=dkt["threshold"].lower_threshold), ), base_threshold=ThresholdSelection( name=ManualThreshold.get_name(), values=SingleThresholdParams(threshold=dkt["threshold"].upper_threshold), ), ), ) return dkt def _rename_algorithm(dkt): values = dkt["threshold"].values name = dkt["threshold"].name if name == "Base/Core": name = "Range" dkt["threshold"] = RangeThresholdSelection(name=name, values=values) return dkt @register_class( version="0.0.3", migrations=[("0.0.1", _to_two_thresholds), ("0.0.2", _to_double_threshold), ("0.0.3", _rename_algorithm)], ) class RangeThresholdAlgorithmParameters(ThresholdBaseAlgorithmParameters): threshold: RangeThresholdSelection = Field(default_factory=RangeThresholdSelection.get_default, position=2)
[docs]class RangeThresholdAlgorithm(ThresholdBaseAlgorithm): """ Implementation of upper threshold algorithm. It has same flow like :py:class:`ThresholdBaseAlgorithm`. The area of interest are voxels from filtered channel with value between the lower and upper threshold """ __argument_class__ = RangeThresholdAlgorithmParameters def _threshold(self, image, thr=None): if thr is None: thr: BaseThreshold = RangeThresholdSelection[self.new_parameters.threshold.name] mask, thr_val = thr.calculate_mask(image, self.mask, self.new_parameters.threshold.values, operator.ge) mask[mask == 2] = 0 self.threshold_info = thr_val[::-1] return mask
[docs] @classmethod def get_name(cls): return "Range threshold"
[docs]class TwoLevelThresholdBaseAlgorithm(ThresholdBaseAlgorithm, ABC): def __init__(self): super().__init__() self.sprawl_area = None self._original_output = None def _threshold(self, image, thr=None): if thr is None: thr: BaseThreshold = DoubleThresholdSelection[self.new_parameters.threshold.name] mask, thr_val = thr.calculate_mask( image, self.mask, self.new_parameters.threshold.values, self.threshold_operator ) self.threshold_info = thr_val self.sprawl_area = (mask >= 1).astype(np.uint8) self._original_output = mask return (mask == 2).astype(np.uint8)
@register_class(version="0.0.1", migrations=[("0.0.1", rename_key("sprawl_type", "flow_type"))]) class BaseThresholdFlowAlgorithmParameters(ThresholdBaseAlgorithmParameters): threshold: DoubleThresholdSelection = Field(DoubleThresholdSelection.get_default(), position=2) flow_type: WatershedSelection = Field(WatershedSelection.get_default(), position=3) minimum_size: int = Field(8000, title="Minimum core\nsize (px)", ge=0, le=10**6) remove_object_touching_border: bool = Field( False, title="Remove objects\ntouching border", description="Remove objects touching border" ) def remove_object_touching_border(new_segment): non_one_dims = np.where(np.array(new_segment.shape) > 1)[0] slice_list = [slice(None)] * len(new_segment.shape) to_remove = set() for dim in non_one_dims: slice_copy = slice_list[:] slice_copy[dim] = 0 to_remove.update(np.unique(new_segment[tuple(slice_copy)])) slice_copy[dim] = new_segment.shape[dim] - 1 to_remove.update(np.unique(new_segment[tuple(slice_copy)])) res = np.copy(new_segment) for i in to_remove: if i == 0: continue res[res == i] = 0 return res
[docs]class BaseThresholdFlowAlgorithm(TwoLevelThresholdBaseAlgorithm, ABC): __argument_class__ = BaseThresholdFlowAlgorithmParameters new_parameters: BaseThresholdFlowAlgorithmParameters def get_info_text(self): return ( "Threshold: " + ", ".join(map(str, self.threshold_info)) + "\nMid sizes: " + ", ".join(map(str, self._sizes_array[1 : self.components_num + 1])) + "\nFinal sizes: " + ", ".join(map(str, self.final_sizes[1:])) ) def __init__(self): super().__init__() self.finally_segment = None self.final_sizes = [] self.threshold_info = [None, None] def clean(self): self.sprawl_area = None super().clean() def set_image(self, image): super().set_image(image) self.threshold_info = [None, None]
[docs] def calculation_run(self, report_fun) -> typing.Optional[ROIExtractionResult]: segment_data = super().calculation_run(report_fun) if segment_data is not None and self.components_num == 0: self.final_sizes = [] return segment_data if segment_data is None: restarted = False finally_segment = np.copy(self.finally_segment) else: self.finally_segment = segment_data.roi finally_segment = segment_data.roi restarted = True if ( restarted or self.old_threshold_info[1] != self.threshold_info[1] or self.new_parameters.flow_type != self.parameters["flow_type"] or self.new_parameters.remove_object_touching_border != self.parameters["remove_object_touching_border"] ): if self.threshold_operator(self.threshold_info[1], self.threshold_info[0]): self.final_sizes = np.bincount(finally_segment.flat) return self.prepare_result(self.finally_segment) path_sprawl: BaseWatershed = WatershedSelection[self.new_parameters.flow_type.name] self.parameters["flow_type"] = self.new_parameters.flow_type new_segment = path_sprawl.sprawl( self.sprawl_area, np.copy(finally_segment), # TODO add tests for discover this problem self.channel, self.components_num, self.image.spacing, self.new_parameters.side_connection, self.threshold_operator, self.new_parameters.flow_type.values, self.threshold_info[1], self.threshold_info[0], ) if self.new_parameters.remove_object_touching_border: new_segment = remove_object_touching_border(new_segment) self.parameters["remove_object_touching_border"] = self.new_parameters.remove_object_touching_border self.final_sizes = np.bincount(new_segment.flat) return ROIExtractionResult( roi=new_segment, parameters=self.get_segmentation_profile(), additional_layers={ "original": AdditionalLayerDescription(data=self._original_output, layer_type="labels"), **self.get_additional_layers(full_segmentation=self.sprawl_area), }, roi_annotation={ i: {"component": i, "core voxels": self._sizes_array[i], "voxels": v} for i, v in enumerate(self.final_sizes[1:], 1) }, alternative_representation={"core_objects": finally_segment}, ) return None
[docs]class LowerThresholdFlowAlgorithm(BaseThresholdFlowAlgorithm): threshold_operator = staticmethod(operator.gt)
[docs] @classmethod def get_name(cls): return "Lower threshold with watershed"
[docs]class UpperThresholdFlowAlgorithm(BaseThresholdFlowAlgorithm): threshold_operator = staticmethod(operator.lt)
[docs] @classmethod def get_name(cls): return "Upper threshold with watershed"
@register_class(version="0.0.1", migrations=[("0.0.1", rename_key("noise_removal", "noise_filtering", optional=True))]) class OtsuSegmentParameters(BaseModel): channel: Channel = 0 noise_filtering: NoiseFilterSelection = Field(NoiseFilterSelection.get_default(), title="Noise Removal") components: int = Field(2, title="Number of Components", ge=0, lt=100) valley: bool = Field(True, title="Valley emphasis") hist_num: int = Field(128, title="Number of histogram bins", ge=8, le=2**16)
[docs]class OtsuSegment(RestartableAlgorithm): __argument_class__ = OtsuSegmentParameters new_parameters: OtsuSegmentParameters
[docs] @classmethod def get_name(cls): return "Multiple Otsu"
def __init__(self): super().__init__() self._sizes_array = [] self.threshold_info = []
[docs] def calculation_run(self, report_fun): channel = self.get_channel(self.new_parameters.channel) noise_filtering_parameters = self.new_parameters.noise_filtering cleaned_image = NoiseFilterSelection[noise_filtering_parameters.name].noise_filter( channel, self.image.spacing, noise_filtering_parameters.values ) cleaned_image_sitk = SimpleITK.GetImageFromArray(cleaned_image) res = SimpleITK.OtsuMultipleThresholds( cleaned_image_sitk, self.new_parameters.components, 0, self.new_parameters.hist_num, self.new_parameters.valley, ) res = SimpleITK.GetArrayFromImage(res) self._sizes_array = np.bincount(res.flat)[1:] self.threshold_info = [] annotations = {} for i in range(1, self.new_parameters.components + 1): val = cleaned_image[res == i] if val.size: self.threshold_info.append(np.min(val)) elif self.threshold_info: self.threshold_info.append(self.threshold_info[-1]) else: self.threshold_info.append(0) annotations[i] = {"lower threshold": self.threshold_info[-1]} if i > 1: annotations[i - 1]["upper threshold"] = self.threshold_info[-1] annotations[self.new_parameters.components]["upper threshold"] = np.max(cleaned_image) return ROIExtractionResult( roi=res, parameters=self.get_segmentation_profile(), additional_layers={"denoised_image": AdditionalLayerDescription(data=cleaned_image, layer_type="image")}, roi_annotation=annotations, )
def get_info_text(self): return ( "Threshold: " + ", ".join(map(str, self.threshold_info)) + "\nSizes: " + ", ".join(map(str, self._sizes_array)) )
class BaseMultiScaleOpeningParameters(TwoLevelThresholdBaseAlgorithm.__argument_class__): threshold: DoubleThresholdSelection = Field(DoubleThresholdSelection.get_default()) mu_mid: MuMidSelection = Field(MuMidSelection.get_default(), title="Mu mid value") step_limits: int = Field(100, title="Limits of Steps", ge=1, le=1000)
[docs]class BaseMultiScaleOpening(TwoLevelThresholdBaseAlgorithm, ABC): # pragma: no cover __argument_class__ = BaseMultiScaleOpeningParameters new_parameters: BaseMultiScaleOpeningParameters def get_info_text(self): return ( "Threshold: " + ", ".join(map(str, self.threshold_info)) + "\nMid sizes: " + ", ".join(map(str, self._sizes_array[1 : self.components_num + 1])) + "\nFinal sizes: " + ", ".join(map(str, self.final_sizes[1:])) + f"\nsteps: {self.steps}" ) def __init__(self): super().__init__() self.finally_segment = None self.final_sizes = [] self.threshold_info = [float("nan"), float("nan")] self.steps = 0 self.mso = PyMSO() self.mso.set_use_background(True) def clean(self): self.sprawl_area = None self.mso = PyMSO() self.mso.set_use_background(True) super().clean() def set_image(self, image): super().set_image(image) self.threshold_info = [float("nan"), float("nan")]
[docs] def calculation_run(self, report_fun) -> typing.Optional[ROIExtractionResult]: if self.new_parameters.side_connection != self.parameters["side_connection"]: neigh, dist = calculate_distances_array(self.image.spacing, get_neigh(self.new_parameters.side_connection)) self.mso.set_neighbourhood(neigh, dist) segment_data = super().calculation_run(report_fun) if segment_data is not None and self.components_num == 0: self.final_sizes = [] return segment_data if segment_data is None: restarted = False finally_segment = np.copy(self.finally_segment) else: self.finally_segment = segment_data.roi finally_segment = segment_data.roi if np.max(finally_segment) > 250: raise SegmentationLimitException( "Current implementation of MSO do not support more than 250 components" ) components = finally_segment.astype(np.uint8) components[components > 0] += 1 components[self.sprawl_area == 0] = 1 self.mso.set_components(components, self.components_num) restarted = True if ( restarted or self.old_threshold_info[1] != self.threshold_info[1] or self.new_parameters.mu_mid != self.parameters["mu_mid"] ): if self.threshold_operator(self.threshold_info[1], self.threshold_info[0]): self.final_sizes = np.bincount(finally_segment.flat) return self.prepare_result(self.finally_segment) mu_calc: BaseMuMid = MuMidSelection[self.new_parameters.mu_mid.name] self.parameters["mu_mid"] = self.new_parameters.mu_mid sprawl_area = (self.sprawl_area > 0).astype(np.uint8) sprawl_area[finally_segment > 0] = 0 mid_val = mu_calc.value( sprawl_area, self.channel, self.threshold_info[0], self.threshold_info[1], self.new_parameters.mu_mid.values, ) mu_array = calculate_mu_mid(self.channel, self.threshold_info[0], mid_val, self.threshold_info[1]) self.mso.set_mu_array(mu_array) restarted = True if restarted or self.new_parameters.step_limits != self.parameters["step_limits"]: self.parameters["step_limits"] = self.new_parameters.step_limits count_steps_factor = 20 if self.image.is_2d else 3 self.mso.run_MSO(self.new_parameters.step_limits, count_steps_factor) self.steps = self.mso.steps_done() new_segment = self.mso.get_result_catted() new_segment[new_segment > 0] -= 1 self.final_sizes = np.bincount(new_segment.flat) return self.prepare_result(new_segment) return None
[docs]class LowerThresholdMultiScaleOpening(BaseMultiScaleOpening): threshold_operator = staticmethod(operator.gt)
[docs] @classmethod def get_name(cls): # pragma: no cover return "Lower threshold MultiScale Opening"
[docs]class UpperThresholdMultiScaleOpening(BaseMultiScaleOpening): threshold_operator = staticmethod(operator.lt)
[docs] @classmethod def get_name(cls): # pragma: no cover return "Upper threshold MultiScale Opening"
final_algorithm_list = [ LowerThresholdAlgorithm, UpperThresholdAlgorithm, RangeThresholdAlgorithm, LowerThresholdFlowAlgorithm, UpperThresholdFlowAlgorithm, OtsuSegment, BorderRim, MaskDistanceSplit, ]