Source code for pypet.pypetlogging

"""Module containing utilities for logging."""

__author__ = 'Robert Meyer'

try:
    import ConfigParser as cp
except ImportError:
    import configparser as cp
try:
    from StringIO import StringIO
except ImportError:
    from io import StringIO
import logging
from logging.config import fileConfig
try:
    from logging.config import dictConfig
except ImportError:
    from logutils.dictconfig import dictConfig
try:
    from logging import NullHandler
except ImportError:
    from logutils import NullHandler
import os
import math
import sys
import ast
import copy
import multiprocessing as multip
import functools

import pypet.pypetconstants as pypetconstants
import pypet.compat as compat
from pypet.utils.helpful_functions import progressbar
from pypet.slots import HasSlots


FILENAME_INDICATORS = set([pypetconstants.LOG_ENV,
                           pypetconstants.LOG_PROC,
                           pypetconstants.LOG_TRAJ,
                           pypetconstants.LOG_RUN,
                           '.log',
                           '.txt'])
"""Set of strings that mark a log file"""

LOGGING_DICT = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'file': {
            'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'
        },
        'stream': {
            'format': '%(processName)-10s %(name)s %(levelname)-8s %(message)s'
        }
    },
    'handlers': {
        'stream': {
            'class': 'logging.StreamHandler',
            'formatter': 'stream'
        },
        'file_main': {
            'class': 'logging.FileHandler',
            'formatter': 'file',
            'filename': os.path.join(pypetconstants.LOG_TRAJ,
                                     pypetconstants.LOG_ENV,
                                     'LOG.txt')
        },
        'file_error': {
            'class': 'logging.FileHandler',
            'formatter': 'file',
            'filename': os.path.join(pypetconstants.LOG_TRAJ,
                                     pypetconstants.LOG_ENV,
                                     'ERROR.txt'),
            'level': 'ERROR'
        }
    },
    'multiproc_formatters': {
        'file': {
            'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'
        },
    },
    'multiproc_handlers': {
        'file_main': {
            'class': 'logging.FileHandler',
            'formatter': 'file',
            'filename': os.path.join(pypetconstants.LOG_TRAJ,
                                     pypetconstants.LOG_ENV,
                                     '%s_%s_LOG.txt' % (pypetconstants.LOG_RUN,
                                                        pypetconstants.LOG_PROC))
        },
        'file_error': {
            'class': 'logging.FileHandler',
            'formatter': 'file',
            'filename': os.path.join(pypetconstants.LOG_TRAJ,
                                     pypetconstants.LOG_ENV,
                                     '%s_%s_ERROR.txt' % (pypetconstants.LOG_RUN,
                                                          pypetconstants.LOG_PROC)),
            'level': 'ERROR'
        }
    }
}
"""Dictionary containing the default configuration."""


def _change_logging_kwargs(kwargs):
    """ Helper function to turn the simple logging kwargs into a `log_config`."""
    log_folder = kwargs.pop('log_folder', 'logs')
    logger_names = kwargs.pop('logger_names', '')
    log_levels = kwargs.pop('log_levels', logging.INFO)

    if not isinstance(logger_names, (tuple, list)):
        logger_names = [logger_names]
    if not isinstance(log_levels, (tuple, list)):
        log_levels = [log_levels]
    if len(log_levels) == 1:
        log_levels = [log_levels[0] for _ in logger_names]

    # We don't want to manipulate the original dictionary
    dictionary = copy.deepcopy(LOGGING_DICT)

    # Add all handlers to all loggers
    for prefix in ('', 'multiproc_'):
        for handler_dict in dictionary[prefix + 'handlers'].values():
            if 'filename' in handler_dict:
                filename = os.path.join(log_folder, handler_dict['filename'])
                filename = os.path.normpath(filename)
                handler_dict['filename'] = filename
        dictionary[prefix + 'loggers'] = {}
        logger_dict = dictionary[prefix + 'loggers']
        for idx, logger_name in enumerate(logger_names):
            logger_dict[logger_name] = {
                'level': log_levels[idx],
                'handlers': list(dictionary[prefix + 'handlers'].keys())
            }

    kwargs['log_config'] = dictionary


def use_simple_logging(kwargs):
    """Checks if simple logging is requested"""
    return any([x in kwargs for x in ('log_folder', 'logger_names', 'log_levels')])


def simple_logging_config(func):
    """Decorator to allow a simple logging configuration.

    This encompasses giving a `log_folder`, `logger_names` as well as `log_levels`.

    """

    @functools.wraps(func)
    def new_func(self, *args, **kwargs):
        if use_simple_logging(kwargs):
            if 'log_config' in kwargs:
                raise ValueError('Please do not specify `log_config` '
                                 'if you want to use the simple '
                                 'way of providing logging configuration '
                                 '(i.e using `log_folder`, `logger_names` and/or `log_levels`).')
            _change_logging_kwargs(kwargs)

        return func(self, *args, **kwargs)

    return new_func


