Source code for pypet.pypetlogging

"""Module containing utilities for logging."""

__author__ = 'Robert Meyer'

try:
    # Python3
    FileNotFoundError
except NameError:
    FileNotFoundError = IOError


import configparser as cp
from io import StringIO
import logging
from logging.config import fileConfig
from logging.config import dictConfig
from logging import NullHandler
import os
import math
import sys
import ast
import copy
import multiprocessing as multip
import functools
import socket

import pypet.pypetconstants as pypetconstants
from pypet.utils.helpful_functions import progressbar, racedirs
from pypet.utils.decorators import retry
from pypet.slots import HasSlots


FILENAME_INDICATORS = set([pypetconstants.LOG_ENV,
                           pypetconstants.LOG_PROC,
                           pypetconstants.LOG_TRAJ,
                           pypetconstants.LOG_RUN,
                           pypetconstants.LOG_HOST,
                           pypetconstants.LOG_SET,
                           '.log',
                           '.txt'])
"""Set of strings that mark a log file"""

LOGGING_DICT = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'file': {
            'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'
        },
        'stream': {
            'format': '%(processName)-10s %(name)s %(levelname)-8s %(message)s'
        }
    },
    'handlers': {
        'stream': {
            'class': 'logging.StreamHandler',
            'formatter': 'stream'
        },
        'file_main': {
            'class': 'logging.FileHandler',
            'formatter': 'file',
            'filename': os.path.join(pypetconstants.LOG_TRAJ,
                                     pypetconstants.LOG_ENV,
                                     'LOG.txt')
        },
        'file_error': {
            'class': 'logging.FileHandler',
            'formatter': 'file',
            'filename': os.path.join(pypetconstants.LOG_TRAJ,
                                     pypetconstants.LOG_ENV,
                                     'ERROR.txt'),
            'level': 'ERROR'
        }
    },
    'multiproc_formatters': {
        'file': {
            'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'
        },
    },
    'multiproc_handlers': {
        'file_main': {
            'class': 'logging.FileHandler',
            'formatter': 'file',
            'filename': os.path.join(pypetconstants.LOG_TRAJ,
                                     pypetconstants.LOG_ENV,
                                     '%s_%s_%s_LOG.txt' % (pypetconstants.LOG_RUN,
                                                           pypetconstants.LOG_HOST,
                                                           pypetconstants.LOG_PROC))
        },
        'file_error': {
            'class': 'logging.FileHandler',
            'formatter': 'file',
            'filename': os.path.join(pypetconstants.LOG_TRAJ,
                                     pypetconstants.LOG_ENV,
                                     '%s_%s_%s_ERROR.txt' % (pypetconstants.LOG_RUN,
                                                             pypetconstants.LOG_HOST,
                                                             pypetconstants.LOG_PROC)),
            'level': 'ERROR'
        }
    }
}
"""Dictionary containing the default configuration."""


def _change_logging_kwargs(kwargs):
    """ Helper function to turn the simple logging kwargs into a `log_config`."""
    log_levels = kwargs.pop('log_level', None)
    log_folder = kwargs.pop('log_folder', 'logs')
    logger_names = kwargs.pop('logger_names', '')
    if log_levels is None:
        log_levels = kwargs.pop('log_levels', logging.INFO)
    log_multiproc = kwargs.pop('log_multiproc', True)

    if not isinstance(logger_names, (tuple, list)):
        logger_names = [logger_names]
    if not isinstance(log_levels, (tuple, list)):
        log_levels = [log_levels]
    if len(log_levels) == 1:
        log_levels = [log_levels[0] for _ in logger_names]

    # We don't want to manipulate the original dictionary
    dictionary = copy.deepcopy(LOGGING_DICT)
    prefixes = ['']
    if not log_multiproc:
        for key in list(dictionary.keys()):
            if key.startswith('multiproc_'):
                del dictionary[key]
    else:
        prefixes.append('multiproc_')

    # Add all handlers to all loggers
    for prefix in prefixes:
        for handler_dict in dictionary[prefix + 'handlers'].values():
            if 'filename' in handler_dict:
                filename = os.path.join(log_folder, handler_dict['filename'])
                filename = os.path.normpath(filename)
                handler_dict['filename'] = filename
        dictionary[prefix + 'loggers'] = {}
        logger_dict = dictionary[prefix + 'loggers']
        for idx, logger_name in enumerate(logger_names):
            logger_dict[logger_name] = {
                'level': log_levels[idx],
                'handlers': list(dictionary[prefix + 'handlers'].keys())
            }

    kwargs['log_config'] = dictionary


