h5flow.core

class h5flow.core.h5_flow_manager.H5FlowManager(config, output_filename, input_filename=None, start_position=None, end_position=None)[source]

Overarching coordination class. Creates data managers, generators, and stages according to the specified workflow. After initializing the workflow, executes each generator and stages’ init, run, and finish methods in sequence.

Also manages refreshing data in the cache and dropping datasets from the final output file.

The standard execution sequence (as implemented in h5flow.run) is:

manager = H5FlowManager(**args)

manager.init()      # initialize components
manager.run()       # execute workflow run loop
manager.finish()    # clean up components
configure_data_manager(output_filename, config)[source]

Create an H5FlowDataManager to coordinate access into output_filename. Access to the data manager is provided via:

manager = H5FlowManager(<config>, <filename>)
manager.data_manager # data manager instance
Parameters
  • output_filenamestr, output file path

  • configdict, parsed yaml config for workflow

configure_flow(config)[source]

Create instances of H5FlowStages in the sequence specified in the config. Access to the stages are provided via:

manager = H5FlowManager(<config>, <filename>)
manager.stages # data manager instance
Parameters

configdict, parsed yaml config for workflow

configure_generator(input_filename, config, start_position, end_position)[source]

Create an H5FlowGenerator to produce slices into the source dataset. Access to the generator is provided via:

manager = H5FlowManager(<config>, <filename>)
manager.generator # generator instance

If no generator configuration is found in the config, a default dataset loop generator is created.

Parameters
  • input_filenamestr, input file path passed to the generator

  • configdict, parsed yaml config for workflow

  • start_positionint, dataset start index passed to generator

  • end_positionint, dataset end index passed to generator

configure_resources(config, input_filename, start_position, end_position)[source]

Create H5FlowResource instance for each object in config that inherits from H5FlowResource

Parameters
  • input_filenamestr, input file path passed to the generator

  • configdict, parsed yaml config for workflow

  • start_positionint, dataset start index passed to generator

  • end_positionint, dataset end index passed to generator

finish()[source]

Execute finish() method of generator and stages. After all components have finished, drop datasets that are not wanted in output file.

format_requirements(requirements)[source]

Converts list from the “requires” configuration option into an list of dicts with:

name: name of object to place in cache
path: list of reference datasets (parent, child) to load for this requirement
index_only: boolean if dataset should only load reference indices rather than data
init()[source]

Execute init() method of resources, generator, and stages, in sequence and in that order.

load_requirement(req, source_name, source_slice)[source]

Loads a requirement specified by:

path: list of references to traverse
index_only: True to load only indices and not data
Parameters
  • reqdict with items path : list of datasets and index_only : bool, true to only load index in to final dataset. Loads a chain of references in a sequence of [source_name, *path]

  • source_namestr base dataset to load

  • source_sliceslice into source_name to load

run()[source]

Run loop, executing run() method of generator and stages, in sequence. Terminate once all processes return an H5FlowGenerator.EMPTY.

Also refreshes the cache with required datasets on each stage.

update_cache(cache, source_name, source_slice, requirements)[source]

Load and dereference “required” data associated with a given source - first loads the data subset of source_name specified by the source_slice. Then loops over the specification dicts in self.requires and loads data from references found in ‘path’.

Called automatically once per loop, just before calling run.

Only loads data to the cache if it is not already present.

Parameters
  • cachedict cache to update

  • source_name – a path to the source dataset group

  • source_slice – a 1D slice into the source dataset

class h5flow.core.h5_flow_generator.H5FlowGenerator(classname, dset_name, data_manager, input_filename=None, start_position=None, end_position=None, **params)[source]

Base class for generators. Provides the following attributes:

  • classname: stage class

  • class_version: a str version number ('major.minor.fix', default = '0.0.0')

  • dset_name: dataset to be accessed by each stage

  • data_manager: an H5FlowDataManager instance used to access the output file

  • input_filename: an optional input filename (default = None)

  • start_position: an optional start position to begin iterating (default = None)

  • end_position: an optional end position to stop iterating (default = None)

  • comm: MPI world communicator (if needed, else None)

  • rank: MPI group rank

  • size: MPI group size

To build a custom generator, inherit from this base class and implement the next() method.

Example:

