浏览代码

Add initial project structure and configuration files for battery health monitor

Silas Gruen 10 月之前
父节点
当前提交
acad6a5942

+ 3 - 0
.gitignore

@@ -0,0 +1,3 @@
+battery-health-monitor/battery_health_monitor.egg-info
+__pycache__
+.python-version

+ 38 - 2
README.md

@@ -1,3 +1,39 @@
-# measure_ctrl
+# README.md
 
-Code for pi controlling multiple measurement devices
+# Battery Health Monitor
+
+## Overview
+
+The Battery Health Monitor is a Python application designed to assess the health of 18650 lithium-ion cells using multiple measurement devices. The application communicates with the devices via I2C and provides an MQTT interface for real-time updates on cell insertion. It also supports HTTP GET requests to retrieve detailed information about the cells.
+
+## Features
+
+- Monitor the health of 18650 cells through multiple measurement devices.
+- Utilize I2C for reading current, voltage, and temperature data.
+- MQTT interface for receiving information about cell slots.
+- HTTP GET requests for querying cell data (max voltage, min voltage, capacity).
+- Configurable measurement cycles and C values.
+
+## Installation
+
+1. Clone the repository:
+   ```
+   git clone <repository-url>
+   cd battery-health-monitor
+   ```
+
+2. Install the required packages:
+   ```
+   pip install -r requirements.txt
+   ```
+
+## Usage
+
+To run the application, execute the following command:
+```
+python src/main.py
+```
+
+## Configuration
+
+The configuration for the measurement devices can be found in `src/config/config.yaml`. Modify this file to set the number of devices, cycle count, and C value for measurements.

+ 9 - 0
battery-health-monitor/requirements.txt

@@ -0,0 +1,9 @@
+paho-mqtt==1.6.1
+smbus2==0.4.2
+smbus==1.1.post2
+aiohttp==3.8.5
+numpy==1.24.3
+pyyaml==6.0.1
+pytest==7.4.0
+pytest-asyncio==0.21.1
+Flask==2.3.3

+ 20 - 0
battery-health-monitor/setup.py

@@ -0,0 +1,20 @@
+from setuptools import setup, find_packages
+
+setup(
+    name="battery-health-monitor",
+    version="0.1.0",
+    packages=find_packages(),
+    install_requires=[
+        "paho-mqtt>=1.6.1",
+        "smbus2>=0.4.2",
+        "aiohttp>=3.8.5",
+        "numpy>=1.24.3",
+        "pyyaml>=6.0.1",
+    ],
+    entry_points={
+        'console_scripts': [
+            'battery-monitor=src.main:main',
+        ],
+    },
+    python_requires='>=3.8',
+)

+ 1 - 0
battery-health-monitor/src/__init__.py

@@ -0,0 +1 @@
+# The file is intentionally left blank.

+ 3 - 0
battery-health-monitor/src/config/__init__.py

@@ -0,0 +1,3 @@
+# FILE: /battery-health-monitor/battery-health-monitor/src/config/__init__.py
+
+# This file is intentionally left blank.

+ 32 - 0
battery-health-monitor/src/config/config.yaml

@@ -0,0 +1,32 @@
+mqtt:
+  broker: "localhost"
+  port: 1883
+  topic_prefix: "cells_inserted"
+  client_id: "battery_monitor"
+  keepalive: 60
+
+http:
+  server_url: "http://localhost:8080"
+  timeout: 5
+  endpoint: "/cell_info"
+
+measurement:
+  cycles: 3
+  c_rate: 0.25
+  sample_rate_hz: 1
+  min_voltage: 2.5
+  max_voltage: 4.2
+  max_temperature_c: 45
+  rest_time_minutes: 30
+
+devices:
+  - id: "device_1"
+    i2c_address: 0x48
+    slots: 4
+  - id: "device_2"
+    i2c_address: 0x49
+    slots: 4
+
+logging:
+  level: "INFO"
+  file: "battery_monitor.log"

