PartSegCore.analysis.batch_processing¶
This subpackage contains utilities to perform parallel batch processing
The batch_backend
contains utilities connected with perform calculation of batch plan
In parallel_backend
there are utilities for parallelism
PartSegCore.analysis.batch_processing.batch_backend¶
This module contains PartSeg function used for calculate in batch processing
Calculation hierarchy:
digraph calc { rankdir="LR"; "CalculationManager"[shape=rectangle style=filled]; "BatchManager"[shape=rectangle]; "BatchWorker"[shape=rectangle]; "CalculationManager" -> "BatchManager" -> "BatchWorker"[arrowhead="crow"]; "CalculationManager" -> "DataWriter"[arrowhead="inv"]; "DataWriter"[shape=rectangle]; "FileData"[shape=rectangle]; "SheetData"[shape=rectangle]; "DataWriter" -> "FileData" -> "SheetData"[arrowhead="crow"]; }- class PartSegCore.analysis.batch_processing.batch_backend.BatchResultDescription(errors: list[tuple[str, ErrorInfo]], global_counter: int, jobs_status: dict[uuid.UUID, int])[source]¶
Bases:
NamedTuple
Tuple to handle information about part of calculation result.
- errors: list[tuple[str, ErrorInfo]]¶
list of errors occurred during calculation
- global_counter: int¶
total number of calculated steps
- jobs_status: dict[uuid.UUID, int]¶
for each job information about progress
- class PartSegCore.analysis.batch_processing.batch_backend.CalculationManager[source]¶
Bases:
object
This class manage batch processing in PartSeg.
- add_calculation(calculation)[source]¶
- Parameters:
calculation (
Calculation
) – Calculation
- get_results()[source]¶
Consume results from
BatchWorker
and transfer it toDataWriter
- Returns:
information about calculation status
- Return type:
- class PartSegCore.analysis.batch_processing.batch_backend.CalculationProcess[source]¶
Bases:
object
Main class to calculate PartSeg calculation plan. To support other operations overwrite
recursive_calculation()
call super function to support already defined operations.- do_calculation(calculation)[source]¶
Main function for calculation process
- Parameters:
calculation (
FileCalculation
) – calculation to do.- Return type:
- Returns:
- iterate_over(node)[source]¶
Execute calculation on node children or list oof nodes
- Parameters:
node (
CalculationTree
|list
[CalculationTree
])- Returns:
- recursive_calculation(node)[source]¶
Identify node type and then call proper
step_*
function- Parameters:
node (CalculationTree) – Node to be proceed
- step_load_mask(operation, children)[source]¶
Load mask using mask mapper (mask can be defined with suffix, substitution, or file with mapping saved, then iterate over
children
nodes.- Parameters:
operation (MaskMapper) – operation to perform
children (List[CalculationTree]) – list of nodes to iterate over with applied mask
- step_mask_create(operation, children)[source]¶
Create mask from current segmentation state using definition
- Parameters:
operation (MaskCreate) – mask create description.
children (List[CalculationTree]) – list of nodes to iterate over after perform segmentation
- step_mask_operation(operation, children)[source]¶
Generate new mask by sum or intersection of existing and iterate over
children
nodes- Parameters:
operation (Union[MaskSum, MaskIntersection]) – mask operation to perform
children (List[CalculationTree]) – list of nodes to iterate over after perform segmentation
- step_mask_use(operation, children)[source]¶
use already defined mask and iterate over
children
nodes- Parameters:
operation (MaskUse)
children (List[CalculationTree]) – list of nodes to iterate over after perform segmentation
- step_measurement(operation)[source]¶
Calculate measurement defined in current operation.
- Parameters:
operation (MeasurementCalculate) – definition of measurement to calculate
- step_save(operation)[source]¶
Perform save operation selected in plan.
- Parameters:
operation (Save) – save definition
- step_segmentation(operation, children)[source]¶
Perform segmentation and iterate over
children
nodes- Parameters:
operation (ROIExtractionProfile) – Specification of segmentation operation
children (List[CalculationTree]) – list of nodes to iterate over after perform segmentation
- class PartSegCore.analysis.batch_processing.batch_backend.DataWriter[source]¶
Bases:
object
Handle information
- add_result(data, calculation, ind=None)[source]¶
Add calculation result to file writer
- Raises:
ValueError – when calculation.measurement_file_path is not added with
add_data_part()
- Return type:
list
[Tuple
[Exception
,Union
[StackSummary
,Tuple
[Dict
,StackSummary
]]]]
- calculation_finished(calculation)[source]¶
Force write data for given calculation.
- Raises:
ValueError – when measurement is not added with
add_data_part()
- Return type:
list
[Tuple
[Exception
,Union
[StackSummary
,Tuple
[Dict
,StackSummary
]]]]- Returns:
list of errors during write.
- is_empty_sheet(file_path, sheet_name)[source]¶
Check if given pair of file_path and sheet_name can be used.
- Parameters:
- Returns:
If calling
FileData.add_data_part()
finish without error.- Return type:
- class PartSegCore.analysis.batch_processing.batch_backend.FileData(calculation, write_threshold=40)[source]¶
Bases:
object
Handle information about single file.
This class run separate thread for writing purpose. This need additional synchronisation. but not freeze
- Parameters:
calculation (BaseCalculation) – calculation information
write_threshold (int) – every how many lines of data are written to disk
- Variables:
component_str – separator for per component sheet information
- add_data_part(calculation)[source]¶
Add new calculation which result will be stored in handled file.
- Parameters:
calculation (BaseCalculation) – information about calculation
- Raises:
ValueError – when
measurement_file_path
is different to handled file orsheet_name
name already is in use.
- component_str = '_comp_'¶
separator for per component sheet information
- get_errors()[source]¶
Get list of errors occurred in last write
- Return type:
list
[Tuple
[Exception
,Union
[StackSummary
,Tuple
[Dict
,StackSummary
]]]]
- good_sheet_name(name)[source]¶
Check if sheet name can be used in current file. Return False if:
file is text file
contains
component_str
in namename is already in use
- wrote_data(uuid_id, data, ind=None)[source]¶
Add information to be stored in output file
- Parameters:
uuid_id (uuid.UUID) – calculation identifier
data (ResponseData) – calculation result
ind (Optional[int]) – element index
- class PartSegCore.analysis.batch_processing.batch_backend.FileType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
Enum
- class PartSegCore.analysis.batch_processing.batch_backend.ResponseData(path_to_file, values)[source]¶
Bases:
NamedTuple
-
values:
list
[MeasurementResult
]¶ Alias for field number 1
-
values:
- class PartSegCore.analysis.batch_processing.batch_backend.SheetData(name, columns, raw=False)[source]¶
Bases:
object
Store single sheet information
- PartSegCore.analysis.batch_processing.batch_backend.do_calculation(file_info, calculation)[source]¶
Main function which will be used for run calculation. It create
CalculationProcess
and call it methodCalculationProcess.do_calculation()
- Parameters:
file_info (
tuple
[int
,str
]) – index and path to file which should be processedcalculation (
BaseCalculation
) – calculation description
- Return type:
Tuple
[int
,List
[Union
[Tuple
[Exception
,Union
[StackSummary
,Tuple
[Dict
,StackSummary
]]],ResponseData
]]]
PartSegCore.analysis.batch_processing.parallel_backend¶
This module contains utils for parallel batch calculation.
Main class is BatchManager
which is used to manage
parallel calculation
Main workflow is to add work with BatchManager.add_work()
and consume results (BatchManager.get_result()
) until
BatchManager.has_work
is evaluating to true
- class PartSegCore.analysis.batch_processing.parallel_backend.BatchManager[source]¶
Bases:
object
This class is used for manage pending works. It use
BatchWorker
for running calculation.- add_work(individual_parameters_list, global_parameters, fun)[source]¶
This function add next works to internal structures. Number of works is length of
individual_parameters_list
- Parameters:
individual_parameters_list (
List
) – list of individual parameters for fun. For each elementfun
will be called with element as first argumentglobal_parameters – second argument of fun. If has field uuid then it is used as work uuid
fun (
Callable
[[Any
,Any
],Any
]) – two argument function which will be used to run calculation. First argument is task specific, second is const for whole work.
- Return type:
- Returns:
work uuid
- property finished¶
Check if any process is running
- class PartSegCore.analysis.batch_processing.parallel_backend.BatchWorker(task_queue, order_queue, result_queue, calculation_dict)[source]¶
Bases:
object
Worker spawned by
BatchManager
instance- Parameters:
- class PartSegCore.analysis.batch_processing.parallel_backend.SubprocessOrder(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
Enum
Commands for process to put in queue