| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- import logging
- from logging.handlers import RotatingFileHandler
- import os
- from typing import Optional
- from .config import RobotConfig
- import time
- THROTTLING_FILTER_ENABLED = False
- # TODO [SG]: Currently not used due to multiple problems:
- # No individual logger naming for each module
- # Throttling filtered too much
- # Coul not be created without config
- class LoggerSingleton:
- _instance: Optional[logging.Logger] = None
- @classmethod
- def get_logger(cls, config: Optional[RobotConfig] = None) -> logging.Logger:
- # if config is None:
- # config = RobotConfig()
- if cls._instance is None and config is not None:
- cls._instance = cls._setup_logger(config)
- return cls._instance
- @staticmethod
- def _setup_logger(config: RobotConfig) -> logging.Logger:
- log_config = config.logging
- logger = logging.getLogger('robot_control')
- logger.setLevel(getattr(logging, log_config.level))
- os.makedirs(os.path.dirname(log_config.file_path), exist_ok=True)
- file_handler = RotatingFileHandler(
- log_config.file_path,
- maxBytes=log_config.max_file_size_mb * 1024 * 1024,
- backupCount=log_config.backup_count
- )
- file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- if THROTTLING_FILTER_ENABLED:
- file_handler.addFilter(ThrottlingFilter())
- file_handler.setFormatter(file_formatter)
- logger.addHandler(file_handler)
- if log_config.console_output:
- console_handler = logging.StreamHandler()
- if THROTTLING_FILTER_ENABLED:
- console_handler.addFilter(ThrottlingFilter())
- console_formatter = logging.Formatter('%(levelname)s: %(message)s')
- console_handler.setFormatter(console_formatter)
- logger.addHandler(console_handler)
- return logger
- class ThrottlingFilter(logging.Filter):
- def __init__(self, name='', initial_throttle_interval=1.0, max_throttle_interval=120.0,
- throttle_multiplier=2.0, reset_after=120.0):
- """
- :param name: Filter name (can be left empty)
- :param initial_throttle_interval: Initial time interval (in seconds) before allowing duplicate messages
- :param max_throttle_interval: Maximum throttle interval (in seconds)
- :param throttle_multiplier: Factor by which to increase the interval for repeated messages
- :param reset_after: Time in seconds after which to reset throttle interval if message wasn't seen
- """
- super().__init__(name)
- self.initial_throttle_interval = initial_throttle_interval
- self.max_throttle_interval = max_throttle_interval
- self.throttle_multiplier = throttle_multiplier
- self.reset_after = reset_after
- self.message_states = {}
- def filter(self, record):
- current_time = time.time()
- message_key = record.getMessage()
-
- if message_key not in self.message_states:
- self.message_states[message_key] = {
- 'last_time': current_time,
- 'current_interval': self.initial_throttle_interval
- }
- return True
-
- state = self.message_states[message_key]
- time_since_last = current_time - state['last_time']
-
- # Reset throttle interval if message hasn't been seen for reset_after seconds
- if time_since_last >= self.reset_after:
- self.message_states[message_key] = {
- 'last_time': current_time,
- 'current_interval': self.initial_throttle_interval
- }
- return True
-
- if time_since_last >= state['current_interval']:
- # Message allowed - increase throttle interval for next occurrence
- next_interval = min(
- state['current_interval'] * self.throttle_multiplier,
- self.max_throttle_interval
- )
- self.message_states[message_key] = {
- 'last_time': current_time,
- 'current_interval': next_interval
- }
- return True
-
- return False
|