import logging from logging.handlers import RotatingFileHandler import os from typing import Optional from .config import RobotConfig import time THROTTLING_FILTER_ENABLED = False # TODO [SG]: Currently not used due to multiple problems: # No individual logger naming for each module # Throttling filtered too much # Coul not be created without config class LoggerSingleton: _instance: Optional[logging.Logger] = None @classmethod def get_logger(cls, config: Optional[RobotConfig] = None) -> logging.Logger: # if config is None: # config = RobotConfig() if cls._instance is None and config is not None: cls._instance = cls._setup_logger(config) return cls._instance @staticmethod def _setup_logger(config: RobotConfig) -> logging.Logger: log_config = config.logging logger = logging.getLogger('robot_control') logger.setLevel(getattr(logging, log_config.level)) os.makedirs(os.path.dirname(log_config.file_path), exist_ok=True) file_handler = RotatingFileHandler( log_config.file_path, maxBytes=log_config.max_file_size_mb * 1024 * 1024, backupCount=log_config.backup_count ) file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') if THROTTLING_FILTER_ENABLED: file_handler.addFilter(ThrottlingFilter()) file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) if log_config.console_output: console_handler = logging.StreamHandler() if THROTTLING_FILTER_ENABLED: console_handler.addFilter(ThrottlingFilter()) console_formatter = logging.Formatter('%(levelname)s: %(message)s') console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) return logger class ThrottlingFilter(logging.Filter): def __init__(self, name='', initial_throttle_interval=1.0, max_throttle_interval=120.0, throttle_multiplier=2.0, reset_after=120.0): """ :param name: Filter name (can be left empty) :param initial_throttle_interval: Initial time interval (in seconds) before allowing duplicate messages :param max_throttle_interval: Maximum throttle interval (in seconds) :param throttle_multiplier: Factor by which to increase the interval for repeated messages :param reset_after: Time in seconds after which to reset throttle interval if message wasn't seen """ super().__init__(name) self.initial_throttle_interval = initial_throttle_interval self.max_throttle_interval = max_throttle_interval self.throttle_multiplier = throttle_multiplier self.reset_after = reset_after self.message_states = {} def filter(self, record): current_time = time.time() message_key = record.getMessage() if message_key not in self.message_states: self.message_states[message_key] = { 'last_time': current_time, 'current_interval': self.initial_throttle_interval } return True state = self.message_states[message_key] time_since_last = current_time - state['last_time'] # Reset throttle interval if message hasn't been seen for reset_after seconds if time_since_last >= self.reset_after: self.message_states[message_key] = { 'last_time': current_time, 'current_interval': self.initial_throttle_interval } return True if time_since_last >= state['current_interval']: # Message allowed - increase throttle interval for next occurrence next_interval = min( state['current_interval'] * self.throttle_multiplier, self.max_throttle_interval ) self.message_states[message_key] = { 'last_time': current_time, 'current_interval': next_interval } return True return False