""" Module containing the storage services.
Contains the standard :class:`~pypet.storageservice.HDF5StorageSerivce`
as well wrapper classes to allow thread safe multiprocess storing.
"""
__author__ = 'Robert Meyer'
import tables as pt
import tables.parameters as ptpa
import os
import warnings
import time
import sys
try:
import queue
except ImportError:
import Queue as queue
import numpy as np
from pandas import DataFrame, Series, Panel, Panel4D, HDFStore
import pypet.compat as compat
import pypet.utils.ptcompat as ptcompat
import pypet.pypetconstants as pypetconstants
import pypet.pypetexceptions as pex
from pypet._version import __version__ as VERSION
from pypet.parameter import ObjectTable
import pypet.naturalnaming as nn
from pypet.pypetlogging import HasLogger
class MultiprocWrapper(object):
"""Abstract class definition of a Wrapper.
Note that only storing is required, loading is optional.
ABSTRACT: Needs to be defined in subclass
"""
def store(self, *args, **kwargs):
raise NotImplementedError('Implement this!')
[docs]class QueueStorageServiceSender(MultiprocWrapper, HasLogger):
""" For multiprocessing with :const:`~pypet.pypetconstants.WRAP_MODE_QUEUE`, replaces the
original storage service.
All storage requests are send over a queue to the process running the
:class:`~pypet.storageservice.QueueStorageServiceWriter`.
Does not support loading of data!
"""
def __init__(self):
self._queue = None
self._set_logger()
@property
def queue(self):
"""The queue to put data on"""
return self._queue
@queue.setter
[docs] def queue(self, queue):
self._queue = queue
def __setstate__(self, statedict):
self.__dict__.update(statedict)
self._set_logger()
def __getstate__(self):
result = self.__dict__.copy()
result['_queue'] = None
del result['_logger']
return result
def load(self, *args, **kwargs):
raise NotImplementedError('Queue wrapping does not support loading. If you want to '
'load data in a multiprocessing environment, use the Lock '
'wrapping.')
[docs] def store(self, *args, **kwargs):
"""Puts data to store on queue."""
try:
self._queue.put(('STORE', args, kwargs))
except IOError:
# # This is due to a bug in Python, repeating the operation works :-/
# # See http://bugs.python.org/issue5155
try:
self._logger.error('Failed sending task %s to queue, I will try again.' %
str(('STORE', args, kwargs)))
self._queue.put(('STORE', args, kwargs))
self._logger.error('Second queue sending try was successful!')
except IOError:
self._logger.error('Failed sending task %s to queue, I will try one last time.' %
str(('STORE', args, kwargs)))
self._queue.put(('STORE', args, kwargs))
self._logger.error('Third queue sending try was successful!')
except TypeError as e:
# This handles a weird bug under python 3.4 (not 3.3) that sometimes
# a NoneType is put on the queue instead of real data which needs to be ignored
self._logger.error('Could not put %s because of: %s, '
'I will try again.' %
(str(('STORE', args, kwargs)), str(e)))
try:
self._logger.error(
'Failed sending task %s to queue due to: %s. I will try again.' %
(str(('STORE', args, kwargs)), str(e)))
self._queue.put(('STORE', args, kwargs))
self._logger.error('Second queue sending try was successful!')
except TypeError as e:
self._logger.error(
'Failed sending task %s to queue due to: %s. I will try one last '
'time again.' %
(str(('STORE', args, kwargs)), str(e)))
self._queue.put(('STORE', args, kwargs))
self._logger.error('Third queue sending try was successful!')
except Exception as e:
self._logger.error('Could not put %s because of: %s' %
(str(('STORE', args, kwargs)), str(e)))
raise
[docs] def send_done(self):
"""Signals the writer that it can stop listening to the queue"""
self._queue.put(('DONE', [], {}))
[docs]class QueueStorageServiceWriter(HasLogger):
"""Wrapper class that listens to the queue and stores queue items via the storage service."""
def __init__(self, storage_service, queue):
self._storage_service = storage_service
self._queue = queue
self._trajectory_name = None
self._set_logger()
[docs] def run(self):
"""Starts listening to the queue."""
while True:
try:
to_store_list = []
stop_listening = False
while True:
# We try to grap more data from the queue to avoid reopneing the
# hdf5 file all the time
try:
msg, args, kwargs = self._queue.get(False)
if msg == 'DONE':
stop_listening = True
elif msg == 'STORE':
if 'msg' in kwargs:
store_msg = kwargs.pop('msg')
else:
store_msg = args[0]
args = args[1:]
if 'stuff_to_store' in kwargs:
stuff_to_store = kwargs.pop('stuff_to_store')
else:
stuff_to_store = args[0]
args = args[1:]
trajectory_name = kwargs.pop('trajectory_name')
if self._trajectory_name is None:
self._trajectory_name = trajectory_name
elif self._trajectory_name != trajectory_name:
raise RuntimeError('Cannot store into two different trajectories. '
'I am supposed to store into %s, '
'but I should also '
'store into %s.' %
(self._trajectory_name, trajectory_name))
to_store_list.append((store_msg, stuff_to_store, args, kwargs))
else:
raise RuntimeError(
'You queued something that was not intended to be queued!')
except queue.Empty:
break
except TypeError as e:
self._logger.error('Could not get %s because of: %s, '
'I will ignore the error and cwhite for another '
'try.' %
(str(('STORE', args, kwargs)), str(e)))
# Under python 3.4 sometimes NoneTypes are put on the queue
# causing a TypeError. Apart form this sketchy handling, I have
# so far no solution
except Exception as e:
self._logger.error('Could not get %s because of: %s' %
(str(('STORE', args, kwargs)), str(e)))
raise
if to_store_list:
self._storage_service.store(pypetconstants.LIST, to_store_list,
trajectory_name=self._trajectory_name)
if stop_listening:
break
except:
raise
finally:
try:
self._queue.task_done()
except ValueError:
pass
[docs]class LockWrapper(MultiprocWrapper, HasLogger):
"""For multiprocessing in :const:`~pypet.pypetconstants.WRAP_MODE_LOCK` mode,
augments a storage service with a lock.
The lock is acquired before storage or loading and released afterwards.
"""
def __init__(self, storage_service, lock):
self._storage_service = storage_service
self._lock = lock
self._set_logger()
[docs] def store(self, *args, **kwargs):
"""Acquires a lock before storage and releases it afterwards."""
try:
self._lock.acquire()
self._storage_service.store(*args, **kwargs)
finally:
if self._lock is not None:
try:
self._lock.release()
except RuntimeError:
self._logger.error('Could not release lock `%s`!' % str(self._lock))
[docs] def load(self, *args, **kwargs):
"""Acquires a lock before loading and releases it afterwards."""
try:
self._lock.acquire()
self._storage_service.load(*args, **kwargs)
finally:
if self._lock is not None:
try:
self._lock.release()
except RuntimeError:
self._logger.error('Could not release lock `%s`!' % str(self._lock))
class StorageService(object):
"""Abstract base class defining the storage service interface."""
def store(self, msg, stuff_to_store, *args, **kwargs):
"""See :class:`pypet.storageservice.HDF5StorageService` for an example of an
implementation and requirements for the API.
ABSTRACT: Needs to be defined in subclass
"""
raise NotImplementedError('Implement this!')
def load(self, msg, stuff_to_load, *args, **kwargs):
""" See :class:`pypet.storageservice.HDF5StorageService` for an example of an
implementation and requirements for the API.
ABSTRACT: Needs to be defined in subclass
"""
raise NotImplementedError('Implement this!')
[docs]class LazyStorageService(StorageService):
"""This lazy guy does nothing! Only for debugging purposes.
Ignores all storage and loading requests and simply executes `pass` instead.
"""
[docs] def load(self, *args, **kwargs):
"""Nope, I won't care, dude!"""
pass
[docs] def store(self, *args, **kwargs):
"""Do whatever you want, I won't store anything!"""
pass
class NodeProcessingTimer(HasLogger):
"""Simple Class to display the processing of nodes"""
def __init__(self, display_time=30, logger_name=None):
self._start_time = time.time()
self._last_time = self._start_time
self._display_time = display_time
self._set_logger(logger_name)
self._updates = 0
self._last_updates = 0
def signal_update(self):
"""Signals the process timer.
If more time than the display time has passed a message is emitted.
"""
self._updates += 1
current_time = time.time()
dt = current_time - self._last_time
if dt > self._display_time:
dfullt = current_time - self._start_time
seconds = int(dfullt) % 60
minutes = int(dfullt) / 60
if minutes == 0:
formatted_time = '%ds' % seconds
else:
formatted_time = '%dm%02ds' % (minutes, seconds)
nodespersecond = self._updates / dfullt
message = 'Processed %d nodes in %s (%.2f nodes/s).' % \
(self._updates, formatted_time, nodespersecond)
self._logger.info(message)
self._last_time = current_time
class PTItemMock(object):
"""Class that mocks a PyTables item and wraps around a dictionary"""
def __init__(self, dictionary):
self._v_attrs = dictionary
def _f_setattr(self, name, val):
self._v_attrs[name] = val
def _f_setAttr(self, name, val):
self._v_attrs[name] = val
[docs]class HDF5StorageService(StorageService, HasLogger):
"""Storage Service to handle the storage of a trajectory/parameters/results into hdf5 files.
Normally you do not interact with the storage service directly but via the trajectory,
see :func:`pypet.trajectory.Trajectory.f_store` and :func:`pypet.trajectory.Trajectory.f_load`.
The service is not thread safe. For multiprocessing the service needs to be wrapped either
by the :class:`~pypet.storageservice.LockWrapper` or with a combination of
:class:`~pypet.storageservice.QueueStorageServiceSender` and
:class:`~pypet.storageservice.QueueStorageServiceWriter`.
The storage service supports two operations *store* and *load*.
Requests for these two are always passed as
`msg, what_to_store_or_load, *args, **kwargs`
For example:
>>> HDF5StorageService.load(pypetconstants.LEAF, myresult, load_only=['spikestimes','nspikes'])
For a list of supported items see :func:`~pypet.storageservice.HDF5StorageService.store`
and :func:`~pypet.storageservice.HDF5StorageService.load`.
"""
ADD_ROW = 'ADD'
''' Adds a row to an overview table'''
REMOVE_ROW = 'REMOVE'
''' Removes a row from an overview table'''
MODIFY_ROW = 'MODIFY'
''' Changes a row of an overview table'''
COLL_TYPE = 'COLL_TYPE'
'''Type of a container stored to hdf5, like list,tuple,dict,etc
Must be stored in order to allow perfect reconstructions.
'''
COLL_LIST = 'COLL_LIST'
''' Container was a list'''
COLL_TUPLE = 'COLL_TUPLE'
''' Container was a tuple'''
COLL_NDARRAY = 'COLL_NDARRAY'
''' Container was a numpy array'''
COLL_MATRIX = 'COLL_MATRIX'
''' Container was a numpy matrix'''
COLL_DICT = 'COLL_DICT'
''' Container was a dictionary'''
COLL_SCALAR = 'COLL_SCALAR'
''' No container, but the thing to store was a scalar'''
SCALAR_TYPE = 'SCALAR_TYPE'
''' Type of scalars stored into a container'''
### Overview Table constants
CONFIG = 'config'
PARAMETERS = 'parameters'
RESULTS = 'results'
EXPLORED_PARAMETERS = 'explored_parameters'
DERIVED_PARAMETERS = 'derived_parameters'
NAME_TABLE_MAPPING = {
'_overview_config': 'config',
'_overview_parameters': 'parameters',
'_overview_derived_parameters_trajectory': 'derived_parameters_trajectory',
'_overview_derived_parameters_runs': 'derived_parameters_runs',
'_overview_results_trajectory': 'results_trajectory',
'_overview_results_runs': 'results_runs',
'_overview_explored_parameters': 'explored_parameters',
'_overview_derived_parameters_runs_summary': 'derived_parameters_runs_summary',
'_overview_results_runs_summary': 'results_runs_summary'
}
''' Mapping of trajectory config names to the tables'''
PR_ATTR_NAME_MAPPING = {
'_derived_parameters_per_run': 'derived_parameters_per_run',
'_results_per_run': 'results_per_run',
'_purge_duplicate_comments': 'purge_duplicate_comments',
'_overview_explored_parameters_runs': 'explored_parameters_runs'
}
'''Mapping of Attribute names for hdf5_settings table'''
ATTR_LIST = [
'complevel',
'complib',
'shuffle',
'fletcher32',
'pandas_format',
'pandas_append',
'encoding'
]
'''List of HDF5StorageService Attributes that have to be stored into the hdf5_settings table'''
### Storing Data Constants
STORAGE_TYPE = 'SRVC_STORE'
'''Flag, how data was stored'''
ARRAY = 'ARRAY'
'''Stored as array_
.. _array: http://pytables.github.io/usersguide/libref/homogenous_storage.html#the-array-class
'''
CARRAY = 'CARRAY'
'''Stored as carray_
.. _carray: http://pytables.github.io/usersguide/libref/homogenous_storage.html#the-carray-class
'''
EARRAY = 'EARRAY' # not supported yet
''' Stored as earray_
Not supported yet, maybe in near future.
.. _earray: http://pytables.github.io/usersguide/libref/homogenous_storage.html#the-earray-class
'''
VLARRAY = 'VLARRAY' # not supported yet
'''Stored as vlarray_
Not supported yet, maybe in near future.
.. _vlarray: http://pytables.github.io/usersguide/libref/homogenous_storage.html#the-vlarray-class
'''
DICT = 'DICT'
''' Stored as dict.
In fact, stored as pytable, but the dictionary wil be reconstructed.
'''
TABLE = 'TABLE'
'''Stored as pytable_
.. _pytable: http://pytables.github.io/usersguide/libref/structured_storage.html#the-table-class
'''
FRAME = 'FRAME'
''' Stored as pandas DataFrame_
.. _DataFrame: http://pandas.pydata.org/pandas-docs/dev/io.html#hdf5-pytables
'''
SERIES = 'SERIES'
''' Store data as pandas Series '''
PANEL = 'PANEL'
''' Store data as pandas Panel(4D) '''
SPLIT_TABLE = 'SPLIT_TABLE'
''' If a table was split due to too many columns'''
DATATYPE_TABLE = 'DATATYPE_TABLE'
'''If a table contains the data types instead of the attrs'''
TYPE_FLAG_MAPPING = {
ObjectTable: TABLE,
list: ARRAY,
tuple: ARRAY,
dict: DICT,
np.ndarray: CARRAY,
np.matrix: CARRAY,
DataFrame: FRAME,
Series: SERIES,
Panel: PANEL,
Panel4D: PANEL
}
''' Mapping from object type to storage flag'''
# Python native data should alwys be stored as an ARRAY
for item in pypetconstants.PARAMETER_SUPPORTED_DATA:
TYPE_FLAG_MAPPING[item] = ARRAY
FORMATTED_COLUMN_PREFIX = 'SRVC_COLUMN_%s_'
''' Stores data type of a specific pytables column for perfect reconstruction'''
DATA_PREFIX = 'SRVC_DATA_'
''' Stores data type of a pytables carray or array for perfect reconstruction'''
# ANNOTATION CONSTANTS
ANNOTATION_PREFIX = 'SRVC_AN_'
''' Prefix to store annotations as node attributes_
.. _attributes: http://pytables.github.io/usersguide/libref/declarative_classes.html#the-attributeset-class
'''
ANNOTATED = 'SRVC_ANNOTATED'
''' Whether an item was annotated'''
# Stuff necessary to construct parameters and result
INIT_PREFIX = 'SRVC_INIT_'
''' Hdf5 attribute prefix to store class name of parameter or result'''
CLASS_NAME = INIT_PREFIX + 'CLASS_NAME'
''' Name of a parameter or result class, is converted to a constructor'''
COMMENT = INIT_PREFIX + 'COMMENT'
''' Comment of parameter or result'''
LENGTH = INIT_PREFIX + 'LENGTH'
''' Length of a parameter if it is explored'''
LEAF = 'SRVC_LEAF'
''' Whether an hdf5 node is a leaf node'''
def __init__(self, filename=None, file_title='Experiment', display_time=30):
self._filename = filename
self._file_title = file_title
self._trajectory_name = None
self._trajectory_index = None
self._hdf5file = None
self._hdf5store = None
self._trajectory_group = None # link to the top group in hdf5 file which is the start
# node of a trajectory
# remembers whether to purge duplicate comments
self._set_logger()
self._filters = None
self._complevel = 9
self._complib = 'zlib'
self._fletcher32 = False
self._shuffle = True
self._encoding = 'utf8'
self._node_processing_timer = None
self._display_time = display_time
self._pandas_append = False
self._pandas_format = 'fixed'
self._purge_duplicate_comments = False
self._results_per_run = False
self._derived_parameters_per_run = False
self._overview_parameters = False
self._overview_config = False
self._overview_explored_parameters = False
self._overview_explored_parameters_runs = False
self._overview_derived_parameters_trajectory = False
self._overview_derived_parameters_runs = False
self._overview_derived_parameters_runs_summary = False
self._overview_results_trajectory = False
self._overview_results_runs = False
self._overview_results_runs_summary = False
self._mode = None
# We don't want the NN warnings of pytables to display because they can be
# annoying as hell
warnings.simplefilter('ignore', pt.NaturalNameWarning)
@property
def encoding(self):
""" How unicode strings are encoded"""
return self._encoding
@encoding.setter
[docs] def encoding(self, encoding):
self._encoding = encoding
@property
def display_time(self):
"""Time interval in seconds, when to display the storage or loading of nodes"""
return self._display_time
@display_time.setter
[docs] def display_time(self, display_time):
self._display_time = display_time
@property
def complib(self):
"""Compression library used"""
return self._complib
@complib.setter
[docs] def complib(self, complib):
self._complib = complib
self._filters = None
@property
def complevel(self):
"""Compression level used"""
return self._complevel
@complevel.setter
[docs] def complevel(self, complevel):
self._complevel = complevel
self._filters = None
@property
def fletcher32(self):
""" Whether fletcher 32 should be used """
return self._fletcher32
@fletcher32.setter
[docs] def fletcher32(self, fletcher32):
self._fletcher32 = bool(fletcher32)
self._filters = None
@property
def shuffle(self):
""" Whether shuffle filtering should be used"""
return self._shuffle
@shuffle.setter
[docs] def shuffle(self, shuffle):
self._shuffle = bool(shuffle)
self._filters = None
@property
def pandas_append(self):
""" If pandas should create storage in append mode"""
return self._pandas_append
@pandas_append.setter
[docs] def pandas_append(self, pandas_append):
self._pandas_append = bool(pandas_append)
@property
def pandas_format(self):
"""Format of pandas data. Applicable formats are 'table' (or 't') and 'fixed' (or 'f')"""
return self._pandas_format
@pandas_format.setter
@property
def filename(self):
"""The name and path of the underlying hdf5 file."""
return self._filename
@filename.setter
[docs] def filename(self, filename):
self._filename = filename
@property
def _overview_group(self):
"""Direct link to the overview group"""
return self._all_create_or_get_groups('overview')[0]
def _get_filters(self):
"""Makes filter and closes pandas store"""
if self._filters is None:
self._filters = pt.Filters(complib=self._complib, complevel=self._complevel,
shuffle=self._shuffle, fletcher32=self._fletcher32)
self._hdf5store._filters = self._filters
self._hdf5store._complevel = self._complevel
self._hdf5store._complib = self._complib
self._hdf5store._fletcher32 = self._fletcher32
return self._filters
[docs] def load(self, msg, stuff_to_load, *args, **kwargs):
"""Loads a particular item from disk.
The storage service always accepts these parameters:
:param trajectory_name: Name of current trajectory and name of top node in hdf5 file.
:param trajectory_index:
If no `trajectory_name` is provided, you can specify an integer index.
The trajectory at the index position in the hdf5 file is considered to loaded.
Negative indices are also possible for reverse indexing.
:param filename: Name of the hdf5 file
The following messages (first argument msg) are understood and the following arguments
can be provided in combination with the message:
* :const:`pypet.pypetconstants.TRAJECTORY` ('TRAJECTORY')
Loads a trajectory.
:param stuff_to_load: The trajectory
:param as_new: Whether to load trajectory as new
:param load_parameters: How to load parameters and config
:param load_derived_parameters: How to load derived parameters
:param load_results: How to load results
:param force: Force load in case there is a pypet version mismatch
You can specify how to load the parameters, derived parameters and results
as follows:
:const:`pypet.pypetconstants.LOAD_NOTHING`: (0)
Nothing is loaded
:const:`pypet.pypetconstants.LOAD_SKELETON`: (1)
The skeleton including annotations are loaded, i.e. the items are empty.
Non-empty items in RAM are left untouched.
:const:`pypet.pypetconstants.LOAD_DATA`: (2)
The whole data is loaded.
Only empty or in RAM non-existing instance are filled with the
data found on disk.
:const:`pypet.pypetconstants.OVERWRITE_DATA`: (3)
The whole data is loaded.
If items that are to be loaded are already in RAM and not empty,
they are emptied and new data is loaded from disk.
* :const:`pypet.pypetconstants.LEAF` ('LEAF')
Loads a parameter or result.
:param stuff_to_load: The item to be loaded
:param load_only:
If you load a result, you can partially load it and ignore the
rest of the data. Just specify the name of the data you want to load.
You can also provide a list,
for example `load_only='spikes'`, `load_only=['spikes','membrane_potential']`.
Issues a warning if items cannot be found.
:param load_except:
If you load a result you can partially load in and specify items
that should NOT be loaded here. You cannot use `load_except` and
`load_only` at the same time.
* :const:`pypet.pypetconstants.TREE` ('TREE')
Loads a whole subtree
:param stuff_to_load: The parent node (!) not the one where loading starts!
:param child_name: Name of child node that should be loaded
:param recursive: Whether to load recursively the subtree below child
:param load_data:
How to load stuff, accepted values as above for loading the trajectory
:param trajectory: The trajectory object
* :const:`pypet.pypetconstants.LIST` ('LIST')
Analogous to :ref:`storing lists <store-lists>`
:raises:
NoSuchServiceError if message or data is not understood
DataNotInStorageError if data to be loaded cannot be found on disk
"""
opened = True
try:
self._srvc_extract_file_information(kwargs)
args = list(args)
opened = self._srvc_opening_routine('r')
if msg == pypetconstants.TRAJECTORY:
self._trj_load_trajectory(msg, stuff_to_load, *args, **kwargs)
elif msg == pypetconstants.LEAF:
self._prm_load_parameter_or_result(stuff_to_load, *args, **kwargs)
elif msg == pypetconstants.TREE:
self._tree_load_tree(stuff_to_load, *args, **kwargs)
elif msg == pypetconstants.LIST:
self._srvc_load_several_items(stuff_to_load, *args, **kwargs)
else:
raise pex.NoSuchServiceError('I do not know how to handle `%s`' % msg)
self._srvc_closing_routine(opened)
except pt.NoSuchNodeError as e:
self._srvc_closing_routine(opened)
self._logger.error('Failed loading `%s`' % str(stuff_to_load))
raise pex.DataNotInStorageError(str(e))
except:
self._srvc_closing_routine(opened)
self._logger.error('Failed loading `%s`' % str(stuff_to_load))
raise
[docs] def store(self, msg, stuff_to_store, *args, **kwargs):
""" Stores a particular item to disk.
The storage service always accepts these parameters:
:param trajectory_name: Name or current trajectory and name of top node in hdf5 file
:param filename: Name of the hdf5 file
:param file_title: If file needs to be created, assigns a title to the file.
The following messages (first argument msg) are understood and the following arguments
can be provided in combination with the message:
* :const:`pypet.pypetconstants.PREPARE_MERGE` ('PREPARE_MERGE'):
Called to prepare a trajectory for merging, see also 'MERGE' below.
Will also be called if merging cannot happen within the same hdf5 file.
Stores already enlarged parameters and updates meta information.
:param stuff_to_store: Trajectory that is about to be extended by another one
:param changed_parameters:
List containing all parameters that were enlarged due to merging
* :const:`pypet.pypetconstants.MERGE` ('MERGE')
Note that before merging within HDF5 file, the storage service will be called
with msg='PREPARE_MERGE' before, see above.
Raises a ValueError if the two trajectories are not stored within the very
same hdf5 file. Then the current trajectory needs to perform the merge slowly
item by item.
Merges two trajectories, parameters are:
:param stuff_to_store: The trajectory data is merged into
:param other_trajectory_name: Name of the other trajectory
:param rename_dict:
Dictionary containing the old result and derived parameter names in the
other trajectory and their new names in the current trajectory.
:param move_nodes:
Whether to move the nodes from the other to the current trajectory
:param delete_trajectory:
Whether to delete the other trajectory after merging.
* :const:`pypet.pypetconstants.BACKUP` ('BACKUP')
:param stuff_to_store: Trajectory to be backed up
:param backup_filename:
Name of file where to store the backup. If None the backup file will be in
the same folder as your hdf5 file and named 'backup_XXXXX.hdf5'
where 'XXXXX' is the name of your current trajectory.
* :const:`pypet.pypetconstants.TRAJECTORY` ('TRAJECTORY')
Stores the whole trajectory
:param stuff_to_store: The trajectory to be stored
:param only_init:
If you just want to initialise the store. If yes, only meta information about
the trajectory is stored and none of the nodes/leaves within the trajectory.
* :const:`pypet.pypetconstants.SINGLE_RUN` ('SINGLE_RUN')
:param stuff_to_store: The single run to be stored
:param store_data: If all data below `run_XXXXXXXX` should be stored
:param store_final: If final meta info should be stored
* :const:`pypet.pypetconstants.LEAF`
Stores a parameter or result.
Modification of results is not supported (yet). Everything stored to disk is
set in stone!
Note that everything that is supported by the storage service and that is
stored to disk will be perfectly recovered.
For instance, you store a tuple of numpy 32 bit integers, you will get a tuple
of numpy 32 bit integers after loading independent of the platform!
:param stuff_to_sore: Result or parameter to store
In order to determine what to store, the function '_store' of the parameter or
result is called. This function returns a dictionary with name keys and data to
store as values. In order to determine how to store the data, the storage flags
are considered, see below.
The function '_store' has to return a dictionary containing values only from
the following objects:
* python natives (int, long, str, bool, float, complex),
*
numpy natives, arrays and matrices of type np.int8-64, np.uint8-64,
np.float32-64, np.complex, np.str
*
python lists and tuples of the previous types
(python natives + numpy natives and arrays)
Lists and tuples are not allowed to be nested and must be
homogeneous, i.e. only contain data of one particular type.
Only integers, or only floats, etc.
*
python dictionaries of the previous types (not nested!), data can be
heterogeneous, keys must be strings. For example, one key-value-pair
of string and int and one key-value pair of string and float, and so
on.
* pandas DataFrames_
* :class:`~pypet.parameter.ObjectTable`
.. _DataFrames: http://pandas.pydata.org/pandas-docs/dev/dsintro.html#dataframe
The keys from the '_store' dictionaries determine how the data will be named
in the hdf5 file.
:param store_flags: Flags describing how to store data.
:const:`~pypet.HDF5StorageService.ARRAY` ('ARRAY')
Store stuff as array
:const:`~pypet.HDF5StorageService.CARRAY` ('CARRAY')
Store stuff as carray
:const:`~pypet.HDF5StorageService.TABLE` ('TABLE')
Store stuff as pytable
:const:`~pypet.HDF5StorageService.DICT` ('DICT')
Store stuff as pytable but reconstructs it later as dictionary
on loading
:const:`~pypet.HDF%StorageService.FRAME` ('FRAME')
Store stuff as pandas data frame
Storage flags can also be provided by the parameters and results themselves
if they implement a function '_store_flags' that returns a dictionary
with the names of the data to store as keys and the flags as values.
If no storage flags are provided, they are automatically inferred from the
data. See :const:`pypet.HDF5StorageService.TYPE_FLAG_MAPPING` for the mapping
from type to flag.
:param overwrite:
Can be used if parts of a leaf should be replaced. Either a list of
HDF5 names or `True` if this should account for all.
* :const:`pypet.pypetconstants.DELETE` ('DELETE')
Removes an item from disk. Empty group nodes, results and non-explored
parameters can be removed.
:param stuff_to_store: The item to be removed.
:param remove_empty_groups:
Whether to also remove groups that become empty due to removal.
default is False.
:param delete_only:
Potential list of parts of a leaf node that should be deleted.
:param remove_from_item:
If `delete_only` is used, whether deleted nodes should also be erased
from the leaf nodes themseleves.
* :const:`pypet.pypetconstants.GROUP` ('GROUP')
:param stuff_to_store: The group to store
* :const:`pypet.pypetconstants.TREE`
Stores a single node or a full subtree
:param stuff_to_store: Node to store
:param recursive: Whether to store recursively the whole sub-tree
* :const:`pypet.pypetconstants.LIST`
.. _store-lists:
Stores several items at once
:param stuff_to_store:
Iterable whose items are to be stored. Iterable must contain tuples,
for example `[(msg1,item1,arg1,kwargs1),(msg2,item2,arg2,kwargs2),...]`
:raises: NoSuchServiceError if message or data is not understood
"""
opened = True
try:
self._srvc_extract_file_information(kwargs)
args = list(args)
opened = self._srvc_opening_routine('a', msg)
if msg == pypetconstants.MERGE:
self._trj_merge_trajectories(*args, **kwargs)
elif msg == pypetconstants.BACKUP:
self._trj_backup_trajectory(stuff_to_store, *args, **kwargs)
elif msg == pypetconstants.PREPARE_MERGE:
self._trj_prepare_merge(stuff_to_store, *args, **kwargs)
elif msg == pypetconstants.TRAJECTORY:
self._trj_store_trajectory(stuff_to_store, *args, **kwargs)
elif msg == pypetconstants.SINGLE_RUN:
self._srn_store_single_run(stuff_to_store, *args, **kwargs)
elif msg in pypetconstants.LEAF:
self._prm_store_parameter_or_result(msg, stuff_to_store, *args, **kwargs)
elif msg == pypetconstants.DELETE:
self._all_delete_parameter_or_result_or_group(stuff_to_store, *args, **kwargs)
elif msg == pypetconstants.GROUP:
self._grp_store_group(stuff_to_store, *args, **kwargs)
elif msg == pypetconstants.TREE:
self._tree_store_tree(stuff_to_store, *args, **kwargs)
elif msg == pypetconstants.LIST:
self._srvc_store_several_items(stuff_to_store, *args, **kwargs)
else:
raise pex.NoSuchServiceError('I do not know how to handle `%s`' % msg)
self._srvc_closing_routine(opened)
except:
self._srvc_closing_routine(opened)
self._logger.error('Failed storing `%s`' % str(stuff_to_store))
raise
def _srvc_load_several_items(self, iterable, *args, **kwargs):
"""Loads several items from an iterable
Iterables are supposed to be of a format like `[(msg, item, args, kwarg),...]`
If `args` and `kwargs` are not part of a tuple, they are taken from the
current `args` and `kwargs` provided to this function.
"""
for input_tuple in iterable:
msg = input_tuple[0]
item = input_tuple[1]
if len(input_tuple) > 2:
args = input_tuple[2]
if len(input_tuple) > 3:
kwargs = input_tuple[3]
if len(input_tuple) > 4:
raise RuntimeError('You shall not pass!')
self.load(msg, item, *args, **kwargs)
def _srvc_check_hdf_properties(self, traj):
"""Reads out the properties for storing new data into the hdf5file
:param traj:
The trajectory
"""
for attr_name in HDF5StorageService.ATTR_LIST:
try:
config = traj.f_get('config.hdf5.' + attr_name).f_get()
setattr(self, attr_name, config)
except AttributeError:
self._logger.warning('Could not find `%s` in traj, '
'using default value.' % attr_name)
for attr_name, table_name in HDF5StorageService.NAME_TABLE_MAPPING.items():
try:
config = traj.f_get('config.hdf5.overview.' + table_name).f_get()
setattr(self, attr_name, config)
except AttributeError:
self._logger.warning('Could not find `%s` in traj, '
'using default value.' % table_name)
for attr_name, name in HDF5StorageService.PR_ATTR_NAME_MAPPING.items():
try:
config = traj.f_get('config.hdf5.' + name).f_get()
setattr(self, attr_name, config)
except AttributeError:
self._logger.warning('Could not find `%s` in traj, '
'using default value.' % name)
self._filters = None
def _srvc_store_several_items(self, iterable, *args, **kwargs):
"""Stores several items from an iterable
Iterables are supposed to be of a format like `[(msg, item, args, kwarg),...]`
If `args` and `kwargs` are not part of a tuple, they are taken from the
current `args` and `kwargs` provided to this function.
"""
for input_tuple in iterable:
msg = input_tuple[0]
item = input_tuple[1]
if len(input_tuple) > 2:
args = input_tuple[2]
if len(input_tuple) > 3:
kwargs = input_tuple[3]
if len(input_tuple) > 4:
raise RuntimeError('You shall not pass!')
self.store(msg, item, *args, **kwargs)
def _srvc_opening_routine(self, mode, msg=None):
"""Opens an hdf5 file for reading or writing
The file is only opened if it has not been opened before (i.e. `self._hdf5file is None`).
:param mode:
'w' for writing
'a' for appending
'r' for reading
Unfortunately, pandas currently does not work with read-only mode.
Thus, if mode is chosen to be 'r', the file will still be opened in
append mode.
:param msg:
Message provided to `load` or `store`. Only considered to check if a trajectory
was stored before.
:return:
`True` if file is opened
`False` if the file was already open before calling this function
"""
self._mode = mode
if self._hdf5file is None:
if 'a' in mode or 'w' in mode:
(path, filename) = os.path.split(self._filename)
if not os.path.exists(path):
os.makedirs(path)
self._hdf5store = HDFStore(self._filename, self._mode, complib=self._complib,
complevel=self._complevel, fletcher32=self._fletcher32)
self._hdf5file = self._hdf5store._handle
self._hdf5file.title = self._file_title
if not ('/' + self._trajectory_name) in self._hdf5file:
# If we want to store individual items we we have to check if the
# trajectory has been stored before
if not msg == pypetconstants.TRAJECTORY:
raise ValueError('Your trajectory cannot be found in the hdf5file, '
'please use >>traj.f_store()<< '
'before storing anyhting else.')
# If we want to store a trajectory it has not been stored before
# create a new trajectory group
ptcompat.create_group(self._hdf5file, where='/', name=self._trajectory_name,
title=self._trajectory_name)
# Store a reference to the top trajectory node
self._trajectory_group = ptcompat.get_node(self._hdf5file,
'/' + self._trajectory_name)
elif mode == 'r':
if not self._trajectory_name is None and not self._trajectory_index is None:
raise ValueError('Please specify either a name of a trajectory or an index, '
'but not both at the same time.')
if not os.path.isfile(self._filename):
raise ValueError('File `' + self._filename + '` does not exist.')
self._hdf5store = HDFStore(self._filename, self._mode, complib=self._complib,
complevel=self._complevel, fletcher32=self._fletcher32)
self._hdf5file = self._hdf5store._handle
if not self._trajectory_index is None:
# If an index is provided pick the trajectory at the corresponding
# position in the trajectory node list
nodelist = ptcompat.list_nodes(self._hdf5file, where='/')
if (self._trajectory_index >= len(nodelist) or
self._trajectory_index < -len(nodelist)):
raise ValueError('Trajectory No. %d does not exists, there are only '
'%d trajectories in %s.'
% (self._trajectory_index, len(nodelist), self._filename))
self._trajectory_group = nodelist[self._trajectory_index]
self._trajectory_name = self._trajectory_group._v_name
elif not self._trajectory_name is None:
# Otherwise pick the trajectory group by name
if not ('/' + self._trajectory_name) in self._hdf5file:
raise ValueError('File %s does not contain trajectory %s.'
% (self._filename, self._trajectory_name))
self._trajectory_group = ptcompat.get_node(self._hdf5file,
'/' + self._trajectory_name)
else:
raise ValueError('Please specify a name of a trajectory to load or its '
'index, otherwise I cannot open one.')
else:
raise RuntimeError('You shall not pass!')
self._node_processing_timer = NodeProcessingTimer(display_time=self._display_time,
logger_name=self._logger.name)
return True
else:
return False
def _srvc_closing_routine(self, closing):
"""Routine to close an hdf5 file
The file is closed only when `closing=True`. `closing=True` means that
the file was opened in the current highest recursion level. This prevents re-opening
and closing of the file if `store` or `load` are called recursively.
"""
if closing and self._hdf5file is not None and self._hdf5file.isopen:
f_fd = self._hdf5file.fileno()
self._hdf5file.flush()
os.fsync(f_fd)
try:
self._hdf5store.flush(fsync=True)
except TypeError:
f_fd = self._hdf5store._handle.fileno()
self._hdf5store.flush()
os.fsync(f_fd)
self._hdf5store.close()
self._hdf5store = None
self._hdf5file = None
self._trajectory_group = None
self._trajectory_name = None
self._trajectory_index = None
return True
else:
return False
def _srvc_extract_file_information(self, kwargs):
"""Extracts file information from kwargs.
Note that `kwargs` is not passed as `**kwargs` in order to also
`pop` the elements on the level of the function calling `_srvc_extract_file_information`.
"""
if 'filename' in kwargs:
self._filename = kwargs.pop('filename')
if 'file_title' in kwargs:
self._file_title = kwargs.pop('file_title')
if 'trajectory_name' in kwargs:
self._trajectory_name = kwargs.pop('trajectory_name')
if 'trajectory_index' in kwargs:
self._trajectory_index = kwargs.pop('trajectory_index')
########################### Merging ###########################################################
def _trj_backup_trajectory(self, traj, backup_filename=None):
"""Backs up a trajectory.
:param traj: Trajectory that should be backed up
:param backup_filename:
Path and filename of backup file. If None is specified the storage service
defaults to `path_to_trajectory_hdf5_file/backup_trajectory_name.hdf`.
"""
self._logger.info('Storing backup of %s.' % traj.v_name)
mypath, filename = os.path.split(self._filename)
if backup_filename is None:
backup_filename = '%s/backup_%s.hdf5' % (mypath, traj.v_name)
backup_hdf5file = ptcompat.open_file(filename=backup_filename,
mode='a', title=backup_filename)
if ('/' + self._trajectory_name) in backup_hdf5file:
raise ValueError('I cannot backup `%s` into file `%s`, there is already a '
'trajectory with that name.' % (traj.v_name, backup_filename))
backup_root = backup_hdf5file.root
self._trajectory_group._f_copy(newparent=backup_root, recursive=True)
backup_hdf5file.flush()
backup_hdf5file.close()
self._logger.info('Finished backup of %s.' % traj.v_name)
def _trj_copy_table_entries(self, rename_dict, other_trajectory_name):
"""Copy the overview table entries from another trajectory into the current one.
:param rename_dict:
Dictionary with old names (keys) and new names (values).
:param other_trajectory_name:
Name of other trajectory
"""
self._trj_copy_table_entries_from_table_name('derived_parameters_runs', rename_dict,
other_trajectory_name)
self._trj_copy_table_entries_from_table_name('results_trajectory', rename_dict,
other_trajectory_name)
self._trj_copy_table_entries_from_table_name('results_runs', rename_dict,
other_trajectory_name)
self._trj_copy_summary_table_entries_from_table_name('results_runs_summary', rename_dict,
other_trajectory_name)
self._trj_copy_summary_table_entries_from_table_name('derived_parameters_runs_summary',
rename_dict, other_trajectory_name)
def _trj_copy_summary_table_entries_from_table_name(self, tablename, rename_dict,
other_trajectory_name):
"""Copy a the summary table entries from another trajectory into the current one
:param tablename:
Name of overview table
:param rename_dict:
Dictionary with old names (keys) and new names (values).
:param other_trajectory_name:
Name of other trajectory
"""
count_dict = {} # Dictionary containing the number of items that are merged for
# a particular result summary
for old_name in rename_dict:
# Check for the summary entry in the other trajectory
# In the overview table location literally contains `run_XXXXXXXX`
run_mask = pypetconstants.RUN_NAME + 'X' * pypetconstants.FORMAT_ZEROS
old_split_name = old_name.split('.')
for idx, name in enumerate(old_split_name):
add_to_count = False
if (name.startswith(pypetconstants.RUN_NAME) and
name != pypetconstants.RUN_NAME_DUMMY):
old_split_name[idx] = run_mask
add_to_count = True
break
if add_to_count:
old_mask_name = '.'.join(old_split_name)
if not old_mask_name in count_dict:
count_dict[old_mask_name] = 0
count_dict[old_mask_name] += 1
try:
# get the other table
other_table = ptcompat.get_node(self._hdf5file,
'/' + other_trajectory_name + '/overview/' + tablename)
new_row_dict = {} # Dictionary with full names as keys and summary row dicts as values
for row in other_table:
# Iterate through the rows of the overview table
location = compat.tostrtype(row['location'])
name = compat.tostrtype(row['name'])
full_name = location + '.' + name
# If we need the overview entry mark it for merging
if full_name in count_dict:
new_row_dict[full_name] = self._trj_read_out_row(other_table.colnames, row)
# Get the summary table in the current trajectory
table = ptcompat.get_node(self._hdf5file,
'/' + self._trajectory_name + '/overview/' + tablename)
for row in table:
# Iterate through the current rows
location = compat.tostrtype(row['location'])
name = compat.tostrtype(row['name'])
full_name = location + '.' + name
# Update the number of items according to the number or merged items
if full_name in new_row_dict:
row['number_of_items'] = row['number_of_items'] + count_dict[full_name]
# Delete used row dicts
del new_row_dict[full_name]
row.update()
table.flush()
self._hdf5file.flush()
# Finally we need to create new rows for results that are part of the other
# trajectory but which could not be found in the current one
for key in sorted(new_row_dict.keys()):
not_inserted_row = new_row_dict[key]
new_row = table.row
for col_name in table.colnames:
# This is to allow backwards compatibility
if (col_name == 'example_item_run_name' and
not col_name in other_table.colnames):
continue
if col_name == 'example_item_run_name':
# The example item run name has changed due to merging
old_run_idx = not_inserted_row[col_name]
old_run_name = pypetconstants.FORMATTED_RUN_NAME % old_run_idx
new_run_name = rename_dict[old_run_name]
new_run_idx = int(new_run_name.split(pypetconstants.RUN_NAME)[1])
new_row[col_name] = new_run_idx
else:
new_row[col_name] = not_inserted_row[col_name]
new_row.append()
table.flush()
self._hdf5file.flush()
except pt.NoSuchNodeError:
self._logger.info('Did not find table `%s` in one of the trajectories,'
' skipped copying.' % tablename)
@staticmethod
def _trj_read_out_row(colnames, row):
"""Reads out a row and returns a dictionary containing the row content.
:param colnames: List of column names
:param row: A pytables table row
:return: A dictionary with colnames as keys and content as values
"""
result_dict = {}
for colname in colnames:
result_dict[colname] = row[colname]
return result_dict
def _trj_copy_table_entries_from_table_name(self, tablename, rename_dict,
other_trajectory_name):
"""Copy overview tables (not summary) from other trajectory into the current one.
:param tablename:
Name of overview table
:param rename_dict:
Dictionary with old names (keys) and new names (values).
:param other_trajectory_name:
Name of other trajectory
"""
try:
other_table = ptcompat.get_node(self._hdf5file,
'/' + other_trajectory_name + '/overview/' + tablename)
new_row_dict = {}
for row in other_table.iterrows():
# Iterate through the summary table
location = compat.tostrtype(row['location'])
name = compat.tostrtype(row['name'])
full_name = location + '.' + name
if full_name in rename_dict:
# If the item is marked for merge we need to copy its overview info
read_out_row = self._trj_read_out_row(other_table.colnames, row)
new_location = rename_dict[full_name].split('.')
new_location = '.'.join(new_location[0:-1])
read_out_row['location'] = new_location
new_row_dict[rename_dict[full_name]] = read_out_row
table = ptcompat.get_node(self._hdf5file,
'/' + self._trajectory_name + '/overview/' + tablename)
# Insert data into the current overview table
for row in table.iterrows():
location = compat.tostrtype(row['location'])
name = compat.tostrtype(row['name'])
full_name = location + '.' + name
if full_name in new_row_dict:
for col_name in table.colnames:
row[col_name] = new_row_dict[full_name][col_name]
del new_row_dict[full_name]
row.update()
table.flush()
self._hdf5file.flush()
# It may be the case that the we need to insert a new row
for key in sorted(new_row_dict.keys()):
not_inserted_row = new_row_dict[key]
new_row = table.row
for col_name in table.colnames:
new_row[col_name] = not_inserted_row[col_name]
new_row.append()
table.flush()
self._hdf5file.flush()
except pt.NoSuchNodeError:
self._logger.info('Did not find table `%s` in one of the trajectories,'
' skipped copying.' % tablename)
def _trj_merge_trajectories(self, other_trajectory_name, rename_dict, move_nodes=False,
delete_trajectory=False):
"""Merges another trajectory into the current trajectory (as in self._trajectory_name).
:param other_trajectory_name: Name of other trajectory
:param rename_dict: Dictionary with old names (keys) and new names (values).
:param move_nodes: Whether to move hdf5 nodes or copy them
:param delete_trajectory: Whether to delete the other trajectory
"""
if not ('/' + other_trajectory_name) in self._hdf5file:
raise ValueError('Cannot merge `%s` and `%s`, because the second trajectory cannot '
'be found in my file.')
for old_name in rename_dict:
new_name = rename_dict[old_name]
# Iterate over all items that need to be merged
split_name = old_name.split('.')
old_location = '/' + other_trajectory_name + '/' + '/'.join(split_name)
split_name = new_name.split('.')
new_location = '/' + self._trajectory_name + '/' + '/'.join(split_name)
# Get the data from the other trajectory
old_group = ptcompat.get_node(self._hdf5file, old_location)
for node in old_group:
# Now move or copy the data
if move_nodes:
ptcompat.move_node(self._hdf5file,
where=old_location, newparent=new_location,
name=node._v_name, createparents=True)
else:
ptcompat.copy_node(self._hdf5file,
where=old_location, newparent=new_location,
name=node._v_name, createparents=True,
recursive=True)
# And finally copy the attributes of leaf nodes
# Attributes of group nodes are NOT copied, this has to be done
# by the trajectory
old_group._v_attrs._f_copy(where=ptcompat.get_node(self._hdf5file, new_location))
self._trj_copy_table_entries(rename_dict, other_trajectory_name)
if delete_trajectory:
ptcompat.remove_node(self._hdf5file,
where='/', name=other_trajectory_name, recursive=True)
def _trj_prepare_merge(self, traj, changed_parameters):
"""Prepares a trajectory for merging.
This function will already store extended parameters.
:param traj: Target of merge
:param changed_parameters: List of extended parameters (i.e. their names).
"""
if not traj._stored:
traj.f_store()
# Update meta information
infotable = getattr(self._overview_group, 'info')
insert_dict = self._all_extract_insert_dict(traj, infotable.colnames)
self._all_add_or_modify_row(traj.v_name, insert_dict, infotable, index=0,
flags=(HDF5StorageService.MODIFY_ROW,))
# Store extended parameters
for param_name in changed_parameters:
param = traj.f_get(param_name)
# First we delete the parameter
try:
self.store(pypetconstants.DELETE, param)
except pt.NoSuchNodeError:
pass # We can end up here if the parameter was never stored
# And then we add it again
self.store(pypetconstants.LEAF, param)
# Increase the run table by the number of new runs
run_table = getattr(self._overview_group, 'runs')
actual_rows = run_table.nrows
self._all_fill_run_table_with_dummys(actual_rows, len(traj))
add_table = self._overview_explored_parameters_runs
# Extract parameter summary and if necessary create new explored parameter tables
# in the result groups
for run_name in traj.f_get_run_names():
run_info = traj.f_get_run_information(run_name)
run_info['name'] = run_name
idx = run_info['idx']
traj._set_explored_parameters_to_idx(idx)
create_run_group = ('results.runs.%s' % run_name) in traj
run_summary = self._srn_add_explored_params(run_name,
compat.listvalues(
traj._explored_parameters),
add_table,
create_run_group=create_run_group)
run_info['parameter_summary'] = run_summary
self._all_add_or_modify_row(run_name, run_info, run_table, index=idx,
flags=(HDF5StorageService.MODIFY_ROW,))
traj.f_restore_default()
######################## Loading a Trajectory #################################################
def _trj_load_trajectory(self, msg, traj, as_new, load_parameters, load_derived_parameters,
load_results, load_other_data, force):
"""Loads a single trajectory from a given file.
:param traj: The trajectory
:param as_new: Whether to load trajectory as new
:param load_parameters: How to load parameters and config
:param load_derived_parameters: How to load derived parameters
:param load_results: How to load results
:param load_other_data: How to load anything not within the four subbranches
:param force: Force load in case there is a pypet version mismatch
You can specify how to load the parameters, derived parameters and results
as follows:
:const:`pypet.pypetconstants.LOAD_NOTHING`: (0)
Nothing is loaded
:const:`pypet.pypetconstants.LOAD_SKELETON`: (1)
The skeleton including annotations are loaded, i.e. the items are empty.
Non-empty items in RAM are left untouched.
:const:`pypet.pypetconstants.LOAD_DATA`: (2)
The whole data is loaded.
Only empty or in RAM non-existing instance are filled with the
data found on disk.
:const:`pypet.pypetconstants.OVERWRITE_DATA`: (3)
The whole data is loaded.
If items that are to be loaded are already in RAM and not empty,
they are emptied and new data is loaded from disk.
If `as_new=True` the old trajectory is loaded into the new one, only parameters can be
loaded. If `as_new=False` the current trajectory is completely replaced by the one
on disk, i.e. the name from disk, the timestamp, etc. are assigned to `traj`.
"""
# Some validity checks, if `as_new` is used correctly
if (as_new and (load_derived_parameters != pypetconstants.LOAD_NOTHING or load_results !=
pypetconstants.LOAD_NOTHING or
load_other_data != pypetconstants.LOAD_NOTHING)):
raise ValueError('You cannot load a trajectory as new and load the derived '
'parameters and results. Only parameters are allowed.')
if as_new and load_parameters != pypetconstants.LOAD_DATA:
raise ValueError('You cannot load the trajectory as new and not load the data of '
'the parameters.')
loadconstants = (pypetconstants.LOAD_NOTHING, pypetconstants.LOAD_SKELETON,
pypetconstants.LOAD_DATA, pypetconstants.OVERWRITE_DATA)
if not (load_parameters in loadconstants and load_derived_parameters in loadconstants and
load_results in loadconstants and load_other_data in loadconstants):
raise ValueError('Please give a valid option on how to load data. Options for '
'`load_parameter`, `load_derived_parameters`, `load_results`, '
'and `load_other_data` are %s. See function documentation for '
'the semantics of the values.' % str(loadconstants))
traj._stored = not as_new
# Loads meta data like the name, timestamps etc.
self._trj_load_meta_data(traj, as_new, force)
if (load_parameters != pypetconstants.LOAD_NOTHING or
load_derived_parameters != pypetconstants.LOAD_NOTHING or
load_results != pypetconstants.LOAD_NOTHING or
load_other_data != pypetconstants.LOAD_NOTHING):
self._logger.info('Loading trajectory `%s`.' % traj.v_name)
else:
self._logger.info('Checked meta data of trajectory `%s`.' % traj.v_name)
# Load the annotations in case they have not been loaded before
if traj.v_annotations.f_is_empty():
self._ann_load_annotations(traj, self._trajectory_group)
maximum_display_other = 10
counter = 0
children = self._trajectory_group._v_children
for hdf5group_name in children:
hdf5group = children[hdf5group_name]
what = hdf5group._v_name
load_subbranch = True
if what == 'config':
loading = load_parameters
elif what == 'parameters':
loading = load_parameters
elif what == 'results':
loading = load_results
elif what == 'derived_parameters':
loading = load_derived_parameters
elif what == 'overview':
continue
else:
loading = load_other_data
load_subbranch = False
if load_subbranch:
# If the trajectory is loaded as new, we don't care about old config stuff
# and only load the parameters
if as_new and what == 'config':
loading = pypetconstants.LOAD_NOTHING
# Load the subbranches recursively
if loading != pypetconstants.LOAD_NOTHING:
self._logger.info('Loading branch `%s` in mode `%s`.' % (what, str(loading)))
self._tree_load_sub_branch(traj, traj, what, self._trajectory_group, loading,
recursive=True,
as_new=as_new)
else:
if loading != pypetconstants.LOAD_NOTHING:
counter += 1
if counter <= maximum_display_other:
self._logger.info(
'Loading branch/node `%s` in mode `%s`.' % (what, str(loading)))
if counter == maximum_display_other:
self._logger.info('To many branchs or nodes at root for display. '
'I will not inform you about loading anymore. '
'Branches are loaded silently in the background. '
'Do not worry, I will not freeze! Pinky promise!!!')
self._tree_load_nodes(traj, traj, hdf5group, loading, recursive=True)
def _trj_load_meta_data(self, traj, as_new, force):
"""Loads meta information about the trajectory
Checks if the version number does not differ from current pypet version
Loads, comment, timestamp, name, version from disk in case trajectory is not loaded
as new. Updates the run information as well.
"""
metatable = self._overview_group.info
metarow = metatable[0]
try:
version = compat.tostrtype(metarow['version'])
except KeyError as ke:
self._logger.error('Could not check version due to: %s' % str(ke))
version = '`COULD NOT BE LOADED`'
try:
python = compat.tostrtype(metarow['python'])
except KeyError as ke:
self._logger.error('Could not check version due to: %s' % str(ke))
python = '`COULD NOT BE LOADED`'
self._trj_check_version(version, python, force)
if as_new:
length = int(metarow['length'])
for irun in range(length):
traj._add_run_info(irun)
else:
traj._comment = compat.tostrtype(metarow['comment'])
traj._timestamp = float(metarow['timestamp'])
traj._trajectory_timestamp = traj._timestamp
traj._time = compat.tostrtype(metarow['time'])
traj._trajectory_time = traj._time
traj._name = compat.tostrtype(metarow['name'])
traj._trajectory_name = traj._name
traj._version = version
traj._python = python
single_run_table = self._overview_group.runs
# Load the run information about the single runs
for row in single_run_table.iterrows():
name = compat.tostrtype(row['name'])
idx = int(row['idx'])
timestamp = float(row['timestamp'])
time = compat.tostrtype(row['time'])
completed = int(row['completed'])
summary = compat.tostrtype(row['parameter_summary'])
hexsha = compat.tostrtype(row['short_environment_hexsha'])
# To allow backwards compatibility we need this try catch block
try:
runtime = compat.tostrtype(row['runtime'])
finish_timestamp = float(row['finish_timestamp'])
except KeyError as ke:
runtime = ''
finish_timestamp = 0.0
self._logger.warning('Could not load runtime, ' + repr(ke))
traj._single_run_ids[idx] = name
traj._single_run_ids[name] = idx
info_dict = {}
info_dict['idx'] = idx
info_dict['timestamp'] = timestamp
info_dict['time'] = time
info_dict['completed'] = completed
info_dict['name'] = name
info_dict['parameter_summary'] = summary
info_dict['short_environment_hexsha'] = hexsha
info_dict['runtime'] = runtime
info_dict['finish_timestamp'] = finish_timestamp
traj._run_information[name] = info_dict
# Load the hdf5 config data:
try:
if 'hdf5_settings' in self._overview_group:
hdf5_table = self._overview_group.hdf5_settings
hdf5_row = hdf5_table[0]
self.complib = compat.tostrtype(hdf5_row['complib'])
self.complevel = int(hdf5_row['complevel'])
self.shuffle = bool(hdf5_row['shuffle'])
self.fletcher32 = bool(hdf5_row['fletcher32'])
self.pandas_format = compat.tostrtype(hdf5_row['pandas_format'])
self.pandas_append = bool(hdf5_row['pandas_append'])
self.encoding = compat.tostrtype(hdf5_row['encoding'])
self._results_per_run = int(hdf5_row['results_per_run'])
self._derived_parameters_per_run = int(
hdf5_row['derived_parameters_per_run'])
self._purge_duplicate_comments = bool(hdf5_row['purge_duplicate_comments'])
self._overview_explored_parameters_runs = bool(
hdf5_row['explored_parameters_runs'])
for attr_name, table_name in self.NAME_TABLE_MAPPING.items():
attr_value = bool(hdf5_row[table_name])
setattr(self, attr_name, attr_value)
else:
self._logger.warning(
'Could not find `hdf5_settings` overview table. I will use the '
'standard settings (for `complib`, `complevel` etc.) instead.')
except Exception as e:
self._logger.error('Using default hdf5 settings, '
'could not extract hdf5 settings because of: %s' % str(e))
def _tree_load_sub_branch(self, traj, traj_node, branch_name, hdf5_group, load_data,
recursive=True, as_new=False):
"""Loads data starting from a node along a branch and starts recursively loading
all data at end of branch.
:param traj: The trajectory
:param traj_node: The node from where loading starts
:param branch_name:
A branch along which loading progresses. Colon Notation is used:
'group1.group2.group3' loads 'group1', then 'group2', then 'group3' and then finally
recursively all children and children's children below 'group3'
:param hdf5_group:
HDF5 node in the file corresponding to `traj_node`.
:param load_data:
How to load the data
:param recursive:
If loading recursively
:param as_new:
If trajectory is loaded as new
"""
split_names = branch_name.split('.')
final_group_name = split_names.pop()
for name in split_names:
# First load along the branch
hdf5_group = getattr(hdf5_group, name)
self._tree_load_nodes(traj, traj_node, hdf5_group, load_data, recursive=False,
as_new=as_new)
traj_node = traj_node._children[name]
# Then load recursively all data in the last group and below
hdf5_group = getattr(hdf5_group, final_group_name)
self._tree_load_nodes(traj, traj_node, hdf5_group, load_data, recursive=recursive)
def _trj_check_version(self, version, python, force):
"""Checks for version mismatch
Raises a VersionMismatchError if version of loaded trajectory and current pypet version
do not match. In case of `force=True` error is not raised only a warning is emitted.
"""
curr_python = compat.python_version_string
if (version != VERSION or curr_python != python) and not force:
raise pex.VersionMismatchError('Current pypet version is %s used under python %s '
' but your trajectory'
' was created with version %s and python %s.'
' Use >>force=True<< to perform your load regardless'
' of version mismatch.' %
(VERSION, curr_python, version, python))
elif version != VERSION or curr_python != python:
self._logger.warning('Current pypet version is %s with python %s but your trajectory'
' was created with version %s under python %s.'
' Yet, you enforced the load, so I will'
' handle the trajectory despite the'
' version mismatch.' %
(VERSION, curr_python, version, python))
#################################### Storing a Trajectory ####################################
def _all_fill_run_table_with_dummys(self, start, stop):
"""Fills the `run` overview table with dummy information.
The table is later on filled by the single runs with the real information.
`start` specifies how large the table is when calling this function.
The table might not be emtpy because a trajectory is enlarged due to expanding.
"""
runtable = getattr(self._overview_group, 'runs')
for idx in range(start, stop):
insert_dict = {}
insert_dict['idx'] = idx
insert_dict['name'] = pypetconstants.FORMATTED_RUN_NAME % idx
self._all_add_or_modify_row('Dummy Row', insert_dict, runtable,
flags=(HDF5StorageService.ADD_ROW,))
runtable.flush()
def _trj_store_meta_data(self, traj):
""" Stores general information about the trajectory in the hdf5file.
The `info` table will contain the name of the trajectory, it's timestamp, a comment,
the length (aka the number of single runs), and the current version number of pypet.
Also prepares the desired overview tables and fills the `run` table with dummies.
"""
# Description of the `info` table
descriptiondict = {'name': pt.StringCol(pypetconstants.HDF5_STRCOL_MAX_LOCATION_LENGTH,
pos=0),
'time': pt.StringCol(len(traj.v_time), pos=1),
'timestamp': pt.FloatCol(pos=3),
'comment': pt.StringCol(pypetconstants.HDF5_STRCOL_MAX_COMMENT_LENGTH,
pos=4),
'length': pt.IntCol(pos=2),
'version': pt.StringCol(pypetconstants.HDF5_STRCOL_MAX_NAME_LENGTH,
pos=5),
'python': pt.StringCol(pypetconstants.HDF5_STRCOL_MAX_NAME_LENGTH,
pos=5)}
# 'loaded_from' : pt.StringCol(pypetconstants.HDF5_STRCOL_MAX_LOCATION_LENGTH)}
infotable = self._all_get_or_create_table(where=self._overview_group, tablename='info',
description=descriptiondict,
expectedrows=len(traj))
insert_dict = self._all_extract_insert_dict(traj, infotable.colnames)
self._all_add_or_modify_row(traj.v_name, insert_dict, infotable, index=0,
flags=(HDF5StorageService.ADD_ROW,
HDF5StorageService.MODIFY_ROW))
# Description of the `run` table
rundescription_dict = {'name': pt.StringCol(pypetconstants.HDF5_STRCOL_MAX_NAME_LENGTH,
pos=1),
'time': pt.StringCol(len(traj.v_time), pos=2),
'timestamp': pt.FloatCol(pos=3),
'idx': pt.IntCol(pos=0),
'completed': pt.IntCol(pos=8),
'parameter_summary': pt.StringCol(
pypetconstants.HDF5_STRCOL_MAX_COMMENT_LENGTH,
pos=6),
'short_environment_hexsha': pt.StringCol(7, pos=7),
'finish_timestamp': pt.FloatCol(pos=4),
'runtime': pt.StringCol(
pypetconstants.HDF5_STRCOL_MAX_RUNTIME_LENGTH,
pos=5)}
runtable = self._all_get_or_create_table(where=self._overview_group,
tablename='runs',
description=rundescription_dict)
hdf5_description_dict = {'complib': pt.StringCol(7, pos=0),
'complevel': pt.IntCol(pos=1),
'shuffle': pt.BoolCol(pos=2),
'fletcher32': pt.BoolCol(pos=3),
'pandas_append': pt.BoolCol(pos=4),
'pandas_format': pt.StringCol(7, pos=5),
'encoding': pt.StringCol(11, pos=6)}
pos = 7
for name, table_name in HDF5StorageService.NAME_TABLE_MAPPING.items():
hdf5_description_dict[table_name] = pt.BoolCol(pos=pos)
pos += 1
# Store the hdf5 properties in an overview table
hdf5_description_dict.update({'purge_duplicate_comments': pt.BoolCol(pos=pos + 2),
'results_per_run': pt.IntCol(pos=pos + 3),
'derived_parameters_per_run': pt.IntCol(pos=pos + 4),
'explored_parameters_runs': pt.BoolCol(pos=pos + 1)})
hdf5table = self._all_get_or_create_table(where=self._overview_group,
tablename='hdf5_settings',
description=hdf5_description_dict)
insert_dict = {}
for attr_name in self.ATTR_LIST:
insert_dict[attr_name] = getattr(self, attr_name)
for attr_name, table_name in self.NAME_TABLE_MAPPING.items():
insert_dict[table_name] = getattr(self, attr_name)
for attr_name, name in self.PR_ATTR_NAME_MAPPING.items():
insert_dict[name] = getattr(self, attr_name)
self._all_add_or_modify_row(traj.v_name, insert_dict, hdf5table, index=0,
flags=(
HDF5StorageService.ADD_ROW, HDF5StorageService.MODIFY_ROW))
# Fill table with dummy entries starting from the current table size
actual_rows = runtable.nrows
self._all_fill_run_table_with_dummys(actual_rows, len(traj))
# Store the annotations in the trajectory node
self._ann_store_annotations(traj, self._trajectory_group)
# Prepare the overview tables
tostore_tables = []
for name, table_name in HDF5StorageService.NAME_TABLE_MAPPING.items():
# Check if we want the corresponding overview table
# If the trajectory does not contain information about the table
# we assume it should be created.
if getattr(self, name):
tostore_tables.append(table_name)
self._srvc_make_overview_tables(tostore_tables, traj)
def _srvc_make_overview_tables(self, tables_to_make, traj=None):
"""Creates the overview tables in overview group"""
for table_name in tables_to_make:
# Prepare the tables desciptions, depending on which overview table we create
# we need different columns
paramdescriptiondict = {}
expectedrows = 0
# Every overview table has a name and location column
paramdescriptiondict['location'] = pt.StringCol(
pypetconstants.HDF5_STRCOL_MAX_LOCATION_LENGTH,
pos=0)
paramdescriptiondict['name'] = pt.StringCol(pypetconstants.HDF5_STRCOL_MAX_NAME_LENGTH,
pos=1)
if not table_name == 'explored_parameters' and not 'groups' in table_name:
paramdescriptiondict['value'] = pt.StringCol(
pypetconstants.HDF5_STRCOL_MAX_VALUE_LENGTH)
if table_name == 'config':
if traj is not None:
expectedrows = len(traj._config)
if table_name == 'parameters':
if traj is not None:
expectedrows = len(traj._parameters)
if table_name == 'explored_parameters':
paramdescriptiondict['range'] = pt.StringCol(
pypetconstants.HDF5_STRCOL_MAX_ARRAY_LENGTH)
if traj is not None:
expectedrows = len(traj._explored_parameters)
if table_name == 'results_trajectory':
if traj is not None:
expectedrows = len(traj._results)
if table_name == 'derived_parameters_trajectory':
if traj is not None:
expectedrows = len(traj._derived_parameters)
if table_name in ['derived_parameters_trajectory', 'results_trajectory',
'derived_parameters_runs_summary', 'results_runs_summary',
'config', 'parameters', 'explored_parameters']:
if table_name.startswith('derived') or table_name.endswith('parameters'):
paramdescriptiondict['length'] = pt.IntCol()
paramdescriptiondict['comment'] = pt.StringCol(
pypetconstants.HDF5_STRCOL_MAX_COMMENT_LENGTH)
if table_name.endswith('summary'):
paramdescriptiondict['number_of_items'] = pt.IntCol(dflt=1)
paramdescriptiondict['example_item_run_name'] = \
pt.StringCol(len(pypetconstants.RUN_NAME) + pypetconstants.FORMAT_ZEROS + 3,
pos=2)
# Check if the user provided an estimate of the amount of results per run
# This can help to speed up storing
if table_name.startswith('derived_parameters_runs'):
expectedrows = self._derived_parameters_per_run
if not table_name.endswith('summary') and traj is not None:
expectedrows *= len(traj)
if table_name.startswith('results_runs'):
expectedrows = self._results_per_run
if not expectedrows <= 0:
if not table_name.endswith('summary') and traj is not None:
expectedrows *= len(traj)
if expectedrows > 0:
paramtable = self._all_get_or_create_table(where=self._overview_group,
tablename=table_name,
description=paramdescriptiondict,
expectedrows=expectedrows)
else:
paramtable = self._all_get_or_create_table(where=self._overview_group,
tablename=table_name,
description=paramdescriptiondict)
# # Index the summary tables for faster look up
# # They are searched by the individual runs later on
# if table_name.endswith('summary'):
# try:
# paramtable.autoindex=True
# except AttributeError:
# paramtable.autoIndex=True
# if not paramtable.indexed:
# try:
# paramtable.cols.location.create_index(optlevel=8, kind='full')
# paramtable.cols.name.create_index(optlevel=8, kind='full')
# except AttributeError:
# paramtable.cols.location.createIndex(optlevel=8, kind='full')
# paramtable.cols.name.createIndex(optlevel=8, kind='full')
paramtable.flush()
def _trj_store_trajectory(self, traj, only_init=False):
""" Stores a trajectory to an hdf5 file
Stores all groups, parameters and results
"""
if not only_init:
self._logger.info('Start storing Trajectory `%s`.' % self._trajectory_name)
else:
self._logger.info('Initialising storage or updating meta data of Trajectory `%s`.' %
self._trajectory_name)
# In case we accidentally chose a trajectory name that already exist
# We do not want to mess up the stored trajectory but raise an Error
if not traj._stored and self._trajectory_group._v_nchildren > 0:
raise RuntimeError('You want to store a completely new trajectory with name'
' `%s` but this trajectory is already found in file `%s`' %
(traj.v_name, self._filename))
# Extract HDF5 settings from the trajectory
self._srvc_check_hdf_properties(traj)
# Store meta information
self._trj_store_meta_data(traj)
# # Store recursively the config subtree
# self._tree_store_recursively(pypetconstants.LEAF,traj.config,self._trajectory_group)
if not only_init:
counter = 0
maximum_display_other = 10
name_set = set(['parameters', 'config', 'derived_parameters', 'results'])
for child_name in traj._children:
if child_name in name_set:
self._logger.info('Storing branch `%s`.' % child_name)
else:
counter += 1
if counter <= maximum_display_other:
self._logger.info('Storing branch/node `%s`.' % child_name)
if counter == maximum_display_other:
self._logger.info('To many branches or nodes at root for display. '
'I will not inform you about storing anymore. '
'Branches are stored silently in the background. '
'Do not worry, I will not freeze! Pinky promise!!!')
# Store recursively the derived parameters subtree
self._tree_store_nodes(pypetconstants.LEAF, traj._children[child_name],
self._trajectory_group)
self._logger.info('Finished storing Trajectory `%s`.' % self._trajectory_name)
else:
self._logger.info('Finished init or meta data update for `%s`.' %
self._trajectory_name)
traj._stored = True
def _tree_store_sub_branch(self, msg, traj_node, branch_name, hdf5_group, recursive=True):
"""Stores data starting from a node along a branch and starts recursively loading
all data at end of branch.
:param msg: 'LEAF'
:param traj_node: The node where storing starts
:param branch_name:
A branch along which storing progresses. Colon Notation is used:
'group1.group2.group3' loads 'group1', then 'group2', then 'group3', and then finally
recursively all children and children's children below 'group3'.
:param hdf5_group:
HDF5 node in the file corresponding to `traj_node`
"""
split_names = branch_name.split('.')
leaf_name = split_names.pop()
for name in split_names:
# Store along a branch
traj_node = traj_node._children[name]
self._tree_store_nodes(msg, traj_node, hdf5_group, recursive=False)
hdf5_group = getattr(hdf5_group, name)
# Store final group and recursively everything below it
traj_node = traj_node._children[leaf_name]
self._tree_store_nodes(msg, traj_node, hdf5_group, recursive)
######################## Storing and Loading Sub Trees #######################################
def _tree_store_tree(self, traj_node, child_name, recursive):
"""Stores a node and potentially recursively all nodes below
:param traj_node: Parent node where storing starts
:param child_name: Name of child node
:param recursive: Whether to store everything below `traj_node`.
"""
location = traj_node.v_full_name
# Get parent hdf5 node
hdf5_location = location.replace('.', '/')
try:
if location == '':
parent_hdf5_node = self._trajectory_group
else:
parent_hdf5_node = ptcompat.get_node(self._hdf5file,
where=self._trajectory_group,
name=hdf5_location)
# Store node and potentially everything below it
self._tree_store_sub_branch(pypetconstants.LEAF, traj_node, child_name,
parent_hdf5_node,
recursive=recursive)
except pt.NoSuchNodeError:
self._logger.warning('Cannot store `%s` the parental hdf5 node with path `%s` does '
'not exist on disk.' %
(traj_node.v_name, hdf5_location))
if traj_node.v_is_leaf:
self._logger.error('Cannot store `%s` the parental hdf5 node with path `%s` does '
'not exist on disk! The child you want to store is a leaf node,'
'that cannot be stored without the parental node existing on '
'disk.' %
(traj_node.v_name, hdf5_location))
raise
else:
self._logger.warning('I will try to store the path from trajectory root to '
'the child now.')
self._tree_store_sub_branch(pypetconstants.LEAF,
traj_node._nn_interface._root_instance,
traj_node.v_full_name + '.' + child_name,
self._trajectory_group,
recursive=recursive)
def _tree_load_tree(self, parent_traj_node, child_name, recursive, load_data, trajectory):
"""Loads a specific tree node and potentially all nodes below
:param parent_traj_node: parent node of node to load in trajectory
:param child_name: Name (!) of node to be loaded
:param recursive: Whether to load everything below the child
:param load_data: How to load the data
:param trajectory: The trajectory object
"""
hdf5_node_name = parent_traj_node.v_full_name.replace('.', '/')
# Get child node to load
if hdf5_node_name == '':
hdf5_node = self._trajectory_group
else:
try:
hdf5_node = ptcompat.get_node(self._hdf5file,
where=self._trajectory_group,
name=hdf5_node_name)
except pt.NoSuchNodeError:
self._logger.error('Cannot load `%s` the hdf5 node `%s` does not exist!'
% (child_name, hdf5_node_name))
raise
self._tree_load_sub_branch(trajectory, parent_traj_node, child_name, hdf5_node,
load_data=load_data,
recursive=recursive)
def _tree_load_nodes(self, traj, parent_traj_node, hdf5group,
load_data=pypetconstants.UPDATE_SKELETON, recursive=True,
as_new=False):
"""Loads a node from hdf5 file and if desired recursively everything below
:param traj: The trajectory object
:param parent_traj_node: The parent node whose child should be loaded
:param hdf5group: The hdf5 group containing the child to be loaded
:param load_data: How to load the data
:param recursive: Whether loading recursively below hdf5group
:param as_new: If trajectory is loaded as new
"""
if load_data == pypetconstants.LOAD_NOTHING:
return
path_name = parent_traj_node.v_full_name
name = hdf5group._v_name
is_leaf = self._all_get_from_attrs(hdf5group, HDF5StorageService.LEAF)
if is_leaf:
# In case we have a leaf node, we need to check if we have to create a new
# parameter or result
full_name = '%s.%s' % (path_name, name)
in_trajectory = name in parent_traj_node._children
if in_trajectory:
instance = parent_traj_node._children[name]
# If we want to update data and the item already contains some we're good
if load_data == pypetconstants.OVERWRITE_DATA:
if instance.v_is_parameter:
instance.f_unlock()
instance.f_empty()
instance.v_annotations.f_empty()
# Load annotations if they are empty
if instance.v_annotations.f_is_empty():
self._ann_load_annotations(instance, hdf5group)
# If we want to update the skeleton and the item exists we're good
if load_data == pypetconstants.UPDATE_SKELETON:
return
# If the instance is non-empty we do not need to load it
if not instance.f_is_empty():
return
# Otherwise we need to create a new instance
if not in_trajectory:
class_name = self._all_get_from_attrs(hdf5group, HDF5StorageService.CLASS_NAME)
comment = self._all_get_from_attrs(hdf5group, HDF5StorageService.COMMENT)
if comment is None:
comment = ''
range_length = self._all_get_from_attrs(hdf5group, HDF5StorageService.LENGTH)
if not range_length is None and range_length != len(traj):
raise RuntimeError('Something is completely odd. You load parameter'
' `%s` of length %d into a trajectory of length'
' %d. They should be equally long!' %
(full_name, range_length, len(traj)))
# Create the instance with the appropriate constructor
class_constructor = traj._create_class(class_name)
instance = class_constructor(name,
comment=comment)
instance._stored = not as_new
if instance.v_is_parameter:
instance._explored = range_length is not None
# Add the instance to the trajectory tree
parent_traj_node._nn_interface._add_generic(parent_traj_node,
type_name=nn.LEAF,
group_type_name=nn.GROUP,
args=(instance,), kwargs={},
add_prefix=False)
# If it has a range we add it to the explored parameters
if range_length:
traj._explored_parameters[instance.v_full_name] = instance
self._ann_load_annotations(instance, node=hdf5group)
if load_data in (pypetconstants.LOAD_DATA, pypetconstants.OVERWRITE_DATA):
# Load data into the instance
self._prm_load_parameter_or_result(instance, _hdf5_group=hdf5group)
self._node_processing_timer.signal_update()
else:
# Else we are dealing with a group node
if not name in parent_traj_node._children:
# If the group does not exist create it
comment = self._all_get_from_attrs(hdf5group, HDF5StorageService.COMMENT)
if comment is None:
comment = ''
new_traj_node = parent_traj_node._nn_interface._add_generic(
parent_traj_node,
type_name=nn.GROUP,
group_type_name=nn.GROUP,
args=(name, comment),
kwargs={},
add_prefix=False)
new_traj_node._stored = not as_new
else:
new_traj_node = parent_traj_node._children[name]
if load_data == pypetconstants.OVERWRITE_DATA:
new_traj_node.v_annotations.f_empty()
# Load annotations if they are empty
if new_traj_node.v_annotations.f_is_empty():
self._ann_load_annotations(new_traj_node, hdf5group)
self._node_processing_timer.signal_update()
if recursive:
# We load recursively everything below it
children = hdf5group._v_children
for new_hdf5group_name in children:
new_hdf5group = children[new_hdf5group_name]
if not isinstance(new_hdf5group, pt.Group):
continue
self._tree_load_nodes(traj, new_traj_node, new_hdf5group, load_data)
def _tree_store_nodes(self, msg, traj_node, parent_hdf5_group, recursive=True):
"""Stores a node to hdf5 and if desired stores recursively everything below it.
:param msg: 'LEAF'
:param traj_node: Node to be stored
:param parent_hdf5_group: Parent hdf5 groug
:param recursive: Whether to store recursively the subtree
"""
name = traj_node.v_name
store_new = False
store_msg = msg
# If the node does not exist in the hdf5 file create it
if not hasattr(parent_hdf5_group, name):
store_new = True
new_hdf5_group = ptcompat.create_group(self._hdf5file, where=parent_hdf5_group,
name=name)
else:
new_hdf5_group = getattr(parent_hdf5_group, name)
if traj_node.v_is_leaf:
if store_new:
# If we have a leaf node, store it as a parameter or result
self._prm_store_parameter_or_result(store_msg, traj_node,
_hdf5_group=new_hdf5_group,
_newly_created=True)
else:
self._logger.debug('Already found `%s` on disk I will not store it!' %
traj_node.v_full_name)
self._node_processing_timer.signal_update()
else:
# Else store it as a group node
# We have to do the following check to be sure that the group node was not
# build on the fly via `f_store_item`.
store_new = (store_new or
(not traj_node.v_annotations.f_is_empty()
and not HDF5StorageService.ANNOTATED in new_hdf5_group._v_attrs) or
(traj_node.v_comment != '' and
not HDF5StorageService.COMMENT not in new_hdf5_group._v_attrs))
if store_new:
self._grp_store_group(traj_node, _hdf5_group=new_hdf5_group)
else:
self._logger.debug('Already found `%s` on disk I will not store it!' %
traj_node.v_full_name)
self._node_processing_timer.signal_update()
if recursive:
# And if desired store recursively the subtree
for child in compat.itervalues(traj_node._children):
self._tree_store_nodes(store_msg, child, new_hdf5_group)
######################## Storing a Single Run ##########################################
def _srn_store_single_run(self, single_run, store_data, store_final):
""" Stores a single run instance to disk (only meta data)"""
if store_data:
self._logger.info('Storing Data of single run `%s`.' % single_run.v_name)
for group_name in single_run._run_parent_groups:
group = single_run._run_parent_groups[group_name]
if group.f_contains(single_run.v_name):
self._tree_store_tree(group, single_run.v_name, recursive=True)
if store_final:
self._logger.info('Finishing Storage of single run `%s`.' % single_run.v_name)
idx = single_run.v_idx
add_table = self._overview_explored_parameters_runs
# For better readability and if desired add the explored parameters to the results
# Also collect some summary information about the explored parameters
# So we can add this to the `run` table
run_summary = self._srn_add_explored_params(single_run.v_name,
compat.listvalues(
single_run._explored_parameters),
add_table)
# Finally, add the real run information to the `run` table
runtable = getattr(self._overview_group, 'runs')
# If the table is not large enough already (maybe because the trajectory got expanded
# We have to manually increase it here
actual_rows = runtable.nrows
if idx + 1 > actual_rows:
self._all_fill_run_table_with_dummys(actual_rows, idx + 1)
insert_dict = self._all_extract_insert_dict(single_run, runtable.colnames)
insert_dict['parameter_summary'] = run_summary
insert_dict['completed'] = 1
self._hdf5file.flush()
self._all_add_or_modify_row(single_run, insert_dict, runtable,
index=idx, flags=(HDF5StorageService.MODIFY_ROW,))
def _srn_add_explored_params(self, run_name, paramlist, add_table, create_run_group=False):
"""If desired adds an explored parameter overview table to the results in each
single run and summarizes the parameter settings.
:param run_name: Name of the single run
:param paramlist: List of explored parameters
:param add_table: Whether to add the overview table
:param create_run_group:
If a group with the particular name should be created if it does not exist.
Might be necessary when trajectories are merged.
"""
# Layout of overview table
paramdescriptiondict = {'name': pt.StringCol(pypetconstants.HDF5_STRCOL_MAX_NAME_LENGTH),
'value': pt.StringCol(pypetconstants.HDF5_STRCOL_MAX_VALUE_LENGTH)}
location = 'results.runs.' + run_name
where = location
where = where.replace('.', '/')
if not where in self._trajectory_group:
if create_run_group:
self._all_create_or_get_groups(location)
else:
add_table = False
if add_table:
rungroup = getattr(self._trajectory_group, where)
# Check if the table already exists
if 'explored_parameters' in rungroup:
# This can happen if trajectories are merged
# If the number of explored parameters changed due to merging we
# need to create the table new in order to show the correct parameters
paramtable = getattr(rungroup, 'explored_parameters')
# If more parameters became explored we need to create the table new
if paramtable.nrows != len(paramlist):
del paramtable
ptcompat.remove_node(self._hdf5file, where=rungroup,
name='explored_parameters')
else:
add_table = False
if not 'explored_parameters' in rungroup:
paramtable = ptcompat.create_table(self._hdf5file,
where=rungroup,
name='explored_parameters',
description=paramdescriptiondict,
title='explored_parameters',
filters=self._get_filters())
runsummary = ''
paramlist = sorted(paramlist, key=lambda name: name.v_name + name.v_location)
for idx, expparam in enumerate(paramlist):
# Create the run summary for the `run` overview
if idx > 0:
runsummary += ', '
valstr = expparam.f_val_to_str()
if len(valstr) >= pypetconstants.HDF5_STRCOL_MAX_COMMENT_LENGTH:
valstr = valstr[0:pypetconstants.HDF5_STRCOL_MAX_COMMENT_LENGTH - 3]
valstr += '...'
if expparam.v_name in runsummary:
param_name = expparam.v_full_name
else:
param_name = expparam.v_name
runsummary = runsummary + param_name + ': ' + valstr
# If Add the explored parameter overview table if dersired and necessary
if add_table:
self._all_store_param_or_result_table_entry(expparam, paramtable,
(HDF5StorageService.ADD_ROW,))
return runsummary
################# Methods used across Storing and Loading different Items #####################
@staticmethod
def _all_find_param_or_result_entry_and_return_iterator(param_or_result, table):
"""Searches for a particular entry in `table` based on the name and location
of `param_or_result` and returns an iterator over the found rows
(should contain only a single row).
"""
location = param_or_result.v_location
name = param_or_result.v_name
condvars = {'namecol': table.cols.name, 'locationcol': table.cols.location,
'name': name, 'location': location}
condition = """(namecol == name) & (locationcol == location)"""
return table.where(condition, condvars=condvars)
@staticmethod
def _all_get_table_name(where, creator_name):
"""Returns an overview table name for a given subtree name
:param where:
Either `parameters`, `config`, `derived_parameters`, or `results`
:param creator_name:
Either `trajectory` or `run_XXXXXXXXX`
:return: Name of overview table
"""
if where in ['config', 'parameters']:
return where
else:
if creator_name == 'trajectory':
return '%s_trajectory' % where
else:
return '%s_runs' % where
def _all_store_param_or_result_table_entry(self, instance, table, flags,
additional_info=None):
"""Stores a single row into an overview table
:param instance: A parameter or result instance
:param table: Table where row will be inserted
:param flags:
Flags how to insert into the table. Potential Flags are
`ADD_ROW`, `REMOVE_ROW`, `MODIFY_ROW`
:param additional_info:
Dictionary containing information that cannot be extracted from
`instance`, but needs to be inserted, too.
"""
# assert isinstance(table, pt.Table)
location = instance.v_location
name = instance.v_name
fullname = instance.v_full_name
if flags == (HDF5StorageService.ADD_ROW,):
# If we are sure we only want to add a row we do not need to search!
condvars = None
condition = None
else:
# Condition to search for an entry
condvars = {'namecol': table.cols.name, 'locationcol': table.cols.location,
'name': name, 'location': location}
condition = """(namecol == name) & (locationcol == location)"""
colnames = set(table.colnames)
if HDF5StorageService.REMOVE_ROW in flags:
# If we want to remove a row, we don't need to extract information
insert_dict = {}
else:
# Extract information to insert from the instance and the additional info dict
insert_dict = self._all_extract_insert_dict(instance, colnames, additional_info)
# Write the table entry
self._all_add_or_modify_row(fullname, insert_dict, table, condition=condition,
condvars=condvars, flags=flags)
def _all_get_or_create_table(self, where, tablename, description, expectedrows=None):
"""Creates a new table, or if the table already exists, returns it."""
where_node = ptcompat.get_node(self._hdf5file, where)
if not tablename in where_node:
if not expectedrows is None:
table = ptcompat.create_table(self._hdf5file,
where=where_node, name=tablename,
description=description, title=tablename,
expectedrows=expectedrows,
filters=self._get_filters())
else:
table = ptcompat.create_table(self._hdf5file,
where=where_node, name=tablename,
description=description, title=tablename,
filters=self._get_filters())
else:
table = ptcompat.get_child(where_node, tablename)
return table
def _all_get_node_by_name(self, name):
"""Returns an HDF5 node by the path specified in `name`"""
path_name = name.replace('.', '/')
where = '/%s/%s' % (self._trajectory_name, path_name)
return ptcompat.get_node(self._hdf5file, where=where)
@staticmethod
def _all_attr_equals(ptitem, name, value):
"""Checks if a given hdf5 node attribute exists and if so if it matches the `value`."""
try:
return ptitem._v_attrs[name] == value
except KeyError:
return False
@staticmethod
def _all_get_from_attrs(ptitem, name):
"""Gets an attribute `name` from `ptitem`, returns None if attribute does not exist."""
# if name in ptitem._v_attrs:
# return ptitem._v_attrs[name]
# else:
# return None
try:
return ptitem._v_attrs[name]
except KeyError:
return None
@staticmethod
def _all_set_attributes_to_recall_natives(data, ptitem, prefix):
"""Stores original data type to hdf5 node attributes for preserving the data type.
:param data:
Data to be stored
:param ptitem:
HDF5 node to store data types as attributes. Can also be just a PTItemMock.
:param prefix:
String prefix to label and name data in HDF5 attributes
"""
# If `data` is a container, remember the container type
if type(data) is tuple:
ptcompat.set_attribute(ptitem, prefix + HDF5StorageService.COLL_TYPE,
HDF5StorageService.COLL_TUPLE)
elif type(data) is list:
ptcompat.set_attribute(ptitem, prefix + HDF5StorageService.COLL_TYPE,
HDF5StorageService.COLL_LIST)
elif type(data) is np.ndarray:
ptcompat.set_attribute(ptitem, prefix + HDF5StorageService.COLL_TYPE,
HDF5StorageService.COLL_NDARRAY)
elif type(data) is np.matrix:
ptcompat.set_attribute(ptitem, prefix + HDF5StorageService.COLL_TYPE,
HDF5StorageService.COLL_MATRIX)
elif type(data) in pypetconstants.PARAMETER_SUPPORTED_DATA:
ptcompat.set_attribute(ptitem, prefix + HDF5StorageService.COLL_TYPE,
HDF5StorageService.COLL_SCALAR)
strtype = type(data).__name__
if not strtype in pypetconstants.PARAMETERTYPEDICT:
raise TypeError('I do not know how to handle `%s` its type is `%s`.' %
(str(data), repr(type(data))))
ptcompat.set_attribute(ptitem, prefix + HDF5StorageService.SCALAR_TYPE, strtype)
elif type(data) is dict:
ptcompat.set_attribute(ptitem, prefix + HDF5StorageService.COLL_TYPE,
HDF5StorageService.COLL_DICT)
else:
raise TypeError('I do not know how to handle `%s` its type is `%s`.' %
(str(data), repr(type(data))))
if type(data) in (list, tuple):
# If data is a list or tuple we need to remember the data type of the elements
# in the list or tuple.
# We do NOT need to remember the elements of `dict` explicitly, though.
# `dict` is stored
# as an `ObjectTable` and thus types are already conserved.
if len(data) > 0:
strtype = type(data[0]).__name__
if not strtype in pypetconstants.PARAMETERTYPEDICT:
raise TypeError('I do not know how to handle `%s` its type is '
'`%s`.' % (str(data), strtype))
ptcompat.set_attribute(ptitem, prefix +
HDF5StorageService.SCALAR_TYPE, strtype)
elif (type(data) in (np.ndarray, np.matrix) and
np.issubdtype(data.dtype, compat.unicode_type)):
ptcompat.set_attribute(ptitem, prefix +
HDF5StorageService.SCALAR_TYPE, compat.unicode_type.__name__)
def _all_recall_native_type(self, data, ptitem, prefix):
"""Checks if loaded data has the type it was stored in. If not converts it.
:param data: Data item to be checked and converted
:param ptitem: HDf5 Node or Leaf from where data was loaded
:param prefix: Prefix for recalling the data type from the hdf5 node attributes
:return:
Tuple, first item is the (converted) `data` item, second boolean whether
item was converted or not.
"""
typestr = self._all_get_from_attrs(ptitem, prefix + HDF5StorageService.SCALAR_TYPE)
type_changed = False
# Check what the original data type was from the hdf5 node attributes
if self._all_attr_equals(ptitem, prefix + HDF5StorageService.COLL_TYPE,
HDF5StorageService.COLL_SCALAR):
# Here data item was a scalar
if isinstance(data, np.ndarray):
# If we recall a numpy scalar, pytables loads a 1d array :-/
# So we have to change it to a real scalar value
data = np.array([data])[0]
type_changed = True
if not typestr is None:
# Check if current type and stored type match
# if not convert the data
if typestr != type(data).__name__:
if typestr == compat.unicode_type.__name__:
data = data.decode(self._encoding)
else:
data = pypetconstants.PARAMETERTYPEDICT[typestr](data)
type_changed = True
elif (self._all_attr_equals(ptitem, prefix + HDF5StorageService.COLL_TYPE,
HDF5StorageService.COLL_TUPLE) or
self._all_attr_equals(ptitem, prefix + HDF5StorageService.COLL_TYPE,
HDF5StorageService.COLL_LIST)):
# Here data item was originally a tuple or a list
if not isinstance(data, (list, tuple)):
# If the original type cannot be recalled, first convert it to a list
type_changed = True
data = list(data)
if len(data) > 0:
first_item = data[0]
else:
first_item = None
if not first_item is None:
# Check if the type of the first item was conserved
if not typestr == type(first_item).__name__:
if not isinstance(data, list):
data = list(data)
# If type was not conserved we need to convert all items
# in the list or tuple
for idx, item in enumerate(data):
if typestr == compat.unicode_type.__name__:
data[idx] = data[idx].decode(self._encoding)
else:
data[idx] = pypetconstants.PARAMETERTYPEDICT[typestr](item)
type_changed = True
if self._all_attr_equals(ptitem, prefix + HDF5StorageService.COLL_TYPE,
HDF5StorageService.COLL_TUPLE):
# If it was originally a tuple we need to convert it back to tuple
if not isinstance(data, tuple):
data = tuple(data)
type_changed = True
elif isinstance(data, np.ndarray):
if typestr == compat.unicode_type.__name__:
data = np.core.defchararray.decode(data, self._encoding)
type_changed = True
if (self._all_attr_equals(ptitem, prefix + HDF5StorageService.COLL_TYPE,
HDF5StorageService.COLL_MATRIX)):
# Here data item was originally a matrix
data = np.matrix(data)
type_changed = True
return data, type_changed
def _all_add_or_modify_row(self, item_name, insert_dict, table, index=None, condition=None,
condvars=None,
flags=(ADD_ROW, MODIFY_ROW,)):
"""Adds or changes a row in a pytable.
:param item_name: Name of item, the row is about, only important for throwing errors.
:param insert_dict:
Dictionary of data that is about to be inserted into the pytables row.
:param table:
The table to insert or modify a row in
:param index:
Index of row to be modified. Instead of an index a search condition can be
used as well, see below.
:param condition:
Condition to search for in the table
:param condvars:
Variables for the search condition
:param flags:
Flags whether to add, modify, or remove a row in the table
"""
# You can only specify either an index or a condition not both
if not index is None and not condition is None:
raise ValueError('Please give either a condition or an index or none!')
elif not condition is None:
row_iterator = table.where(condition, condvars=condvars)
elif not index is None:
row_iterator = table.iterrows(index, index + 1)
else:
row_iterator = None
try:
row = next(row_iterator)
except TypeError:
row = None
except StopIteration:
row = None
if ((HDF5StorageService.MODIFY_ROW in flags or HDF5StorageService.ADD_ROW in flags) and
HDF5StorageService.REMOVE_ROW in flags):
# You cannot remove and modify or add at the same time
raise ValueError('You cannot add or modify and remove a row at the same time.')
if row is None and HDF5StorageService.ADD_ROW in flags:
# Here we add a new row
row = table.row
self._all_insert_into_row(row, insert_dict)
row.append()
elif row is not None and HDF5StorageService.MODIFY_ROW in flags:
# Here we modify an existing row
self._all_insert_into_row(row, insert_dict)
row.update()
elif HDF5StorageService.REMOVE_ROW in flags:
# Here we delete an existing row
if row is not None:
# Only delete if the row does exist otherwise we do not have to do anything
rownumber = row.nrow
multiple_entries = False
try:
next(row_iterator)
multiple_entries = True
except StopIteration:
pass
if multiple_entries:
raise RuntimeError('There is something entirely wrong, `%s` '
'appears more than once in table %s.'
% (item_name, table._v_name))
try:
ptcompat.remove_rows(table, rownumber)
except NotImplementedError:
pass # We get here if we try to remove the last row of a table
# there is nothing we can do except for keeping it :(
else:
raise ValueError('Something is wrong, you might not have found '
'a row, or your flags are not set approprialty')
# # Check if there are 2 entries which should not happen
multiple_entries = False
try:
next(row_iterator)
multiple_entries = True
except TypeError:
pass
except StopIteration:
pass
if multiple_entries:
raise RuntimeError('There is something entirely wrong, `%s` '
'appears more than once in table %s.'
% (item_name, table._v_name))
# Check if we added something. Note that row is also not None in case REMOVE_ROW,
# then it refers to the deleted row
if HDF5StorageService.REMOVE_ROW not in flags and row is None:
raise RuntimeError('Could not add or modify entries of `%s` in '
'table %s' % (item_name, table._v_name))
table.flush()
def _all_insert_into_row(self, row, insert_dict):
"""Copies data from `insert_dict` into a pytables `row`."""
for key, val in insert_dict.items():
try:
row[key] = val
except KeyError as ke:
self._logger.warning('Could not write `%s` into a table, ' % key + repr(ke))
def _all_extract_insert_dict(self, item, colnames, additional_info=None):
"""Extracts information from a given item to be stored into a pytable row.
Items can be a variety of things here, trajectories, single runs, group node,
parameters, results.
:param item: Item from which data should be extracted
:param colnames: Names of the columns in the pytable
:param additional_info: (dict)
Additional information that should be stored into the pytable row that cannot be
read out from `item`.
:return: Dictionary containing the data to be inserted into a row
"""
insert_dict = {}
if 'length' in colnames:
insert_dict['length'] = len(item)
if 'comment' in colnames:
comment = self._all_cut_string(compat.tobytetype(item.v_comment),
pypetconstants.HDF5_STRCOL_MAX_COMMENT_LENGTH,
self._logger)
insert_dict['comment'] = comment
if 'location' in colnames:
insert_dict['location'] = compat.tobytetype(item.v_location)
if 'name' in colnames:
insert_dict['name'] = compat.tobytetype(item.v_name)
if 'class_name' in colnames:
insert_dict['class_name'] = compat.tobytetype(item.f_get_class_name())
if 'value' in colnames:
insert_dict['value'] = self._all_cut_string(
compat.tobytetype(item.f_val_to_str()),
pypetconstants.HDF5_STRCOL_MAX_VALUE_LENGTH,
self._logger)
if 'example_item_run_name' in colnames:
insert_dict['example_item_run_name'] = additional_info['example_item_run_name']
if 'idx' in colnames:
insert_dict['idx'] = item.v_idx
if 'time' in colnames:
insert_dict['time'] = compat.tobytetype(item.v_time)
if 'timestamp' in colnames:
insert_dict['timestamp'] = item.v_timestamp
if 'range' in colnames:
insert_dict['range'] = self._all_cut_string(
compat.tobytetype(str(item.f_get_range())),
pypetconstants.HDF5_STRCOL_MAX_ARRAY_LENGTH,
self._logger)
# To allow backwards compatibility
if 'array' in colnames:
insert_dict['array'] = self._all_cut_string(
compat.tobytetype(str(item.f_get_range())),
pypetconstants.HDF5_STRCOL_MAX_ARRAY_LENGTH,
self._logger)
if 'version' in colnames:
insert_dict['version'] = compat.tobytetype(item.v_version)
if 'python' in colnames:
insert_dict['python'] = compat.tobytetype(item.v_python)
if 'finish_timestamp' in colnames:
insert_dict['finish_timestamp'] = item._finish_timestamp
if 'runtime' in colnames:
runtime = item._runtime
if len(runtime) > pypetconstants.HDF5_STRCOL_MAX_RUNTIME_LENGTH:
# If string is too long we cut the microseconds
runtime = runtime.split('.')[0]
insert_dict['runtime'] = compat.tobytetype(runtime)
if 'short_environment_hexsha' in colnames:
insert_dict['short_environment_hexsha'] = compat.tobytetype(
item.v_environment_hexsha[0:7])
return insert_dict
@staticmethod
def _all_cut_string(string, max_length, logger):
"""Cuts string data to the maximum length allowed in a pytables column
if string is too long.
:param string: String to be cut
:param max_length: Maximum allowed string length
:param logger: Logger where messages about truncating should be written
:return: String, cut if too long
"""
if len(string) > max_length:
logger.debug('The string `%s` was too long I truncated it to'
' %d characters' %
(string, max_length))
string = string[0:max_length - 3] + compat.tobytetype('...')
return string
def _all_create_or_get_groups(self, key):
"""Creates new or follows existing group nodes along a given colon separated `key`.
:param key:
Colon separated path along hdf5 file, e.g. `parameters.mobiles.cars`.
:return:
Final group node, e.g. group node with name `cars`.
"""
newhdf5group = self._trajectory_group
split_key = key.split('.')
created = False
for name in split_key:
if not name in newhdf5group:
newhdf5group = ptcompat.create_group(self._hdf5file, where=newhdf5group, name=name,
title=name, filters=self._get_filters())
created = True
else:
newhdf5group = ptcompat.get_child(newhdf5group, name)
return newhdf5group, created
################# Storing and loading Annotations ###########################################
def _ann_store_annotations(self, item_with_annotations, node):
"""Stores annotations into an hdf5 file."""
# Only store annotations if the item has some
if not item_with_annotations.v_annotations.f_is_empty():
anno_dict = item_with_annotations.v_annotations.__dict__
current_attrs = node._v_attrs
changed = False
for field_name in anno_dict:
val = anno_dict[field_name]
field_name_with_prefix = HDF5StorageService.ANNOTATION_PREFIX + field_name
if not field_name_with_prefix in current_attrs:
# Only store *new* annotations, if they already exist on disk, skip storage
setattr(current_attrs, field_name_with_prefix, val)
changed = True
if changed:
setattr(current_attrs, HDF5StorageService.ANNOTATED, True)
self._hdf5file.flush()
def _ann_load_annotations(self, item_with_annotations, node):
"""Loads annotations from disk."""
annotated = self._all_get_from_attrs(node, HDF5StorageService.ANNOTATED)
if annotated:
annotations = item_with_annotations.v_annotations
# You can only load into non-empty annotations, to prevent overwriting data in RAM
if not annotations.f_is_empty():
raise TypeError('Loading into non-empty annotations!')
current_attrs = node._v_attrs
for attr_name in current_attrs._v_attrnames:
if attr_name.startswith(HDF5StorageService.ANNOTATION_PREFIX):
key = attr_name
key = key.replace(HDF5StorageService.ANNOTATION_PREFIX, '')
data = getattr(current_attrs, attr_name)
setattr(annotations, key, data)
############################################## Storing Groups ################################
def _grp_store_group(self, node_in_traj, _hdf5_group=None):
"""Stores a group node.
For group nodes only annotations and comments need to be stored.
"""
if _hdf5_group is None:
_hdf5_group, _ = self._all_create_or_get_groups(node_in_traj.v_full_name)
if node_in_traj.v_comment != '' and HDF5StorageService.COMMENT not in _hdf5_group._v_attrs:
setattr(_hdf5_group._v_attrs, HDF5StorageService.COMMENT, node_in_traj.v_comment)
self._ann_store_annotations(node_in_traj, _hdf5_group)
node_in_traj._stored = True
################# Storing and Loading Parameters ############################################
@staticmethod
def _prm_extract_missing_flags(data_dict, flags_dict):
"""Extracts storage flags for data in `data_dict`
if they were not specified in `flags_dict`.
See :const:`~pypet.storageservice.HDF5StorageService.TYPE_FLAG_MAPPING`
for how to store different types of data per default.
"""
for key, data in data_dict.items():
if not key in flags_dict:
dtype = type(data)
if dtype in HDF5StorageService.TYPE_FLAG_MAPPING:
flags_dict[key] = HDF5StorageService.TYPE_FLAG_MAPPING[dtype]
else:
raise pex.NoSuchServiceError('I cannot store `%s`, I do not understand the'
'type `%s`.' % (key, str(dtype)))
def _prm_meta_remove_summary(self, instance):
"""Changes a summary table entry if the current `instance` is removed from the trajectory
and from disk.
The number of items represented by a summary is decreased, if the
number of items shrinks to zero the whole row is deleted.
"""
split_name = instance.v_full_name.split('.')
where = split_name[0]
if where in ['derived_parameters', 'results']:
# There are only summaries for derived parameters and results
creator_name = instance.v_run_branch
if creator_name.startswith(pypetconstants.RUN_NAME):
run_mask = pypetconstants.RUN_NAME + 'X' * pypetconstants.FORMAT_ZEROS
split_name[instance._run_branch_pos] = run_mask
new_full_name = '.'.join(split_name)
old_full_name = instance.v_full_name
instance._rename(new_full_name)
try:
table_name = where + '_runs_summary'
table = getattr(self._overview_group, table_name)
row_iterator = self._all_find_param_or_result_entry_and_return_iterator(
instance, table)
row = next(row_iterator)
# Decrease the number of items represented by the summary
nitems = row['number_of_items'] - 1
row['number_of_items'] = nitems
row.update()
try:
next(row_iterator)
raise RuntimeError('There is something completely wrong, found '
'`%s` twice in a table!' %
instance.v_full_name)
except StopIteration:
pass
table.flush()
if nitems == 0:
# Here the summary became obsolete
self._all_store_param_or_result_table_entry(
instance,
table,
flags=(HDF5StorageService.REMOVE_ROW,))
except pt.NoSuchNodeError:
pass
except StopIteration:
pass
finally:
# Get the old name back
instance._rename(old_full_name)
def _all_meta_add_summary(self, instance):
"""Adds data to the summary tables and returns if `instance`s comment has to be stored.
Also moves comments upwards in the hierarchy if purge_duplicate_comments is true
and a lower index run has completed. Only necessary for *multiprocessing*.
:return: Tuple
* String specifying the subtree
* Boolean whether to store the comment to `instance`s hdf5 node
"""
definitely_store_comment = True
split_name = instance.v_full_name.split('.')
where = split_name[0]
# Check if we are in the subtree that has runs overview tables
creator_name = instance.v_run_branch
if creator_name.startswith(pypetconstants.RUN_NAME):
try:
# Get the overview table
table_name = where + '_runs_summary'
# Check if the overview table exists, otherwise skip the rest of
# the meta adding
if table_name in self._overview_group:
table = getattr(self._overview_group, table_name)
else:
return where, definitely_store_comment
except pt.NoSuchNodeError:
return where, definitely_store_comment
# Create the dummy name `result.run_XXXXXXXX` as a general mask and example item
run_mask = pypetconstants.RUN_NAME + 'X' * pypetconstants.FORMAT_ZEROS
split_name[instance._run_branch_pos] = run_mask
new_full_name = '.'.join(split_name)
old_full_name = instance.v_full_name
# Rename the item for easier storage
instance._rename(new_full_name)
try:
# True if comment must be moved upwards to lower index
erase_old_comment = False
# Find the overview table entry
row_iterator = \
self._all_find_param_or_result_entry_and_return_iterator(instance, table)
row = None
try:
row = next(row_iterator)
except StopIteration:
pass
if row is not None:
# If row found we need to increase the number of items
nitems = row['number_of_items'] + 1
# Get the run name of the example
example_item_run_name = compat.tostrtype(row['example_item_run_name'])
# Get the old comment:
location_string = compat.tostrtype(row['location'])
other_parent_node_name = location_string.replace(run_mask,
example_item_run_name)
other_parent_node_name = '/' + self._trajectory_name + '/' + \
other_parent_node_name.replace('.', '/')
# If the instance actually has the name of the run as it's own name
# We need this replacement as well:
instance_name = instance.v_name.replace(run_mask, example_item_run_name)
example_item_node = ptcompat.get_node(self._hdf5file,
where=other_parent_node_name,
name=instance_name)
# Check if comment is obsolete
example_comment = ''
if HDF5StorageService.COMMENT in example_item_node._v_attrs:
example_comment = str(
example_item_node._v_attrs[HDF5StorageService.COMMENT])
definitely_store_comment = instance.v_comment != example_comment
# We can rely on lexicographic comparisons with run indices
if creator_name < example_item_run_name:
# In case the statement is true and the comments are equal, we need
# to move the comment to a result or derived parameter with a lower
# run name:
if not definitely_store_comment:
# We need to purge the comment in the other result or derived parameter
erase_old_comment = True
definitely_store_comment = True
row['example_item_run_name'] = creator_name
row['value'] = self._all_cut_string(
instance.f_val_to_str(),
pypetconstants.HDF5_STRCOL_MAX_VALUE_LENGTH,
self._logger)
else:
self._logger.warning('Your example value and comment in the overview'
' table cannot be set to the lowest index'
' item because results or derived parameters'
' with lower indices have '
' a different comment! The comment of `%s` '
' in run `%s'
' differs from the current result or'
' derived parameter in run `%s`.' %
(instance.v_name, creator_name,
example_item_run_name))
row['number_of_items'] = nitems
row.update()
try:
next(row_iterator)
raise RuntimeError('There is something completely wrong, '
'found `%s` twice in a table!' %
instance.v_full_name)
except StopIteration:
pass
table.flush()
if (self._purge_duplicate_comments and erase_old_comment and
HDF5StorageService.COMMENT in example_item_node._v_attrs):
del example_item_node._v_attrs[HDF5StorageService.COMMENT]
self._hdf5file.flush()
else:
self._all_store_param_or_result_table_entry(instance, table,
flags=(
HDF5StorageService.ADD_ROW,),
additional_info={
'example_item_run_name':
creator_name})
definitely_store_comment = True
# There are 2 cases of exceptions, either the table is switched off, or
# the entry already exists, in both cases we have to store the comments
except pt.NoSuchNodeError:
definitely_store_comment = True
finally:
# Get the old name back
instance._rename(old_full_name)
return where, definitely_store_comment
def _prm_add_meta_info(self, instance, group, msg):
"""Adds information to overview tables and meta information to
the `instance`s hdf5 `group`.
:param instance: Instance to store meta info about
:param group: HDF5 group of instance
:param msg: Whether to update leaf (we need to modify a row) or just store it
"""
flags = (HDF5StorageService.ADD_ROW,)
# Check if we need to store the comment. Maybe update the overview tables
# accordingly if the current run index is lower than the one in the table.
where, definitely_store_comment = self._all_meta_add_summary(instance)
try:
# Update the summary overview table
table_name = self._all_get_table_name(where, instance.v_run_branch)
table = getattr(self._overview_group, table_name)
self._all_store_param_or_result_table_entry(instance, table,
flags=flags)
except pt.NoSuchNodeError:
pass
if ((not self._purge_duplicate_comments or definitely_store_comment) and
instance.v_comment != ''):
# Only add the comment if necessary
setattr(group._v_attrs, HDF5StorageService.COMMENT, instance.v_comment)
# Add class name and whether node is a leaf to the HDF5 attributes
setattr(group._v_attrs, HDF5StorageService.CLASS_NAME, instance.f_get_class_name())
setattr(group._v_attrs, HDF5StorageService.LEAF, 1)
if instance.v_is_parameter and instance.f_has_range():
# If the stored parameter was an explored one we need to mark this in the
# explored overview table
setattr(group._v_attrs, HDF5StorageService.LENGTH, len(instance))
try:
tablename = 'explored_parameters'
table = getattr(self._overview_group, tablename)
self._all_store_param_or_result_table_entry(instance, table,
flags=flags)
except pt.NoSuchNodeError:
pass
def _prm_store_parameter_or_result(self, msg, instance, store_flags=None,
overwrite=None, _hdf5_group=None, _newly_created=False):
"""Stores a parameter or result to hdf5.
:param msg:
Either :const:`~pypet.pypetconstants.LEAF` for storing a new parameter or result
:param instance:
The instance to be stored
:param store_flags:
Dictionary containing how to store individual data, usually empty.
:param _hdf5_group:
The hdf5 group for storing the parameter or result
"""
fullname = instance.v_full_name
self._logger.debug('Storing %s.' % fullname)
if _hdf5_group is None:
# If no group is provided we might need to create one
_hdf5_group, newly_created = self._all_create_or_get_groups(fullname)
else:
newly_created = _newly_created
try:
# Get the data to store from the instance
if not instance.f_is_empty():
store_dict = instance._store()
else:
store_dict = {}
# If the user did not supply storage flags, we need to set it to the empty dictionary
if store_flags is None:
store_flags = {}
try:
# Ask the instance for storage flags
instance_flags = instance._store_flags()
except AttributeError:
# If it does not provide any, set it to the empty dictionary
instance_flags = {}
# User specified flags have priority over the flags from the instance
instance_flags.update(store_flags)
store_flags = instance_flags
# If we still have data in `store_dict` about which we do not know how to store
# it, pick default storage flags
self._prm_extract_missing_flags(store_dict, store_flags)
if isinstance(overwrite, compat.base_type):
overwrite = [overwrite]
if overwrite is True:
to_delete = [key for key in store_dict.keys() if key in _hdf5_group]
self._all_delete_parameter_or_result_or_group(instance,
delete_only=to_delete)
elif overwrite is not None:
overwrite_set = set(overwrite)
key_set = set(store_dict.keys())
stuff_not_to_be_overwritten = overwrite_set - key_set
if len(stuff_not_to_be_overwritten) > 0:
self._logger.warning('Cannot overwrite `%s`, these items are not supposed to '
'be stored by the leaf node.' %
str(stuff_not_to_be_overwritten))
stuff_to_overwrite = overwrite_set & key_set
if len(stuff_to_overwrite) > 0:
self._all_delete_parameter_or_result_or_group(instance,
delete_only=list(
stuff_to_overwrite))
for key, data_to_store in store_dict.items():
# Iterate through the data and store according to the storage flags
if key in _hdf5_group:
# We won't change any data that is found on disk
self._logger.debug(
'Found %s already in hdf5 node of %s, so I will ignore it.' %
(key, fullname))
continue
if store_flags[key] == HDF5StorageService.TABLE:
self._prm_store_into_pytable(msg, key, data_to_store, _hdf5_group, fullname)
elif store_flags[key] == HDF5StorageService.DICT:
self._prm_store_dict_as_table(msg, key, data_to_store, _hdf5_group, fullname)
elif store_flags[key] == HDF5StorageService.ARRAY:
self._prm_store_into_array(msg, key, data_to_store, _hdf5_group, fullname)
elif store_flags[key] == HDF5StorageService.CARRAY:
self._prm_store_into_carray(msg, key, data_to_store, _hdf5_group, fullname)
elif store_flags[key] == HDF5StorageService.FRAME:
self._prm_store_data_frame(msg, key, data_to_store, _hdf5_group, fullname)
elif store_flags[key] == HDF5StorageService.SERIES:
self._prm_store_series(msg, key, data_to_store, _hdf5_group, fullname)
elif store_flags[key] == HDF5StorageService.PANEL:
self._prm_store_panel(msg, key, data_to_store, _hdf5_group, fullname)
else:
raise RuntimeError('You shall not pass!')
# Store annotations
self._ann_store_annotations(instance, _hdf5_group)
if newly_created:
# If we created a new group or the parameter was extended we need to
# update the meta information and summary tables
self._prm_add_meta_info(instance, _hdf5_group, msg)
instance._stored = True
except:
# I anything fails, we want to remove the parameter again
self._logger.error(
'Failed storing leaf `%s`. I will remove the hdf5 node corresponding to '
'the leaf again.' % fullname)
_hdf5_group._f_remove(recursive=True)
raise
def _prm_store_dict_as_table(self, msg, key, data_to_store, group, fullname):
"""Stores a python dictionary as pytable
:param msg:
Message passed to the storage service ('LEAF')
:param key:
Name of data item to store
:param data_to_store:
Dictionary to store
:param group:
Group node where to store data in hdf5 file
:param fullname:
Full name of the `data_to_store`s original container, only needed for throwing errors.
"""
if key in group:
raise ValueError(
'Dictionary `%s` already exists in `%s`. Appending is not supported (yet).')
if key in group:
raise ValueError('Dict `%s` already exists in `%s`. Appending is not supported (yet).')
temp_dict = {}
for innerkey in data_to_store:
val = data_to_store[innerkey]
temp_dict[innerkey] = [val]
# Convert dictionary to object table
objtable = ObjectTable(data=temp_dict)
# Then store the object table
self._prm_store_into_pytable(msg, key, objtable, group, fullname)
new_table = ptcompat.get_child(group, key)
# Remember that the Object Table represents a dictionary
self._all_set_attributes_to_recall_natives(temp_dict, new_table,
HDF5StorageService.DATA_PREFIX)
setattr(new_table._v_attrs, HDF5StorageService.STORAGE_TYPE,
HDF5StorageService.DICT)
self._hdf5file.flush()
def _prm_store_series(self, msg, key, data, group, fullname):
"""Stores a pandas Series into hdf5.
:param msg:
Message passed to the storage service ('LEAF')
:param key:
Name of data item to store
:param data:
DataFrame to store
:param group:
Group node where to store data in hdf5 file
:param fullname:
Full name of the `data_to_store`s original container, only needed for throwing errors.
"""
try:
if key in group:
raise ValueError(
'Series `%s` already exists in `%s`. Appending is not supported (yet).')
name = group._v_pathname + '/' + key
pandas_store = self._hdf5store
pandas_store.put(name, data)
pandas_store.flush()
self._hdf5file.flush()
frame_group = ptcompat.get_child(group, key)
setattr(frame_group._v_attrs, HDF5StorageService.STORAGE_TYPE,
HDF5StorageService.FRAME)
self._hdf5file.flush()
except:
self._logger.error('Failed storing Series `%s` of `%s`.' % (key, fullname))
raise
def _prm_store_panel(self, msg, key, data, group, fullname):
"""Stores a pandas Panel into hdf5.
:param msg:
Message passed to the storage service ('LEAF')
:param key:
Name of data item to store
:param data:
DataFrame to store
:param group:
Group node where to store data in hdf5 file
:param fullname:
Full name of the `data_to_store`s original container, only needed for throwing errors.
"""
try:
if key in group:
raise ValueError(
'Series `%s` already exists in `%s`. Appending is not supported (yet).')
name = group._v_pathname + '/' + key
pandas_store = self._hdf5store
pandas_store.put(name, data,
format=self.pandas_format,
append=self.pandas_append)
pandas_store.flush()
self._hdf5file.flush()
frame_group = ptcompat.get_child(group, key)
setattr(frame_group._v_attrs, HDF5StorageService.STORAGE_TYPE,
HDF5StorageService.FRAME)
self._hdf5file.flush()
except:
self._logger.error('Failed storing Series `%s` of `%s`.' % (key, fullname))
raise
def _prm_store_data_frame(self, msg, key, data, group, fullname):
"""Stores a pandas DataFrame into hdf5.
:param msg:
Message passed to the storage service ('LEAF')
:param key:
Name of data item to store
:param data:
DataFrame to store
:param group:
Group node where to store data in hdf5 file
:param fullname:
Full name of the `data_to_store`s original container, only needed for throwing errors.
"""
try:
if key in group:
raise ValueError(
'DataFrame `%s` already exists in `%s`. Appending is not supported (yet).')
name = group._v_pathname + '/' + key
pandas_store = self._hdf5store
pandas_store.put(name, data,
format=self.pandas_format,
append=self.pandas_append,
expected_rows=data.shape[0])
pandas_store.flush()
self._hdf5file.flush()
frame_group = ptcompat.get_child(group, key)
setattr(frame_group._v_attrs, HDF5StorageService.STORAGE_TYPE,
HDF5StorageService.FRAME)
self._hdf5file.flush()
except:
self._logger.error('Failed storing DataFrame `%s` of `%s`.' % (key, fullname))
raise
def _prm_store_into_carray(self, msg, key, data, group, fullname):
"""Stores data as carray.
:param msg:
Message passed to the storage service ('LEAF').
Not needed here.
:param key:
Name of data item to store
:param data:
Data to store
:param group:
Group node where to store data in hdf5 file
:param fullname:
Full name of the `data_to_store`s original container, only needed for throwing errors.
"""
try:
if key in group:
raise ValueError(
'CArray `%s` already exists in `%s`. Appending is not supported (yet).')
# # if isinstance(data, np.ndarray):
# # size = data.size
# if hasattr(data,'__len__'):
# size = len(data)
# else:
# size = 1
#
# if size == 0:
# self._logger.warning('`%s` of `%s` is _empty, I will skip storing.' %(key,fullname))
# return
try:
carray = ptcompat.create_carray(self._hdf5file, where=group, name=key, obj=data,
filters=self._get_filters())
except ValueError:
conv_data = data[:]
conv_data = np.core.defchararray.encode(conv_data, self.encoding)
carray = ptcompat.create_carray(self._hdf5file, where=group, name=key,
obj=conv_data,
filters=self._get_filters())
# Remember the types of the original data to recall them on loading
self._all_set_attributes_to_recall_natives(data, carray,
HDF5StorageService.DATA_PREFIX)
setattr(carray._v_attrs, HDF5StorageService.STORAGE_TYPE, HDF5StorageService.CARRAY)
self._hdf5file.flush()
except:
self._logger.error('Failed storing array `%s` of `%s`.' % (key, fullname))
raise
def _prm_store_into_array(self, msg, key, data, group, fullname):
"""Stores data as array.
:param msg:
Message passed to the storage service ('LEAF').
Not needed here.
:param key:
Name of data item to store
:param data:
Data to store
:param group:
Group node where to store data in hdf5 file
:param fullname:
Full name of the `data_to_store`s original container, only needed for throwing errors.
"""
try:
if key in group:
raise ValueError(
'Array `%s` already exists in `%s`. Appending is not supported (yet).')
# if hasattr(data,'__len__'):
# size = len(data)
# else:
# size = 1
#
# if size == 0:
# self._logger.warning('`%s` of `%s` is _empty, I will skip storing.' %(key,fullname))
# return
try:
array = ptcompat.create_array(self._hdf5file, where=group,
name=key, obj=data)
except TypeError:
if isinstance(data, compat.unicode_type):
conv_data = data.encode(self._encoding)
else:
conv_data = []
for string in data:
conv_data.append(string.encode(self._encoding))
array = ptcompat.create_array(self._hdf5file, where=group,
name=key, obj=conv_data)
# Remember the types of the original data to recall them on loading
self._all_set_attributes_to_recall_natives(data, array, HDF5StorageService.DATA_PREFIX)
setattr(array._v_attrs, HDF5StorageService.STORAGE_TYPE, HDF5StorageService.ARRAY)
self._hdf5file.flush()
except:
self._logger.error('Failed storing array `%s` of `%s`.' % (key, fullname))
raise
def _all_delete_parameter_or_result_or_group(self, instance,
remove_empty_groups=False,
delete_only=None,
remove_from_item=False):
"""Removes a parameter or result or group from the hdf5 file.
:param instance: Instance to be removed
:param remove_empty_groups:
Whether to delete groups that might become empty due to deletion
:param delete_only:
List of elements if you only want to delete parts of a leaf node. Note that this
needs to list the names of the hdf5 subnodes. BE CAREFUL if you erase parts of a leaf.
Erasing partly happens at your own risk, it might be the case that you can
no longer reconstruct the leaf from the leftovers!
:param remove_from_item:
If using `delete_only` and `remove_from_item=True` after deletion the data item is
also removed from the `instance`.
"""
split_name = instance.v_location.split('.')
where = '/' + self._trajectory_name + '/' + '/'.join(split_name)
node_name = instance.v_name
if delete_only is None:
the_node = ptcompat.get_node(self._hdf5file, where=where, name=node_name)
if not instance.v_is_leaf:
if len(the_node._v_groups) != 0:
raise TypeError('You cannot remove the group `%s`, it is not empty!' %
instance.v_full_name)
if instance.v_is_leaf:
# If we delete a leaf we need to take care about overview tables
base_group = split_name[0]
table_name = self._all_get_table_name(base_group, instance.v_run_branch)
if table_name in self._overview_group:
table = getattr(self._overview_group, table_name)
self._all_store_param_or_result_table_entry(
instance,
table,
flags=(HDF5StorageService.REMOVE_ROW,))
if instance.v_is_parameter:
table_name = 'explored_parameters'
if table_name in self._overview_group:
table = getattr(self._overview_group, table_name)
self._all_store_param_or_result_table_entry(
instance,
table,
flags=(HDF5StorageService.REMOVE_ROW,))
self._prm_meta_remove_summary(instance)
the_node._f_remove(recursive=True)
if remove_empty_groups:
for irun in reversed(list(range(len(split_name)))):
where = '/' + self._trajectory_name + '/' + '/'.join(split_name[0:irun])
node_name = split_name[irun]
act_group = ptcompat.get_node(self._hdf5file,
where=where, name=node_name)
if len(act_group._v_groups) == 0:
ptcompat.remove_node(self._hdf5file,
where=where, name=node_name, recursive=True)
else:
break
else:
if not instance.v_is_leaf:
raise ValueError('You can only choose `delete_only` mode for leafs.')
if isinstance(delete_only, compat.base_type):
delete_only = [delete_only]
path_to_leaf = where + '/' + node_name
for delete_item in delete_only:
if (remove_from_item and
hasattr(instance, '__contains__') and
hasattr(instance, '__delattr__') and
delete_item in instance):
delattr(instance, delete_item)
try:
the_node = ptcompat.get_node(self._hdf5file,
where=path_to_leaf, name=delete_item)
the_node._f_remove(recursive=True)
except pt.NoSuchNodeError:
self._logger.warning('Could not delete `%s` from `%s`. HDF5 node not found!' %
(delete_item, instance.v_full_name))
def _prm_store_into_pytable(self, msg, tablename, data, hdf5group, fullname):
"""Stores data as pytable.
:param msg:
Message passed to the storage service ('LEAF').
:param tablename:
Name of the data table
:param data:
Data to store
:param hdf5group:
Group node where to store data in hdf5 file
:param fullname:
Full name of the `data_to_store`s original container, only needed for throwing errors.
"""
datasize = data.shape[0]
try:
# Get a new pytables description from the data and create a new table
description_dict, data_type_dict = self._prm_make_description(data, fullname)
description_dicts = [{}]
if len(description_dict) > pypetconstants.HDF5_MAX_OBJECT_TABLE_TYPE_ATTRS:
# For optimzation we want to store the original data types into another table
new_table_group = ptcompat.create_group(self._hdf5file, where=hdf5group,
name=tablename)
if len(description_dict) > ptpa.MAX_COLUMNS:
# For further optimization we need to split the table into several
count = 0
for innerkey in description_dict:
val = description_dict[innerkey]
if count == ptpa.MAX_COLUMNS:
description_dicts.append({})
count = 0
description_dicts[-1][innerkey] = val
count += 1
else:
description_dicts = [description_dict]
setattr(new_table_group._v_attrs, HDF5StorageService.STORAGE_TYPE,
HDF5StorageService.TABLE)
setattr(new_table_group._v_attrs, HDF5StorageService.SPLIT_TABLE, 1)
hdf5group = new_table_group
else:
description_dicts = [description_dict]
for idx, descr_dict in enumerate(description_dicts):
if idx == 0:
tblname = tablename
else:
tblname = tablename + '_%d' % idx
table = ptcompat.create_table(self._hdf5file,
where=hdf5group, name=tblname,
description=descr_dict,
title=tblname,
expectedrows=datasize,
filters=self._get_filters())
row = table.row
for n in range(datasize):
# Fill the columns with data, note if the parameter was extended nstart!=0
for key in descr_dict:
row[key] = data[key][n]
row.append()
# Remember the original types of the data for perfect recall
if idx == 0 and len(
description_dict) <= pypetconstants.HDF5_MAX_OBJECT_TABLE_TYPE_ATTRS:
# We only have a single table and
# we can store the original data types as attributes
for field_name in data_type_dict:
type_description = data_type_dict[field_name]
ptcompat.set_attribute(table, field_name, type_description)
setattr(table._v_attrs, HDF5StorageService.STORAGE_TYPE,
HDF5StorageService.TABLE)
table.flush()
self._hdf5file.flush()
if len(description_dict) > pypetconstants.HDF5_MAX_OBJECT_TABLE_TYPE_ATTRS:
# We have potentially many split tables and the data types are
# stored into an additional table for performance reasons
tblname = tablename + '__' + HDF5StorageService.STORAGE_TYPE
field_names, data_types = list(zip(*data_type_dict.items()))
data_type_table_dict = {'field_name': field_names, 'data_type': data_types}
descr_dict, _ = self._prm_make_description(data_type_table_dict, fullname)
table = ptcompat.create_table(self._hdf5file,
where=hdf5group, name=tblname,
description=descr_dict,
title=tblname,
expectedrows=len(field_names),
filters=self._get_filters())
row = table.row
for n in range(len(field_names)):
# Fill the columns with data, note if the parameter was extended nstart!=0
for key in data_type_table_dict:
row[key] = data_type_table_dict[key][n]
row.append()
setattr(table._v_attrs, HDF5StorageService.DATATYPE_TABLE, 1)
table.flush()
self._hdf5file.flush()
except:
self._logger.error('Failed storing table `%s` of `%s`.' % (tablename, fullname))
raise
def _prm_make_description(self, data, fullname):
""" Returns a description dictionary for pytables table creation"""
def _convert_lists_and_tuples(series_of_data):
"""Converts lists and tuples to numpy arrays"""
if isinstance(series_of_data[0],
(list, tuple)): # and not isinstance(series_of_data[0], np.ndarray):
# If the first data item is a list, the rest must be as well, since
# data has to be homogeneous
for idx, item in enumerate(series_of_data):
series_of_data[idx] = np.array(item)
descriptiondict = {} # dictionary containing the description to build a pytables table
original_data_type_dict = {} # dictionary containing the original data types
for key in data:
val = data[key]
# remember the original data types
self._all_set_attributes_to_recall_natives(val[0], PTItemMock(original_data_type_dict),
HDF5StorageService.FORMATTED_COLUMN_PREFIX %
key)
_convert_lists_and_tuples(val)
# get a pytables column from the data
col = self._prm_get_table_col(key, val, fullname)
descriptiondict[key] = col
return descriptiondict, original_data_type_dict
def _prm_get_table_col(self, key, column, fullname):
""" Creates a pytables column instance.
The type of column depends on the type of `column[0]`.
Note that data in `column` must be homogeneous!
"""
val = column[0]
try:
# # We do not want to loose int_
if type(val) is int:
return pt.IntCol()
if isinstance(val, (compat.unicode_type, compat.bytes_type)):
itemsize = int(self._prm_get_longest_stringsize(column))
return pt.StringCol(itemsize)
if isinstance(val, np.ndarray):
if (np.issubdtype(val.dtype, compat.unicode_type) or
np.issubdtype(val.dtype, compat.bytes_type)):
itemsize = int(self._prm_get_longest_stringsize(column))
return pt.StringCol(itemsize, shape=val.shape)
else:
return pt.Col.from_dtype(np.dtype((val.dtype, val.shape)))
else:
return pt.Col.from_dtype(np.dtype(type(val)))
except Exception:
self._logger.error('Failure in storing `%s` of Parameter/Result `%s`.'
' Its type was `%s`.' % (key, fullname, repr(type(val))))
raise
@staticmethod
def _prm_get_longest_stringsize(string_list):
""" Returns the longest string size for a string entry across data."""
maxlength = 1
for stringar in string_list:
if isinstance(stringar, np.ndarray):
if stringar.ndim > 0:
for string in stringar.ravel():
maxlength = max(len(string), maxlength)
else:
maxlength = max(len(stringar.tolist()), maxlength)
else:
maxlength = max(len(stringar), maxlength)
# Make the string Col longer than needed in order to allow later on slightly larger strings
return int(maxlength * 1.5)
def _prm_load_parameter_or_result(self, param, load_only=None, load_except=None,
_hdf5_group=None):
"""Loads a parameter or result from disk.
:param param:
Empty parameter or result instance
:param load_only:
List of data keys if only parts of a result should be loaded
:param load_except:
List of data key that should NOT be loaded.
:param _hdf5_group:
The corresponding hdf5 group of the instance
"""
if load_only is not None and load_except is not None:
raise ValueError('Please use either `load_only` or `load_except` and not '
'both at the same time.')
# If load only is just a name and not a list of names, turn it into a 1 element list
if isinstance(load_only, compat.base_type):
load_only = [load_only]
if isinstance(load_except, compat.base_type):
load_except = [load_except]
if load_only is not None:
self._logger.debug('I am in load only mode, I will only load %s.' %
str(load_only))
load_only = set(load_only)
elif load_except is not None:
self._logger.debug('I am in load except mode, I will load everything except %s.' %
str(load_except))
# We do not want to modify the original list
load_except = set(load_except)
if _hdf5_group is None:
_hdf5_group = self._all_get_node_by_name(param.v_full_name)
full_name = param.v_full_name
self._logger.debug('Loading %s' % full_name)
load_dict = {} # Dict that will be used to keep all data for loading the parameter or
# result
for node in _hdf5_group:
if load_only is not None:
if node._v_name not in load_only:
continue
else:
load_only.remove(node._v_name)
elif load_except is not None:
if node._v_name in load_except:
load_except.remove(node._v_name)
continue
# Recall from the hdf5 node attributes how the data was stored and reload accordingly
load_type = self._all_get_from_attrs(node, HDF5StorageService.STORAGE_TYPE)
if load_type == HDF5StorageService.DICT:
self._prm_read_dictionary(node, load_dict, full_name)
elif load_type == HDF5StorageService.TABLE:
self._prm_read_table(node, load_dict, full_name)
elif load_type in [HDF5StorageService.ARRAY, HDF5StorageService.CARRAY]:
self._prm_read_array(node, load_dict, full_name)
elif load_type in [HDF5StorageService.FRAME,
HDF5StorageService.SERIES,
HDF5StorageService.PANEL]:
self._prm_read_pandas(node, load_dict, full_name)
else:
raise pex.NoSuchServiceError('Cannot load %s, do not understand the hdf5 file '
'structure of %s [%s].' %
(full_name, str(node), str(load_type)))
if load_only is not None:
# Check if all data in `load_only` was actually found in the hdf5 file
if len(load_only) > 0:
self._logger.warning('You marked %s for load only, '
'but I cannot find these for `%s`' %
(str(load_only), full_name))
elif load_except is not None:
if len(load_except) > 0:
self._logger.warning(('You marked `%s` for not loading, but these were not part '
'of `%s` anyway.' % (str(load_except), full_name)))
# Finally tell the parameter or result to load the data, if there was any ;-)
if load_dict:
try:
param._load(load_dict)
except:
# If there happens to be any exception on loading, we want to empty the parameter
# again. This is especially important if the user hits Ctrl-C, otherwise he
# would end up with a half-loaded leaf which needs to be manually erased and
# reloaded.
self._logger.error(
'Error while reconstructing data of leaf `%s`. I will empty the '
'leaf again!' % full_name)
param.f_empty()
# ...And be nice and reraise the error
raise
def _prm_read_dictionary(self, leaf, load_dict, full_name):
"""Loads data that was originally a dictionary when stored
:param leaf:
PyTables table containing the dictionary data
:param load_dict:
Dictionary to keep the loaded data in
:param full_name:
Full name of the parameter or result whose data is to be loaded
"""
try:
# if self._all_get_from_attrs(leaf, HDF5StorageService.DICT_SPLIT):
# temp_dict = {}
# children = leaf._v_children
# for tablename in children:
# table = children[tablename]
# self._prm_read_dictionary(table, temp_dict, full_name)
# key =leaf._v_name
# load_dict[key]={}
# for dictname in temp_dict:
# dictionary = temp_dict[dictname]
# load_dict[key].update(dictionary)
# else:
temp_dict = {}
# Load as Pbject Table
self._prm_read_table(leaf, temp_dict, full_name)
key = leaf._v_name
temp_table = temp_dict[key]
# Turn the ObjectTable into a dictionary of lists (with length 1).
temp_dict = temp_table.to_dict('list')
innder_dict = {}
load_dict[key] = innder_dict
# Turn the dictionary of lists into a normal dictionary
for innerkey, vallist in temp_dict.items():
innder_dict[innerkey] = vallist[0]
except:
self._logger.error('Failed loading `%s` of `%s`.' % (leaf._v_name, full_name))
raise
def _prm_read_pandas(self, pd_node, load_dict, full_name):
"""Reads a DataFrame from dis.
:param pd_node:
hdf5 node stroing the pandas DataFrame
:param load_dict:
Dictionary to keep the loaded data in
:param full_name:
Full name of the parameter or result whose data is to be loaded
"""
try:
name = pd_node._v_name
pathname = pd_node._v_pathname
pandas_store = self._hdf5store
pandas_data = pandas_store.get(pathname)
load_dict[name] = pandas_data
except:
self._logger.error('Failed loading `%s` of `%s`.' % (pd_node._v_name, full_name))
raise
def _prm_read_table(self, table_or_group, load_dict, full_name):
"""Reads a non-nested PyTables table column by column and created a new ObjectTable for
the loaded data.
:param table_or_group:
PyTables table to read from or a group containing subtables.
:param load_dict:
Dictionary where the loaded ObjectTable will be kept
:param full_name:
Full name of the parameter or result whose data is to be loaded
"""
try:
if self._all_get_from_attrs(table_or_group, HDF5StorageService.SPLIT_TABLE):
table_name = table_or_group._v_name
data_type_table_name = table_name + '__' + HDF5StorageService.STORAGE_TYPE
data_type_table = table_or_group._v_children[data_type_table_name]
data_type_dict = {}
for row in data_type_table:
fieldname = str(row['field_name'])
data_type_dict[fieldname] = str(row['data_type'])
for sub_table in table_or_group:
sub_table_name = sub_table._v_name
if sub_table_name == data_type_table_name:
continue
for colname in sub_table.colnames:
# Read Data column by column
col = sub_table.col(colname)
data_list = list(col)
prefix = HDF5StorageService.FORMATTED_COLUMN_PREFIX % colname
for idx, data in enumerate(data_list):
# Recall original type of data
data, type_changed = self._all_recall_native_type(data,
PTItemMock(
data_type_dict),
prefix)
if type_changed:
data_list[idx] = data
else:
break
# Construct or insert into an ObjectTable
if table_name in load_dict:
load_dict[table_name][colname] = data_list
else:
load_dict[table_name] = ObjectTable(data={colname: data_list})
else:
table_name = table_or_group._v_name
for colname in table_or_group.colnames:
# Read Data column by column
col = table_or_group.col(colname)
data_list = list(col)
prefix = HDF5StorageService.FORMATTED_COLUMN_PREFIX % colname
for idx, data in enumerate(data_list):
# Recall original type of data
data, type_changed = self._all_recall_native_type(data, table_or_group,
prefix)
if type_changed:
data_list[idx] = data
else:
break
# Construct or insert into an ObjectTable
if table_name in load_dict:
load_dict[table_name][colname] = data_list
else:
load_dict[table_name] = ObjectTable(data={colname: data_list})
except:
self._logger.error(
'Failed loading `%s` of `%s`.' % (table_or_group._v_name, full_name))
raise
def _prm_read_array(self, array, load_dict, full_name):
"""Reads data from an array or carray
:param array:
PyTables array or carray to read from
:param load_dict:
Dictionary where the loaded ObjectTable will be kept
:param full_name:
Full name of the parameter or result whose data is to be loaded
"""
try:
result = array.read()
# Recall original data types
result, dummy = self._all_recall_native_type(result, array,
HDF5StorageService.DATA_PREFIX)
load_dict[array._v_name] = result
except:
self._logger.error('Failed loading `%s` of `%s`.' % (array._v_name, full_name))
raise