+ 3 - 0
battery-health-monitor/src/controllers/__init__.py

@@ -0,0 +1,3 @@
+# FILE: /battery-health-monitor/battery-health-monitor/src/controllers/__init__.py
+
+# This file is intentionally left blank.

+ 33 - 0
battery-health-monitor/src/controllers/device_controller.py

@@ -0,0 +1,33 @@
+import paho.mqtt.client as mqtt
+from src.models.device import Device
+
+class DeviceController:
+    def __init__(self, mqtt_broker, devices_config):
+        self.devices = [Device(config) for config in devices_config]
+        self.mqtt_client = mqtt.Client()
+        self.mqtt_broker = mqtt_broker
+        self.setup_mqtt()
+
+    def setup_mqtt(self):
+        self.mqtt_client.on_connect = self.on_connect
+        self.mqtt_client.on_message = self.on_message
+        self.mqtt_client.connect(self.mqtt_broker)
+        self.mqtt_client.loop_start()
+
+    def on_connect(self, client, userdata, flags, rc):
+        print("Connected to MQTT broker with result code " + str(rc))
+        for device in self.devices:
+            client.subscribe(f"cells_inserted/{device.device_id}")
+
+    def on_message(self, client, userdata, msg):
+        payload = json.loads(msg.payload)
+        slot = payload['slot']
+        cell_id = payload['cell_id']
+        self.handle_cell_insertion(msg.topic, slot, cell_id)
+
+    def handle_cell_insertion(self, topic, slot, cell_id):
+        device_id = topic.split('/')[-1]
+        device = next((d for d in self.devices if d.device_id == device_id), None)
+        if device:
+            device.insert_cell(slot, cell_id)
+            print(f"Cell {cell_id} inserted in device {device_id} at slot {slot}.")

+ 105 - 0
battery-health-monitor/src/controllers/measurement_controller.py

@@ -0,0 +1,105 @@
+import asyncio
+import logging
+from typing import Dict, List
+from datetime import datetime
+from models.battery import Battery
+from models.device import Device
+from services.i2c_service import I2CService
+from services.http_service import HTTPService
+from utils.health_calculator import calculate_health
+
+logger = logging.getLogger(__name__)
+
+class MeasurementController:
+    """Controls the measurement process for multiple devices and slots."""
+    
+    def __init__(self, config: dict, i2c_service: I2CService, http_service: HTTPService):
+        self.config = config
+        self.i2c_service = i2c_service
+        self.http_service = http_service
+        self.active_measurements: Dict[str, Dict[int, asyncio.Task]] = {}
+        
+    async def start_measurement(self, device_id: str, slot: int, cell_id: str):
+        """Start measurement cycle for a specific slot."""
+        if device_id not in self.active_measurements:
+            self.active_measurements[device_id] = {}
+            
+        # Cancel existing measurement if any
+        if slot in self.active_measurements[device_id]:
+            self.active_measurements[device_id][slot].cancel()
+            
+        # Create new measurement task
+        task = asyncio.create_task(
+            self._measure_cycle(device_id, slot, cell_id)
+        )
+        self.active_measurements[device_id][slot] = task
+        
+    async def _measure_cycle(self, device_id: str, slot: int, cell_id: str):
+        """Execute measurement cycles for a battery."""
+        try:
+            # Get battery info from HTTP service
+            battery_info = await self.http_service.get_cell_info(cell_id)
+            
+            measurements = []
+            cycles = self.config['measurement']['cycles']
+            sample_rate = self.config['measurement']['sample_rate_hz']
+            
+            for cycle in range(cycles):
+                logger.info(f"Starting cycle {cycle+1}/{cycles} for cell {cell_id}")
+                cycle_measurements = []
+                
+                while True:
+                    # Read measurements
+                    voltage = await self.i2c_service.read_voltage(device_id, slot)
+                    current = await self.i2c_service.read_current(device_id, slot)
+                    temp = await self.i2c_service.read_temperature(device_id, slot)
+                    
+                    # Check safety limits
+                    if not self._check_safety_limits(voltage, temp):
+                        raise Exception("Safety limits exceeded")
+                    
+                    cycle_measurements.append({
+                        'timestamp': datetime.now().isoformat(),
+                        'voltage': voltage,
+                        'current': current,
+                        'temperature': temp
+                    })
+                    
+                    # Check if cycle complete
+                    if self._is_cycle_complete(cycle_measurements):
+                        break
+                        
+                    await asyncio.sleep(1/sample_rate)
+                
+                measurements.extend(cycle_measurements)
+                
+                # Rest period between cycles
+                await asyncio.sleep(self.config['measurement']['rest_time_minutes'] * 60)
+            
+            # Calculate health
+            health = calculate_health([m['current'] for m in measurements])
+            
+            logger.info(f"Measurement complete for cell {cell_id}. Health: {health}%")
+            
+        except Exception as e:
+            logger.error(f"Error during measurement: {str(e)}")
+        finally:
+            # Cleanup
+            if device_id in self.active_measurements and \
+               slot in self.active_measurements[device_id]:
+                del self.active_measurements[device_id][slot]
+                
+    def _check_safety_limits(self, voltage: float, temperature: float) -> bool:
+        """Check if measurements are within safety limits."""
+        return (self.config['measurement']['min_voltage'] <= voltage <= 
+                self.config['measurement']['max_voltage'] and
+                temperature <= self.config['measurement']['max_temperature_c'])
+        
+    def _is_cycle_complete(self, measurements: List[dict]) -> bool:
+        """Determine if a measurement cycle is complete."""
+        if not measurements:
+            return False
+        
+        # Add your cycle completion logic here
+        # Example: Check if voltage has reached upper limit and then lower limit
+        return len(measurements) > 100  # Simplified example

