Source code for pypet.utils.mpwrappers

"""Module containing wrappers for multiprocessing"""

__author__ = 'Robert Meyer', 'Mehmet Nevvaf Timur'


from threading import ThreadError
import queue
import pickle
try:
    import zmq
except ImportError:
    zmq = None

from collections import deque
import copy as cp
import gc
import sys
from threading import Thread
import time
import os
import socket

import pypet.pypetconstants as pypetconstants
from pypet.pypetlogging import HasLogger
from pypet.utils.decorators import retry
from pypet.utils.helpful_functions import is_ipv6


class MultiprocWrapper(object):
    """Abstract class definition of a Wrapper.

    Note that only storing is required, loading is optional.

    ABSTRACT: Needs to be defined in subclass

    """
    @property
    def is_open(self):
        """ Normally the file is opened and closed after each insertion.

        However, the storage service may provide to keep the store open and signals
        this via this property.

        """
        return False

    @property
    def multiproc_safe(self):
        """This wrapper guarantees multiprocessing safety"""
        return True

    def store(self, *args, **kwargs):
        raise NotImplementedError('Implement this!')


class ZMQServer(HasLogger):
    """ Generic zmq server """

    PING = 'PING'  # for connection testing
    PONG = 'PONG'  # for connection testing
    DONE = 'DONE'  # signals stopping of server
    CLOSED = 'CLOSED'  # signals closing of server

    def __init__(self, url="tcp://127.0.0.1:7777"):
        self._url = url  # server url
        self._set_logger()
        self._context = None
        self._socket = None

    def _start(self):
        self._logger.info('Starting Server at `%s`' % self._url)
        self._context = zmq.Context()
        self._socket = self._context.socket(zmq.REP)
        self._socket.ipv6 = is_ipv6(self._url)
        self._socket.bind(self._url)

    def _close(self):
        self._logger.info('Closing Server')
        self._socket.close()
        self._context.term()


