PartSegCore.analysis.batch_processing¶
This subpackage contains utilities to perform parallel batch processing
The batch_backend contains utilities connected with perform calculation of batch plan
In parallel_backend there are utilities for parallelism
PartSegCore.analysis.batch_processing.batch_backend¶
This module contains PartSeg function used for calculate in batch processing
Calculation hierarchy:
![digraph calc {
rankdir="LR";
"CalculationManager"[shape=rectangle style=filled];
"BatchManager"[shape=rectangle];
"BatchWorker"[shape=rectangle];
"CalculationManager" -> "BatchManager" -> "BatchWorker"[arrowhead="crow"];
"CalculationManager" -> "DataWriter"[arrowhead="inv"];
"DataWriter"[shape=rectangle];
"FileData"[shape=rectangle];
"SheetData"[shape=rectangle];
"DataWriter" -> "FileData" -> "SheetData"[arrowhead="crow"];
}](../../_images/graphviz-ecf8315346bacb9d5e9753ba9c73b4a8509d6ed5.png)
- class PartSegCore.analysis.batch_processing.batch_backend.BatchResultDescription(errors: list[tuple[str, ErrorInfo]], global_counter: int, jobs_status: dict[uuid.UUID, int])[source]¶
Bases:
NamedTupleTuple to handle information about part of calculation result.
- errors: list[tuple[str, tuple[Exception, StackSummary | tuple[dict, StackSummary]]]]¶
list of errors occurred during calculation
- class PartSegCore.analysis.batch_processing.batch_backend.CalculationManager[source]¶
Bases:
objectThis class manage batch processing in PartSeg.
- add_calculation(calculation)[source]¶
- Parameters:
calculation (
Calculation) – Calculation
- get_results()[source]¶
Consume results from
BatchWorkerand transfer it toDataWriter- Returns:
information about calculation status
- Return type:
- class PartSegCore.analysis.batch_processing.batch_backend.CalculationProcess[source]¶
Bases:
objectMain class to calculate PartSeg calculation plan. To support other operations overwrite
recursive_calculation()call super function to support already defined operations.- do_calculation(calculation)[source]¶
Main function for calculation process
- Parameters:
calculation (
FileCalculation) – calculation to do.- Return type:
list[Union[ResponseData,tuple[Exception,Union[StackSummary,tuple[dict,StackSummary]]]]]- Returns:
- iterate_over(node)[source]¶
Execute calculation on node children or list oof nodes
- Parameters:
node (
CalculationTree|list[CalculationTree])- Returns:
- recursive_calculation(node)[source]¶
Identify node type and then call proper
step_*function- Parameters:
node (
CalculationTree) – Node to be proceed
- step_load_mask(operation, children)[source]¶
Load mask using mask mapper (mask can be defined with suffix, substitution, or file with mapping saved, then iterate over
childrennodes.- Parameters:
operation (
MaskMapper) – operation to performchildren (
list[CalculationTree]) – list of nodes to iterate over with applied mask
- step_mask_create(operation, children)[source]¶
Create mask from current segmentation state using definition
- Parameters:
operation (
MaskCreate) – mask create description.children (
list[CalculationTree]) – list of nodes to iterate over after perform segmentation
- step_mask_operation(operation, children)[source]¶
Generate new mask by sum or intersection of existing and iterate over
childrennodes- Parameters:
operation (
MaskSum|MaskIntersection) – mask operation to performchildren (
list[CalculationTree]) – list of nodes to iterate over after perform segmentation
- step_mask_use(operation, children)[source]¶
use already defined mask and iterate over
childrennodes- Parameters:
operation (
MaskUse)children (
list[CalculationTree]) – list of nodes to iterate over after perform segmentation
- step_measurement(operation)[source]¶
Calculate measurement defined in current operation.
- Parameters:
operation (
MeasurementCalculate) – definition of measurement to calculate
- step_save(operation)[source]¶
Perform save operation selected in plan.
- Parameters:
operation (
Save) – save definition
- step_segmentation(operation, children)[source]¶
Perform segmentation and iterate over
childrennodes- Parameters:
operation (
ROIExtractionProfile) – Specification of segmentation operationchildren (
list[CalculationTree]) – list of nodes to iterate over after perform segmentation
- class PartSegCore.analysis.batch_processing.batch_backend.DataWriter[source]¶
Bases:
objectHandle information
- add_result(data, calculation, ind=None)[source]¶
Add calculation result to file writer
- Raises:
ValueError – when calculation.measurement_file_path is not added with
add_data_part()- Return type:
list[tuple[Exception,Union[StackSummary,tuple[dict,StackSummary]]]]
- calculation_finished(calculation)[source]¶
Force write data for given calculation.
- Raises:
ValueError – when measurement is not added with
add_data_part()- Return type:
list[tuple[Exception,Union[StackSummary,tuple[dict,StackSummary]]]]- Returns:
list of errors during write.
- is_empty_sheet(file_path, sheet_name)[source]¶
Check if given pair of file_path and sheet_name can be used.
- Parameters:
- Returns:
If calling
FileData.add_data_part()finish without error.- Return type:
- class PartSegCore.analysis.batch_processing.batch_backend.FileData(calculation, write_threshold=40)[source]¶
Bases:
objectHandle information about single file.
This class run separate thread for writing purpose. This need additional synchronisation. but not freeze
- Parameters:
calculation (
BaseCalculation) – calculation informationwrite_threshold (
int) – every how many lines of data are written to disk
- Variables:
component_str – separator for per component sheet information
- add_data_part(calculation)[source]¶
Add new calculation which result will be stored in handled file.
- Parameters:
calculation (
BaseCalculation) – information about calculation- Raises:
ValueError – when
measurement_file_pathis different to handled file orsheet_namename already is in use.
- component_str = '_comp_'¶
separator for per component sheet information
- get_errors()[source]¶
Get list of errors occurred in last write
- Return type:
list[tuple[Exception,Union[StackSummary,tuple[dict,StackSummary]]]]
- good_sheet_name(name)[source]¶
Check if sheet name can be used in current file. Return False if:
file is text file
contains
component_strin namename is already in use
- wrote_data(uuid_id, data, ind=None)[source]¶
Add information to be stored in output file
- Parameters:
uuid_id (
UUID) – calculation identifierdata (
ResponseData) – calculation result
- class PartSegCore.analysis.batch_processing.batch_backend.ResponseData(path_to_file, values)[source]¶
Bases:
NamedTuple- values: list[MeasurementResult]¶
Alias for field number 1
- class PartSegCore.analysis.batch_processing.batch_backend.SheetData(name, columns, raw=False)[source]¶
Bases:
objectStore single sheet information
- PartSegCore.analysis.batch_processing.batch_backend.do_calculation(file_info, calculation)[source]¶
Main function which will be used for run calculation. It create
CalculationProcessand call it methodCalculationProcess.do_calculation()- Parameters:
file_info (
tuple[int,str]) – index and path to file which should be processedcalculation (
BaseCalculation) – calculation description
- Return type:
tuple[int,list[Union[tuple[Exception,Union[StackSummary,tuple[dict,StackSummary]]],ResponseData]]]
PartSegCore.analysis.batch_processing.parallel_backend¶
This module contains utils for parallel batch calculation.
Main class is BatchManager which is used to manage
parallel calculation
Main workflow is to add work with BatchManager.add_work()
and consume results (BatchManager.get_result()) until
BatchManager.has_work is evaluating to true
![digraph foo {
"BatchManager" -> "BatchWorker"[arrowhead="crow"];
}](../../_images/graphviz-951c22ff9636fb308b89b733e4f4a2ab40683404.png)
- class PartSegCore.analysis.batch_processing.parallel_backend.BatchManager[source]¶
Bases:
objectThis class is used for manage pending works. It use
BatchWorkerfor running calculation.- add_work(individual_parameters_list, global_parameters, fun)[source]¶
This function add next works to internal structures. Number of works is length of
individual_parameters_list- Parameters:
individual_parameters_list (
list) – list of individual parameters for fun. For each elementfunwill be called with element as first argumentglobal_parameters – second argument of fun. If has field uuid then it is used as work uuid
fun (
Callable[[Any,Any],Any]) – two argument function which will be used to run calculation. First argument is task specific, second is const for whole work.
- Return type:
- Returns:
work uuid
- property finished¶
Check if any process is running
- class PartSegCore.analysis.batch_processing.parallel_backend.BatchWorker(task_queue, order_queue, result_queue, calculation_dict)[source]¶
Bases:
objectWorker spawned by
BatchManagerinstance- Parameters:
- class PartSegCore.analysis.batch_processing.parallel_backend.SubprocessOrder(*values)[source]¶
Bases:
EnumCommands for process to put in queue