| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- import logging
- from dataclasses import dataclass
- from datetime import datetime
- from src.services.prometheus_service import cell_voltage, cell_current, cell_temperature, cell_health
- logger = logging.getLogger(__name__)
- @dataclass
- class MeasureValues:
- voltage: int
- current: int
- temperature: int
- @dataclass
- class CellLimits:
- min_volt: int
- max_volt: int
- max_curr: int
- class Cell():
-
- def __init__(self, id: int, cell_limits: CellLimits, nom_capacity: float, estimated_health: float=-1.0):
- self.id = id
- self.limits = cell_limits
- self.limits_transmitted = False
- self.nom_capacity = nom_capacity
- self.estimated_health = estimated_health # -1.0 indicates unknown health
- self.measurements: list[MeasureValues] = []
- self.measurements_duration: list[float] = []
- self.last_measured_time = None
- def add_measurement(self, data: MeasureValues):
- """
- Add a new measurement to the list of measurements and update Prometheus metrics.
- """
- now = datetime.now()
- if self.last_measured_time is not None:
- duration = (now - self.last_measured_time).total_seconds()
- self.measurements_duration.append(duration)
- self.measurements.append(data)
- logger.debug(f"Added measurement for cell {self.id}: {data} ({duration:.1f}s)")
-
- # Update Prometheus metrics
- cell_id = str(self.id)
- cell_voltage.labels(cell_id=cell_id).set(data.voltage)
- cell_current.labels(cell_id=cell_id).set(data.current)
- cell_temperature.labels(cell_id=cell_id).set(data.temperature)
- if self.estimated_health >= 0:
- cell_health.labels(cell_id=cell_id).set(self.estimated_health)
-
- self.last_measured_time = datetime.now()
- def _find_cycles(self) -> list[tuple[int, float]]:
- """
- Find charge cycles in measurements by detecting current direction changes.
- Returns list of (start_idx, total_duration) tuples for positive current periods.
- """
- if not self.measurements:
- return []
- cycles = []
- start_idx = None
- current_duration = 0.0
-
- for i in range(len(self.measurements)):
- current = self.measurements[i].current
-
- # Detect start of positive current (charging)
- if start_idx is None and current > 0:
- start_idx = i
- current_duration = 0.0
- # Accumulate duration during positive current
- elif start_idx is not None and current > 0:
- current_duration += self.measurements_duration[i]
- # Detect end of positive current (charging)
- elif start_idx is not None and current <= 0:
- cycles.append((start_idx, current_duration))
- start_idx = None
-
- # Handle case where last cycle is incomplete
- if start_idx is not None:
- cycles.append((start_idx, current_duration))
-
- return cycles
- def estimate_capacity(self) -> float:
- """
- Estimate cell capacity based on charge cycle durations.
- Returns estimated capacity as percentage of nominal capacity.
- """
- if not self.measurements:
- logger.warning("No measurements available for capacity estimation.")
- return -1.0
- cycles = self._find_cycles()
- if not cycles:
- logger.warning("No charge cycles detected for capacity estimation.")
- return -1.0
- # Calculate expected cycle duration in seconds
- est_c_rate = max(m.current for m in self.measurements) / self.nom_capacity
- expected_duration = 3600 / est_c_rate # seconds for one cycle (1h/c_rate)
- # Calculate average cycle duration from actual measured durations
- actual_durations = [duration for _, duration in cycles]
- avg_duration = sum(actual_durations) / len(actual_durations)
- # Calculate capacity as ratio of actual vs expected duration
- capacity_ratio = min(1.0, avg_duration / expected_duration)
- estimated_capacity = capacity_ratio * 100.0
- logger.info(f"Cell {self.id} capacity estimation: {estimated_capacity:.1f}% "
- f"(found {len(cycles)} cycles, avg duration: {avg_duration:.1f} seconds)")
-
- self.estimated_health = estimated_capacity
- return estimated_capacity
|