Source code for pypet.brian2.parameter

"""Module containing results and parameters that can be used to store `BRIAN2 data`_.

Parameters handling BRIAN2 data are instantiated by the
:class:`~pypet.brian2.parameter.Brian2Parameter` class for any BRIAN2 Quantity.

The :class:`~pypet.brian2.parameter.Brian2Result` can store BRIAN2 Quantities
and the :class:`~pypet.brian2.parameter.Brian2MonitorResult` extracts data from
BRIAN2 Monitors.

.. _`BRIAN2 data`: http://brian2.readthedocs.org/

"""

__author__ = ['Henri Bunting', 'Robert Meyer']

import ast

import brian2.numpy_ as np
from brian2.units.fundamentalunits import Quantity, get_dimensions
from brian2.monitors import SpikeMonitor, StateMonitor, PopulationRateMonitor
import brian2.units.allunits as allunits

import pypet.pypetexceptions as pex
from pypet.parameter import Parameter, Result, ObjectTable


ALLUNITS = {}
for name in allunits.__all__:
    ALLUNITS[name] =  getattr(allunits, name)


def get_unit_fast(x):
    """ Return a `Quantity` with value 1 and the same dimensions. """
    return Quantity.with_dimensions(1, get_dimensions(x))


def unit_from_expression(expr):
    """Takes a unit string like ``'1. * volt'`` and returns the BRIAN2 unit."""
    if expr == '1':
        return get_unit_fast(1)
    elif isinstance(expr, str):
        mod = ast.parse(expr, mode='eval')
        expr = mod.body
        return unit_from_expression(expr)
    elif expr.__class__ is ast.Name:
        return ALLUNITS[expr.id]
    elif expr.__class__ is ast.Num or expr.__class__ is ast.Constant:
        return expr.n
    elif expr.__class__ is ast.UnaryOp:
        op = expr.op.__class__.__name__
        operand = unit_from_expression(expr.operand)
        if op=='USub':
            return -operand
        else:
            raise SyntaxError("Unsupported operation "+op)
    elif expr.__class__ is ast.BinOp:
        op = expr.op.__class__.__name__
        left = unit_from_expression(expr.left)
        right = unit_from_expression(expr.right)
        if op=='Add':
            u = left+right
        elif op=='Sub':
            u = left-right
        elif op=='Mult':
            u = left*right
        elif op=='Div':
            u = left/right
        elif op=='Pow':
            n = unit_from_expression(expr.right)
            u = left**n
        elif op=='Mod':
            u = left % right
        else:
            raise SyntaxError("Unsupported operation "+op)
        return u
    else:
        raise RuntimeError('You shall not pass')