+ 47 - 0
battery-health-monitor/src/main.py

@@ -0,0 +1,47 @@
+import asyncio
+import logging
+import yaml
+from pathlib import Path
+from services.mqtt_service import MQTTService
+from services.i2c_service import I2CService
+from services.http_service import HTTPService
+from controllers.measurement_controller import MeasurementController
+
+async def main():
+    # Load config
+    config_path = Path(__file__).parent / "config" / "config.yaml"
+    with open(config_path) as f:
+        config = yaml.safe_load(f)
+
+    # Setup logging
+    logging.basicConfig(
+        level=getattr(logging, config['logging']['level']),
+        filename=config['logging']['file'],
+        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+    )
+
+    # Initialize services
+    i2c_service = I2CService(config)
+    http_service = HTTPService(config)
+    measurement_controller = MeasurementController(config, i2c_service, http_service)
+
+    # Setup MQTT client with callback
+    mqtt_service = MQTTService(config)
+    
+    async def on_cell_inserted(device_id: str, slot: int, cell_id: str):
+        await measurement_controller.start_measurement(device_id, slot, cell_id)
+
+    mqtt_service.set_callback(on_cell_inserted)
+    await mqtt_service.connect()
+
+    try:
+        # Keep running until interrupted
+        while True:
+            await asyncio.sleep(1)
+    except KeyboardInterrupt:
+        logging.info("Shutting down...")
+    finally:
+        await mqtt_service.disconnect()
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 1 - 0
battery-health-monitor/src/models/__init__.py

@@ -0,0 +1 @@
+# The file is intentionally left blank.

+ 22 - 0
battery-health-monitor/src/models/battery.py

@@ -0,0 +1,22 @@
+class Battery:
+    def __init__(self, voltage, capacity):
+        self.voltage = voltage
+        self.capacity = capacity
+
+    def estimate_health(self, current_measurements):
+        """
+        Estimate the state of health of the battery based on current measurements.
+        
+        Parameters:
+        current_measurements (list): A list of current measurements during the cycles.
+
+        Returns:
+        float: Estimated state of health as a percentage.
+        """
+        # Basic health estimation logic (placeholder)
+        if not current_measurements:
+            return 0.0
+        
+        avg_current = sum(current_measurements) / len(current_measurements)
+        health = max(0, min(100, (self.capacity - avg_current) / self.capacity * 100))
+        return health