def use_simple_logging(kwargs):
    """Checks if simple logging is requested"""
    return any([x in kwargs for x in ('log_folder', 'logger_names',
                                      'log_levels', 'log_multiproc', 'log_level')])


def simple_logging_config(func):
    """Decorator to allow a simple logging configuration.

    This encompasses giving a `log_folder`, `logger_names` as well as `log_levels`.

    """

    @functools.wraps(func)
    def new_func(self, *args, **kwargs):
        if use_simple_logging(kwargs):
            if 'log_config' in kwargs:
                raise ValueError('Please do not specify `log_config` '
                                 'if you want to use the simple '
                                 'way of providing logging configuration '
                                 '(i.e using `log_folder`, `logger_names` and/or `log_levels`).')
            _change_logging_kwargs(kwargs)

        return func(self, *args, **kwargs)

    return new_func


def try_make_dirs(filename):
    """ Tries to make directories for a given `filename`.

    Ignores any error but notifies via stderr.

    """
    try:
        dirname = os.path.dirname(os.path.normpath(filename))
        racedirs(dirname)
    except Exception as exc:
        sys.stderr.write('ERROR during log config file handling, could not create dirs for '
                         'filename `%s` because of: %s' % (filename, repr(exc)))


def get_strings(args):
    """Returns all valid python strings inside a given argument string."""
    string_list = []
    for elem in ast.walk(ast.parse(args)):
        if isinstance(elem, ast.Str):
            string_list.append(elem.s)
    return string_list


