import logging from logging.handlers import RotatingFileHandler import os from .config import RobotConfig THROTTLING_FILTER_ENABLED = False # Usage: # In main.py, after loading config: # from robot_control.src.utils.logging import setup_logging # setup_logging(config) # In each module: # import logging # logger = logging.getLogger(__name__) # logger.info("message") def setup_logging(config: RobotConfig): """ Set up logging configuration globally using the provided config. Should be called once at program startup (e.g., in main.py). """ log_config = config.logging log_level = getattr(logging, log_config.level) os.makedirs(os.path.dirname(log_config.file_path), exist_ok=True) # Remove all handlers associated with the root logger object. for handler in logging.root.handlers[:]: logging.root.removeHandler(handler) handlers = [] # File handler file_handler = RotatingFileHandler( log_config.file_path, maxBytes=log_config.max_file_size_mb * 1024 * 1024, backupCount=log_config.backup_count ) file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') if THROTTLING_FILTER_ENABLED: file_handler.addFilter(ThrottlingFilter()) file_handler.setFormatter(file_formatter) handlers.append(file_handler) # Console handler if log_config.console_output: console_handler = logging.StreamHandler() if THROTTLING_FILTER_ENABLED: console_handler.addFilter(ThrottlingFilter()) console_formatter = logging.Formatter('%(levelname)s: %(name)s: %(message)s') console_handler.setFormatter(console_formatter) handlers.append(console_handler) logging.basicConfig(level=log_level, handlers=handlers) import time class ThrottlingFilter(logging.Filter): def __init__(self, name='', initial_throttle_interval=1.0, max_throttle_interval=120.0, throttle_multiplier=2.0, reset_after=120.0): """ :param name: Filter name (can be left empty) :param initial_throttle_interval: Initial time interval (in seconds) before allowing duplicate messages :param max_throttle_interval: Maximum throttle interval (in seconds) :param throttle_multiplier: Factor by which to increase the interval for repeated messages :param reset_after: Time in seconds after which to reset throttle interval if message wasn't seen """ super().__init__(name) self.initial_throttle_interval = initial_throttle_interval self.max_throttle_interval = max_throttle_interval self.throttle_multiplier = throttle_multiplier self.reset_after = reset_after self.message_states = {} def filter(self, record): current_time = time.time() message_key = record.getMessage() if message_key not in self.message_states: self.message_states[message_key] = { 'last_time': current_time, 'current_interval': self.initial_throttle_interval } return True state = self.message_states[message_key] time_since_last = current_time - state['last_time'] # Reset throttle interval if message hasn't been seen for reset_after seconds if time_since_last >= self.reset_after: self.message_states[message_key] = { 'last_time': current_time, 'current_interval': self.initial_throttle_interval } return True if time_since_last >= state['current_interval']: # Message allowed - increase throttle interval for next occurrence next_interval = min( state['current_interval'] * self.throttle_multiplier, self.max_throttle_interval ) self.message_states[message_key] = { 'last_time': current_time, 'current_interval': next_interval } return True return False