Source code for h5flow

#!/usr/bin/env python
import logging
import os
import sys

global H5FLOW_MPI
if os.environ.get('H5FLOW_NOMPI', False):
    logging.warning(f'Running without mpi4py because H5FLOW_NOMPI={os.environ["H5FLOW_NOMPI"]}')
    H5FLOW_MPI = False
elif '--nompi' in sys.argv:
    logging.warning(f'Running without mpi4py because --nompi flag set')
    H5FLOW_MPI = False
else:
    try:
        from mpi4py import MPI
        H5FLOW_MPI = True
    except Exception as e:
        logging.warning(f'Running without mpi4py because {e}')
        H5FLOW_MPI = False

from .core import H5FlowManager, resources
import argparse
import yaml
from yamlinclude import YamlIncludeConstructor
YamlIncludeConstructor.add_to_loader_class(loader_class=yaml.FullLoader, base_dir='./')


[docs]def run(configs, output_filename, input_filename=None, start_position=None, end_position=None, verbose=0, drop=None, nompi=False): ''' Execute a workflow specified by ``config`` writing to ``output_filename``. :param configs: ``list``, paths to configuration yamls to run in sequence :param output_filename: ``str``, path to output hdf5 file :param input_filename: ``str``, path to optional input file (default: ``None``) :param start_position: ``int``, loop start index given to generator (default: ``None``) :param end_position: ``int``, loop end index given to generator (default: ``None``) :param verbose: ``int``, verbosity level (``0 = warnings only``, ``1 = info``, ``2 = debug``) :param drop: ``list`` of ``str``, additional objects to drop from output file :param nompi: ``bool`` flag to force run without MPI ''' global H5FLOW_MPI if nompi == True and H5FLOW_MPI: H5FLOW_MPI = False rank = MPI.COMM_WORLD.Get_rank() if H5FLOW_MPI else 0 log_level = {0: 'WARNING', 1: 'INFO', 2: 'DEBUG'}[verbose] logging.basicConfig(format=f'%(asctime)s (r{rank}) %(module)s.%(funcName)s[l%(lineno)d] %(levelname)s : %(message)s', level=log_level) logging.getLogger().setLevel(log_level) if rank == 0: print('~~~ H5FLOW ~~~') print(f'output file: {output_filename}') if input_filename is not None: print(f'input file: {input_filename}') if start_position is not None: print(f'start: {start_position}') if end_position is not None: print(f'end: {end_position}') if drop is not None: print(f'drop: {drop}') if nompi is not False: print(f'no mpi: {nompi}') if verbose > 0: print(f'verbose: {log_level}') print('~~~~~~~~~~~~~~\n') for iconfig, config in enumerate(configs): # if running with multiple workflows, use output file for subsequent runs if iconfig != 0: input_filename = output_filename # refresh resource list global resources for key in list(resources.keys()): del resources[key] # load workflow configuration if rank == 0: print('~~~ WORKFLOW ~~~' if len(configs) == 1 else f'~~~ WORKFLOW ({iconfig+1}/{len(configs)}) ~~~') if verbose > 0: print(f'# {config}') with open(config, 'r') as f: for line in f.readlines(): print(line.strip('\n')) else: print(config) print('~~~~~~~~~~~~~~~~\n') with open(config, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) if drop is not None: config['flow']['drop'] = config['flow'].get('drop', list()) + drop # execute workflow if rank == 0: print('~~~ INIT ~~~') manager = H5FlowManager(config, output_filename, input_filename=input_filename, start_position=start_position, end_position=end_position) manager.init() if rank == 0: print('~~~~~~~~~~~~\n') if rank == 0: print('~~~ RUN ~~~') manager.run() if rank == 0: print('~~~~~~~~~~~\n') if rank == 0: print('~~~ FINISH ~~~') manager.finish() if rank == 0: print('~~~~~~~~~~~~~~\n')
[docs]def main(): ''' Entry point for command line execution. Parses arguments from command line and executes ``run(**parsed_args)``. ''' parser = argparse.ArgumentParser() parser.add_argument('--verbose', '-v', action='count', default=0, help='''Increase verbosity, can specify more for more verbose (e.g. -vv)''') parser.add_argument('--input_filename', '-i', type=str, default=None, required=False, help='''input hdf5 file to loop over, optional if using a custom file generator''') parser.add_argument('--output_filename', '-o', type=str, required=True) parser.add_argument('--configs', '-c', type=str, nargs='+', required=True, help='''yaml config file(s)''') parser.add_argument('--start_position', '-s', type=int, default=None, help='''start position within source dset (for partial file processing)''') parser.add_argument('--end_position', '-e', type=int, default=None, help='''end position within source dset (for partial file processing)''') parser.add_argument('--drop', '-d', type=str, default=None, nargs='+', help='''drop objects from output file''') parser.add_argument('--nompi', action='store_true', default=False, help='''run h5flow without mpi enabled (can also be disabled by setting H5FLOW_NOMPI)''') args = parser.parse_args() run(**vars(args))