PartSegCore.analysis.batch_processing

This subpackage contains utilities to perform parallel batch processing

The batch_backend contains utilities connected with perform calculation of batch plan

In parallel_backend there are utilities for parallelism

PartSegCore.analysis.batch_processing.batch_backend

This module contains PartSeg function used for calculate in batch processing

Calculation hierarchy:

digraph calc { rankdir="LR"; "CalculationManager"[shape=rectangle style=filled]; "BatchManager"[shape=rectangle]; "BatchWorker"[shape=rectangle]; "CalculationManager" -> "BatchManager" -> "BatchWorker"[arrowhead="crow"]; "CalculationManager" -> "DataWriter"[arrowhead="inv"]; "DataWriter"[shape=rectangle]; "FileData"[shape=rectangle]; "SheetData"[shape=rectangle]; "DataWriter" -> "FileData" -> "SheetData"[arrowhead="crow"]; }
class PartSegCore.analysis.batch_processing.batch_backend.BatchResultDescription(errors: List[Tuple[str, Tuple[Exception, Union[traceback.StackSummary, Tuple[Dict, traceback.StackSummary]]]]], global_counter: int, jobs_status: Dict[uuid.UUID, int])[source]

Bases: NamedTuple

Tuple to handle information about part of calculation result.

errors: List[Tuple[str, Tuple[Exception, Union[traceback.StackSummary, Tuple[Dict, traceback.StackSummary]]]]]

list of errors occurred during calculation

global_counter: int

total number of calculated steps

jobs_status: Dict[uuid.UUID, int]

for each job information about progress

class PartSegCore.analysis.batch_processing.batch_backend.CalculationManager[source]

Bases: object

This class manage batch processing in PartSeg.

add_calculation(calculation)[source]
Parameters

calculation (Calculation) – Calculation

get_results()[source]

Consume results from BatchWorker and transfer it to DataWriter

Returns

information about calculation status

Return type

BatchResultDescription

property has_work: bool

Is still some calculation or data writing in progress

Return type

bool

is_valid_sheet_name(excel_path, sheet_name)[source]

Check if sheet name can be used

Parameters
  • excel_path (str) – path which allow identify excel file

  • sheet_name (str) – name of excel sheet

Returns

Return type

bool

set_number_of_workers(val)[source]

Set number of workers to perform calculation.

Parameters

val (int) – number of workers.

class PartSegCore.analysis.batch_processing.batch_backend.CalculationProcess[source]

Bases: object

Main class to calculate PartSeg calculation plan. To support other operations overwrite recursive_calculation() call super function to support already defined operations.

do_calculation(calculation)[source]

Main function for calculation process

Parameters

calculation (FileCalculation) – calculation to do.

Return type

List[ResponseData]

Returns

iterate_over(node)[source]

Execute calculation on node children or list oof nodes

Parameters

node (Union[CalculationTree, List[CalculationTree]]) –

Returns

recursive_calculation(node)[source]

Identify node type and then call proper step_* function

Parameters

node (CalculationTree) – Node to be proceed

step_load_mask(operation, children)[source]