def try_make_dirs(filename):
    """ Tries to make directories for a given `filename`.

    Ignores any error but notifies via stderr.

    """
    try:
        dirname = os.path.dirname(filename)
        if not os.path.isdir(dirname):
            os.makedirs(dirname)
    except Exception as exc:
        sys.stderr.write('ERROR during log config file handling, could not create dirs for '
                         'filename `%s` because of: %s' % (filename, repr(exc)))


[docs]def rename_log_file(traj, filename, process_name=None): """ Renames a given `filename` with valid wildcard placements. :const:`~pypet.pypetconstants.LOG_ENV` ($env) is replaces by the name of the trajectory`s environment. :const:`~pypet.pypetconstants.LOG_TRAJ` ($traj) is replaced by the name of the trajectory. :const:`~pypet.pypetconstants.LOG_RUN` ($run) is replaced by the name of the current run. If the trajectory is not set to a run 'run_ALL' is used. :const:`~pypet.pypetconstants.LOG_SET` ($set) is replaced by the name of the current run set. If the trajectory is not set to a run 'run_set_ALL' is used. :const:`~pypet.pypetconstants.LOG_PROC` ($proc) is replaced by the name fo the current process. :param traj: A trajectory container :param filename: A filename string :param process_name: The name of the desired process. If `None` the name of the current process is taken determined by the multiprocessing module. :return: The new filename """ if pypetconstants.LOG_ENV in filename: env_name = traj.v_environment_name filename = filename.replace(pypetconstants.LOG_ENV, env_name) if pypetconstants.LOG_TRAJ in filename: traj_name = traj.v_name filename = filename.replace(pypetconstants.LOG_TRAJ, traj_name) if pypetconstants.LOG_RUN in filename: run_name = traj.f_wildcard('$') filename = filename.replace(pypetconstants.LOG_RUN, run_name) if pypetconstants.LOG_SET in filename: set_name = traj.f_wildcard('$set') filename = filename.replace(pypetconstants.LOG_SET, set_name) if pypetconstants.LOG_PROC in filename: if process_name is None: process_name = multip.current_process().name filename = filename.replace(pypetconstants.LOG_PROC, process_name) return filename
def get_strings(args): """Returns all valid python strings inside a given argument string.""" string_list = [] for elem in ast.walk(ast.parse(args)): if isinstance(elem, ast.Str): string_list.append(elem.s) return string_list
[docs]class HasLogger(HasSlots): """Abstract super class that automatically adds a logger to a class. To add a logger to a sub-class of yours simply call ``myobj._set_logger(name)``. If ``name=None`` the logger is chosen as follows: ``self._logger = logging.getLogger(self.__class.__.__module__ + '.' + self.__class__.__name__)`` The logger can be accessed via ``myobj._logger``. """ __slots__ = ('_logger',)
[docs] def __getstate__(self): """Called for pickling. Removes the logger to allow pickling and returns a copy of `__dict__`. """ state_dict = super(HasLogger, self).__getstate__() if '_logger' in state_dict: # Pickling does not work with loggers objects, so we just keep the logger's name: state_dict['_logger'] = self._logger.name return state_dict
[docs] def __setstate__(self, statedict): """Called after loading a pickle dump. Restores `__dict__` from `statedict` and adds a new logger. """ super(HasLogger, self).__setstate__(statedict) if '_logger' in statedict: # If we re-instantiate the component the logger attribute only contains a name, # so we also need to re-create the logger: self._set_logger(statedict['_logger'])
[docs] def _set_logger(self, name=None): """Adds a logger with a given `name`. If no name is given, name is constructed as `type(self).__name__`. """ if name is None: cls = self.__class__ name = '%s.%s' % (cls.__module__, cls.__name__) self._logger = logging.getLogger(name)
class LoggingManager(object): """ Manager taking care of all logging related issues. :param trajectory: Trajectory container of Mock :param log_config: Logging configuration Can be a a full name of an `ini` file. An already instantiated config parser, or a logging dictionary. :param log_stdout: If `stdout` should be logged. :param report_progress: How to report progress. """ def __init__(self, trajectory=None, log_config=None, log_stdout=False, report_progress=False): self.trajectory = trajectory self.log_config = log_config self._sp_config = None self._mp_config = None self.log_stdout = log_stdout self.report_progress = report_progress self._tools = [] self._null_handler = NullHandler() self._format_string = 'PROGRESS: Finished %d/%d runs ' # Format string for the progressbar def __getstate__(self): """ConfigParsers are not guaranteed to be picklable so we need to remove these.""" state_dict = self.__dict__.copy() if isinstance(state_dict['log_config'], cp.RawConfigParser): # Config Parsers are not guaranteed to be picklable state_dict['log_config'] = True return state_dict def show_progress(self, n, total_runs): """Displays a progressbar""" if self.report_progress: percentage, logger_name, log_level = self.report_progress if logger_name == 'print': logger = 'print' else: logger = logging.getLogger(logger_name) if n == 0: # Compute the number of digits and avoid log10(0) digits = int(math.log10(total_runs + 0.1)) + 1 self._format_string = 'PROGRESS: Finished %' + '%d' % digits + 'd/%d runs ' fmt_string = self._format_string % (n + 1, total_runs) + '%s' reprint = log_level == 0 progressbar(n, total_runs, percentage_step=percentage, logger=logger, log_level=log_level, fmt_string=fmt_string, reprint=reprint) def add_null_handler(self): """Adds a NullHandler to the root logger. This is simply added to avoid warnings that no logger has been configured. """ root = logging.getLogger() root.addHandler(self._null_handler) def remove_null_handler(self): """Removes the NullHandler from the root logger.""" root = logging.getLogger() root.removeHandler(self._null_handler) @staticmethod def tabula_rasa(): """Removes all loggers and logging handlers. """ erase_dict = {'disable_existing_loggers': False, 'version': 1} dictConfig(erase_dict) @staticmethod def _check_and_replace_parser_args(parser, section, option, rename_func, make_dirs=True): """ Searches for parser settings that define filenames. If such settings are found, they are renamed according to the wildcard rules. Moreover, it is also tried to create the corresponding folders. :param parser: A config parser :param section: A config section :param option: The section option :param rename_func: A function to rename found files :param make_dirs: If the directories of the file should be created. """ args = parser.get(section, option, raw=True) strings = get_strings(args) replace = False for string in strings: isfilename = any(x in string for x in FILENAME_INDICATORS) if isfilename: newstring = rename_func(string) if make_dirs: try_make_dirs(newstring) # To work with windows path specifications we need this replacement: raw_string = string.replace('\\', '\\\\') raw_newstring = newstring.replace('\\', '\\\\') args = args.replace(raw_string, raw_newstring) replace = True if replace: parser.set(section, option, args) @staticmethod def _parser_to_string_io(parser): """Turns a ConfigParser into a StringIO stream.""" memory_file = StringIO() parser.write(memory_file) memory_file.flush() memory_file.seek(0) return memory_file @staticmethod def _find_multiproc_options(parser): """ Searches for multiprocessing options within a ConfigParser. If such options are found, they are copied (without the `'multiproc_'` prefix) into a new parser. """ sections = parser.sections() if not any(section.startswith('multiproc_') for section in sections): return None mp_parser = NoInterpolationParser() for section in sections: if section.startswith('multiproc_'): new_section = section.replace('multiproc_', '') mp_parser.add_section(new_section) options = parser.options(section) for option in options: val = parser.get(section, option, raw=True) mp_parser.set(new_section, option, val) return mp_parser @staticmethod def _find_multiproc_dict(dictionary): """ Searches for multiprocessing options in a given `dictionary`. If found they are copied (without the `'multiproc_'` prefix) into a new dictionary """ if not any(key.startswith('multiproc_') for key in dictionary.keys()): return None mp_dictionary = {} for key in dictionary.keys(): if key.startswith('multiproc_'): new_key = key.replace('multiproc_', '') mp_dictionary[new_key] = dictionary[key] mp_dictionary['version'] = dictionary['version'] if 'disable_existing_loggers' in dictionary: mp_dictionary['disable_existing_loggers'] = dictionary['disable_existing_loggers'] return mp_dictionary def check_log_config(self): """ Checks and converts all settings if necessary passed to the Manager. Searches for multiprocessing options as well. """ if self.report_progress: if self.report_progress is True: self.report_progress = (5, 'pypet', logging.INFO) elif isinstance(self.report_progress, (int, float)): self.report_progress = (self.report_progress, 'pypet', logging.INFO) elif isinstance(self.report_progress, compat.base_type): self.report_progress = (5, self.report_progress, logging.INFO) elif len(self.report_progress) == 2: self.report_progress = (self.report_progress[0], self.report_progress[1], logging.INFO) if self.log_config: if self.log_config == pypetconstants.DEFAULT_LOGGING: pypet_path = os.path.abspath(os.path.dirname(__file__)) init_path = os.path.join(pypet_path, 'logging') self.log_config = os.path.join(init_path, 'default.ini') if isinstance(self.log_config, compat.base_type): if not os.path.isfile(self.log_config): raise ValueError('Could not find the logger init file ' '`%s`.' % self.log_config) parser = NoInterpolationParser() parser.read(self.log_config) elif isinstance(self.log_config, cp.RawConfigParser): parser = self.log_config else: parser = None if parser is not None: self._sp_config = self._parser_to_string_io(parser) self._mp_config = self._find_multiproc_options(parser) if self._mp_config is not None: self._mp_config = self._parser_to_string_io(self._mp_config) elif isinstance(self.log_config, dict): self._sp_config = self.log_config self._mp_config = self._find_multiproc_dict(self._sp_config) if self.log_stdout: if self.log_stdout is True: self.log_stdout = ('STDOUT', logging.INFO) if isinstance(self.log_stdout, compat.base_type): self.log_stdout = (self.log_stdout, logging.INFO) if isinstance(self.log_stdout, int): self.log_stdout = ('STDOUT', self.log_stdout) def _handle_config_parsing(self, log_config): """ Checks for filenames within a config file and translates them. Moreover, directories for the files are created as well. :param log_config: Config file as a stream (like StringIO) """ parser = NoInterpolationParser() parser.readfp(log_config) rename_func = lambda string: rename_log_file(self.trajectory, string) sections = parser.sections() for section in sections: options = parser.options(section) for option in options: if option == 'args': self._check_and_replace_parser_args(parser, section, option, rename_func=rename_func) return parser def _handle_dict_config(self, log_config): """Recursively walks and copies the `log_config` dict and searches for filenames. Translates filenames and creates directories if necessary. """ new_dict = dict() for key in log_config.keys(): if key == 'filename': filename = log_config[key] filename = rename_log_file(self.trajectory, filename) new_dict[key] = filename try_make_dirs(filename) elif isinstance(log_config[key], dict): inner_dict = self._handle_dict_config(log_config[key]) new_dict[key] = inner_dict else: new_dict[key] = log_config[key] return new_dict def make_logging_handlers_and_tools(self, multiproc=False): """Creates logging handlers and redirects stdout.""" log_stdout = self.log_stdout if multiproc and hasattr(os, 'fork'): # If we allow forking and it is possible we already have a redirection of stdout log_stdout = False if self.log_config: if multiproc: proc_log_config = self._mp_config else: proc_log_config = self._sp_config if proc_log_config: if isinstance(proc_log_config, dict): new_dict = self._handle_dict_config(proc_log_config) dictConfig(new_dict) else: parser = self._handle_config_parsing(proc_log_config) memory_file = self._parser_to_string_io(parser) fileConfig(memory_file, disable_existing_loggers=False) if log_stdout: # Create a logging mock for stdout std_name, std_level = self.log_stdout stdout = StdoutToLogger(std_name, log_level=std_level) stdout.start() self._tools.append(stdout) def finalize(self, remove_all_handlers=True): """Finalizes the manager, closes and removes all handlers if desired.""" for tool in self._tools: tool.finalize() self._tools = [] for config in (self._sp_config, self._mp_config): if hasattr(config, 'close'): config.close() self._sp_config = None self._mp_config = None if remove_all_handlers: self.tabula_rasa() class NoInterpolationParser(cp.ConfigParser): """Dummy class to solve a Python 3 bug/feature in configparser.py""" def __init__(self): try: # Needed for Python 3, see [http://bugs.python.org/issue21265] super(NoInterpolationParser, self).__init__(interpolation=None) except TypeError: # Python 2.x cp.ConfigParser.__init__(self) class DisableLogger(object): """Context Manager that disables logging""" def __enter__(self): logging.disable(logging.CRITICAL) def __exit__(self, exc_type, exc_val, exc_tb): logging.disable(logging.NOTSET) class StdoutToLogger(HasLogger): """Fake file-like stream object that redirects writes to a logger instance.""" def __init__(self, logger_name, log_level=logging.INFO): self._logger_name = logger_name self._log_level = log_level self._linebuf = '' self._recursion = False self._redirection = False self._original_steam = None #self._logger = logging.getLogger(self._logger_name) self._set_logger(name=self._logger_name) def __getstate__(self): state_dict = super(StdoutToLogger, self).__getstate__() # The original stream cannot be pickled state_dict['_original_stream'] = None def start(self): """Starts redirection of `stdout`""" if sys.stdout is not self: self._original_steam = sys.stdout sys.stdout = self self._redirection = True if self._redirection: print('Established redirection of `stdout`.') def write(self, buf): """Writes data from buffer to logger""" if not self._recursion: self._recursion = True try: for line in buf.rstrip().splitlines(): self._logger.log(self._log_level, line.rstrip()) finally: self._recursion = False else: # If stderr is redirected to stdout we can avoid further recursion by sys.__stderr__.write('ERROR: Recursion in Stream redirection!') def flush(self): """No-op to fulfil API""" pass def finalize(self): """Disables redirection""" if self._original_steam is not None and self._redirection: sys.stdout = self._original_steam print('Disabled redirection of `stdout`.') self._redirection = False self._original_steam = None