class ExampleGenerator(H5FlowGenerator):
    class_version = '0.0.0'

    default_max_value = 2**32-1
    default_chunk_size = 1024
    default_iterations = 100

    def __init__(self, **params):
        super(ExampleGenerator,self).__init__(**params)

        # get config parameters
        self.max_value = params.get('max_value', self.default_max_value)
        self.chunk_size = params.get('chunk_size', self.default_chunk_size)

        # prepare loop
        if self.end_position is None:
            self.end_position = self.default_iterations
        self.iteration = 0

    def init(self):
        # prepare output file
        self.data_manager.create_dset(self.dset_name, dtype=int)
        self.data_manager.set_attrs(self.obj_name,
            classname=self.classname,
            class_version=self.class_version,
            max_value=self.max_value,
            chunk_size=self.chunk_size,
            end_position=self.end_position,
            )

    def next(self):
        if self.iteration >= self.end_position:
            return H5FlowGenerator.EMPTY
        self.iteration += 1

        next_slice = self.data_manager.reserve_data(self.dset_name, self.chunk_size)
        self.data_manager.write_data(self.dset_name, next_slice, np.random.randint(self.max_value, self.chunk_size))

        return next_slice

This example creates a generator that will fill the dset_name dataset with random integer data (max value of max_value) in chunks of length chunk_size. The process will continue for end_position iterations until it ends. Note that if running with MPI, each process will run for the same number of iterations (and so the data file will be N times larger).

EMPTY = slice(-1, -1, None)
class_version = '0.0.0'
finish()[source]

Clean up any open files / etc, called once after run loop finishes

init()[source]

Prepare output file to be written to, called once before initializing the flow stages.

next()[source]

Generate a new slice into the source dataset in the data file. To end loop, return an empty slice (H5FlowGenerator.EMPTY).

Returns

<slice> into self.dset_name data

class h5flow.core.h5_flow_stage.H5FlowStage(name, classname, data_manager, requires=None, **params)[source]

Base class for loop stage. Provides the following attributes:

  • name: instance name of stage (declared in configuration file)

  • classname: stage class

  • class_version: a str version number ('major.minor.fix', default = '0.0.0')

  • data_manager: an H5FlowDataManager instance used to access the output file

  • requires: a list of dataset names to load when calling H5FlowStage.load()

  • comm: MPI world communicator (if needed, else None)

  • rank: MPI group rank

  • size: MPI group size

To build a custom stage, inherit from this base class and implement the init() and the run() methods.

Example:

class ExampleStage(H5FlowStage):
    class_version = '0.0.0' # keep track of a version number for each class

    default_custom_param = None
    default_obj_name = 'obj0'

    def __init__(**params):
        super(ExampleStage,self).__init__(**params)

        # grab parameters from configuration file here, e.g.
        self.custom_param = params.get('custom_param', self.default_custom_param)
        self.obj_name = self.name + '/' + params.get('obj_name', self.default_obj_name)

    def init(self, source_name):
        # declare any new datasets and set dataset metadata, e.g.

        self.data_manager.set_attrs(self.obj_name,
            classname=self.classname,
            class_version=self.class_version,
            custom_param=self.custom_param,
            )
        self.data_manager.create_dset(self.obj_name)

    def run(self, source_name, source_slice):
        # load, process, and save new data objects

        data = self.load(source_name, source_slice)
class_version = '0.0.0'
finish(source_name)[source]

Clean up any open files / etc, called once after run loop finishes

init(source_name)[source]

Called once before starting the loop. Used to create datasets and set file meta-data

Returns

None

run(source_name, source_slice, cache)[source]

Called once per source_slice provided by the h5flow generator.

Parameters
  • source_name – path to the source dataset group

  • source_slice – a 1D slice into the source dataset

  • cache – pre-loaded data from requires list

Returns

None

class h5flow.core.h5_flow_resource.H5FlowResource(classname, data_manager, input_filename=None, start_position=None, end_position=None, **params)[source]

Base class for an accessible static resource. Provides:

  • classname: resource class

  • class_version: a str version number ('major.minor.fix', default = '0.0.0')

  • data_manager: an H5FlowDataManager instance used to access the output file

  • input_filename: an optional input filename (default = None)

  • start_position: an optional start position to begin iterating (default = None)

  • end_position: an optional end position to stop iterating (default = None)

  • comm: MPI world communicator (if needed, else None)

  • rank: MPI group rank

  • size: MPI group size

To build a custom resource, implement the init() or finish() methods.

To access a resource, declare it in the config file under resources:

resources:
     - classname: ExampleResource
       params:
           a_parameter: example

And then access it from a stage or generator via:

from h5flow.core import resources

resources['ExampleResource']
class_version = '0.0.0'
finish(source_name)[source]

Called once after finishing loop and after generators and stages have finished.

Returns None

init(source_name)[source]

Called once before starting the loop and before generator has been initialized. Used to load data or configure resource.

Returns

None