cell.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import logging
  2. from dataclasses import dataclass
  3. from datetime import datetime
  4. logger = logging.getLogger(__name__)
  5. @dataclass
  6. class MeasureValues:
  7. voltage: int
  8. current: int
  9. temperature: int
  10. @dataclass
  11. class CellLimits:
  12. min_volt: int
  13. max_volt: int
  14. max_curr: int
  15. class Cell():
  16. def __init__(self, id: int, cell_limits: CellLimits, nom_capacity: float, estimated_health: float=-1.0):
  17. self.id = id
  18. self.limits = cell_limits
  19. self.limits_transmitted = False
  20. self.nom_capacity = nom_capacity
  21. self.estimated_health = estimated_health # -1.0 indicates unknown health
  22. self.measurements: list[MeasureValues] = []
  23. self.measurements_duration: list[float] = []
  24. self.last_measured_time = None
  25. def add_measurement(self, data: MeasureValues):
  26. """
  27. Add a new measurement to the list of measurements.
  28. """
  29. now = datetime.now()
  30. if self.last_measured_time is not None:
  31. duration = (now - self.last_measured_time).total_seconds()
  32. self.measurements_duration.append(duration)
  33. self.measurements.append(data)
  34. logger.debug(f"Added measurement for cell {self.id}: {data} ({duration:.1f}s)")
  35. self.last_measured_time = datetime.now()
  36. def _find_cycles(self) -> list[tuple[int, float]]:
  37. """
  38. Find charge cycles in measurements by detecting current direction changes.
  39. Returns list of (start_idx, total_duration) tuples for positive current periods.
  40. """
  41. if not self.measurements:
  42. return []
  43. cycles = []
  44. start_idx = None
  45. current_duration = 0.0
  46. for i in range(len(self.measurements)):
  47. current = self.measurements[i].current
  48. # Detect start of positive current (charging)
  49. if start_idx is None and current > 0:
  50. start_idx = i
  51. current_duration = 0.0
  52. # Accumulate duration during positive current
  53. elif start_idx is not None and current > 0:
  54. current_duration += self.measurements_duration[i]
  55. # Detect end of positive current (charging)
  56. elif start_idx is not None and current <= 0:
  57. cycles.append((start_idx, current_duration))
  58. start_idx = None
  59. # Handle case where last cycle is incomplete
  60. if start_idx is not None:
  61. cycles.append((start_idx, current_duration))
  62. return cycles
  63. def estimate_capacity(self) -> float:
  64. """
  65. Estimate cell capacity based on charge cycle durations.
  66. Returns estimated capacity as percentage of nominal capacity.
  67. """
  68. if not self.measurements:
  69. logger.warning("No measurements available for capacity estimation.")
  70. return -1.0
  71. cycles = self._find_cycles()
  72. if not cycles:
  73. logger.warning("No charge cycles detected for capacity estimation.")
  74. return -1.0
  75. # Calculate expected cycle duration in seconds
  76. est_c_rate = max(m.current for m in self.measurements) / self.nom_capacity
  77. expected_duration = 3600 / est_c_rate # seconds for one cycle (1h/c_rate)
  78. # Calculate average cycle duration from actual measured durations
  79. actual_durations = [duration for _, duration in cycles]
  80. avg_duration = sum(actual_durations) / len(actual_durations)
  81. # Calculate capacity as ratio of actual vs expected duration
  82. capacity_ratio = min(1.0, avg_duration / expected_duration)
  83. estimated_capacity = capacity_ratio * 100.0
  84. logger.info(f"Cell {self.id} capacity estimation: {estimated_capacity:.1f}% "
  85. f"(found {len(cycles)} cycles, avg duration: {avg_duration:.1f} seconds)")
  86. self.estimated_health = estimated_capacity
  87. return estimated_capacity