PartSegCore.analysis.batch_processing

This subpackage contains utilities to perform parallel batch processing

The batch_backend contains utilities connected with perform calculation of batch plan

In parallel_backend there are utilities for parallelism

PartSegCore.analysis.batch_processing.batch_backend

This module contains PartSeg function used for calculate in batch processing

Calculation hierarchy:

digraph calc { rankdir="LR"; "CalculationManager"[shape=rectangle style=filled]; "BatchManager"[shape=rectangle]; "BatchWorker"[shape=rectangle]; "CalculationManager" -> "BatchManager" -> "BatchWorker"[arrowhead="crow"]; "CalculationManager" -> "DataWriter"[arrowhead="inv"]; "DataWriter"[shape=rectangle]; "FileData"[shape=rectangle]; "SheetData"[shape=rectangle]; "DataWriter" -> "FileData" -> "SheetData"[arrowhead="crow"]; }
class PartSegCore.analysis.batch_processing.batch_backend.BatchResultDescription(errors: list[tuple[str, ErrorInfo]], global_counter: int, jobs_status: dict[uuid.UUID, int])[source]

Bases: NamedTuple

Tuple to handle information about part of calculation result.

errors: list[tuple[str, ErrorInfo]]

list of errors occurred during calculation

global_counter: int

total number of calculated steps

jobs_status: dict[uuid.UUID, int]

for each job information about progress

class PartSegCore.analysis.batch_processing.batch_backend.CalculationManager[source]

Bases: object

This class manage batch processing in PartSeg.

add_calculation(calculation)[source]
Parameters:

calculation (Calculation) – Calculation

get_results()[source]

Consume results from BatchWorker and transfer it to DataWriter

Returns:

information about calculation status

Return type:

BatchResultDescription

property has_work: bool

Is still some calculation or data writing in progress

is_valid_sheet_name(excel_path, sheet_name)[source]

Check if sheet name can be used

Parameters:
  • excel_path (str) – path which allow identify excel file

  • sheet_name (str) – name of excel sheet

Returns:

Return type:

bool

set_number_of_workers(val)[source]

Set number of workers to perform calculation.

Parameters:

val (int) – number of workers.

class PartSegCore.analysis.batch_processing.batch_backend.CalculationProcess[source]

Bases: object

Main class to calculate PartSeg calculation plan. To support other operations overwrite recursive_calculation() call super function to support already defined operations.

do_calculation(calculation)[source]

Main function for calculation process

Parameters:

calculation (FileCalculation) – calculation to do.

Return type:

List[ResponseData]

Returns:

iterate_over(node)[source]

Execute calculation on node children or list oof nodes

Parameters:

node (CalculationTree | list[CalculationTree])

Returns:

recursive_calculation(node)[source]

Identify node type and then call proper step_* function

Parameters:

node (CalculationTree) – Node to be proceed

step_load_mask(operation, children)[source]

