import h5py
import numpy as np
import logging
import os
import time
import uuid
from .. import H5FLOW_MPI
if H5FLOW_MPI:
from mpi4py import MPI
from .lib import ref_region_dtype, dereference_chain
[docs]class H5FlowDataManager(object):
'''
Coordinates access to the output data file across multiple processes.
To initialize::
hfdm = H5FlowDataManager(<path to file>, mode=<'r'/'a'>, mpi=<True/False>)
Opening and closing the underlying resource is handled automatically when
using the dedicated file access API, e.g.::
hfdm.dset_exists(...)
hfdm.create_dset(...)
hfdm.get_ref(...)
hfdm.reserve_data(...)
hfdm.write_ref(...)
hfdm[...]
...
'''
_temp_filename_fmt = 'tmp-h5flow-%y.%m.%d-%H.%M.%S-{uid}.h5'
def __init__(self, filepath, mode='a', mpi=H5FLOW_MPI, drop_list=None):
self.filepath = filepath
self._fh = None
self._temp_fh = None
self.mpi_flag = mpi
self.mode = mode
self.comm = MPI.COMM_WORLD if self.mpi_flag else None
self.rank = self.comm.Get_rank() if self.mpi_flag else 0
self.size = self.comm.Get_size() if self.mpi_flag else 1
if drop_list:
self.drop_list = drop_list
uid = uuid.uuid4()
if self.mpi_flag:
uid = self.comm.bcast(uid, root=0)
self._temp_filepath = os.path.join(os.path.dirname(self.filepath),
time.strftime(self._temp_filename_fmt).format(uid=uid))
logging.info(f'writing temporary data to {self._temp_filepath}')
else:
self.drop_list = list()
self._temp_filepath = None
def __repr__(self):
return f'H5FlowDataManager(filepath={self.filepath}, mode={self.mode}, mpi={self.mpi_flag}, drop_list={self.drop_list})'
def __getitem__(self, args):
'''
Fetch an object or load a dataset (or partial dataset) using the following convention::
dm[<object name>] # fetch a given object from the file
dm[<parent dataset name>, <child datasetname, optional>, ..., <slice into parent dataset, optional>] # load references between datasets
E.g. a file containing datasets ``'dataset0/data'``,
``'dataset1/data'``, and references between them, can be accessed
via::
dm['dataset0'] # returns 'dataset0' group
dm['dataset0/data'] # returns 'dataset0/data' dataset
dm['dataset0',:100] # returns first 100 rows from 'dataset0/data' dataset
dm['dataset0','dataset1',:100] # returns referred data in 'dataset1/data' corresponding to the first 100 rows of 'dataset0/data'
'''
if not isinstance(args, str):
sel = None
if isinstance(args[-1], slice):
sel = args[-1]
args = args[:-1]
elif isinstance(args[-1], int):
sel = slice(args[-1], args[-1] + 1)
args = args[:-1]
elif isinstance(args[-1], (str,bytes)):
pass
else:
sel = np.r_[args[-1]]
args = args[:-1]
if len(args) > 1:
path_specs = list(zip(args[:-1], args[1:]))
if sel is None:
sel = slice(0, len(self.get_dset(args[0])))
regions = None
else:
regions = [self.get_ref_region(*spec) for spec in path_specs]
refs = [self.get_ref(*spec) for spec in path_specs]
refs, ref_directions = zip(*refs)
return dereference_chain(
sel, refs, data=self.get_dset(args[-1]), regions=regions,
ref_directions=ref_directions)
else:
if sel is None:
return self.get_dset(args[0])
else:
return self.get_dset(args[0])[sel]
else:
return self._route_fh(args)[args]
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
try:
self.finish()
except:
try:
self.close_file()
except Exception as e:
raise e
[docs] def finish(self):
'''
Deletes datasets specified in the drop_list before closing file
handle.
'''
for path in self.drop_list:
logging.info(f'deleting item at {path}')
self.delete(path)
self.close_file()
if self.mpi_flag:
self.comm.barrier()
if self._temp_filepath is not None and self.rank == 0:
logging.info(f'removing temporary file {self._temp_filepath}')
os.remove(self._temp_filepath)
def _open_file(self, mpi=True, mode='a'):
if (self._fh is not None or mpi != self.mpi_flag) and self._fh:
# close file if mpi mode changes
self.close_file()
if mpi:
# open file with mpi enabled
self._fh = h5py.File(self.filepath, mode, driver='mpio', comm=self.comm)
self.comm.barrier()
if self._temp_filepath is not None:
self._temp_fh = h5py.File(self._temp_filepath, mode, driver='mpio', comm=self.comm)
else:
# open file without mpi enabled
self._fh = h5py.File(self.filepath, mode)
if self._temp_filepath is not None:
self._temp_fh = h5py.File(self._temp_filepath, mode)
# update mpi flag
self.mpi_flag = mpi
[docs] def close_file(self):
'''
Force underlying hdf5 resource to close
'''
if self._fh is not None and self._fh:
self._fh.close()
if self._temp_fh is not None and self._temp_fh:
self._temp_fh.close()
@property
def fh(self):
'''
Direct access to the underlying h5py ``File`` object. Not recommended
for use. Instead, use ``get_dset(...)``, ``write_data(...)``, or
the implemented ``__getitem__()``.
'''
if self._fh is None or not self._fh:
self._open_file(mpi=self.mpi_flag, mode=self.mode)
return self._fh
def _route_fh(self, path):
'''
Return file handle to temp file or output file depending on if
path is in drop list. Result is cached such that subsequent calls
return the same handle.
'''
if path in self.fh:
# if it already exists in output file, do not use temp file
return self.fh
elif any([d in path for d in self.drop_list]):
return self._temp_fh
else:
return self.fh
[docs] def delete(self, name):
'''
Delete object at and references to ``name``. Ignored if path is
in temp file.
:param name: ``str`` path to dataset to be deleted
'''
for ref in self.get_refs(name):
fh = self._route_fh(ref.attrs['ref_region0'])
if ref.attrs['ref_region0'] in fh and fh is not self._temp_fh:
del fh[ref.attrs['ref_region0'][:-10]] # remove reference group
fh = self._route_fh(ref.attrs['ref_region1'])
if ref.attrs['ref_region1'] in fh and fh is not self._temp_fh:
del fh[ref.attrs['ref_region1'][:-10]] # remove reference group
fh = self._route_fh(name)
if name in fh and fh is not self._temp_fh:
del fh[name] # remove object group
[docs] def exists(self, path):
'''
Check if a path exists
:param path: ``str`` path to check
:returns: ``True`` if path is present
'''
return path in self._route_fh(path)
[docs] def dset_exists(self, dataset_name):
'''
Check if data object of ``dataset_name`` exists
:param dataset_name: ``str`` path to dataset, e.g. ``stage0/obj0``
:returns: ``True`` if data object exists
'''
return self.exists(f'{dataset_name}/data')
[docs] def ref_exists(self, parent_dataset_name, child_dataset_name):
'''
Check if references for ``parent_dataset_name -> child_dataset_name`` exists
:param parent_dataset_name: ``str`` path to parent dataset, e.g. ``stage0/obj0``
:param child_dataset_name: ``str`` path to child dataset, e.g. ``stage0/obj1``
:returns: ``True`` if references exists
'''
path0 = self.exists(f'{parent_dataset_name}/ref/{child_dataset_name}/ref')
path1 = self.exists(f'{child_dataset_name}/ref/{parent_dataset_name}/ref')
return (path0 or path1)
[docs] def ref_region_exists(self, parent_dataset_name, child_dataset_name):
'''
Check if reference table for ``parent_dataset_name -> child_dataset_name`` exists
:param parent_dataset_name: ``str`` path to parent dataset, e.g. ``stage0/obj0``
:param child_dataset_name: ``str`` path to child dataset, e.g. ``stage0/obj1``
:returns: ``True`` if reference table exists
'''
path = f'{parent_dataset_name}/ref/{child_dataset_name}/ref_region'
return self.exists(path)
[docs] def attr_exists(self, name, key):
'''
Check if attribute ``key`` exists for ``name``
:param name: ``str`` path to object, e.g. ``stage0/obj0`` or ``stage0``
:param key: ``str`` attribute name
:returns: ``True`` if attribute exists
'''
if self.exists(f'{name}'):
return key in self._route_fh(name)[f'{name}'].attrs
return False
[docs] def get_dset(self, dataset_name):
'''
Get dataset of ``dataset_name``
:param dataset_name: ``str`` path to dataset, e.g. ``stage0/obj0``
:returns: ``h5py.Dataset``, e.g. ``stage0/obj0/data``
'''
dset = self._route_fh(f'{dataset_name}/data')[f'{dataset_name}/data']
return dset
[docs] def get_attrs(self, name):
'''
Get attributes of ``name``
:param name: ``str`` path to object, e.g. ``stage0``
:returns: ``h5py.AttributeManager``
'''
return self._route_fh(name)[f'{name}'].attrs
[docs] def get_ref(self, parent_dataset_name, child_dataset_name):
'''
Get references of ``parent_dataset_name -> child_dataset_name``
:param parent_dataset_name: ``str`` path to parent dataset, e.g. ``stage0/obj0``
:param child_dataset_name: ``str`` path to child dataset, e.g. ``stage0/obj1``
:returns: ``tuple`` of ``h5py.Dataset``, reference direction; e.g. ``(stage0/obj0/ref/stage0/obj1/ref, (0,1))``
'''
path = f'{parent_dataset_name}/ref/{child_dataset_name}/ref'
fh = self._route_fh(path)
if path in fh:
dset = fh[path]
return dset, (0, 1)
path = f'{child_dataset_name}/ref/{parent_dataset_name}/ref'
dset = self._route_fh(path)[path]
return dset, (1, 0)
[docs] def get_refs(self, dataset_name):
'''
Get all references involving ``dataset_name -> other``
'''
fh = self._route_fh(dataset_name)
reg_regions = list()
if dataset_name not in fh:
return list()
fh[dataset_name].visititems(lambda n, d:
reg_regions.append(d)
if isinstance(d, h5py.Dataset) and n.endswith('/ref_region')
else None
)
return [self._route_fh(d.attrs['ref'])[d.attrs['ref']] for d in reg_regions
if d.attrs['ref'] in self._route_fh(d.attrs['ref'])]
[docs] def get_ref_region(self, parent_dataset_name, child_dataset_name):
'''
Get reference lookup regions for ``parent_dataset_name -> child_dataset_name``
:param parent_dataset_name: ``str`` path to parent dataset, e.g. ``stage0/obj0``
:param child_dataset_name: ``str`` path to child dataset, e.g. ``stage0/obj1``
:returns: ``h5py.Dataset``, ``stage0/obj0/ref/stage0/obj1/ref_region``, (0,1)
'''
path = f'{parent_dataset_name}/ref/{child_dataset_name}/ref_region'
return self._route_fh(path)[path]
[docs] def set_attrs(self, name, **attrs):
'''
Update attributes of ``name``. Attribute ``key: value`` are passed
in as additional keyword arguments
:param name: ``str`` path to object, e.g. ``stage0``
'''
fh = self._route_fh(name)
if name not in fh:
fh.create_group(name)
for key, val in attrs.items():
fh[name].attrs[key] = val
[docs] def create_dset(self, dataset_name, dtype, shape=()):
'''
Create a 1D dataset of ``dataset_name`` with datatype ``dtype``, if
it doesn't already exist
:param dataset_name: ``str`` path to dataset, e.g. ``stage0/obj0``
:param dtype: ``np.dtype`` of dataset, can be a structured dtype
'''
path = f'{dataset_name}/data'
fh = self._route_fh(path)
if path not in fh:
fh.require_dataset(path, (0,) + shape, maxshape=(None,) + shape,
dtype=dtype)
[docs] def create_ref(self, parent_dataset_name, child_dataset_name):
'''
Create a 1D dataset of references of
``parent_dataset_name -> child_dataset_name``, if it doesn't already
exist. Both datasets must already exist.
:param parent_dataset_name: ``str`` path to parent dataset, e.g. ``stage0/obj0``
:param child_dataset_name: ``str`` path to child dataset, e.g. ``stage0/obj1``
'''
child_path = f'{child_dataset_name}/ref/{parent_dataset_name}'
if child_path + '/ref' in self._route_fh(child_path):
raise RuntimeError(f'References for {parent_dataset_name}->{child_dataset_name} already exist under {child_path}')
path = f'{parent_dataset_name}/ref/{child_dataset_name}'
fh = self._route_fh(path)
if path not in fh:
# create reference group, if not present
if f'{parent_dataset_name}/ref' not in fh:
fh.create_group(f'{parent_dataset_name}/ref')
# create bi-directional reference dataset
fh.require_dataset(path + '/ref', shape=(0, 2), maxshape=(None, 2),
dtype='u4')
# link to source datasets
fh[path + '/ref'].attrs['dset0'] = self.get_dset(parent_dataset_name).name
fh[path + '/ref'].attrs['dset1'] = self.get_dset(child_dataset_name).name
# create lookup table dataset
parent_dset = self.get_dset(parent_dataset_name)
child_dset = self.get_dset(child_dataset_name)
child_fh = self._route_fh(child_path)
fh.require_dataset(path + '/ref_region', shape=(len(parent_dset),), maxshape=(None,),
dtype=ref_region_dtype, fillvalue=np.zeros((1,), dtype=ref_region_dtype))
child_fh.require_dataset(child_path + '/ref_region', shape=(len(child_dset),), maxshape=(None,),
dtype=ref_region_dtype, fillvalue=np.zeros((1,), dtype=ref_region_dtype))
# link to references
fh[path + '/ref_region'].attrs['ref'] = fh[path + '/ref'].name
child_fh[child_path + '/ref_region'].attrs['ref'] = fh[path + '/ref'].name
# link back to lookup tables
fh[path + '/ref'].attrs['ref_region0'] = fh[f'{path}/ref_region'].name
fh[path + '/ref'].attrs['ref_region1'] = child_fh[f'{child_path}/ref_region'].name
def _resize_dset(self, dset, new_shape):
curr_shape = dset.shape
dset.resize(new_shape + curr_shape[1:])
if dset.name.endswith('/data'):
for ref in self.get_refs(dset.name[:-5]):
dset0 = ref.attrs['dset0']
dset1 = ref.attrs['dset1']
self._resize_dset(
self._route_fh(ref.attrs['ref_region0'])[ref.attrs['ref_region0']],
(len(self._route_fh(dset0)[dset0]),)
)
self._resize_dset(
self._route_fh(ref.attrs['ref_region1'])[ref.attrs['ref_region1']],
(len(self._route_fh(dset1)[dset1]),)
)
[docs] def reserve_data(self, dataset_name, spec):
'''
Coordinate access into ``dataset_name``. Depending on the type of
``spec`` a different access mode will be performed:
- ``int``: access in append mode - will grant access to ``spec`` rows at the end of the dataset
- ``slice`` or list of ``int`` or list of ``slice``: access a specific section(s) of the dataset - will resize dataset if section does not exist
:param dataset_name: ``str`` path to dataset, e.g. ``stage0/obj0``
:param spec: see function description
:returns: ``slice`` into ``dataset_name`` where access is given
'''
dset = self.get_dset(dataset_name)
curr_len = len(dset)
specs = self.comm.allgather(spec) if self.mpi_flag else [spec]
if isinstance(spec, int):
# create a new chunk at the end of the dataset
n = sum(specs)
self._resize_dset(dset, (curr_len + n,))
rv = slice(curr_len + sum(specs[:self.rank]), curr_len + sum(specs[:self.rank + 1]))
elif isinstance(spec, slice):
# maybe create up to a specific chunk of the dataset
new_size = max([spec.stop for spec in specs])
if new_size > curr_len:
self._resize_dset(dset, (new_size,))
rv = spec
else:
raise TypeError(f'spec {spec} is not a valid type, must be slice or integer')
return rv
[docs] def write_data(self, dataset_name, spec, data):
'''
Write ``data`` into ``dataset_name`` at ``spec``
:param dataset_name: ``str`` path to dataset, e.g. ``stage0/obj0``
:param spec: ``slice`` into ``dataset_name`` to write ``data``
:param data: numpy array or iterable to write
'''
dset = self.get_dset(dataset_name)
dset[spec] = data
def _update_ref_region(self, region_dset, sel, ref_arr, ref_offset):
# Note:: ref_arr is the 1D array of indices into region_dset to update, ref_offset is where ref_array is positioned within a larger ref dataset
max_length = int(np.max(self.comm.allgather(sel.stop))) if self.mpi_flag else sel.stop
if len(region_dset) < max_length:
self._resize_dset(region_dset, (max_length,))
region = region_dset[sel]
_, idcs, start_idcs = np.intersect1d(np.r_[sel], ref_arr, return_indices=True)
start = np.zeros(len(region), dtype='i8')
start[idcs] = ref_offset + start_idcs
start = np.where(
region['start'] != region['stop'],
np.minimum(region['start'], start),
start
)
region_dset[sel, 'start'] = start
_, idcs, stop_idcs = np.intersect1d(np.r_[sel], ref_arr[::-1], return_indices=True)
stop = np.zeros(len(region), dtype='i8')
stop[idcs] = ref_offset + len(ref_arr) - stop_idcs
stop = np.where(
region['start'] != region['stop'],
np.maximum(region['stop'], stop),
stop
)
region_dset[sel, 'stop'] = stop
[docs] def write_ref(self, parent_dataset_name, child_dataset_name, refs):
'''
Add refs for ``parent_dataset_name -> child_dataset_name``. Note
that references are never updated and can't be removed after they
are created.
:param refs: an integer array of shape (N,2) with refs[:,0] corresponding to the index in the parent dataset and refs[:,1] corresponding to the index in the child dataset
'''
ns = self.comm.allgather(len(refs)) if self.mpi_flag else [len(refs)]
ref_dset, ref_dir = self.get_ref(parent_dataset_name, child_dataset_name)
ref_offset = len(ref_dset) + sum(ns[:self.rank])
self._resize_dset(ref_dset, (len(ref_dset) + sum(ns),))
ref_slice = slice(ref_offset, ref_offset + ns[self.rank])
ref_dset[ref_slice] = refs[:, ref_dir]
parent_ref_region_dset = self.get_ref_region(parent_dataset_name, child_dataset_name)
child_ref_region_dset = self.get_ref_region(child_dataset_name, parent_dataset_name)
if len(refs):
parent_sel = slice(int(np.min(refs[:, 0])), int(np.max(refs[:, 0]) + 1))
child_sel = slice(int(np.min(refs[:, 1])), int(np.max(refs[:, 1]) + 1))
else:
parent_sel = slice(0, 0)
child_sel = slice(0, 0)
self._update_ref_region(parent_ref_region_dset, parent_sel, refs[:, 0], ref_offset)
self._update_ref_region(child_ref_region_dset, child_sel, refs[:, 1], ref_offset)