+ 36 - 0
battery-health-monitor/src/models/device.py

@@ -0,0 +1,36 @@
+class Device:
+    def __init__(self, device_id, slots):
+        """
+        Initializes a Device instance.
+
+        :param device_id: Unique identifier for the device.
+        :param slots: Number of slots available in the device.
+        """
+        self.device_id = device_id
+        self.slots = slots
+        self.slot_data = {slot: None for slot in range(slots)}
+
+    def read_i2c_data(self):
+        """
+        Reads data from the I2C bus for each slot.
+        This method should be implemented to interact with the I2C service.
+        """
+        pass
+
+    def update_slot(self, slot, cell_id):
+        """
+        Updates the specified slot with the given cell ID.
+
+        :param slot: The slot number to update.
+        :param cell_id: The ID of the cell inserted in the slot.
+        """
+        if slot in self.slot_data:
+            self.slot_data[slot] = cell_id
+        else:
+            raise ValueError("Invalid slot number.")
+
+    def get_slot_info(self):
+        """
+        Returns the current state of all slots in the device.
+        """
+        return self.slot_data

+ 1 - 0
battery-health-monitor/src/services/__init__.py

@@ -0,0 +1 @@
+# The file is intentionally left blank.

+ 30 - 0
battery-health-monitor/src/services/http_service.py

@@ -0,0 +1,30 @@
+import json
+from flask import Flask, request, jsonify
+
+class HTTPService:
+    def __init__(self, app: Flask):
+        self.app = app
+        self.app.add_url_rule('/cell_info', 'get_cell_info', self.get_cell_info, methods=['GET'])
+
+    def get_cell_info(self):
+        cell_id = request.args.get('cell_id')
+        if not cell_id:
+            return jsonify({"error": "cell_id is required"}), 400
+        
+        # Here you would typically fetch the cell information from a database or another service
+        cell_info = self.fetch_cell_info(cell_id)
+        
+        if cell_info:
+            return jsonify(cell_info), 200
+        else:
+            return jsonify({"error": "Cell not found"}), 404
+
+    def fetch_cell_info(self, cell_id):
+        # Placeholder for fetching cell information logic
+        # This should be replaced with actual data retrieval logic
+        return {
+            "cell_id": cell_id,
+            "max_voltage": 4.2,
+            "min_voltage": 2.5,
+            "capacity": 2500
+        }

+ 36 - 0
battery-health-monitor/src/services/i2c_service.py

@@ -0,0 +1,36 @@
+import smbus
+import time
+
+class I2CService:
+    def __init__(self, bus_number=1):
+        self.bus = smbus.SMBus(bus_number)
+
+    def read_current(self, device_address):
+        # Replace with actual register address for current
+        current_register = 0x00
+        current = self.bus.read_word_data(device_address, current_register)
+        return self._convert_to_current(current)
+
+    def read_voltage(self, device_address):
+        # Replace with actual register address for voltage
+        voltage_register = 0x01
+        voltage = self.bus.read_word_data(device_address, voltage_register)
+        return self._convert_to_voltage(voltage)
+
+    def read_temperature(self, device_address):
+        # Replace with actual register address for temperature
+        temperature_register = 0x02
+        temperature = self.bus.read_word_data(device_address, temperature_register)
+        return self._convert_to_temperature(temperature)
+
+    def _convert_to_current(self, raw_value):
+        # Conversion logic for current
+        return raw_value * 0.1  # Example conversion factor
+
+    def _convert_to_voltage(self, raw_value):
+        # Conversion logic for voltage
+        return raw_value * 0.01  # Example conversion factor
+
+    def _convert_to_temperature(self, raw_value):
+        # Conversion logic for temperature
+        return raw_value * 0.5  # Example conversion factor

