PartSegCore.analysis.batch_processing¶
This subpackage contains utilities to perform parallel batch processing
The batch_backend
contains utilities connected with perform calculation of batch plan
In parallel_backend
there are utilities for parallelism
PartSegCore.analysis.batch_processing.batch_backend¶
This module contains PartSeg function used for calculate in batch processing
Calculation hierarchy:
digraph calc { rankdir="LR"; "CalculationManager"[shape=rectangle style=filled]; "BatchManager"[shape=rectangle]; "BatchWorker"[shape=rectangle]; "CalculationManager" -> "BatchManager" -> "BatchWorker"[arrowhead="crow"]; "CalculationManager" -> "DataWriter"[arrowhead="inv"]; "DataWriter"[shape=rectangle]; "FileData"[shape=rectangle]; "SheetData"[shape=rectangle]; "DataWriter" -> "FileData" -> "SheetData"[arrowhead="crow"]; }- class PartSegCore.analysis.batch_processing.batch_backend.BatchResultDescription(errors: List[Tuple[str, Tuple[Exception, Union[traceback.StackSummary, Tuple[Dict, traceback.StackSummary]]]]], global_counter: int, jobs_status: Dict[uuid.UUID, int])[source]¶
Bases:
NamedTuple
Tuple to handle information about part of calculation result.
- errors: List[Tuple[str, Tuple[Exception, Union[traceback.StackSummary, Tuple[Dict, traceback.StackSummary]]]]]¶
list of errors occurred during calculation
- class PartSegCore.analysis.batch_processing.batch_backend.CalculationManager[source]¶
Bases:
object
This class manage batch processing in PartSeg.
- add_calculation(calculation)[source]¶
- Parameters
calculation (
Calculation
) – Calculation
- get_results()[source]¶
Consume results from
BatchWorker
and transfer it toDataWriter
- Returns
information about calculation status
- Return type
- class PartSegCore.analysis.batch_processing.batch_backend.CalculationProcess[source]¶
Bases:
object
Main class to calculate PartSeg calculation plan. To support other operations overwrite
recursive_calculation()
call super function to support already defined operations.- do_calculation(calculation)[source]¶
Main function for calculation process
- Parameters
calculation (
FileCalculation
) – calculation to do.- Return type
- Returns
- iterate_over(node)[source]¶
Execute calculation on node children or list oof nodes
- Parameters
node (
Union
[CalculationTree
,List
[CalculationTree
]]) –- Returns
- recursive_calculation(node)[source]¶
Identify node type and then call proper
step_*
function- Parameters
node (CalculationTree) – Node to be proceed
- step_load_mask(operation, children)[source]¶
Load mask using mask mapper (mask can be defined with suffix, substitution, or file with mapping saved, then iterate over
children
nodes.- Parameters
operation (MaskMapper) – operation to perform
children (List[CalculationTree]) – list of nodes to iterate over with applied mask
- step_mask_create(operation, children)[source]¶
Create mask from current segmentation state using definition
- Parameters
operation (MaskCreate) – mask create description.
children (List[CalculationTree]) – list of nodes to iterate over after perform segmentation
- step_mask_operation(operation, children)[source]¶
Generate new mask by sum or intersection of existing and iterate over
children
nodes- Parameters
operation (Union[MaskSum, MaskIntersection]) – mask operation to perform
children (List[CalculationTree]) – list of nodes to iterate over after perform segmentation
- step_mask_use(operation, children)[source]¶
use already defined mask and iterate over
children
nodes- Parameters
operation (MaskUse) –
children (List[CalculationTree]) – list of nodes to iterate over after perform segmentation
- step_measurement(operation)[source]¶
Calculate measurement defined in current operation.
- Parameters
operation (MeasurementCalculate) – definition of measurement to calculate
- step_save(operation)[source]¶
Perform save operation selected in plan.
- Parameters
operation (Save) – save definition
- step_segmentation(operation, children)[source]¶
Perform segmentation and iterate over
children
nodes- Parameters
operation (ROIExtractionProfile) – Specification of segmentation operation
children (List[CalculationTree]) – list of nodes to iterate over after perform segmentation
- class PartSegCore.analysis.batch_processing.batch_backend.DataWriter[source]¶
Bases:
object
Handle information
- add_result(data, calculation, ind=None)[source]¶
Add calculation result to file writer
- Raises
ValueError – when calculation.measurement_file_path is not added with
add_data_part()
- Return type
List
[Tuple
[Exception
,Union
[StackSummary
,Tuple
[Dict
,StackSummary
]]]]
- calculation_finished(calculation)[source]¶
Force write data for given calculation.
- Raises
ValueError – when measurement is not added with
add_data_part()
- Return type
List
[Tuple
[Exception
,Union
[StackSummary
,Tuple
[Dict
,StackSummary
]]]]- Returns
list of errors during write.
- is_empty_sheet(file_path, sheet_name)[source]¶
Check if given pair of file_path and sheet_name can be used.
- Parameters
- Returns
If calling
FileData.add_data_part()
finish without error.- Return type
- class PartSegCore.analysis.batch_processing.batch_backend.FileData(calculation, write_threshold=40)[source]¶
Bases:
object
Handle information about single file.
This class run separate thread for writing purpose. This need additional synchronisation. but not freeze
- Parameters
calculation (BaseCalculation) – calculation information
write_threshold (int) – every how many lines of data are written to disk
- Variables
component_str – separator for per component sheet information
- add_data_part(calculation)[source]¶
Add new calculation which result will be stored in handled file.
- Parameters
calculation (BaseCalculation) – information about calculation
- Raises
ValueError – when
measurement_file_path
is different to handled file orsheet_name
name already is in use.
- component_str = '_comp_'¶
separator for per component sheet information
- get_errors()[source]¶
Get list of errors occurred in last write
- Return type
List
[Tuple
[Exception
,Union
[StackSummary
,Tuple
[Dict
,StackSummary
]]]]
- good_sheet_name(name)[source]¶
Check if sheet name can be used in current file. Return False if:
file is text file
contains
component_str
in namename is already in use
- wrote_data(uuid_id, data, ind=None)[source]¶
Add information to be stored in output file
- Parameters
uuid_id (uuid.UUID) – calculation identifier
data (ResponseData) – calculation result
ind (Optional[int]) – element index
- class PartSegCore.analysis.batch_processing.batch_backend.FileType(value)[source]¶
Bases:
enum.Enum
An enumeration.
- class PartSegCore.analysis.batch_processing.batch_backend.ResponseData(path_to_file, values)[source]¶
Bases:
NamedTuple
- values: List[PartSegCore.analysis.measurement_calculation.MeasurementResult]¶
Alias for field number 1
- class PartSegCore.analysis.batch_processing.batch_backend.SheetData(name, columns, raw=False)[source]¶
Bases:
object
Store single sheet information
- PartSegCore.analysis.batch_processing.batch_backend.do_calculation(file_info, calculation)[source]¶
Main function which will be used for run calculation. It create
CalculationProcess
and call it methodCalculationProcess.do_calculation()
- Parameters
file_info (
Tuple
[int
,str
]) – index and path to file which should be processedcalculation (
BaseCalculation
) – calculation description
- Return type
Tuple
[int
,List
[Union
[Tuple
[Exception
,Union
[StackSummary
,Tuple
[Dict
,StackSummary
]]],ResponseData
]]]
PartSegCore.analysis.batch_processing.parallel_backend¶
This module contains utils for parallel batch calculation.
Main class is BatchManager
which is used to manage
parallel calculation
Main workflow is to add work with BatchManager.add_work()
and consume results (BatchManager.get_result()
) until
BatchManager.has_work
is evaluating to true
- class PartSegCore.analysis.batch_processing.parallel_backend.BatchManager[source]¶
Bases:
object
This class is used for manage pending works. It use
BatchWorker
for running calculation.- add_work(individual_parameters_list, global_parameters, fun)[source]¶
This function add next works to internal structures. Number of works is length of
individual_parameters_list
- Parameters
individual_parameters_list (
List
) – list of individual parameters for fun. For each elementfun
will be called with element as first argumentglobal_parameters – second argument of fun. If has field uuid then it is used as work uuid
fun (
Callable
[[Any
,Any
],Any
]) – two argument function which will be used to run calculation. First argument is task specific, second is const for whole work.
- Return type
- Returns
work uuid
- property finished¶
Check if any process is running
- class PartSegCore.analysis.batch_processing.parallel_backend.BatchWorker(task_queue, order_queue, result_queue, calculation_dict)[source]¶
Bases:
object
Worker spawned by
BatchManager
instance- Parameters
- class PartSegCore.analysis.batch_processing.parallel_backend.SubprocessOrder(value)[source]¶
Bases:
enum.Enum
Commands for process to put in queue