Load mask using mask mapper (mask can be defined with suffix, substitution, or file with mapping saved, then iterate over children nodes.

Parameters:
  • operation (MaskMapper) – operation to perform

  • children (List[CalculationTree]) – list of nodes to iterate over with applied mask

step_mask_create(operation, children)[source]

Create mask from current segmentation state using definition

Parameters:
  • operation (MaskCreate) – mask create description.

  • children (List[CalculationTree]) – list of nodes to iterate over after perform segmentation

step_mask_operation(operation, children)[source]

Generate new mask by sum or intersection of existing and iterate over children nodes

Parameters:
step_mask_use(operation, children)[source]

use already defined mask and iterate over children nodes

Parameters:
  • operation (MaskUse)

  • children (List[CalculationTree]) – list of nodes to iterate over after perform segmentation

step_measurement(operation)[source]

Calculate measurement defined in current operation.

Parameters:

operation (MeasurementCalculate) – definition of measurement to calculate

step_save(operation)[source]

Perform save operation selected in plan.

Parameters:

operation (Save) – save definition

step_segmentation(operation, children)[source]

Perform segmentation and iterate over children nodes

Parameters:
class PartSegCore.analysis.batch_processing.batch_backend.DataWriter[source]

Bases: object

Handle information

add_data_part(calculation)[source]

Add information about calculation

add_result(data, calculation, ind=None)[source]

Add calculation result to file writer

Raises:

ValueError – when calculation.measurement_file_path is not added with add_data_part()

Return type:

list[Tuple[Exception, Union[StackSummary, Tuple[Dict, StackSummary]]]]

calculation_finished(calculation)[source]

Force write data for given calculation.

Raises:

ValueError – when measurement is not added with add_data_part()

Return type:

list[Tuple[Exception, Union[StackSummary, Tuple[Dict, StackSummary]]]]

Returns:

list of errors during write.

finish()[source]

close all files

is_empty_sheet(file_path, sheet_name)[source]

Check if given pair of file_path and sheet_name can be used.

Parameters:
  • file_path (str) – path to file to store measurement result

  • sheet_name (str) – Name of excel sheet in which data will be stored

Returns:

If calling FileData.add_data_part() finish without error.

Return type:

bool

writing_finished()[source]

check if all data are written to disc

Return type:

bool

class PartSegCore.analysis.batch_processing.batch_backend.FileData(calculation, write_threshold=40)[source]

Bases: object

Handle information about single file.

This class run separate thread for writing purpose. This need additional synchronisation. but not freeze

Parameters:
  • calculation (BaseCalculation) – calculation information

  • write_threshold (int) – every how many lines of data are written to disk

Variables:

component_str – separator for per component sheet information

add_data_part(calculation)[source]

Add new calculation which result will be stored in handled file.

Parameters:

calculation (BaseCalculation) – information about calculation

Raises:

ValueError – when measurement_file_path is different to handled file or sheet_name name already is in use.

component_str = '_comp_'

separator for per component sheet information

dump_data()[source]

Fire writing data to disc

finished()[source]

check if any data wait on write to disc

get_errors()[source]

Get list of errors occurred in last write

Return type:

list[Tuple[Exception, Union[StackSummary, Tuple[Dict, StackSummary]]]]

good_sheet_name(name)[source]

Check if sheet name can be used in current file. Return False if:

  • file is text file

  • contains component_str in name

  • name is already in use

Parameters:

name (str) – sheet name

Return type:

tuple[bool, str]

Returns:

if can be used and error message

wrote_data(uuid_id, data, ind=None)[source]

Add information to be stored in output file

Parameters:
  • uuid_id (uuid.UUID) – calculation identifier

  • data (ResponseData) – calculation result

  • ind (Optional[int]) – element index

wrote_data_to_file()[source]

Main function to write data to hard drive. It is executed in separate thread.

class PartSegCore.analysis.batch_processing.batch_backend.FileType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

class PartSegCore.analysis.batch_processing.batch_backend.ResponseData(path_to_file, values)[source]

Bases: NamedTuple

path_to_file: str

Alias for field number 0

values: list[MeasurementResult]

Alias for field number 1

class PartSegCore.analysis.batch_processing.batch_backend.SheetData(name, columns, raw=False)[source]

Bases: object

Store single sheet information

get_data_to_write()[source]

Get data for write

Returns:

sheet name and data to write

Return type:

Tuple[str, pd.DataFrame]

PartSegCore.analysis.batch_processing.batch_backend.do_calculation(file_info, calculation)[source]

Main function which will be used for run calculation. It create CalculationProcess and call it method CalculationProcess.do_calculation()

Parameters:
  • file_info (tuple[int, str]) – index and path to file which should be processed

  • calculation (BaseCalculation) – calculation description

Return type:

Tuple[int, List[Union[Tuple[Exception, Union[StackSummary, Tuple[Dict, StackSummary]]], ResponseData]]]

PartSegCore.analysis.batch_processing.batch_backend.get_data_loader(root_type, file_path)[source]

Get data loader for given root type. Return indicator if file extension match to loader.

Parameters:
  • root_type (RootType) – type of loader

  • file_path (str) – path to file

Return type:

tuple[type[LoadMaskSegmentation | LoadProject | LoadImageForBatch], bool]

Returns:

Loader and indicator if file extension match to loader

PartSegCore.analysis.batch_processing.parallel_backend

This module contains utils for parallel batch calculation. Main class is BatchManager which is used to manage parallel calculation

Main workflow is to add work with BatchManager.add_work() and consume results (BatchManager.get_result()) until BatchManager.has_work is evaluating to true

digraph foo { "BatchManager" -> "BatchWorker"[arrowhead="crow"]; }
class PartSegCore.analysis.batch_processing.parallel_backend.BatchManager[source]

Bases: object

This class is used for manage pending works. It use BatchWorker for running calculation.

add_work(individual_parameters_list, global_parameters, fun)[source]

This function add next works to internal structures. Number of works is length of individual_parameters_list

Parameters:
  • individual_parameters_list (List) – list of individual parameters for fun. For each element fun will be called with element as first argument

  • global_parameters – second argument of fun. If has field uuid then it is used as work uuid

  • fun (Callable[[Any, Any], Any]) – two argument function which will be used to run calculation. First argument is task specific, second is const for whole work.

Return type:

str

Returns:

work uuid

property finished

Check if any process is running

get_result()[source]

Clean result queue and return it as list

Return type:

List[Tuple[UUID, Any]]

Returns:

List of results as tuple where first element is uuid of job and second is function result or tuple with exception as first argument and second is traceback

property has_work: bool

Check if Manager has pending or processed work and if all results are consumed

set_number_of_process(num)[source]

Change number of workers which should be used for calculation

Parameters:

num (int) – target number of process

class PartSegCore.analysis.batch_processing.parallel_backend.BatchWorker(task_queue, order_queue, result_queue, calculation_dict)[source]

Bases: object

Worker spawned by BatchManager instance

Parameters:
  • task_queue (Queue) – Queue with task data

  • order_queue (Queue) – Queue with additional orders (like kill)

  • result_queue (Queue) – Queue to put result

  • calculation_dict (Dict[UUID, Tuple[Any, Callable[[Any, Any], Any]]]) – to store global parameters of task

calculate_task(val)[source]

Calculate single task. val is tuple with two elements (task_data, uuid). function and global parameters are obtained from calculation_dict

run()[source]

Worker main loop

class PartSegCore.analysis.batch_processing.parallel_backend.SubprocessOrder(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Commands for process to put in queue

PartSegCore.analysis.batch_processing.parallel_backend.spawn_worker(task_queue, order_queue, result_queue, calculation_dict)[source]

Function for spawning worker. Designed as argument for multiprocessing.Process().

Parameters:
  • task_queue (Queue) – Queue with tasks

  • order_queue (Queue) – Queue with additional orders (like kill)

  • result_queue (Queue) – Queue for calculation result

  • calculation_dict (Dict[UUID, Any]) – dict with global parameters