+ 32 - 0
battery-health-monitor/src/services/mqtt_service.py

@@ -0,0 +1,32 @@
+import paho.mqtt.client as mqtt
+import json
+
+class MQTTService:
+    def __init__(self, broker_address, topic):
+        self.broker_address = broker_address
+        self.topic = topic
+        self.client = mqtt.Client()
+
+    def connect(self):
+        self.client.connect(self.broker_address)
+
+    def on_message(self, client, userdata, message):
+        payload = json.loads(message.payload.decode())
+        self.handle_message(payload)
+
+    def handle_message(self, payload):
+        # Handle the incoming message payload
+        print(f"Received message: {payload}")
+
+    def subscribe(self):
+        self.client.subscribe(self.topic)
+        self.client.on_message = self.on_message
+
+    def publish(self, message):
+        self.client.publish(self.topic, json.dumps(message))
+
+    def loop_start(self):
+        self.client.loop_start()
+
+    def loop_stop(self):
+        self.client.loop_stop()

+ 1 - 0
battery-health-monitor/src/utils/__init__.py

@@ -0,0 +1 @@
+# This file is intentionally left blank.

+ 28 - 0
battery-health-monitor/src/utils/health_calculator.py

@@ -0,0 +1,28 @@
+def calculate_health(current_measurements, voltage_measurements, temperature_measurements):
+    """
+    Estimate the state of health (SoH) of a battery based on current, voltage, and temperature measurements.
+
+    Parameters:
+    - current_measurements: List of current measurements (in Amperes).
+    - voltage_measurements: List of voltage measurements (in Volts).
+    - temperature_measurements: List of temperature measurements (in Celsius).
+
+    Returns:
+    - health: Estimated state of health as a percentage.
+    """
+    # Basic checks
+    if not current_measurements or not voltage_measurements or not temperature_measurements:
+        return 0.0
+
+    # Calculate average values
+    avg_current = sum(current_measurements) / len(current_measurements)
+    avg_voltage = sum(voltage_measurements) / len(voltage_measurements)
+    avg_temperature = sum(temperature_measurements) / len(temperature_measurements)
+
+    # Simple health estimation logic (placeholder)
+    health = 100 - (avg_current * 0.1) - (avg_voltage * 0.05) + (avg_temperature * 0.1)
+
+    # Ensure health is within 0 to 100
+    health = max(0, min(100, health))
+
+    return health

+ 1 - 0
battery-health-monitor/tests/__init__.py

@@ -0,0 +1 @@
+# This file is intentionally left blank.

+ 25 - 0
battery-health-monitor/tests/test_health_calculator.py

@@ -0,0 +1,25 @@
+import unittest
+from src.utils.health_calculator import calculate_health
+
+class TestHealthCalculator(unittest.TestCase):
+
+    def test_calculate_health(self):
+        # Test case for a healthy battery
+        current_measurements = [0.5, 0.5, 0.5]  # Example current measurements
+        expected_health = 100  # Expected health percentage
+        self.assertEqual(calculate_health(current_measurements), expected_health)
+
+    def test_calculate_health_low_capacity(self):
+        # Test case for a battery with low capacity
+        current_measurements = [0.1, 0.1, 0.1]  # Example current measurements
+        expected_health = 20  # Expected health percentage
+        self.assertEqual(calculate_health(current_measurements), expected_health)
+
+    def test_calculate_health_empty_measurements(self):
+        # Test case for empty measurements
+        current_measurements = []
+        expected_health = 0  # Expected health percentage
+        self.assertEqual(calculate_health(current_measurements), expected_health)
+
+if __name__ == '__main__':
+    unittest.main()

+ 8 - 0
measure_ctrl.code-workspace

@@ -0,0 +1,8 @@
+{
+	"folders": [
+		{
+			"path": "."
+		}
+	],
+	"settings": {}
+}