Pārlūkot izejas kodu

Add initial project structure

Silas Gruen 1 gadu atpakaļ
vecāks
revīzija
6cb5dd6593
11 mainītis faili ar 290 papildinājumiem un 0 dzēšanām
  1. 11 0
      can_bus.py
  2. 59 0
      cell.py
  3. 13 0
      config/cell_config.yaml
  4. 24 0
      config_loader.py
  5. 26 0
      controller.py
  6. 22 0
      db_handler.py
  7. 18 0
      rack.py
  8. 28 0
      redis_handler.py
  9. 3 0
      requirements.txt
  10. 27 0
      segment.py
  11. 59 0
      subsegment.py

+ 11 - 0
can_bus.py

@@ -0,0 +1,11 @@
+
+from abc import ABC, abstractmethod
+
+class CANBus(ABC):
+    @abstractmethod
+    def send_message(self, message):
+        pass
+
+    @abstractmethod
+    def receive_message(self):
+        pass

+ 59 - 0
cell.py

@@ -0,0 +1,59 @@
+from redis_handler import RedisHandler
+from db_handler import DatabaseHandler
+import uuid
+
+class Cell:
+    def __init__(self, cell_id=None):
+        self.cell_id = cell_id or str(uuid.uuid4())
+        self.voltage = 0
+        self.current = 0
+        self.temperature = 0
+        self._redis = RedisHandler()
+        self._db = DatabaseHandler()
+        
+        # Get estimated capacity from database or use nominal as default
+        self.nominal_capacity_mah = self._db.get_cell_capacity_nom(self.cell_id)
+        self.estimated_capacity_mah = self._db.get_cell_capacity(self.cell_id) or self.nominal_capacity_mah
+
+    def measure_voltage(self):
+        # Logic to measure voltage
+        return self.voltage
+
+    def measure_current(self):
+        # Logic to measure current
+        return self.current
+
+    def measure_temperature(self):
+        # Logic to measure temperature
+        return self.temperature
+
+    def disconnect(self):
+        # Logic to disconnect the cell
+        pass
+
+    def connect(self):
+        # Logic to connect the cell
+        pass
+
+    def update_estimated_capacity(self, new_estimate_mah):
+        """Update the estimated capacity based on usage patterns and measurements"""
+        self.estimated_capacity_mah = new_estimate_mah
+        self._db.update_cell_capacity(self.cell_id, new_estimate_mah)
+
+    def get_capacity_health(self):
+        """Return cell health as a percentage of nominal capacity"""
+        return (self.estimated_capacity_mah / self.nominal_capacity_mah) * 100
+
+    def update_measurements(self, voltage, current, temperature):
+        """Update measurements and save to Redis"""
+        self.voltage = voltage
+        self.current = current
+        self.temperature = temperature
+        self._redis.save_measurement(self.cell_id, voltage, current, temperature)
+        self._update_capacity_estimate()
+
+    def _update_capacity_estimate(self):
+        """Update capacity estimate based on Redis data"""
+        new_estimate = self._redis.get_capacity_estimate(self.cell_id)
+        if new_estimate:
+            self.estimated_capacity_mah = new_estimate

+ 13 - 0
config/cell_config.yaml

@@ -0,0 +1,13 @@
+segments:
+  - id: "segment_01"
+    subsegments:
+      - id: 1
+        cell_ids: [513, 514, 515, 516]
+      - id: 2
+        cell_ids: [6, 7, 8, 12]
+  - id: "segment_02"
+    subsegments:
+      - id: 1
+        cell_ids: [1, 3, 4, 5]
+      - id: 2
+        cell_ids: [12, 24, 64, 128]

+ 24 - 0
config_loader.py

@@ -0,0 +1,24 @@
+import yaml
+from pathlib import Path
+
+class ConfigLoader:
+    def __init__(self, config_path="config/cell_config.yaml"):
+        self.config_path = Path(__file__).parent / config_path
+        with open(self.config_path, 'r') as f:
+            self.config = yaml.safe_load(f)
+
+    def get_cell_ids_for_subsegment(self, segment_id, subsegment_id):
+        """Get cell IDs for a specific subsegment"""
+        for segment in self.config['segments']:
+            if segment['id'] == segment_id:
+                for subsegment in segment['subsegments']:
+                    if subsegment['id'] == subsegment_id:
+                        return subsegment['cell_ids']  # Updated key name
+        return []
+
+    def get_segment_config(self, segment_id):
+        """Get configuration for a specific segment"""
+        for segment in self.config['segments']:
+            if segment['id'] == segment_id:
+                return segment
+        return None

+ 26 - 0
controller.py

@@ -0,0 +1,26 @@
+
+from rack import Rack
+from can_bus import CANBus
+
+class Controller:
+    def __init__(self, rack, can_bus, min_voltage, max_voltage, min_current, max_current, min_temperature, max_temperature):
+        self.rack = rack
+        self.can_bus = can_bus
+        self.min_voltage = min_voltage
+        self.max_voltage = max_voltage
+        self.min_current = min_current
+        self.max_current = max_current
+        self.min_temperature = min_temperature
+        self.max_temperature = max_temperature
+
+    def charge(self):
+        # Logic to control charging
+        pass
+
+    def discharge(self):
+        # Logic to control discharging
+        pass
+
+    def ensure_safety(self):
+        # Logic to ensure safety conditions
+        pass

+ 22 - 0
db_handler.py

