Source code for h5flow.core.h5_flow_manager
from tqdm import tqdm
import logging
import sys
import numpy as np
from .. import H5FLOW_MPI
if H5FLOW_MPI:
from mpi4py import MPI
from ..data.lib import dereference_chain
from ..data import H5FlowDataManager
from ..modules import get_class
from .h5_flow_resource import resources, H5FlowResource
[docs]class H5FlowManager(object):
'''
Overarching coordination class. Creates data managers, generators, and
stages according to the specified workflow. After initializing
the workflow, executes each generator and stages' ``init``, ``run``, and
``finish`` methods in sequence.
Also manages refreshing data in the ``cache`` and dropping datasets from
the final output file.
The standard execution sequence (as implemented in ``h5flow.run``) is::
manager = H5FlowManager(**args)
manager.init() # initialize components
manager.run() # execute workflow run loop
manager.finish() # clean up components
'''
def __init__(self, config, output_filename, input_filename=None, start_position=None, end_position=None):
self.comm = MPI.COMM_WORLD if H5FLOW_MPI else None
self.rank = self.comm.Get_rank() if H5FLOW_MPI else 0
self.size = self.comm.Get_size() if H5FLOW_MPI else 1
self.drop_list = config['flow'].get('drop', list())
# set up the data manager
self.configure_data_manager(output_filename, config)
# set up resources
self.configure_resources(config, input_filename, start_position, end_position)
# set up the file chunk generator
self.configure_generator(input_filename, config, start_position, end_position)
# set up flow stages
self.configure_flow(config)
if H5FLOW_MPI:
self.comm.barrier()
[docs] def configure_resources(self, config, input_filename, start_position, end_position):
'''
Create ``H5FlowResource`` instance for each object in config that
inherits from ``H5FlowResource``
:param input_filename: ``str``, input file path passed to the generator
:param config: ``dict``, parsed yaml config for workflow
:param start_position: ``int``, dataset start index passed to generator
:param end_position: ``int``, dataset end index passed to generator
'''
global resources
for obj_config in config.get('resources', list()):
obj_classname = obj_config['classname']
obj_path = obj_config.get('path', None)
obj_class = get_class(obj_classname, path=obj_path)
if issubclass(obj_class, H5FlowResource):
resources[obj_classname] = obj_class(
classname=obj_classname,
data_manager=self.data_manager,
input_filename=input_filename,
start_position=start_position,
end_position=end_position,
**obj_config.get('params', dict())
)
else:
raise RuntimeError(f'failed to load resource {obj_classname} - does not inherit from H5FlowResource')
[docs] def configure_data_manager(self, output_filename, config):
'''
Create an ``H5FlowDataManager`` to coordinate access into
``output_filename``. Access to the data manager is provided via::
manager = H5FlowManager(<config>, <filename>)
manager.data_manager # data manager instance
:param output_filename: ``str``, output file path
:param config: ``dict``, parsed yaml config for workflow
'''
self.data_manager = H5FlowDataManager(output_filename, drop_list=self.drop_list)
[docs] def configure_generator(self, input_filename, config, start_position, end_position):
'''
Create an ``H5FlowGenerator`` to produce slices into the source
dataset. Access to the generator is provided via::
manager = H5FlowManager(<config>, <filename>)
manager.generator # generator instance
If no generator configuration is found in the ``config``, a default
dataset loop generator is created.
:param input_filename: ``str``, input file path passed to the generator
:param config: ``dict``, parsed yaml config for workflow
:param start_position: ``int``, dataset start index passed to generator
:param end_position: ``int``, dataset end index passed to generator
'''
source_name = config['flow']['source']
source_config = config[source_name] if source_name in config else self._default_generator_config(source_name)
self.generator = get_class(source_config['classname'], path=source_config.get('path', None))(
classname=source_config['classname'],
dset_name=source_config['dset_name'],
data_manager=self.data_manager,
input_filename=input_filename,
start_position=start_position,
end_position=end_position,
**source_config.get('params', dict())
)
[docs] def configure_flow(self, config):
'''
Create instances of ``H5FlowStages`` in the sequence specified in
the ``config``. Access to the stages are provided via::
manager = H5FlowManager(<config>, <filename>)
manager.stages # data manager instance
:param config: ``dict``, parsed yaml config for workflow
'''
stage_names = config['flow'].get('stages', list())
stage_args = [config[stage_name] for stage_name in stage_names]
self.stages = [
get_class(args['classname'], path=args.get('path',None))(
classname=args['classname'],
name=name,
data_manager=self.data_manager,
requires=self.format_requirements(args.get('requires', list())),
**args.get('params', dict()))
for name, args in zip(stage_names, stage_args)
]
def _default_generator_config(self, source_name):
if self.rank == 0:
print(f'Could not find generator description, using default loop behavior on {source_name} dataset')
return dict(
classname='H5FlowDatasetLoopGenerator',
dset_name=source_name,
path='h5flow.modules'
)
[docs] def format_requirements(self, requirements):
'''
Converts list from the "requires" configuration option into an
list of dicts with::
name: name of object to place in cache
path: list of reference datasets (parent, child) to load for this requirement
index_only: boolean if dataset should only load reference indices rather than data
'''
req = []
for r in requirements:
if isinstance(r, str):
req.append(dict(
name=r,
path=[r],
index_only=False
))
elif isinstance(r, dict):
d = dict(name=r['name'])
if 'path' in r:
if isinstance(r['path'], str):
d['path'] = [r['path']]
elif isinstance(r['path'], list):
d['path'] = r['path']
else:
raise ValueError(f'Unrecognized path specification in {r}')
else:
d['path'] = [d['name']]
d['index_only'] = r['index_only'] if 'index_only' in r else False
req.append(d)
else:
raise ValueError(f'Unrecognized requirement {r}')
return req
[docs] def init(self):
'''
Execute ``init()`` method of resources, generator, and stages, in
sequence and in that order.
'''
global resources
for classname, resource in resources.items():
resource.init(self.generator.dset_name)
self.generator.init()
for stage in self.stages:
stage.init(self.generator.dset_name)
if H5FLOW_MPI:
self.comm.barrier()
[docs] def run(self):
'''
Run loop, executing ``run()`` method of generator and stages, in
sequence. Terminate once all processes return an
``H5FlowGenerator.EMPTY``.
Also refreshes the cache with required datasets on each stage.
'''
if self.rank == 0:
print(f'Run loop on {self.generator.dset_name}:')
print(' ' + ' -> '.join([stage.name for stage in self.stages]))
loop_gen = tqdm(self.generator, smoothing=1, ascii=True) if self.rank == 0 else self.generator
stage_requirements = [[r for stage in self.stages[:i + 1] for r in stage.requires] for i in range(len(self.stages))]
for chunk in loop_gen:
cache = dict()
for i, (stage, requirements) in enumerate(zip(self.stages, stage_requirements)):
self.update_cache(cache, self.generator.dset_name, chunk, requirements)
stage.run(self.generator.dset_name, chunk, cache)
sys.stdout.flush()
if H5FLOW_MPI:
self.comm.barrier()
[docs] def finish(self):
'''
Execute ``finish()`` method of generator and stages. After all
components have finished, drop datasets that are not wanted in
output file.
'''
self.generator.finish()
if H5FLOW_MPI:
self.comm.barrier()
for stage in self.stages:
stage.finish(self.generator.dset_name)
global resources
for classname, resource in resources.items():
resource.finish(self.generator.dset_name)
self.data_manager.finish()
[docs] def update_cache(self, cache, source_name, source_slice, requirements):
'''
Load and dereference "required" data associated with a given source
- first loads the data subset of ``source_name`` specified by the
``source_slice``. Then loops over the specification dicts in ``self.requires``
and loads data from references found in `'path'`.
Called automatically once per loop, just before calling ``run``.
Only loads data to the cache if it is not already present.
:param cache: ``dict`` cache to update
:param source_name: a path to the source dataset group
:param source_slice: a 1D slice into the source dataset
'''
required_names = [r['name'] for r in requirements]
for name in list(cache.keys()).copy():
if name not in required_names and name != source_name:
del cache[name]
if source_name not in cache:
cache[source_name] = self.data_manager.get_dset(source_name)[source_slice]
for i, linked_name in enumerate(required_names):
if linked_name not in cache:
cache[linked_name] = self.load_requirement(requirements[i], source_name, source_slice)
[docs] def load_requirement(self, req, source_name, source_slice):
'''
Loads a requirement specified by::
path: list of references to traverse
index_only: True to load only indices and not data
:param req: ``dict`` with items ``path : list of datasets`` and ``index_only : bool, true to only load index in to final dataset``. Loads a chain of references in a sequence of ``[source_name, *path]``
:param source_name: ``str`` base dataset to load
:param source_slice: ``slice`` into ``source_name`` to load
'''
path = req['path']
index_only = req['index_only']
logging.debug((f'loading requirement {req["name"]}: ' + ' -> '.join([source_name] + path)) +
('' if not index_only else '(index)'))
if self.data_manager.ref_exists(source_name, path[0]):
chain = list(zip([source_name] + path[:-1], path))
elif len(path) > 1:
chain = list(zip(path[:-1], path[1:]))
else:
try:
if not index_only:
return self.data_manager.get_dset(path[0])[source_slice]
else:
return np.r_[source_slice]
except Exception as e:
logging.info('failed to load: ' + path[0]
+ ('' if not index_only else '(index)') + ' : '
+ str(e))
return None
try:
data = self.data_manager.get_dset(path[-1])
ref, ref_dir = list(zip(*[self.data_manager.get_ref(p, c) for p, c in chain]))
regions = [self.data_manager.get_ref_region(p, c) for p, c in chain]
return dereference_chain(source_slice, ref, data=data, regions=regions, ref_directions=ref_dir, indices_only=index_only)
except Exception as e:
logging.info(('failed to load: ' + ' -> '.join([source_name] + path)
+ ('' if not index_only else '(index)') + ' : '
+ str(e)))
return None