Jelajahi Sumber

add I2C handling and playground; update .gitignore and requirements

SG/Cellrobot 8 bulan lalu
induk
melakukan
6fa6d2e72c

+ 2 - 1
.gitignore

@@ -1,8 +1,9 @@
 .cpython-39.pyc
 .VSCodeCounter/
 .pyc
-robot_control/src/robot_control.egg-info/
+robot-control/src/robot_control.egg-info/
 __pycache__
 .coverage
 .python-version
 logs/robot.log*
+cell_loader_env/

+ 7 - 0
.vscode/launch.json

@@ -17,6 +17,13 @@
             "request": "launch",
             "program": "${workspaceFolder}/robot_control/src/main.py",
             "console": "integratedTerminal"
+        },
+        {
+            "name": "I2C_Playground",
+            "type": "debugpy",
+            "request": "launch",
+            "module": "playgrounds.i2c_playground",
+            "console": "integratedTerminal"
         }
     ]
 }

+ 0 - 0
playgrounds/__init__.py


+ 13 - 0
playgrounds/i2c_playground.py

@@ -0,0 +1,13 @@
+from robot_control.src.api.i2c_handler import I2C
+from time import sleep
+
+# Initialize with multiple channels (1 through 4)
+i2c = I2C([3, 4])
+i2c.initialize()
+
+while(1):
+    sleep(1)
+    readings = i2c.read()
+    for channel, value in readings.items():
+        print(f"Channel {channel}: {value}")
+

+ 1 - 1
robot_control/requirements.txt

@@ -1,6 +1,6 @@
 fastapi[all]
 uvicorn
-# opencv-python==4.10.0
+opencv-python>=4
 pylibdmtx==0.1.10
 pydantic==2.10.4
 numpy==2.0.0

+ 28 - 0
robot_control/src/api/i2c_handler.py

@@ -0,0 +1,28 @@
+import smbus2 as smbus 
+from ..vendor.mcp3428 import MCP3428, MCP3428_DEFAULT_ADDRESS
+from time import sleep
+
+class I2C:
+    def __init__(self, channels, gain = 1, resolution = 12):
+        self.channels = channels if isinstance(channels, list) else [channels]
+        self.gain = gain
+        self.resolution = resolution
+        self.mcp = None
+
+    def initialize(self):
+        bus = smbus.SMBus(1)
+        self.mcp = MCP3428(bus, MCP3428_DEFAULT_ADDRESS)
+        
+    def read_channel(self, channel):
+        if self.mcp is None:
+            raise RuntimeError("I2C not initialized")
+        self.mcp.config_command(channel, self.gain, self.resolution)
+        sleep(0.01)
+        return self.mcp.read_adc()
+    
+    def read(self):
+        return {channel: self.read_channel(channel) for channel in self.channels}
+
+
+
+

+ 60 - 0
robot_control/src/vendor/mcp3428.py

@@ -0,0 +1,60 @@
+# I2C address of the device
+MCP3428_DEFAULT_ADDRESS                = 0x68
+
+# MCP3428 Configuration Command Set
+MCP3428_CMD_NEW_CNVRSN                = 0x80 # Initiate a new conversion(One-Shot Conversion mode only)
+CHANNEL = {
+    1: 0x00, # Channel-1 Selected
+    2: 0x20, # Channel-2 Selected
+    3: 0x40, # Channel-3 Selected
+    4: 0x60 # Channel-4 Selected
+}
+
+MCP3428_CMD_MODE_CONT                = 0x10 # Continuous Conversion Mode
+MCP3428_CMD_MODE_ONESHOT            = 0x00 # One-Shot Conversion Mode
+RESOLUTION = {
+    12: 0x00, # 240 SPS (12-bit)
+    14: 0x04, # 60 SPS (14-bit)
+    16:  0x08 # 15 SPS (16-bit)
+}
+
+GAIN = {
+    1: 0x00, # PGA Gain = 1V/V
+    2: 0x01, # PGA Gain = 2V/V
+    4: 0x02, # PGA Gain = 4V/V
+    8: 0x03 # PGA Gain = 8V/V
+}
+MCP3428_CMD_READ_CNVRSN                = 0x00 # Read Conversion Result Data
+
+import smbus2 as smbus
+
+class MCP3428:
+
+    def __init__(self, bus:smbus.SMBus, address):
+        self.bus = bus
+        self.address = address
+
+    def config_command(self, channel, gain=1, resolution=12):
+        """Select the Configuration Command from the given provided values"""
+        self.gain = gain
+        self.resolution = resolution
+        CONFIG_CMD = (MCP3428_CMD_MODE_CONT | RESOLUTION[resolution] | GAIN[gain] | CHANNEL[channel])
+        self.bus.write_byte(self.address, CONFIG_CMD)
+
+    def read_adc(self):
+        """Read data back from MCP3428_CMD_READ_CNVRSN(0x00), 2 bytes
+        raw_adc MSB, raw_adc LSB"""
+        read = smbus.i2c_msg.read(self.address, 2)
+        self.bus.i2c_rdwr(read)
+        data = list(read)
+        # Convert the data to 12-bits
+        raw_adc = ((data[0] & 0x0F) * 256) + data[1]
+        if raw_adc > 2047 :
+            raw_adc -= 4095
+        return raw_adc
+
+    def read_value(self):
+        raw_adc = self.read_adc()
+        res_per_bit = 2.048/2**(self.resolution-1)
+        # ensure this works!
+        return raw_adc*res_per_bit/self.gain