@@ -0,0 +1,22 @@
+import sqlite3
+from pathlib import Path
+
+class DatabaseHandler:
+    def __init__(self, db_path="cell_data.db"):
+        # add code for initializing database connection
+        pass
+
+
+    def get_cell_capacity_nom(self, cell_id):
+        # add code for getting nominal cell capacity from database
+        pass
+
+    def get_cell_capacity(self, cell_id):
+        # add code for getting cell capacity from database
+        pass
+
+
+    def update_cell_capacity(self, cell_id, estimated_capacity_mah):
+        # add code for updating cell capacity in database
+        pass
+

+ 18 - 0
rack.py

@@ -0,0 +1,18 @@
+from segment import Segment
+from config_loader import ConfigLoader
+
+class Rack:
+    def __init__(self, capacity_mah=2500):
+        self.config = ConfigLoader()
+        self.segments = [
+            Segment(segment['id'])
+            for segment in self.config.config['segments']
+        ]
+
+    def measure_voltage(self):
+        # Logic to measure voltage
+        pass
+
+    def measure_power(self):
+        # Logic to measure power
+        pass

+ 28 - 0
redis_handler.py

@@ -0,0 +1,28 @@
+import redis
+from datetime import datetime
+
+class RedisHandler:
+    def __init__(self, host='localhost', port=6379, db=0):
+        self.client = redis.Redis(host=host, port=port, db=db)
+
+    def save_measurement(self, cell_id, voltage, current, temperature):
+        """Save cell measurements with timestamp"""
+        timestamp = datetime.now().isoformat()
+        key = f"cell:{cell_id}:measurements:{timestamp}"
+        self.client.hmset(key, {
+            'voltage': voltage,
+            'current': current,
+            'temperature': temperature
+        })
+        self.client.expire(key, 60 * 60 * 24 * 30)  # Keep data for 30 days
+
+    def get_capacity_estimate(self, cell_id):
+        """Calculate estimated capacity based on historical data"""
+        measurements = self.client.keys(f"cell:{cell_id}:measurements:*")
+        if not measurements:
+            return None
+            
+        # Here you would implement your capacity estimation algorithm
+        # based on voltage curves, current integration, etc.
+        # This is a simplified example
+        return float(self.client.get(f"cell:{cell_id}:estimated_capacity") or 0)

+ 3 - 0
requirements.txt

@@ -0,0 +1,3 @@
+pyyaml>=6.0.1
+pathlib>=1.0.1
+sqlite3

+ 27 - 0
segment.py

@@ -0,0 +1,27 @@
+from subsegment import Subsegment
+from config_loader import ConfigLoader
+
+class Segment:
+    def __init__(self, segment_id):
+        self.segment_id = segment_id
+        self.config = ConfigLoader()
+        segment_config = self.config.get_segment_config(segment_id)
+        if not segment_config:
+            raise ValueError(f"No configuration found for segment {segment_id}")
+            
+        self.subsegments = [
+            Subsegment(self.segment_id, subseg['id'])
+            for subseg in segment_config['subsegments']
+        ]
+
+    def measure_voltage(self):
+        # Logic to measure voltage
+        pass
+
+    def measure_power(self):
+        # Logic to measure power
+        pass
+
+    def measure_temperature(self):
+        # Logic to measure temperature
+        pass

+ 59 - 0
subsegment.py

@@ -0,0 +1,59 @@
+from cell import Cell
+from config_loader import ConfigLoader
+
+class Subsegment:
+    def __init__(self, segment_id, subsegment_id):
+        self.segment_id = segment_id
+        self.subsegment_id = subsegment_id
+        config = ConfigLoader()
+        cell_ids = config.get_cell_ids_for_subsegment(segment_id, subsegment_id)
+        
+        if not cell_ids:
+            raise ValueError(f"No cell IDs configured for subsegment {subsegment_id}")
+            
+        # Create cells with explicitly configured IDs
+        self.cells = [Cell(cell_id) for cell_id in cell_ids]
+
+    def measure_voltages(self):
+        return [cell.measure_voltage() for cell in self.cells]
+
+    def measure_currents(self):
+        return [cell.measure_current() for cell in self.cells]
+
+    def measure_temperatures(self):
+        return [cell.measure_temperature() for cell in self.cells]
+
+    # def get_power(self):
+    #     return [cell.measure_voltage() * cell.measure_current() for cell in self.cells]
+
+    # def calculate_saved_power(self):
+    #     return sum(self.get_power())
+
+    def get_voltage(self):
+        voltages = self.measure_voltages()
+        return sum(voltages) / len(voltages) if voltages else 0
+
+    def get_current(self):
+        currents = self.measure_currents()
+        return sum(currents)
+
+    def disconnect(self):
+        # Logic to disconnect the subsegment
+        pass
+
+    def connect(self):
+        # Logic to connect the subsegment
+        pass
+
+    def get_total_estimated_capacity_mah(self):
+        """For cells in parallel, total estimated capacity is sum of individual estimated capacities"""
+        return sum(cell.estimated_capacity_mah for cell in self.cells)
+    
+    def get_capacity_health_percentage(self):
+        """Estimate capacity health based on voltage and internal resistance"""
+        voltages = self.measure_voltages()
+        if not voltages:
+            return 0
+        avg_voltage = sum(voltages) / len(voltages)
+        # Basic health estimation (can be made more sophisticated)
+        return min(100, max(0, (avg_voltage - 2.5) / (4.2 - 2.5) * 100))