[docs]class LockerServer(ZMQServer): """ Manages a database of locks """ LOCK = 'LOCK' # command for locking a lock RELEASE_ERROR = 'RELEASE_ERROR' # signals unsuccessful attempt to unlock MSG_ERROR = 'MSG_ERROR' # signals error in decoding client request UNLOCK = 'UNLOCK' # command for unlocking a lock RELEASED = 'RELEASED' # signals successful unlocking LOCK_ERROR = 'LOCK_ERROR' # signals unsuccessful attempt to lock GO = 'GO' # signals successful locking and and allwos continuing of client WAIT = 'WAIT' # signals lock is already in use and client has to wait for release DELIMITER = ':::' # delimiter to split messages DEFAULT_LOCK = '_DEFAULT_' # default lock name def __init__(self, url="tcp://127.0.0.1:7777"): super(LockerServer, self).__init__(url) self._locks = {} # lock DB, format 'lock_name': ('client_id', 'request_id') def _pre_respond_hook(self, response): """ Hook that can be used to temper with the server before responding :param response: Response to be send :return: Boolean value if response should be send or not """ return True def _lock(self, name, client_id, request_id): """Hanldes locking of locks If a lock is already locked sends a WAIT command, else LOCKs it and sends GO. Complains if a given client re-locks a lock without releasing it before. """ if name in self._locks: other_client_id, other_request_id = self._locks[name] if other_client_id == client_id: response = (self.LOCK_ERROR + self.DELIMITER + 'Re-request of lock `%s` (old request id `%s`) by `%s` ' '(request id `%s`)' % (name, client_id, other_request_id, request_id)) self._logger.warning(response) return response else: return self.WAIT else: self._locks[name] = (client_id, request_id) return self.GO def _unlock(self, name, client_id, request_id): """Handles unlocking Complains if a non-existent lock should be released or if a lock should be released that was acquired by another client before. """ if name in self._locks: other_client_id, other_request_id = self._locks[name] if other_client_id != client_id: response = (self.RELEASE_ERROR + self.DELIMITER + 'Lock `%s` was acquired by `%s` (old request id `%s`) and not by ' '`%s` (request id `%s`)' % (name, other_client_id, other_request_id, client_id, request_id)) self._logger.error(response) return response else: del self._locks[name] return self.RELEASED else: response = (self.RELEASE_ERROR + self.DELIMITER + 'Lock `%s` cannot be found in database (client id `%s`, ' 'request id `%s`)' % (name, client_id, request_id)) self._logger.error(response) return response
[docs] def run(self): """Runs server""" try: self._start() running = True while running: msg = '' name = '' client_id = '' request_id = '' request = self._socket.recv_string() self._logger.log(1, 'Recevied REQ `%s`', request) split_msg = request.split(self.DELIMITER) if len(split_msg) == 4: msg, name, client_id, request_id = split_msg if msg == self.LOCK: response = self._lock(name, client_id, request_id) elif msg == self.UNLOCK: response = self._unlock(name, client_id, request_id) elif msg == self.PING: response = self.PONG elif msg == self.DONE: response = self.CLOSED running = False else: response = (self.MSG_ERROR + self.DELIMITER + 'Request `%s` not understood ' '(or wrong number of delimiters)' % request) self._logger.error(response) respond = self._pre_respond_hook(response) if respond: self._logger.log(1, 'Sending REP `%s` to `%s` (request id `%s`)', response, client_id, request_id) self._socket.send_string(response) # Close everything in the end self._close() except Exception: self._logger.exception('Crashed Lock Server!') raise
[docs]class TimeOutLockerServer(LockerServer): """ Lock Server where each lock is valid only for a fixed period of time. """ def __init__(self, url, timeout): super(TimeOutLockerServer, self).__init__(url) self._timeout = timeout self._timeout_locks = {} def _lock(self, name, client_id, request_id): """Handles locking Locking time is stored to determine time out. If a lock is timed out it can be acquired by a different client. """ if name in self._locks: other_client_id, other_request_id, lock_time = self._locks[name] if other_client_id == client_id: response = (self.LOCK_ERROR + self.DELIMITER + 'Re-request of lock `%s` (old request id `%s`) by `%s` ' '(request id `%s`)' % (name, client_id, other_request_id, request_id)) self._logger.warning(response) return response else: current_time = time.time() if current_time - lock_time < self._timeout: return self.WAIT else: response = (self.GO + self.DELIMITER + 'Lock `%s` by `%s` (old request id `%s) ' 'timed out' % (name, other_client_id, other_request_id)) self._logger.info(response) self._locks[name] = (client_id, request_id, time.time()) self._timeout_locks[(name, other_client_id)] = (request_id, lock_time) return response else: self._locks[name] = (client_id, request_id, time.time()) return self.GO def _unlock(self, name, client_id, request_id): """Handles unlocking""" if name in self._locks: other_client_id, other_request_id, lock_time = self._locks[name] if other_client_id != client_id: response = (self.RELEASE_ERROR + self.DELIMITER + 'Lock `%s` was acquired by `%s` (old request id `%s`) and not by ' '`%s` (request id `%s`)' % (name, other_client_id, other_request_id, client_id, request_id)) self._logger.error(response) return response else: del self._locks[name] return self.RELEASED elif (name, client_id) in self._timeout_locks: other_request_id, lock_time = self._timeout_locks[(name, client_id)] timeout = time.time() - lock_time - self._timeout response = (self.RELEASE_ERROR + self.DELIMITER + 'Lock `%s` timed out %f seconds ago (client id `%s`, ' 'old request id `%s`)' % (name, timeout, client_id, other_request_id)) return response else: response = (self.RELEASE_ERROR + self.DELIMITER + 'Lock `%s` cannot be found in database (client id `%s`, ' 'request id `%s`)' % (name, client_id, request_id)) self._logger.warning(response) return response
class ReliableClient(HasLogger): """Implements a reliable client that reconnects on server failure""" SLEEP = 0.01 # Sleep time before reconnect in seconds RETRIES = 9 # Number of reconnect retries TIMEOUT = 2222 # Waiting time to reconnect in seconds def __init__(self, url): self.url = url self._context = None self._socket = None self._poll = None self._set_logger() def __getstate__(self): result_dict = super(ReliableClient, self).__getstate__() # Do not pickle zmq data result_dict['_context'] = None result_dict['_socket'] = None result_dict['_poll'] = None return result_dict def send_done(self): """Notifies the Server to shutdown""" self.start(test_connection=False) self._logger.debug('Sending shutdown signal') self._req_rep(ZMQServer.DONE) def test_ping(self): """Connection test""" self.start(test_connection=False) response = self._req_rep(ZMQServer.PING) if response != ZMQServer.PONG: raise RuntimeError('Connection Error to LockServer') def finalize(self): """Closes socket and terminates context NO-OP if already closed. """ if self._context is not None: if self._socket is not None: self._close_socket(confused=False) self._context.term() self._context = None self._poll = None def start(self, test_connection=True): """Starts connection to server if not existent. NO-OP if connection is already established. Makes ping-pong test as well if desired. """ if self._context is None: self._logger.debug('Starting Client') self._context = zmq.Context() self._poll = zmq.Poller() self._start_socket() if test_connection: self.test_ping() def _start_socket(self): self._socket = self._context.socket(zmq.REQ) self._socket.ipv6 = is_ipv6(self.url) self._socket.connect(self.url) self._poll.register(self._socket, zmq.POLLIN) def _close_socket(self, confused=False): if confused: self._socket.setsockopt(zmq.LINGER, 0) self._socket.close() self._poll.unregister(self._socket) self._socket = None def __del__(self): # For Python 3.4 to avoid dead-lock due to wrong object clearing # i.e. deleting context before socket self.finalize() def _req_rep(self, request): """Returns server response on `request_sketch`""" return self._req_rep_retry(request)[0] def _req_rep_retry(self, request): """Returns response and number of retries""" retries_left = self.RETRIES while retries_left: self._logger.log(1, 'Sending REQ `%s`', request) self._send_request(request) socks = dict(self._poll.poll(self.TIMEOUT)) if socks.get(self._socket) == zmq.POLLIN: response = self._receive_response() self._logger.log(1, 'Received REP `%s`', response) return response, self.RETRIES - retries_left else: self._logger.debug('No response from server (%d retries left)' % retries_left) self._close_socket(confused=True) retries_left -= 1 if retries_left == 0: raise RuntimeError('Server seems to be offline!') time.sleep(self.SLEEP) self._start_socket() def _send_request(self, request): """Actual sending of the request over network""" self._socket.send_string(request) def _receive_response(self): """Actual receiving of response""" return self._socket.recv_string()
[docs]class LockerClient(ReliableClient): """ Implements a Lock by requesting lock information from LockServer""" def __init__(self, url='tcp://127.0.0.1:7777', lock_name=LockerServer.DEFAULT_LOCK): super(LockerClient, self).__init__(url) self.lock_name = lock_name self.id = None def __getstate__(self): result_dict = super(LockerClient, self).__getstate__() result_dict['id'] = None return result_dict
[docs] def start(self, test_connection=True): if self._context is None: self.id = self._get_id() cls = self.__class__ self._set_logger('%s.%s_%s' % (cls.__module__, cls.__name__, self.id)) super(LockerClient, self).start(test_connection)
@staticmethod def _get_id(): return socket.getfqdn().replace(LockerServer.DELIMITER, '-') + '__' + str(os.getpid()) @staticmethod def _get_request_id(): return str(time.time()).replace(LockerServer.DELIMITER, '-') def _compose_request(self, request_sketch): request = (request_sketch + LockerServer.DELIMITER + self.lock_name + LockerServer.DELIMITER + self.id + LockerServer.DELIMITER + self._get_request_id()) return request
[docs] def acquire(self): """Acquires lock and returns `True` Blocks until lock is available. """ self.start(test_connection=False) while True: str_response, retries = self._req_rep_retry(LockerServer.LOCK) response = str_response.split(LockerServer.DELIMITER) if response[0] == LockerServer.GO: return True elif response[0] == LockerServer.LOCK_ERROR and retries > 0: # Message was sent but Server response was lost and we tried again self._logger.error(str_response + '; Probably due to retry') return True elif response[0] == LockerServer.WAIT: time.sleep(self.SLEEP) else: raise RuntimeError('Response `%s` not understood' % response)
[docs] def release(self): """Releases lock""" # self.start(test_connection=False) str_response, retries = self._req_rep_retry(LockerServer.UNLOCK) response = str_response.split(LockerServer.DELIMITER) if response[0] == LockerServer.RELEASED: pass # Everything is fine elif response[0] == LockerServer.RELEASE_ERROR and retries > 0: # Message was sent but Server response was lost and we tried again self._logger.error(str_response + '; Probably due to retry') else: raise RuntimeError('Response `%s` not understood' % response)
def _req_rep_retry(self, request): request = self._compose_request(request) return super(LockerClient, self)._req_rep_retry(request)
class QueuingServerMessageListener(ZMQServer): """ Manages the listening requests""" SPACE = 'SPACE' # for space in the queue DATA = 'DATA' # for sending data SPACE_AVAILABLE = 'SPACE_AVAILABLE' SPACE_NOT_AVAILABLE = 'SPACE_NOT_AVAILABLE' STORING = 'STORING' def __init__(self, url, queue, queue_maxsize): super(QueuingServerMessageListener, self).__init__(url) self.queue = queue if queue_maxsize == 0: queue_maxsize = float('inf') self.queue_maxsize = queue_maxsize def listen(self): """ Handles listening requests from the client. There are 4 types of requests: 1- Check space in the queue 2- Tests the socket 3- If there is a space, it sends data 4- after data is sent, puts it to queue for storing """ count = 0 self._start() while True: result = self._socket.recv_pyobj() if isinstance(result, tuple): request, data = result else: request = result data = None if request == self.SPACE: if self.queue.qsize() + count < self.queue_maxsize: self._socket.send_string(self.SPACE_AVAILABLE) count += 1 else: self._socket.send_string(self.SPACE_NOT_AVAILABLE) elif request == self.PING: self._socket.send_string(self.PONG) elif request == self.DATA: self._socket.send_string(self.STORING) self.queue.put(data) count -= 1 elif request == self.DONE: self._socket.send_string(ZMQServer.CLOSED) self.queue.put(('DONE', [], {})) self._close() break else: raise RuntimeError('I did not understand your request %s' % request) class QueuingServer(HasLogger): """ Implements server architecture for Queueing""" def __init__(self, url, storage_service, queue_maxsize, gc_interval): self._url = url self._storage_service = storage_service self._queue_maxsize = queue_maxsize self._gc_interval = gc_interval def run(self): main_queue = queue.Queue(maxsize=self._queue_maxsize) server_message_listener = QueuingServerMessageListener(self._url, main_queue, self._queue_maxsize) storage_writer = QueueStorageServiceWriter(self._storage_service, main_queue, self._gc_interval) server_queue = Thread(target=server_message_listener.listen, args=()) server_queue.start() storage_writer.run() server_queue.join() class QueuingClient(ReliableClient): """ Manages the returning requests""" def put(self, data, block=True): """ If there is space it sends data to server If no space in the queue It returns the request in every 10 millisecond until there will be space in the queue. """ self.start(test_connection=False) while True: response = self._req_rep(QueuingServerMessageListener.SPACE) if response == QueuingServerMessageListener.SPACE_AVAILABLE: self._req_rep((QueuingServerMessageListener.DATA, data)) break else: time.sleep(0.01) def _send_request(self, request): return self._socket.send_pyobj(request) class ForkDetector(HasLogger): def _detect_fork(self): """Detects if lock client was forked. Forking is detected by comparing the PID of the current process with the stored PID. """ if self._pid is None: self._pid = os.getpid() if self._context is not None: current_pid = os.getpid() if current_pid != self._pid: self._logger.debug('Fork detected: My pid `%s` != os pid `%s`. ' 'Restarting connection.' % (str(self._pid), str(current_pid))) self._context = None self._pid = current_pid class ForkAwareQueuingClient(QueuingClient, ForkDetector): """ Queuing Client can detect forking of process. In this case the context and socket are restarted. """ def __init__(self, url='tcp://127.0.0.1:22334'): super(ForkAwareQueuingClient, self).__init__(url) self._pid = None def __getstate__(self): result_dict = super(ForkAwareQueuingClient, self).__getstate__() result_dict['_pid'] = None return result_dict def start(self, test_connection=True): self._detect_fork() super(ForkAwareQueuingClient, self).start(test_connection)
[docs]class ForkAwareLockerClient(LockerClient, ForkDetector): """Locker Client that can detect forking of processes. In this case the context and socket are restarted. """ def __init__(self, url='tcp://127.0.0.1:7777', lock_name=LockerServer.DEFAULT_LOCK): super(ForkAwareLockerClient, self).__init__(url, lock_name) self._pid = None def __getstate__(self): result_dict = super(ForkAwareLockerClient, self).__getstate__() result_dict['_pid'] = None return result_dict
[docs] def start(self, test_connection=True): """Checks for forking and starts/restarts if desired""" self._detect_fork() super(ForkAwareLockerClient, self).start(test_connection)
[docs]class QueueStorageServiceSender(MultiprocWrapper, HasLogger): """ For multiprocessing with :const:`~pypet.pypetconstants.WRAP_MODE_QUEUE`, replaces the original storage service. All storage requests are send over a queue to the process running the :class:`~pypet.storageservice.QdebugueueStorageServiceWriter`. Does not support loading of data! """ def __init__(self, storage_queue=None): self.queue = storage_queue self.pickle_queue = True self._set_logger() def __getstate__(self): result = super(QueueStorageServiceSender, self).__getstate__() if not self.pickle_queue: result['queue'] = None return result def load(self, *args, **kwargs): raise NotImplementedError('Queue wrapping does not support loading. If you want to ' 'load data in a multiprocessing environment, use a Lock ' 'wrapping.') @retry(9, Exception, 0.01, 'pypet.retry') def _put_on_queue(self, to_put): """Puts data on queue""" old = self.pickle_queue self.pickle_queue = False try: self.queue.put(to_put, block=True) finally: self.pickle_queue = old
[docs] def store(self, *args, **kwargs): """Puts data to store on queue. Note that the queue will no longer be pickled if the Sender is pickled. """ self._put_on_queue(('STORE', args, kwargs))
[docs] def send_done(self): """Signals the writer that it can stop listening to the queue""" self._put_on_queue(('DONE', [], {}))
class LockAcquisition(HasLogger): """Abstract class to allow lock acquisition and release. Assumes that implementing classes have a ``lock``, ``is_locked`` and ``is_open`` attribute. Requires a ``_logger`` for error messaging. """ @retry(9, TypeError, 0.01, 'pypet.retry') def acquire_lock(self): if not self.is_locked: self.is_locked = self.lock.acquire() @retry(9, TypeError, 0.01, 'pypet.retry') def release_lock(self): if self.is_locked and not self.is_open: try: self.lock.release() except (ValueError, ThreadError): self._logger.exception('Could not release lock, ' 'probably has been released already!') self.is_locked = False
[docs]class PipeStorageServiceSender(MultiprocWrapper, LockAcquisition): def __init__(self, storage_connection=None, lock=None): self.conn = storage_connection self.lock = lock self.is_locked = False self._set_logger() def __getstate__(self): # result = super(PipeStorageServiceSender, self).__getstate__() result = self.__dict__.copy() result['conn'] = None result['lock'] = None return result def load(self, *args, **kwargs): raise NotImplementedError('Pipe wrapping does not support loading. If you want to ' 'load data in a multiprocessing environment, use the Lock ' 'wrapping.') @retry(9, Exception, 0.01, 'pypet.retry') def _put_on_pipe(self, to_put): """Puts data on queue""" self.acquire_lock() self._send_chunks(to_put) self.release_lock() def _make_chunk_iterator(self, to_chunk, chunksize): return (to_chunk[i:i + chunksize] for i in range(0, len(to_chunk), chunksize)) def _send_chunks(self, to_put): put_dump = pickle.dumps(to_put) data_size = sys.getsizeof(put_dump) nchunks = data_size / 20000000. # chunks with size 20 MB chunksize = int(len(put_dump) / nchunks) chunk_iterator = self._make_chunk_iterator(put_dump, chunksize) for chunk in chunk_iterator: # print('S: sending False') self.conn.send(False) # print('S: sent False') # print('S: sending chunk') self.conn.send_bytes(chunk) # print('S: sent chunk %s' % chunk[0:10]) # print('S: recv signal') self.conn.recv() # wait for signal that message was received # print('S: read signal') # print('S: sending True') self.conn.send(True) # print('S: sent True') # print('S: recving last signal') self.conn.recv() # wait for signal that message was received # print('S: read last signal') # print('S; DONE SENDING data')
[docs] def store(self, *args, **kwargs): """Puts data to store on queue. Note that the queue will no longer be pickled if the Sender is pickled. """ self._put_on_pipe(('STORE', args, kwargs))
[docs] def send_done(self): """Signals the writer that it can stop listening to the queue""" self._put_on_pipe(('DONE', [], {}))
class StorageServiceDataHandler(HasLogger): """Class that can store data via a storage service, needs to be sub-classed to receive data""" def __init__(self, storage_service, gc_interval=None): self._storage_service = storage_service self._trajectory_name = '' self.gc_interval = gc_interval self.operation_counter = 0 self._set_logger() def __repr__(self): return '<%s wrapping Storage Service %s>' % (self.__class__.__name__, repr(self._storage_service)) def _open_file(self): self._storage_service.store(pypetconstants.OPEN_FILE, None, trajectory_name=self._trajectory_name) self._logger.info('Opened the hdf5 file.') def _close_file(self): self._storage_service.store(pypetconstants.CLOSE_FILE, None) self._logger.info('Closed the hdf5 file.') def _check_and_collect_garbage(self): if self.gc_interval and self.operation_counter % self.gc_interval == 0: collected = gc.collect() self._logger.debug('Garbage Collection: Found %d unreachable items.' % collected) self.operation_counter += 1 def _handle_data(self, msg, args, kwargs): """Handles data and returns `True` or `False` if everything is done.""" stop = False try: if msg == 'DONE': stop = True elif msg == 'STORE': if 'msg' in kwargs: store_msg = kwargs.pop('msg') else: store_msg = args[0] args = args[1:] if 'stuff_to_store' in kwargs: stuff_to_store = kwargs.pop('stuff_to_store') else: stuff_to_store = args[0] args = args[1:] trajectory_name = kwargs['trajectory_name'] if self._trajectory_name != trajectory_name: if self._storage_service.is_open: self._close_file() self._trajectory_name = trajectory_name self._open_file() self._storage_service.store(store_msg, stuff_to_store, *args, **kwargs) self._storage_service.store(pypetconstants.FLUSH, None) self._check_and_collect_garbage() else: raise RuntimeError('You queued something that was not ' 'intended to be queued. I did not understand message ' '`%s`.' % msg) except Exception: self._logger.exception('ERROR occurred during storing!') time.sleep(0.01) pass # We don't want to kill the queue process in case of an error return stop def run(self): """Starts listening to the queue.""" try: while True: msg, args, kwargs = self._receive_data() stop = self._handle_data(msg, args, kwargs) if stop: break finally: if self._storage_service.is_open: self._close_file() self._trajectory_name = '' def _receive_data(self): raise NotImplementedError('Implement this!')
[docs]class QueueStorageServiceWriter(StorageServiceDataHandler): """Wrapper class that listens to the queue and stores queue items via the storage service.""" def __init__(self, storage_service, storage_queue, gc_interval=None): super(QueueStorageServiceWriter, self).__init__(storage_service, gc_interval=gc_interval) self.queue = storage_queue @retry(9, Exception, 0.01, 'pypet.retry') def _receive_data(self): """Gets data from queue""" result = self.queue.get(block=True) if hasattr(self.queue, 'task_done'): self.queue.task_done() return result
[docs]class PipeStorageServiceWriter(StorageServiceDataHandler): """Wrapper class that listens to the queue and stores queue items via the storage service.""" def __init__(self, storage_service, storage_connection, max_buffer_size=10, gc_interval=None): super(PipeStorageServiceWriter, self).__init__(storage_service, gc_interval=gc_interval) self.conn = storage_connection if max_buffer_size == 0: # no maximum buffer size max_buffer_size = float('inf') self.max_size = max_buffer_size self._buffer = deque() self._set_logger() def _read_chunks(self): chunks = [] stop = False while not stop: # print('W: recving stop') stop = self.conn.recv() # print('W: read stop = %s' % str(stop)) if not stop: # print('W: recving chunk') chunk = self.conn.recv_bytes() chunks.append(chunk) # print('W: read chunk') # print('W: sending True') self.conn.send(True) # print('W: sent True') # print('W: reconstructing data') to_load = b''.join(chunks) del chunks # free unnecessary memory try: data = pickle.loads(to_load) except Exception: # We don't want to crash the storage service if reconstruction # due to errors fails self._logger.exception('Could not reconstruct pickled data.') data = None return data @retry(9, Exception, 0.01, 'pypet.retry') def _receive_data(self): """Gets data from pipe""" while True: while len(self._buffer) < self.max_size and self.conn.poll(): data = self._read_chunks() if data is not None: self._buffer.append(data) if len(self._buffer) > 0: return self._buffer.popleft()
[docs]class LockWrapper(MultiprocWrapper, LockAcquisition): """For multiprocessing in :const:`~pypet.pypetconstants.WRAP_MODE_LOCK` mode, augments a storage service with a lock. The lock is acquired before storage or loading and released afterwards. """ def __init__(self, storage_service, lock=None): self._storage_service = storage_service self.lock = lock self.is_locked = False self.pickle_lock = True self._set_logger() def __getstate__(self): result = super(LockWrapper, self).__getstate__() if not self.pickle_lock: result['lock'] = None return result def __repr__(self): return '<%s wrapping Storage Service %s>' % (self.__class__.__name__, repr(self._storage_service)) @property def is_open(self): """ Normally the file is opened and closed after each insertion. However, the storage service may provide the option to keep the store open and signals this via this property. """ return self._storage_service.is_open @property def multiproc_safe(self): """Usually storage services are not supposed to be multiprocessing safe""" return True
[docs] def store(self, *args, **kwargs): """Acquires a lock before storage and releases it afterwards.""" try: self.acquire_lock() return self._storage_service.store(*args, **kwargs) finally: if self.lock is not None: try: self.release_lock() except RuntimeError: self._logger.error('Could not release lock `%s`!' % str(self.lock))
def __del__(self): # In order to prevent a dead-lock in case of error, # we release the lock once again self.release_lock()
[docs] def load(self, *args, **kwargs): """Acquires a lock before loading and releases it afterwards.""" try: self.acquire_lock() return self._storage_service.load(*args, **kwargs) finally: if self.lock is not None: try: self.release_lock() except RuntimeError: self._logger.error('Could not release lock `%s`!' % str(self.lock))
[docs]class ReferenceWrapper(MultiprocWrapper): """Wrapper that just keeps references to data to be stored.""" def __init__(self): self.references = {}
[docs] def store(self, msg, stuff_to_store, *args, **kwargs): """Simply keeps a reference to the stored data """ trajectory_name = kwargs['trajectory_name'] if trajectory_name not in self.references: self.references[trajectory_name] = [] self.references[trajectory_name].append((msg, cp.copy(stuff_to_store), args, kwargs))
[docs] def load(self, *args, **kwargs): """Not implemented""" raise NotImplementedError('Reference wrapping does not support loading. If you want to ' 'load data in a multiprocessing environment, use a Lock ' 'wrapping.')
def free_references(self): self.references = {}
[docs]class ReferenceStore(HasLogger): """Class that can store references""" def __init__(self, storage_service, gc_interval=None): self._storage_service = storage_service self.gc_interval = gc_interval self.operation_counter = 0 self._set_logger() def _check_and_collect_garbage(self): if self.gc_interval and self.operation_counter % self.gc_interval == 0: collected = gc.collect() self._logger.debug('Garbage Collection: Found %d unreachable items.' % collected) self.operation_counter += 1
[docs] def store_references(self, references): """Stores references to disk and may collect garbage.""" for trajectory_name in references: self._storage_service.store(pypetconstants.LIST, references[trajectory_name], trajectory_name=trajectory_name) self._check_and_collect_garbage()