[docs]def rename_log_file(filename, trajectory=None, env_name=None, traj_name=None, set_name=None, run_name=None, process_name=None, host_name=None): """ Renames a given `filename` with valid wildcard placements. :const:`~pypet.pypetconstants.LOG_ENV` ($env) is replaces by the name of the trajectory`s environment. :const:`~pypet.pypetconstants.LOG_TRAJ` ($traj) is replaced by the name of the trajectory. :const:`~pypet.pypetconstants.LOG_RUN` ($run) is replaced by the name of the current run. If the trajectory is not set to a run 'run_ALL' is used. :const:`~pypet.pypetconstants.LOG_SET` ($set) is replaced by the name of the current run set. If the trajectory is not set to a run 'run_set_ALL' is used. :const:`~pypet.pypetconstants.LOG_PROC` ($proc) is replaced by the name fo the current process. :const:`~pypet.pypetconstant.LOG_HOST` ($host) is replaced by the name of the current host. :param filename: A filename string :param traj: A trajectory container, leave `None` if you provide all the parameters below :param env_name: Name of environemnt, leave `None` to get it from `traj` :param traj_name: Name of trajectory, leave `None` to get it from `traj` :param set_name: Name of run set, leave `None` to get it from `traj` :param run_name: Name of run, leave `None` to get it from `traj` :param process_name: The name of the desired process. If `None` the name of the current process is taken determined by the multiprocessing module. :param host_name: Name of host, leave `None` to determine it automatically with the platform module. :return: The new filename """ if pypetconstants.LOG_ENV in filename: if env_name is None: env_name = trajectory.v_environment_name filename = filename.replace(pypetconstants.LOG_ENV, env_name) if pypetconstants.LOG_TRAJ in filename: if traj_name is None: traj_name = trajectory.v_name filename = filename.replace(pypetconstants.LOG_TRAJ, traj_name) if pypetconstants.LOG_RUN in filename: if run_name is None: run_name = trajectory.f_wildcard('$') filename = filename.replace(pypetconstants.LOG_RUN, run_name) if pypetconstants.LOG_SET in filename: if set_name is None: set_name = trajectory.f_wildcard('$set') filename = filename.replace(pypetconstants.LOG_SET, set_name) if pypetconstants.LOG_PROC in filename: if process_name is None: process_name = multip.current_process().name + '-' + str(os.getpid()) filename = filename.replace(pypetconstants.LOG_PROC, process_name) if pypetconstants.LOG_HOST in filename: if host_name is None: host_name = socket.getfqdn().replace('.', '-') filename = filename.replace(pypetconstants.LOG_HOST, host_name) return filename
[docs]class HasLogger(HasSlots): """Abstract super class that automatically adds a logger to a class. To add a logger to a sub-class of yours simply call ``myobj._set_logger(name)``. If ``name=None`` the logger is chosen as follows: ``self._logger = logging.getLogger(self.__class.__.__module__ + '.' + self.__class__.__name__)`` The logger can be accessed via ``myobj._logger``. """ __slots__ = ('_logger',)
[docs] def __getstate__(self): """Called for pickling. Removes the logger to allow pickling and returns a copy of `__dict__`. """ state_dict = super(HasLogger, self).__getstate__() if '_logger' in state_dict: # Pickling does not work with loggers objects, # so we just keep the logger's name: state_dict['_logger'] = self._logger.name return state_dict
[docs] def __setstate__(self, statedict): """Called after loading a pickle dump. Restores `__dict__` from `statedict` and adds a new logger. """ super(HasLogger, self).__setstate__(statedict) if '_logger' in statedict: # If we re-instantiate the component the # logger attribute only contains a name, # so we also need to re-create the logger: self._set_logger(statedict['_logger'])
[docs] def _set_logger(self, name=None): """Adds a logger with a given `name`. If no name is given, name is constructed as `type(self).__name__`. """ if name is None: cls = self.__class__ name = '%s.%s' % (cls.__module__, cls.__name__) self._logger = logging.getLogger(name)
class LoggingManager(object): """ Manager taking care of all logging related issues. :param trajectory: Trajectory container of Mock :param log_config: Logging configuration Can be a a full name of an `ini` file. An already instantiated config parser, or a logging dictionary. :param log_stdout: If `stdout` should be logged. :param report_progress: How to report progress. """ def __init__(self, log_config=None, log_stdout=False, report_progress=False): self.log_config = log_config self._sp_config = None self._mp_config = None self.log_stdout = log_stdout self.report_progress = report_progress self._tools = [] self._null_handler = NullHandler() self._format_string = 'PROGRESS: Finished %d/%d runs ' self._stdout_to_logger = None self.env_name = None self.traj_name = None self.set_name = None self.run_name = None # Format string for the progressbar def extract_replacements(self, trajectory): """Extracts the wildcards and file replacements from the `trajectory`""" self.env_name = trajectory.v_environment_name self.traj_name = trajectory.v_name self.set_name = trajectory.f_wildcard('$set') self.run_name = trajectory.f_wildcard('$') def __getstate__(self): """ConfigParsers are not guaranteed to be picklable so we need to remove these.""" state_dict = self.__dict__.copy() if isinstance(state_dict['log_config'], cp.RawConfigParser): # Config Parsers are not guaranteed to be picklable state_dict['log_config'] = True return state_dict def show_progress(self, n, total_runs): """Displays a progressbar""" if self.report_progress: percentage, logger_name, log_level = self.report_progress if logger_name == 'print': logger = 'print' else: logger = logging.getLogger(logger_name) if n == -1: # Compute the number of digits and avoid log10(0) digits = int(math.log10(total_runs + 0.1)) + 1 self._format_string = 'PROGRESS: Finished %' + '%d' % digits + 'd/%d runs ' fmt_string = self._format_string % (n + 1, total_runs) + '%s' reprint = log_level == 0 progressbar(n, total_runs, percentage_step=percentage, logger=logger, log_level=log_level, fmt_string=fmt_string, reprint=reprint) def add_null_handler(self): """Adds a NullHandler to the root logger. This is simply added to avoid warnings that no logger has been configured. """ root = logging.getLogger() root.addHandler(self._null_handler) def remove_null_handler(self): """Removes the NullHandler from the root logger.""" root = logging.getLogger() root.removeHandler(self._null_handler) @staticmethod def tabula_rasa(): """Removes all loggers and logging handlers. """ erase_dict = {'disable_existing_loggers': False, 'version': 1} dictConfig(erase_dict) @staticmethod def _check_and_replace_parser_args(parser, section, option, rename_func, make_dirs=True): """ Searches for parser settings that define filenames. If such settings are found, they are renamed according to the wildcard rules. Moreover, it is also tried to create the corresponding folders. :param parser: A config parser :param section: A config section :param option: The section option :param rename_func: A function to rename found files :param make_dirs: If the directories of the file should be created. """ args = parser.get(section, option, raw=True) strings = get_strings(args) replace = False for string in strings: isfilename = any(x in string for x in FILENAME_INDICATORS) if isfilename: newstring = rename_func(string) if make_dirs: try_make_dirs(newstring) # To work with windows path specifications we need this replacement: raw_string = string.replace('\\', '\\\\') raw_newstring = newstring.replace('\\', '\\\\') args = args.replace(raw_string, raw_newstring) replace = True if replace: parser.set(section, option, args) @staticmethod def _parser_to_string_io(parser): """Turns a ConfigParser into a StringIO stream.""" memory_file = StringIO() parser.write(memory_file) memory_file.flush() memory_file.seek(0) return memory_file @staticmethod def _find_multiproc_options(parser): """ Searches for multiprocessing options within a ConfigParser. If such options are found, they are copied (without the `'multiproc_'` prefix) into a new parser. """ sections = parser.sections() if not any(section.startswith('multiproc_') for section in sections): return None mp_parser = NoInterpolationParser() for section in sections: if section.startswith('multiproc_'): new_section = section.replace('multiproc_', '') mp_parser.add_section(new_section) options = parser.options(section) for option in options: val = parser.get(section, option, raw=True) mp_parser.set(new_section, option, val) return mp_parser @staticmethod def _find_multiproc_dict(dictionary): """ Searches for multiprocessing options in a given `dictionary`. If found they are copied (without the `'multiproc_'` prefix) into a new dictionary """ if not any(key.startswith('multiproc_') for key in dictionary.keys()): return None mp_dictionary = {} for key in dictionary.keys(): if key.startswith('multiproc_'): new_key = key.replace('multiproc_', '') mp_dictionary[new_key] = dictionary[key] mp_dictionary['version'] = dictionary['version'] if 'disable_existing_loggers' in dictionary: mp_dictionary['disable_existing_loggers'] = dictionary['disable_existing_loggers'] return mp_dictionary def check_log_config(self): """ Checks and converts all settings if necessary passed to the Manager. Searches for multiprocessing options as well. """ if self.report_progress: if self.report_progress is True: self.report_progress = (5, 'pypet', logging.INFO) elif isinstance(self.report_progress, (int, float)): self.report_progress = (self.report_progress, 'pypet', logging.INFO) elif isinstance(self.report_progress, str): self.report_progress = (5, self.report_progress, logging.INFO) elif len(self.report_progress) == 2: self.report_progress = (self.report_progress[0], self.report_progress[1], logging.INFO) if self.log_config: if self.log_config == pypetconstants.DEFAULT_LOGGING: pypet_path = os.path.abspath(os.path.dirname(__file__)) init_path = os.path.join(pypet_path, 'logging') self.log_config = os.path.join(init_path, 'default.ini') if isinstance(self.log_config, str): if not os.path.isfile(self.log_config): raise ValueError('Could not find the logger init file ' '`%s`.' % self.log_config) parser = NoInterpolationParser() parser.read(self.log_config) elif isinstance(self.log_config, cp.RawConfigParser): parser = self.log_config else: parser = None if parser is not None: self._sp_config = self._parser_to_string_io(parser) self._mp_config = self._find_multiproc_options(parser) if self._mp_config is not None: self._mp_config = self._parser_to_string_io(self._mp_config) elif isinstance(self.log_config, dict): self._sp_config = self.log_config self._mp_config = self._find_multiproc_dict(self._sp_config) if self.log_stdout: if self.log_stdout is True: self.log_stdout = ('STDOUT', logging.INFO) if isinstance(self.log_stdout, str): self.log_stdout = (self.log_stdout, logging.INFO) if isinstance(self.log_stdout, int): self.log_stdout = ('STDOUT', self.log_stdout) def _handle_config_parsing(self, log_config): """ Checks for filenames within a config file and translates them. Moreover, directories for the files are created as well. :param log_config: Config file as a stream (like StringIO) """ parser = NoInterpolationParser() parser.readfp(log_config) rename_func = lambda string: rename_log_file(string, env_name=self.env_name, traj_name=self.traj_name, set_name=self.set_name, run_name=self.run_name) sections = parser.sections() for section in sections: options = parser.options(section) for option in options: if option == 'args': self._check_and_replace_parser_args(parser, section, option, rename_func=rename_func) return parser def _handle_dict_config(self, log_config): """Recursively walks and copies the `log_config` dict and searches for filenames. Translates filenames and creates directories if necessary. """ new_dict = dict() for key in log_config.keys(): if key == 'filename': filename = log_config[key] filename = rename_log_file(filename, env_name=self.env_name, traj_name=self.traj_name, set_name=self.set_name, run_name=self.run_name) new_dict[key] = filename try_make_dirs(filename) elif isinstance(log_config[key], dict): inner_dict = self._handle_dict_config(log_config[key]) new_dict[key] = inner_dict else: new_dict[key] = log_config[key] return new_dict def make_logging_handlers_and_tools(self, multiproc=False): """Creates logging handlers and redirects stdout.""" log_stdout = self.log_stdout if sys.stdout is self._stdout_to_logger: # If we already redirected stdout we don't neet to redo it again log_stdout = False if self.log_config: if multiproc: proc_log_config = self._mp_config else: proc_log_config = self._sp_config if proc_log_config: if isinstance(proc_log_config, dict): new_dict = self._handle_dict_config(proc_log_config) dictConfig(new_dict) else: parser = self._handle_config_parsing(proc_log_config) memory_file = self._parser_to_string_io(parser) fileConfig(memory_file, disable_existing_loggers=False) if log_stdout: # Create a logging mock for stdout std_name, std_level = self.log_stdout stdout = StdoutToLogger(std_name, log_level=std_level) stdout.start() self._tools.append(stdout) def finalize(self, remove_all_handlers=True): """Finalizes the manager, closes and removes all handlers if desired.""" for tool in self._tools: tool.finalize() self._tools = [] self._stdout_to_logger = None for config in (self._sp_config, self._mp_config): if hasattr(config, 'close'): config.close() self._sp_config = None self._mp_config = None if remove_all_handlers: self.tabula_rasa() class NoInterpolationParser(cp.ConfigParser): """Dummy class to solve a Python 3 bug/feature in configparser.py""" def __init__(self): try: # Needed for Python 3, see [http://bugs.python.org/issue21265] super(NoInterpolationParser, self).__init__(interpolation=None) except TypeError: # Python 2.x cp.ConfigParser.__init__(self) class DisableAllLogging(object): """Context Manager that disables logging""" def __enter__(self): logging.disable(logging.CRITICAL) def __exit__(self, exc_type, exc_val, exc_tb): logging.disable(logging.NOTSET) class StdoutToLogger(HasLogger): """Fake file-like stream object that redirects writes to a logger instance.""" def __init__(self, logger_name, log_level=logging.INFO): self._logger_name = logger_name self._log_level = log_level self._linebuf = '' self._recursion = False self._redirection = False self._original_steam = None #self._logger = logging.getLogger(self._logger_name) self._set_logger(name=self._logger_name) def __getstate__(self): state_dict = super(StdoutToLogger, self).__getstate__() # The original stream cannot be pickled state_dict['_original_stream'] = None def start(self): """Starts redirection of `stdout`""" if sys.stdout is not self: self._original_steam = sys.stdout sys.stdout = self self._redirection = True if self._redirection: print('Established redirection of `stdout`.') def write(self, buf): """Writes data from buffer to logger""" if not self._recursion: self._recursion = True try: for line in buf.rstrip().splitlines(): self._logger.log(self._log_level, line.rstrip()) finally: self._recursion = False else: # If stderr is redirected to stdout we can avoid further recursion by sys.__stderr__.write('ERROR: Recursion in Stream redirection!') def flush(self): """No-op to fulfil API""" pass def finalize(self): """Disables redirection""" if self._original_steam is not None and self._redirection: sys.stdout = self._original_steam print('Disabled redirection of `stdout`.') self._redirection = False self._original_steam = None # class PypetTestFileHandler(logging.FileHandler): # """Takes care that data is flushed using fsync""" # def flush(self): # """ # Flushes the stream. # """ # self.acquire() # try: # if self.stream and hasattr(self.stream, "flush"): # self.stream.flush() # try: # os.fsync(self.stream.fileno()) # except OSError: # pass # finally: # self.release() # # @retry(9, FileNotFoundError, 0.01, 'pypet.retry') # def _open(self): # try: # return logging.FileHandler._open(self) # except FileNotFoundError: # old_mode = self.mode # try: # self.mode = 'w' # try_make_dirs(self.baseFilename) # return logging.FileHandler._open(self) # finally: # self.mode = old_mode