[docs]class Brian2Parameter(Parameter): """A Parameter class that supports BRIAN2 Quantities. Note that only scalar BRIAN2 quantities are supported, lists, tuples or dictionaries of BRIAN2 quantities cannot be handled. Supports data for the standard :class:`~pypet.parameter.Parameter`, too. """ IDENTIFIER = '__brn2__' ''' Identification string stored into column title of hdf5 table''' __slots__ = () def __init__(self, full_name, data=None, comment=''): super(Brian2Parameter, self).__init__(full_name, data, comment)
[docs] def f_supports(self, data): """ Simply checks if data is supported """ if isinstance(data, Quantity): return True elif super(Brian2Parameter, self).f_supports(data): return True return False
def _values_of_same_type(self, val1, val2): if isinstance(val1, Quantity): try: if not val1.has_same_dimensions(val2): return False except TypeError: return False elif isinstance(val2, Quantity): try: if not val2.has_same_dimensions(val1): return False except TypeError: return False elif not super(Brian2Parameter, self)._values_of_same_type(val1, val2): return False return True def _store(self): if not isinstance(self._data, Quantity): return super(Brian2Parameter, self)._store() else: store_dict = {} unit = get_unit_fast(self._data) value = self._data/unit store_dict['data' + Brian2Parameter.IDENTIFIER] = ObjectTable(data={'value': [value], 'unit': [repr(unit)]}) if self.f_has_range(): value_list = [value_with_unit/unit for value_with_unit in self._explored_range] store_dict['explored_data' + Brian2Parameter.IDENTIFIER] = ObjectTable(data={'value': value_list}) self._locked = True return store_dict def _load(self, load_dict): if self.v_locked: raise pex.ParameterLockedException('Parameter `%s` is locked!' % self.v_full_name) try: data_table = load_dict['data' + Brian2Parameter.IDENTIFIER] unitstring = data_table['unit'][0] unit = unit_from_expression(unitstring) value = data_table['value'][0] self._data = value * unit if 'explored_data' + Brian2Parameter.IDENTIFIER in load_dict: explore_table = load_dict['explored_data' + Brian2Parameter.IDENTIFIER] value_col = explore_table['value'] explore_list = [value * unit for value in value_col] self._explored_range = explore_list self._explored = True except KeyError: super(Brian2Parameter, self)._load(load_dict) self._default = self._data self._locked = True
[docs]class Brian2Result(Result): """ A result class that can handle BRIAN2 quantities. Note that only scalar BRIAN2 quantities are supported, lists, tuples or dictionaries of BRIAN2 quantities cannot be handled. Supports also all data supported by the standard :class:`~pypet.parameter.Result`. """ IDENTIFIER = Brian2Parameter.IDENTIFIER ''' Identifier String to label brian result ''' __slots__ = () def __init__(self, full_name, *args, **kwargs): super(Brian2Result, self).__init__(full_name, *args, **kwargs)
[docs] def f_set_single(self, name, item): if Brian2Result.IDENTIFIER in name: raise AttributeError('Your result name contains the identifier for brian data,' ' please do not use %s in your result names.' % Brian2Result.IDENTIFIER) else: super(Brian2Result, self).f_set_single(name, item)
def _supports(self, data): """ Simply checks if data is supported """ if isinstance(data, Quantity): return True elif super(Brian2Result, self)._supports(data): return True return False def _store(self): store_dict = {} for key in self._data: val = self._data[key] if isinstance(val, Quantity): unit = get_unit_fast(val) value = val/unit # Potentially the results are very big in contrast to small parameters # Accordingly, an ObjectTable might not be the best choice after all for a result if isinstance(val, np.ndarray) and len(val.shape) == 0: # Convert 0-dimensional arrays to regular numpy floats value = np.float(value) store_dict[key + Brian2Result.IDENTIFIER + 'value'] = value store_dict[key + Brian2Result.IDENTIFIER + 'unit'] = repr(unit) else: store_dict[key] = val return store_dict def _load(self, load_dict): for key in load_dict: if Brian2Result.IDENTIFIER in key: new_key = key.split(Brian2Result.IDENTIFIER)[0] if new_key in self._data: # We already extracted the unit/value pair continue # Recreate the brain units from the vale as float and unit as string: unitstring = load_dict[new_key + Brian2Result.IDENTIFIER + 'unit'] unit = unit_from_expression(unitstring) value = load_dict[new_key + Brian2Result.IDENTIFIER +'value'] self._data[new_key] = value * unit else: self._data[key] = load_dict[key]
[docs]class Brian2MonitorResult(Brian2Result): """ A Result class that supports BRIAN2 monitors. Monitor attributes are extracted and added as results with the attribute names. Note the original monitors are NOT stored, only their attribute/property values are kept. Add monitor on `__init__` via `monitor=` or via `f_set(monitor=brian_monitor)` **IMPORTANT**: You can only use 1 result per monitor. Otherwise a 'TypeError' is thrown. Example: >>> brian_result = BrianMonitorResult('example.brian_test_test.mymonitor', monitor=SpikeMonitor(...), comment='Im a SpikeMonitor Example!') >>> brian_result.num_spikes 1337 Following monitors are supported and the following values are extraced: * PopulationRateMonitor * t The times of the bins. * rate An array of rates in Hz * source String representation of source * name The name of the monitor * SpikeMonitor * t The times of the spikes * i The neuron indices of the spikes * num_spikes Total number of spikes * record_variables If variables are recorded at spike time, this is the list of their names * "varname" Array of variable recorded at spike time * source String representation of source * name The name of the monitor * StateMonitor * t Recording times * record_variables List of recorded variable names * "varname" Array of variable recorded * source String representation of source * name The name of the monitor """ __slots__ = ('_monitor_type',) def __init__(self, full_name, *args, **kwargs): self._monitor_type = None super(Brian2MonitorResult, self).__init__(full_name, *args, **kwargs) def _store(self): store_dict = super(Brian2MonitorResult, self)._store() if self._monitor_type is not None: store_dict['monitor_type'] = self._monitor_type return store_dict def _load(self, load_dict): if 'monitor_type' in load_dict: self._monitor_type = load_dict.pop('monitor_type') super(Brian2MonitorResult, self)._load(load_dict) @property def v_monitor_type(self): """ The type of the stored monitor. Each MonitorResult can only manage a single Monitor. """ return self._monitor_type
[docs] def f_set_single(self, name, item): """ To add a monitor use `f_set_single('monitor', brian_monitor)`. Otherwise `f_set_single` works similar to :func:`~pypet.parameter.Result.f_set_single`. """ if type(item) in [SpikeMonitor, StateMonitor, PopulationRateMonitor]: if self.v_stored: self._logger.warning('You are changing an already stored result. If ' 'you not explicitly overwrite the data on disk, ' 'this change might be lost and not propagated to disk.') self._extract_monitor_data(item) else: super(Brian2MonitorResult, self).f_set_single(name, item)
def _extract_monitor_data(self, monitor): if self._monitor_type is not None: raise TypeError('Your result `%s` already extracted data from a `%s` monitor.' ' Please use a new empty result for a new monitor.') self._monitor_type = monitor.__class__.__name__ if isinstance(monitor, SpikeMonitor): self._extract_spike_monitor(monitor) elif isinstance(monitor, StateMonitor): self._extract_state_monitor(monitor) elif isinstance(monitor, PopulationRateMonitor): self._extract_population_rate_monitor(monitor) else: raise ValueError('Monitor Type %s is not supported (yet)' % str(type(monitor))) def _extract_state_monitor(self, monitor): self.f_set(record_variables=monitor.record_variables) self.f_set(record=monitor.record) self.f_set(when=monitor.when) self.f_set(source=str(monitor.source)) self.f_set(name=monitor.name) times=np.array(monitor.t[:]) if len(times) > 0: self.f_set(t=times) for varname in monitor.record_variables: val = getattr(monitor, varname) self.f_set(**{varname: val}) def _extract_spike_monitor(self, monitor): self.f_set(source=str(monitor.source)) self.f_set(num_spikes=monitor.num_spikes) self.f_set(when=monitor.when) self.f_set(name=monitor.name) if len(monitor.record_variables) > 0: self.f_set(record_variables=list(monitor.record_variables)) times = monitor.t[:] if len(times) > 0: self.f_set(t=times) self.f_set(i=monitor.i[:]) for varname in monitor.record_variables: val = getattr(monitor, varname) self.f_set(**{varname: val[:]}) def _extract_population_rate_monitor(self, monitor): times = monitor.t[:] self.f_set(source=str(monitor.source)) self.f_set(name=monitor.name) if len(times) > 0: self.f_set(t=times) self.f_set(rate=monitor.rate[:])