cell.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import logging
  2. from dataclasses import dataclass
  3. from datetime import datetime
  4. from src.services.prometheus_service import cell_voltage, cell_current, cell_temperature, cell_health
  5. logger = logging.getLogger(__name__)
  6. @dataclass
  7. class MeasureValues:
  8. voltage: int
  9. current: int
  10. temperature: int
  11. @dataclass
  12. class CellLimits:
  13. min_volt: int
  14. max_volt: int
  15. max_curr: int
  16. class Cell():
  17. def __init__(self, id: int, cell_limits: CellLimits, nom_capacity: float, estimated_health: float=-1.0):
  18. self.id = id
  19. self.limits = cell_limits
  20. self.limits_transmitted = False
  21. self.nom_capacity = nom_capacity
  22. self.estimated_health = estimated_health # -1.0 indicates unknown health
  23. self.measurements: list[MeasureValues] = []
  24. self.measurements_duration: list[float] = []
  25. self.last_measured_time = None
  26. def add_measurement(self, data: MeasureValues):
  27. """
  28. Add a new measurement to the list of measurements and update Prometheus metrics.
  29. """
  30. now = datetime.now()
  31. if self.last_measured_time is not None:
  32. duration = (now - self.last_measured_time).total_seconds()
  33. self.measurements_duration.append(duration)
  34. self.measurements.append(data)
  35. logger.debug(f"Added measurement for cell {self.id}: {data} ({duration:.1f}s)")
  36. # Update Prometheus metrics
  37. cell_id = str(self.id)
  38. cell_voltage.labels(cell_id=cell_id).set(data.voltage)
  39. cell_current.labels(cell_id=cell_id).set(data.current)
  40. cell_temperature.labels(cell_id=cell_id).set(data.temperature)
  41. if self.estimated_health >= 0:
  42. cell_health.labels(cell_id=cell_id).set(self.estimated_health)
  43. self.last_measured_time = datetime.now()
  44. def _find_cycles(self) -> list[tuple[int, float]]:
  45. """
  46. Find charge cycles in measurements by detecting current direction changes.
  47. Returns list of (start_idx, total_duration) tuples for positive current periods.
  48. """
  49. if not self.measurements:
  50. return []
  51. cycles = []
  52. start_idx = None
  53. current_duration = 0.0
  54. for i in range(len(self.measurements)):
  55. current = self.measurements[i].current
  56. # Detect start of positive current (charging)
  57. if start_idx is None and current > 0:
  58. start_idx = i
  59. current_duration = 0.0
  60. # Accumulate duration during positive current
  61. elif start_idx is not None and current > 0:
  62. current_duration += self.measurements_duration[i]
  63. # Detect end of positive current (charging)
  64. elif start_idx is not None and current <= 0:
  65. cycles.append((start_idx, current_duration))
  66. start_idx = None
  67. # Handle case where last cycle is incomplete
  68. if start_idx is not None:
  69. cycles.append((start_idx, current_duration))
  70. return cycles
  71. def estimate_capacity(self) -> float:
  72. """
  73. Estimate cell capacity based on charge cycle durations.
  74. Returns estimated capacity as percentage of nominal capacity.
  75. """
  76. if not self.measurements:
  77. logger.warning("No measurements available for capacity estimation.")
  78. return -1.0
  79. cycles = self._find_cycles()
  80. if not cycles:
  81. logger.warning("No charge cycles detected for capacity estimation.")
  82. return -1.0
  83. # Calculate expected cycle duration in seconds
  84. est_c_rate = max(m.current for m in self.measurements) / self.nom_capacity
  85. expected_duration = 3600 / est_c_rate # seconds for one cycle (1h/c_rate)
  86. # Calculate average cycle duration from actual measured durations
  87. actual_durations = [duration for _, duration in cycles]
  88. avg_duration = sum(actual_durations) / len(actual_durations)
  89. # Calculate capacity as ratio of actual vs expected duration
  90. capacity_ratio = min(1.0, avg_duration / expected_duration)
  91. estimated_capacity = capacity_ratio * 100.0
  92. logger.info(f"Cell {self.id} capacity estimation: {estimated_capacity:.1f}% "
  93. f"(found {len(cycles)} cycles, avg duration: {avg_duration:.1f} seconds)")
  94. self.estimated_health = estimated_capacity
  95. return estimated_capacity