h5flow.core
- class h5flow.core.h5_flow_manager.H5FlowManager(config, output_filename, input_filename=None, start_position=None, end_position=None)[source]
Overarching coordination class. Creates data managers, generators, and stages according to the specified workflow. After initializing the workflow, executes each generator and stages’
init,run, andfinishmethods in sequence.Also manages refreshing data in the
cacheand dropping datasets from the final output file.The standard execution sequence (as implemented in
h5flow.run) is:manager = H5FlowManager(**args) manager.init() # initialize components manager.run() # execute workflow run loop manager.finish() # clean up components
- configure_data_manager(output_filename, config)[source]
Create an
H5FlowDataManagerto coordinate access intooutput_filename. Access to the data manager is provided via:manager = H5FlowManager(<config>, <filename>) manager.data_manager # data manager instance
- Parameters
output_filename –
str, output file pathconfig –
dict, parsed yaml config for workflow
- configure_flow(config)[source]
Create instances of
H5FlowStagesin the sequence specified in theconfig. Access to the stages are provided via:manager = H5FlowManager(<config>, <filename>) manager.stages # data manager instance
- Parameters
config –
dict, parsed yaml config for workflow
- configure_generator(input_filename, config, start_position, end_position)[source]
Create an
H5FlowGeneratorto produce slices into the source dataset. Access to the generator is provided via:manager = H5FlowManager(<config>, <filename>) manager.generator # generator instance
If no generator configuration is found in the
config, a default dataset loop generator is created.- Parameters
input_filename –
str, input file path passed to the generatorconfig –
dict, parsed yaml config for workflowstart_position –
int, dataset start index passed to generatorend_position –
int, dataset end index passed to generator
- configure_resources(config, input_filename, start_position, end_position)[source]
Create
H5FlowResourceinstance for each object in config that inherits fromH5FlowResource- Parameters
input_filename –
str, input file path passed to the generatorconfig –
dict, parsed yaml config for workflowstart_position –
int, dataset start index passed to generatorend_position –
int, dataset end index passed to generator
- finish()[source]
Execute
finish()method of generator and stages. After all components have finished, drop datasets that are not wanted in output file.
- format_requirements(requirements)[source]
Converts list from the “requires” configuration option into an list of dicts with:
name: name of object to place in cache path: list of reference datasets (parent, child) to load for this requirement index_only: boolean if dataset should only load reference indices rather than data
- init()[source]
Execute
init()method of resources, generator, and stages, in sequence and in that order.
- load_requirement(req, source_name, source_slice)[source]
Loads a requirement specified by:
path: list of references to traverse index_only: True to load only indices and not data
- Parameters
req –
dictwith itemspath : list of datasetsandindex_only : bool, true to only load index in to final dataset. Loads a chain of references in a sequence of[source_name, *path]source_name –
strbase dataset to loadsource_slice –
sliceintosource_nameto load
- run()[source]
Run loop, executing
run()method of generator and stages, in sequence. Terminate once all processes return anH5FlowGenerator.EMPTY.Also refreshes the cache with required datasets on each stage.
- update_cache(cache, source_name, source_slice, requirements)[source]
Load and dereference “required” data associated with a given source - first loads the data subset of
source_namespecified by thesource_slice. Then loops over the specification dicts inself.requiresand loads data from references found in ‘path’.Called automatically once per loop, just before calling
run.Only loads data to the cache if it is not already present.
- Parameters
cache –
dictcache to updatesource_name – a path to the source dataset group
source_slice – a 1D slice into the source dataset
- class h5flow.core.h5_flow_generator.H5FlowGenerator(classname, dset_name, data_manager, input_filename=None, start_position=None, end_position=None, **params)[source]
Base class for generators. Provides the following attributes:
classname: stage classclass_version: astrversion number ('major.minor.fix', default ='0.0.0')dset_name: dataset to be accessed by each stagedata_manager: anH5FlowDataManagerinstance used to access the output fileinput_filename: an optional input filename (default =None)start_position: an optional start position to begin iterating (default =None)end_position: an optional end position to stop iterating (default =None)comm: MPI world communicator (if needed, elseNone)rank: MPI group ranksize: MPI group size
To build a custom generator, inherit from this base class and implement the
next()method.Example:
class ExampleGenerator(H5FlowGenerator): class_version = '0.0.0' default_max_value = 2**32-1 default_chunk_size = 1024 default_iterations = 100 def __init__(self, **params): super(ExampleGenerator,self).__init__(**params) # get config parameters self.max_value = params.get('max_value', self.default_max_value) self.chunk_size = params.get('chunk_size', self.default_chunk_size) # prepare loop if self.end_position is None: self.end_position = self.default_iterations self.iteration = 0 def init(self): # prepare output file self.data_manager.create_dset(self.dset_name, dtype=int) self.data_manager.set_attrs(self.obj_name, classname=self.classname, class_version=self.class_version, max_value=self.max_value, chunk_size=self.chunk_size, end_position=self.end_position, ) def next(self): if self.iteration >= self.end_position: return H5FlowGenerator.EMPTY self.iteration += 1 next_slice = self.data_manager.reserve_data(self.dset_name, self.chunk_size) self.data_manager.write_data(self.dset_name, next_slice, np.random.randint(self.max_value, self.chunk_size)) return next_slice
This example creates a generator that will fill the
dset_namedataset with random integer data (max value ofmax_value) in chunks of lengthchunk_size. The process will continue forend_positioniterations until it ends. Note that if running with MPI, each process will run for the same number of iterations (and so the data file will beNtimes larger).- EMPTY = slice(-1, -1, None)
- class_version = '0.0.0'
- class h5flow.core.h5_flow_stage.H5FlowStage(name, classname, data_manager, requires=None, **params)[source]
Base class for loop stage. Provides the following attributes:
name: instance name of stage (declared in configuration file)classname: stage classclass_version: astrversion number ('major.minor.fix', default ='0.0.0')data_manager: anH5FlowDataManagerinstance used to access the output filerequires: a list of dataset names to load when callingH5FlowStage.load()comm: MPI world communicator (if needed, elseNone)rank: MPI group ranksize: MPI group size
To build a custom stage, inherit from this base class and implement the
init()and therun()methods.Example:
class ExampleStage(H5FlowStage): class_version = '0.0.0' # keep track of a version number for each class default_custom_param = None default_obj_name = 'obj0' def __init__(**params): super(ExampleStage,self).__init__(**params) # grab parameters from configuration file here, e.g. self.custom_param = params.get('custom_param', self.default_custom_param) self.obj_name = self.name + '/' + params.get('obj_name', self.default_obj_name) def init(self, source_name): # declare any new datasets and set dataset metadata, e.g. self.data_manager.set_attrs(self.obj_name, classname=self.classname, class_version=self.class_version, custom_param=self.custom_param, ) self.data_manager.create_dset(self.obj_name) def run(self, source_name, source_slice): # load, process, and save new data objects data = self.load(source_name, source_slice)
- class_version = '0.0.0'
- class h5flow.core.h5_flow_resource.H5FlowResource(classname, data_manager, input_filename=None, start_position=None, end_position=None, **params)[source]
Base class for an accessible static resource. Provides:
classname: resource classclass_version: astrversion number ('major.minor.fix', default ='0.0.0')data_manager: anH5FlowDataManagerinstance used to access the output fileinput_filename: an optional input filename (default =None)start_position: an optional start position to begin iterating (default =None)end_position: an optional end position to stop iterating (default =None)comm: MPI world communicator (if needed, elseNone)rank: MPI group ranksize: MPI group size
To build a custom resource, implement the
init()orfinish()methods.To access a resource, declare it in the config file under
resources:resources: - classname: ExampleResource params: a_parameter: example
And then access it from a stage or generator via:
from h5flow.core import resources resources['ExampleResource']
- class_version = '0.0.0'