Load mask using mask mapper (mask can be defined with suffix, substitution, or file with mapping saved, then iterate over children nodes.

Parameters
  • operation (MaskMapper) – operation to perform

  • children (List[CalculationTree]) – list of nodes to iterate over with applied mask

step_mask_create(operation, children)[source]

Create mask from current segmentation state using definition

Parameters
  • operation (MaskCreate) – mask create description.

  • children (List[CalculationTree]) – list of nodes to iterate over after perform segmentation

step_mask_operation(operation, children)[source]

Generate new mask by sum or intersection of existing and iterate over children nodes

Parameters
step_mask_use(operation, children)[source]

use already defined mask and iterate over children nodes

Parameters
  • operation (MaskUse) –

  • children (List[CalculationTree]) – list of nodes to iterate over after perform segmentation

step_measurement(operation)[source]

Calculate measurement defined in current operation.

Parameters

operation (MeasurementCalculate) – definition of measurement to calculate

step_save(operation)[source]

Perform save operation selected in plan.

Parameters

operation (Save) – save definition

step_segmentation(operation, children)[source]

Perform segmentation and iterate over children nodes

Parameters
class PartSegCore.analysis.batch_processing.batch_backend.DataWriter[source]

Bases: object

Handle information

add_data_part(calculation)[source]

Add information about calculation

add_result(data, calculation, ind=None)[source]

Add calculation result to file writer

Raises

ValueError – when calculation.measurement_file_path is not added with add_data_part()

Return type

List[Tuple[Exception, Union[StackSummary, Tuple[Dict, StackSummary]]]]

calculation_finished(calculation)[source]

Force write data for given calculation.

Raises

ValueError – when measurement is not added with add_data_part()

Return type

List[Tuple[Exception, Union[StackSummary, Tuple[Dict, StackSummary]]]]

Returns

list of errors during write.

finish()[source]

close all files

is_empty_sheet(file_path, sheet_name)[source]

Check if given pair of file_path and sheet_name can be used.

Parameters
  • file_path (str) – path to file to store measurement result

  • sheet_name (str) – Name of excel sheet in which data will be stored

Returns

If calling FileData.add_data_part() finish without error.

Return type

bool

writing_finished()[source]

check if all data are written to disc

Return type

bool

class PartSegCore.analysis.batch_processing.batch_backend.FileData(calculation, write_threshold=40)[source]

Bases: object

Handle information about single file.

This class run separate thread for writing purpose. This need additional synchronisation. but not freeze

Parameters
  • calculation (BaseCalculation) – calculation information

  • write_threshold (int) – every how many lines of data are written to disk

Variables

component_str – separator for per component sheet information

add_data_part(calculation)[source]

Add new calculation which result will be stored in handled file.

Parameters

calculation (BaseCalculation) – information about calculation

Raises

ValueError – when measurement_file_path is different to handled file or sheet_name name already is in use.

component_str = '_comp_'

separator for per component sheet information

dump_data()[source]

Fire writing data to disc

finished()[source]

check if any data wait on write to disc

get_errors()[source]

Get list of errors occurred in last write

Return type

List[Tuple[Exception, Union[StackSummary, Tuple[Dict, StackSummary]]]]

good_sheet_name(name)[source]

Check if sheet name can be used in current file. Return False if:

  • file is text file

  • contains component_str in name

  • name is already in use

Parameters

name (str) – sheet name

Return type

Tuple[bool, str]

Returns

if can be used and error message

wrote_data(uuid_id, data, ind=None)[source]

Add information to be stored in output file

Parameters
  • uuid_id (uuid.UUID) – calculation identifier

  • data (ResponseData) – calculation result

  • ind (Optional[int]) – element index

wrote_data_to_file()[source]

Main function to write data to hard drive. It is executed in separate thread.

class PartSegCore.analysis.batch_processing.batch_backend.FileType(value)[source]

Bases: enum.Enum

An enumeration.

class PartSegCore.analysis.batch_processing.batch_backend.ResponseData(path_to_file, values)[source]

Bases: NamedTuple

path_to_file: str

Alias for field number 0

values: List[PartSegCore.analysis.measurement_calculation.MeasurementResult]

Alias for field number 1

class PartSegCore.analysis.batch_processing.batch_backend.SheetData(name, columns, raw=False)[source]

Bases: object

Store single sheet information

get_data_to_write()[source]

Get data for write

Returns

sheet name and data to write

Return type

Tuple[str, pd.DataFrame]

PartSegCore.analysis.batch_processing.batch_backend.do_calculation(file_info, calculation)[source]

Main function which will be used for run calculation. It create CalculationProcess and call it method CalculationProcess.do_calculation()

Parameters
  • file_info (Tuple[int, str]) – index and path to file which should be processed

  • calculation (BaseCalculation) – calculation description

Return type

Tuple[int, List[Union[Tuple[Exception, Union[StackSummary, Tuple[Dict, StackSummary]]], ResponseData]]]

PartSegCore.analysis.batch_processing.batch_backend.get_data_loader(root_type, file_path)[source]

Get data loader for given root type. Return indicator if file extension match to loader.

Parameters
  • root_type (RootType) – type of loader

  • file_path (str) – path to file

Return type

Tuple[Union[Type[LoadMaskSegmentation], Type[LoadProject], Type[LoadImageForBatch]], bool]

Returns

Loader and indicator if file extension match to loader

PartSegCore.analysis.batch_processing.parallel_backend

This module contains utils for parallel batch calculation. Main class is BatchManager which is used to manage parallel calculation

Main workflow is to add work with BatchManager.add_work() and consume results (BatchManager.get_result()) until BatchManager.has_work is evaluating to true

digraph foo { "BatchManager" -> "BatchWorker"[arrowhead="crow"]; }
class PartSegCore.analysis.batch_processing.parallel_backend.BatchManager[source]

Bases: object

This class is used for manage pending works. It use BatchWorker for running calculation.

add_work(individual_parameters_list, global_parameters, fun)[source]

This function add next works to internal structures. Number of works is length of individual_parameters_list

Parameters
  • individual_parameters_list (List) – list of individual parameters for fun. For each element fun will be called with element as first argument

  • global_parameters – second argument of fun. If has field uuid then it is used as work uuid

  • fun (Callable[[Any, Any], Any]) – two argument function which will be used to run calculation. First argument is task specific, second is const for whole work.

Return type

str

Returns

work uuid

property finished

Check if any process is running

get_result()[source]

Clean result queue and return it as list

Return type

List[Tuple[UUID, Any]]

Returns

List of results as tuple where first element is uuid of job and second is function result or tuple with exception as first argument and second is traceback

property has_work: bool

Check if Manager has pending or processed work and if all results are consumed

Return type

bool

set_number_of_process(num)[source]

Change number of workers which should be used for calculation

Parameters

num (int) – target number of process

class PartSegCore.analysis.batch_processing.parallel_backend.BatchWorker(task_queue, order_queue, result_queue, calculation_dict)[source]

Bases: object

Worker spawned by BatchManager instance

Parameters
  • task_queue (Queue) – Queue with task data

  • order_queue (Queue) – Queue with additional orders (like kill)

  • result_queue (Queue) – Queue to put result

  • calculation_dict (Dict[UUID, Tuple[Any, Callable[[Any, Any], Any]]]) – to store global parameters of task

calculate_task(val)[source]

Calculate single task. val is tuple with two elements (task_data, uuid). function and global parameters are obtained from calculation_dict

run()[source]

Worker main loop

class PartSegCore.analysis.batch_processing.parallel_backend.SubprocessOrder(value)[source]

Bases: enum.Enum

Commands for process to put in queue

PartSegCore.analysis.batch_processing.parallel_backend.spawn_worker(task_queue, order_queue, result_queue, calculation_dict)[source]

Function for spawning worker. Designed as argument for multiprocessing.Process().

Parameters
  • task_queue (Queue) – Queue with tasks

  • order_queue (Queue) – Queue with additional orders (like kill)

  • result_queue (Queue) – Queue for calculation result

  • calculation_dict (Dict[UUID, Any]) – dict with global parameters