| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- import logging
- from logging.handlers import RotatingFileHandler
- import os
- from .config import RobotConfig
- THROTTLING_FILTER_ENABLED = False
- # Usage:
- # In main.py, after loading config:
- # from robot_control.src.utils.logging import setup_logging
- # setup_logging(config)
- # In each module:
- # import logging
- # logger = logging.getLogger(__name__)
- # logger.info("message")
- def setup_logging(config: RobotConfig):
- """
- Set up logging configuration globally using the provided config.
- Should be called once at program startup (e.g., in main.py).
- """
- log_config = config.logging
- log_level = getattr(logging, log_config.level)
- os.makedirs(os.path.dirname(log_config.file_path), exist_ok=True)
- # Remove all handlers associated with the root logger object.
- for handler in logging.root.handlers[:]:
- logging.root.removeHandler(handler)
- handlers = []
- # File handler
- file_handler = RotatingFileHandler(
- log_config.file_path,
- maxBytes=log_config.max_file_size_mb * 1024 * 1024,
- backupCount=log_config.backup_count
- )
- file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- if THROTTLING_FILTER_ENABLED:
- file_handler.addFilter(ThrottlingFilter())
- file_handler.setFormatter(file_formatter)
- handlers.append(file_handler)
- # Console handler
- if log_config.console_output:
- console_handler = logging.StreamHandler()
- if THROTTLING_FILTER_ENABLED:
- console_handler.addFilter(ThrottlingFilter())
- console_formatter = logging.Formatter('%(levelname)s: %(name)s: %(message)s')
- console_handler.setFormatter(console_formatter)
- handlers.append(console_handler)
- logging.basicConfig(level=log_level, handlers=handlers)
- import time
- class ThrottlingFilter(logging.Filter):
- def __init__(self, name='', initial_throttle_interval=1.0, max_throttle_interval=120.0,
- throttle_multiplier=2.0, reset_after=120.0):
- """
- :param name: Filter name (can be left empty)
- :param initial_throttle_interval: Initial time interval (in seconds) before allowing duplicate messages
- :param max_throttle_interval: Maximum throttle interval (in seconds)
- :param throttle_multiplier: Factor by which to increase the interval for repeated messages
- :param reset_after: Time in seconds after which to reset throttle interval if message wasn't seen
- """
- super().__init__(name)
- self.initial_throttle_interval = initial_throttle_interval
- self.max_throttle_interval = max_throttle_interval
- self.throttle_multiplier = throttle_multiplier
- self.reset_after = reset_after
- self.message_states = {}
- def filter(self, record):
- current_time = time.time()
- message_key = record.getMessage()
- if message_key not in self.message_states:
- self.message_states[message_key] = {
- 'last_time': current_time,
- 'current_interval': self.initial_throttle_interval
- }
- return True
- state = self.message_states[message_key]
- time_since_last = current_time - state['last_time']
- # Reset throttle interval if message hasn't been seen for reset_after seconds
- if time_since_last >= self.reset_after:
- self.message_states[message_key] = {
- 'last_time': current_time,
- 'current_interval': self.initial_throttle_interval
- }
- return True
- if time_since_last >= state['current_interval']:
- # Message allowed - increase throttle interval for next occurrence
- next_interval = min(
- state['current_interval'] * self.throttle_multiplier,
- self.max_throttle_interval
- )
- self.message_states[message_key] = {
- 'last_time': current_time,
- 'current_interval': next_interval
- }
- return True
- return False
|