Source code for PartSegCore.analysis.calculation_plan

import logging
import os
import textwrap
import typing
import uuid
from abc import abstractmethod
from copy import copy, deepcopy
from enum import Enum

import local_migrator
from local_migrator import register_class, rename_key
from pydantic import BaseModel as PydanticBaseModel

from PartSegCore.algorithm_describe_base import ROIExtractionProfile
from PartSegCore.analysis import AnalysisAlgorithmSelection
from PartSegCore.analysis.measurement_calculation import MeasurementProfile
from PartSegCore.mask_create import MaskProperty
from PartSegCore.universal_const import Units
from PartSegCore.utils import BaseModel


[docs] class MaskBase(BaseModel): """ Base class for mask in calculation plan. :ivar str ~.name: name of mask """ name: str
[docs] class RootType(Enum): """Defines root type which changes of data available on begin of calculation""" Image = 0 #: raw image Project = 1 #: PartSeg project with defined segmentation Mask_project = 2 #: Project from mask segmentation. It contains multiple elements. def __str__(self): return self.name.replace("_", " ")
[docs] @register_class(old_paths=["PartSeg.utils.analysis.calculation_plan.MaskCreate"]) class MaskCreate(MaskBase): """ Description of mask creation in calculation plan. :ivar str ~.name: name of mask :ivar str ~.mask_property: instance of :py:class:`.MaskProperty` """ mask_property: MaskProperty def __str__(self): return f"Mask create: {self.name}\n" + str(self.mask_property).split("\n", 1)[1]
[docs] @register_class(old_paths=["PartSeg.utils.analysis.calculation_plan.MaskUse"]) class MaskUse(MaskBase): """ Reuse of already defined mask Will be deprecated in short time """
[docs] @register_class(old_paths=["PartSeg.utils.analysis.calculation_plan.MaskSum"]) class MaskSum(MaskBase): """ Description of OR operation on mask :ivar str ~.name: name of mask :ivar str ~.mask1: first mask name :ivar str ~.mask2: second mask name """ mask1: str mask2: str
[docs] @register_class(old_paths=["PartSeg.utils.analysis.calculation_plan.MaskIntersection"]) class MaskIntersection(MaskBase): """ Description of AND operation on mask :ivar str ~.name: name of mask :ivar str ~.mask1: first mask name :ivar str ~.mask2: second mask name """ mask1: str mask2: str
[docs] @register_class(old_paths=["PartSeg.utils.analysis.calculation_plan.Save"]) class Save(BaseModel): """ Save operation description :ivar str ~.suffix: suffix for saved file :ivar str ~.directory: name of subdirectory to save :ivar str ~.algorithm: name of save method :ivar str ~.short_name: short name of save method :ivar dict ~.values: parameters specific for save method """ suffix: str directory: str algorithm: str short_name: str values: dict
[docs] @register_class( version="0.0.1", migrations=[("0.0.1", rename_key("statistic_profile", "measurement_profile", optional=True))], old_paths=["PartSeg.utils.analysis.calculation_plan.StatisticCalculate"], ) class MeasurementCalculate(BaseModel): """ Measurement calculation description :ivar int ~.channel: on which channel measurements should be calculated :ivar Units ~.units: Type of units in which results of measurements should be represented :ivar MeasurementProfile ~.statistic_profile: description of measurements :ivar str name_prefix: prefix of column names """ channel: int units: Units measurement_profile: MeasurementProfile name_prefix: str @property def name(self): """name of used MeasurementProfile""" return self.measurement_profile.name def __str__(self): channel = "Like segmentation" if self.channel == -1 else str(self.channel) desc = str(self.measurement_profile).split("\n", 1)[1] return f"MeasurementCalculate \nChannel: {channel}\nUnits: {self.units}\n{desc}\n"
[docs] def get_save_path(op: Save, calculation: "FileCalculation") -> str: """ Calculate save path base on proceeded file path and save operation parameters. It assume that save algorithm is registered in :py:data:`PartSegCore.analysis.save_functions.save_dict` :param op: operation to do :param calculation: information about calculation :return: save path """ from PartSegCore.analysis.save_functions import save_dict extension = save_dict[op.algorithm].get_default_extension() rel_path = os.path.relpath(calculation.file_path, calculation.base_prefix) rel_path = os.path.splitext(rel_path)[0] if op.directory: file_name = os.path.basename(rel_path) base_rel_path = os.path.dirname(rel_path) return os.path.join(calculation.result_prefix, base_rel_path, op.directory, file_name + op.suffix + extension) return os.path.join(calculation.result_prefix, rel_path + op.suffix + extension)
[docs] @register_class(old_paths=["PartSeg.utils.analysis.calculation_plan.MaskMapper"]) class MaskMapper(BaseModel): """ Base class for obtaining mask from computer disc :ivar ~.name: mask name """ name: str
[docs] @abstractmethod def get_mask_path(self, file_path: str) -> str: """ Calculate mask path based od file_path :param file_path: path to proceeded file """
[docs] @abstractmethod def get_parameters(self): """Parameters for serialize"""
[docs] @staticmethod def is_ready() -> bool: """Check if this mask mapper can be used""" return True
[docs] @register_class(old_paths=["PartSeg.utils.analysis.calculation_plan.MaskSuffix"]) class MaskSuffix(MaskMapper): """ Description of mask form file obtained by adding suffix to image file path :ivar str ~.name: mask name :ivar str ~.suffix: mask file path suffix """ suffix: str
[docs] def get_mask_path(self, file_path: str) -> str: base, ext = os.path.splitext(file_path) return base + self.suffix + ext
[docs] def get_parameters(self): return {"name": self.name, "suffix": self.suffix}
[docs] @register_class(old_paths=["PartSeg.utils.analysis.calculation_plan.MaskSub"]) class MaskSub(MaskMapper): """ Description of mask form file obtained by substitution :ivar str ~.name: mask name :ivar str ~.base: string to be searched :ivar str ~.repr: string to be put instead of ``base`` """ base: str rep: str
[docs] def get_mask_path(self, file_path: str) -> str: dir_name, filename = os.path.split(file_path) filename = filename.replace(self.base, self.rep) return os.path.join(dir_name, filename)
[docs] def get_parameters(self): return {"name": self.name, "base": self.base, "rep": self.rep}
[docs] @register_class(old_paths=["PartSeg.utils.analysis.calculation_plan.MaskFile"]) class MaskFile(MaskMapper): # TODO Check implementation path_to_file: str name_dict: typing.Optional[dict] = None
[docs] def is_ready(self) -> bool: return os.path.exists(self.path_to_file)
[docs] def get_mask_path(self, file_path: str) -> str: if self.name_dict is None: self.parse_map() try: return self.name_dict[os.path.normpath(file_path)] except (KeyError, AttributeError): return ""
[docs] def get_parameters(self): return {"name": self.name, "path_to_file": self.path_to_file}
[docs] def set_map_path(self, value): self.path_to_file = value
[docs] def parse_map(self, sep=";"): if not os.path.exists(self.path_to_file): # pragma: no cover logging.error("File does not exists: %s", self.path_to_file) raise ValueError(f"File for mapping mask does not exists: {self.path_to_file}") with open(self.path_to_file, encoding="utf-8") as map_file: dir_name = os.path.dirname(self.path_to_file) for i, line in enumerate(map_file): try: file_name, mask_name = line.split(sep) except ValueError: # pragma: no cover logging.error("Error in parsing map file\nline %s\n%s\nfrom file %s", i, line, self.path_to_file) continue file_name = file_name.strip() mask_name = mask_name.strip() if not os.path.abspath(file_name): file_name = os.path.normpath(os.path.join(dir_name, file_name)) if not os.path.abspath(mask_name): mask_name = os.path.normpath(os.path.join(dir_name, mask_name)) self.name_dict[file_name] = mask_name
[docs] class Operations(Enum): """Global operations""" reset_to_base = 1
[docs] class PlanChanges(Enum): """History elements""" add_node = 1 #: remove_node = 2 #: replace_node = 3 #:
[docs] @register_class(old_paths=["PartSeg.utils.analysis.calculation_plan.CalculationTree"], allow_errors_in_values=True) class CalculationTree: """ Structure for describe calculation structure """ def __init__( self, operation: typing.Union[PydanticBaseModel, ROIExtractionProfile, MeasurementCalculate, RootType], children: typing.List["CalculationTree"], ): if operation == "root": operation = RootType.Image self.operation = operation self.children = children def __str__(self): return f"{self.operation}:\n[{'n'.join([str(x) for x in self.children])}]" def __repr__(self): return f"CalculationTree(operation={self.operation!r}, children={self.children})" def as_dict(self): return {"operation": self.operation, "children": self.children} def is_bad(self): return any(el.is_bad() for el in self.children) or isinstance(self.operation, dict) def get_error_source(self): res = [] for el in self.children: res.extend(el.get_error_source()) if isinstance(self.operation, dict): res.extend(self.get_source_error_dict(self.operation)) return res @classmethod def get_source_error_dict(cls, dkt): if not isinstance(dkt, dict) or "__error__" not in dkt: return [] if "not found in register" in dkt["__error__"]: return [dkt["__error__"]] if "__values__" not in dkt: return [] fields = local_migrator.check_for_errors_in_dkt_values(dkt["__values__"]) res = [] for field in fields: res.extend(cls.get_source_error_dict(dkt["__values__"][field])) return res
[docs] class NodeType(Enum): """Type of node in calculation""" segment = 1 #: segmentation mask = 2 #: mask creation measurement = 3 #: measurement calculation root = 4 #: root of calculation save = 5 #: save operation none = 6 #: other, like description file_mask = 7 #: mask load
[docs] class BaseCalculation: """ Base description of calculation needed for single file :ivar str ~.base_prefix: path prefix which should be used to calculate relative path of processed files :ivar str ~.result_prefix: path prefix for saving structure :ivar str ~.measurement_file_path: path to file in which result of measurement should be saved :ivar str ~.sheet_name: name of sheet in excel file :ivar CalculationPlan ~.calculation_plan: plan of calculation :ivar str uuid: ~.uuid of whole calculation :ivar ~.voxel_size: default voxel size (for files which do not contains this information in metadata """ def __init__( self, base_prefix: str, result_prefix: str, measurement_file_path: str, sheet_name: str, calculation_plan: "CalculationPlan", voxel_size: typing.Sequence[float], overwrite_voxel_size: bool = False, ): self.base_prefix = base_prefix self.result_prefix = result_prefix self.measurement_file_path = measurement_file_path self.sheet_name = sheet_name self.calculation_plan = calculation_plan self.uuid = uuid.uuid4() self.voxel_size = voxel_size self.overwrite_voxel_size = overwrite_voxel_size def __repr__(self): return ( f"{self.__class__.__name__}(calculation_plan={self.calculation_plan}, voxel_size={self.voxel_size}, " f"overwrite_voxel_size={self.overwrite_voxel_size}), " f"base_prefix={self.base_prefix}, result_prefix={self.base_prefix}, " f"measurement_file_path{self.measurement_file_path}, sheet_name={self.sheet_name})" )
[docs] class Calculation(BaseCalculation): """ Description of whole calculation. Extended with list of all files to proceed :ivar str ~.base_prefix: path prefix which should be used to calculate relative path of processed files :ivar str ~.result_prefix: path prefix for saving structure :ivar str ~.measurement_file_path: path to file in which result of measurement should be saved :ivar str ~.sheet_name: name of sheet in excel file :ivar CalculationPlan ~.calculation_plan: plan of calculation :ivar str uuid: ~.uuid of whole calculation :ivar ~.voxel_size: default voxel size (for files which do not contains this information in metadata :ivar typing.List[str] ~.file_list: list of files to be proceed """ def __init__( self, file_list, base_prefix, result_prefix, measurement_file_path, sheet_name, calculation_plan, voxel_size, overwrite_voxel_size=False, ): super().__init__( base_prefix, result_prefix, measurement_file_path, sheet_name, calculation_plan, voxel_size, overwrite_voxel_size, ) self.file_list: typing.List[str] = file_list
[docs] def get_base_calculation(self) -> BaseCalculation: """Extract py:class:`BaseCalculation` from instance.""" base = BaseCalculation( self.base_prefix, self.result_prefix, self.measurement_file_path, self.sheet_name, self.calculation_plan, self.voxel_size, self.overwrite_voxel_size, ) base.uuid = self.uuid return base
@property def measurement(self): return self.calculation_plan.get_measurements()
[docs] class FileCalculation: """ Description of single file calculation """ def __init__(self, file_path: str, calculation: BaseCalculation): self.file_path = file_path self.calculation = calculation @property def base_prefix(self): """path prefix which should be used to calculate relative path of processed files""" return self.calculation.base_prefix @property def result_prefix(self): """path prefix for saving structure""" return self.calculation.result_prefix @property def calculation_plan(self): """plan of calculation""" return self.calculation.calculation_plan @property def uuid(self): """uuid of whole calculation""" return self.calculation.uuid @property def voxel_size(self): """default voxel size (for files which do not contains this information in metadata""" return self.calculation.voxel_size @property def overwrite_voxel_size(self): """overwrite voxel size""" return self.calculation.overwrite_voxel_size def __repr__(self): return f"FileCalculation(file_path={self.file_path}, calculation={self.calculation})"
[docs] @register_class(allow_errors_in_values=True) class CalculationPlan: """ Clean description Calculation plan. :type current_pos: list[int] :type name: str :type segmentation_count: int :type execution_tree: CalculationTree """ correct_name: typing.ClassVar[typing.Dict[str, typing.Union[BaseModel, Enum]]] = { MaskCreate.__name__: MaskCreate, MaskUse.__name__: MaskUse, Save.__name__: Save, MeasurementCalculate.__name__: MeasurementCalculate, ROIExtractionProfile.__name__: ROIExtractionProfile, MaskSuffix.__name__: MaskSuffix, MaskSub.__name__: MaskSub, MaskFile.__name__: MaskFile, Operations.__name__: Operations, MaskIntersection.__name__: MaskIntersection, MaskSum.__name__: MaskSum, RootType.__name__: RootType, } def __init__(self, tree: typing.Optional[CalculationTree] = None, name: str = ""): if tree is None: self.execution_tree = CalculationTree(RootType.Image, []) else: self.execution_tree = tree self.segmentation_count = 0 self.name = name self.current_pos = [] self.changes = [] self.current_node = None def is_bad(self): return self.execution_tree.is_bad() def get_error_source(self): return ", ".join(self.execution_tree.get_error_source()) def as_dict(self): return {"tree": self.execution_tree, "name": self.name} def get_root_type(self): return self.execution_tree.operation def set_root_type(self, root_type: RootType): self.execution_tree.operation = root_type def __str__(self): return f"CalculationPlan<{self.name}>\n{self.execution_tree}" def __repr__(self): return f"CalculationPlan(name={self.name!r}, execution_tree={self.execution_tree!r})"
[docs] def get_measurements(self, node: typing.Optional[CalculationTree] = None) -> typing.List[MeasurementCalculate]: """ Get all measurement Calculation below given node :param node: Node for start, if absent then start from plan root :return: list of measurements """ if node is None: node = self.execution_tree if isinstance(node.operation, MeasurementCalculate): return [node.operation] res = [] for el in node.children: res.extend(self.get_measurements(el)) return res
def get_changes(self): ret = self.changes self.changes = [] return ret def position(self): return self.current_pos def set_position(self, value): self.current_pos = value self.current_node = None def clean(self): self.execution_tree = CalculationTree(RootType.Image, []) self.current_pos = [] def __copy__(self): return CalculationPlan(name=self.name, tree=deepcopy(self.execution_tree)) def __deepcopy__(self, memo): return CalculationPlan(name=self.name, tree=deepcopy(self.execution_tree))
[docs] def get_node(self, search_pos: typing.Optional[typing.List[int]] = None, parent=False) -> CalculationTree: """ :param search_pos: :return: CalculationTree """ node = self.execution_tree if search_pos is None: if self.current_node is not None and not parent: return self.current_node search_pos = self.current_pos if parent: search_pos = search_pos[:-1] for pos in search_pos: node = node.children[pos] return node
[docs] def get_mask_names(self, node=None): """ :type node: CalculationTree :param node: :return: set[str] """ if node is None: node = self.get_node() res = set() if isinstance(node.operation, (MaskCreate, MaskMapper)): res.add(node.operation.name) for el in node.children: res |= self.get_mask_names(el) return res
def get_file_mask_names(self): node = self.get_node() used_mask = self.get_reused_mask() tree_mask_names = self.get_mask_names(node) return used_mask & tree_mask_names, tree_mask_names @staticmethod def _get_reused_mask(node): """ :type node: CalculationTree :param node: :return: """ used_mask = set() for el in node.children: if isinstance(el.operation, MaskUse): used_mask.add(el.operation.name) elif isinstance(el.operation, (MaskSum, MaskIntersection)): used_mask.add(el.operation.mask1) used_mask.add(el.operation.mask2) return used_mask def get_reused_mask(self) -> set: return self._get_reused_mask(self.execution_tree) def get_node_type(self, parent=False) -> NodeType: if self.current_pos is None: return NodeType.none if not self.current_pos: return NodeType.root node = self.get_node(parent=parent) for klass, node_type in [ (RootType, NodeType.root), ((MaskMapper, MaskIntersection, MaskSum), NodeType.file_mask), (MaskCreate, NodeType.mask), (MeasurementCalculate, NodeType.measurement), (ROIExtractionProfile, NodeType.segment), (Save, NodeType.save), (MaskUse, NodeType.file_mask), ]: if isinstance(node.operation, klass): return node_type if isinstance(node.operation, Operations) and node.operation == Operations.reset_to_base: return NodeType.mask raise ValueError(f"[get_node_type] unknown node type {node.operation}") def add_step(self, step): if self.current_pos is None: return node = self.get_node() node.children.append(CalculationTree(step, [])) if isinstance(step, ROIExtractionProfile): self.segmentation_count += 1 self.changes.append((self.current_pos, node.children[-1], PlanChanges.add_node)) def replace_step(self, step): if self.current_pos is None: return node = self.get_node() node.operation = step self.changes.append((self.current_pos, node, PlanChanges.replace_node)) def replace_name(self, name): if self.current_pos is None: return node = self.get_node() node.operation = node.operation.copy(update={"name": name}) self.changes.append((self.current_pos, node, PlanChanges.replace_node)) def has_children(self): node = self.get_node() return len(node.children) > 0 def remove_step(self): path = copy(self.current_pos) if not path: return pos = path[-1] parent_node = self.get_node(path[:-1]) del parent_node.children[pos] self.changes.append((self.current_pos, None, PlanChanges.remove_node)) self.current_pos = self.current_pos[:-1] def is_segmentation(self): return self.segmentation_count > 0 def set_name(self, text): self.name = text def get_execution_tree(self): return self.execution_tree def _get_save_list(self, node): """ :type node: CalculationTree :param node: :return: """ if isinstance(node.operation, Save): return [node.operation] res = [] for chl in node.children: res.extend(self._get_save_list(chl)) return res def get_save_list(self): return self._get_save_list(self.execution_tree)
[docs] def get_list_file_mask(self): """ :return: list[MaskMapper] """ return [el.operation for el in self.execution_tree.children if isinstance(el.operation, MaskMapper)]
def set_path_to_mapping_file(self, num, path): for el in self.execution_tree.children: if num == 0: el.operation.path_to_file = path return el.operation if isinstance(el.operation, MaskFile): num -= 1 return None @classmethod def dict_load(cls, data_dict): res_plan = cls() name = data_dict["name"] res_plan.set_name(name) execution_tree = data_dict["execution_tree"] for pos, el, _ in execution_tree: res_plan.current_pos = pos[:-1] try: res_plan.add_step(CalculationPlan.correct_name[el["type"]](**el["values"])) except TypeError: logging.warning(el["type"]) raise res_plan.changes = [] return res_plan
[docs] @staticmethod def get_el_name(el): # noqa: C901, PLR0911, PLR0912 """ :param el: Plan element :return: str """ if el.__class__.__name__ not in CalculationPlan.correct_name: raise ValueError(f"Unknown type {el.__class__.__name__}") if isinstance(el, RootType): return f"Root: {el}" if isinstance(el, Operations) and el == Operations.reset_to_base: return "reset project to base image with mask" if isinstance(el, ROIExtractionProfile): return f"Segmentation: {el.name}" if isinstance(el, MeasurementCalculate): if not el.name_prefix: return f"Measurement: {el.name}" return f"Measurement: {el.name} with prefix: {el.name_prefix}" if isinstance(el, MaskCreate): return f"Create mask: {el.name}" if el.name else "Create mask:" if isinstance(el, MaskUse): return f"Use mask: {el.name}" if isinstance(el, MaskSuffix): return f"File mask: {el.name} with suffix {el.suffix}" if isinstance(el, MaskSub): return f"File mask: {el.name} substitution {el.base} on {el.rep}" if isinstance(el, MaskFile): return f"File mapping mask: {el.name}, {el.path_to_file}" if isinstance(el, Save): base = el.short_name if el.directory: return f"Save {base} in directory with name {el.suffix}" return f"Save {base} with suffix {el.suffix}" if el.suffix else f"Save {base}" if isinstance(el, MaskIntersection): if not el.name: return f"Mask intersection of mask {el.mask1} and {el.mask2}" return f"Mask {el.name} intersection of mask {el.mask1} and {el.mask2}" if isinstance(el, MaskSum): if not el.name: return f"Mask sum of mask {el.mask1} and {el.mask2}" return f"Mask {el.name} sum of mask {el.mask1} and {el.mask2}" raise ValueError(f"Unknown type {type(el)}")
def pretty_print(self) -> str: return f"Calculation Plan: {self.name}\n{self._pretty_print(self.execution_tree, 0)}" def _pretty_print(self, elem: CalculationTree, indent) -> str: if isinstance(elem.operation, str): name = elem.operation else: name = self.get_el_name(elem.operation) if isinstance(elem.operation, (MeasurementCalculate, ROIExtractionProfile, MaskCreate)): if isinstance(elem.operation, ROIExtractionProfile): txt = elem.operation.pretty_print(AnalysisAlgorithmSelection) else: txt = str(elem.operation) txt = "\n".join(txt.split("\n")[1:]) name += "\n" + textwrap.indent(txt, " " * (indent + 4)) if elem.children: suffix = "\n" + "\n".join(self._pretty_print(x, indent + 2) for x in elem.children) else: suffix = "" return " " * indent + name + suffix