瀏覽代碼

invert dependency in I2C

SG/Cellrobot 8 月之前
父節點
當前提交
6c2f9035a5
共有 3 個文件被更改,包括 74 次插入33 次删除
  1. 29 8
      playgrounds/i2c_playground.py
  2. 33 14
      robot_control/src/api/i2c_handler.py
  3. 12 11
      robot_control/src/vendor/mcp3428.py

+ 29 - 8
playgrounds/i2c_playground.py

@@ -1,13 +1,34 @@
 from robot_control.src.api.i2c_handler import I2C
 from robot_control.src.api.i2c_handler import I2C
+from robot_control.src.vendor.mcp3428 import MCP3428
 from time import sleep
 from time import sleep
+import asyncio
 
 
-# Initialize with multiple channels (1 through 4)
-i2c = I2C([3, 4])
-i2c.initialize()
+async def main():
+    # Initialize with multiple channels (1 through 4)
+    i2c = I2C(device_class=MCP3428, gain=1, resolution=12)
+    i2c.initialize()
 
 
-while(1):
-    sleep(1)
-    readings = i2c.read()
-    for channel, value in readings.items():
-        print(f"Channel {channel}: {value}")
+    counter = 0
+    while True:
+        # Read channel 4 every iteration
+        selection = [4]
+
+        # Read channel 3 every 4 seconds
+        if counter % 4 == 0:
+            selection.append(3)
+
+        # Read channel 1 every 10 iterations
+        if counter % 10 == 0:
+            selection.append(1)
+
+        readings = await i2c.read_channels(selection)
+
+        for channel, value in readings.items():
+            print(f"Channel {channel}: {value}")
+
+        counter += 1
+        await asyncio.sleep(1)
+
+if __name__ == "__main__":
+    asyncio.run(main())
 
 

+ 33 - 14
robot_control/src/api/i2c_handler.py

@@ -1,27 +1,46 @@
 import smbus2 as smbus 
 import smbus2 as smbus 
-from ..vendor.mcp3428 import MCP3428, MCP3428_DEFAULT_ADDRESS
+from abc import ABC, abstractmethod
 from time import sleep
 from time import sleep
+import asyncio
+
+class I2CDevice(ABC):
+    @abstractmethod
+    def initialize(self, bus: smbus.SMBus, address: int):
+        pass
+
+    @abstractmethod
+    def config_channel(self, channel: int, **kwargs):
+        pass
+
+    @abstractmethod
+    def read_value(self) -> float:
+        pass
 
 
 class I2C:
 class I2C:
-    def __init__(self, channels, gain = 1, resolution = 12):
-        self.channels = channels if isinstance(channels, list) else [channels]
-        self.gain = gain
-        self.resolution = resolution
-        self.mcp = None
+    def __init__(self, device_class:I2CDevice, address=None, **kwargs):
+        self.device_class = device_class
+        self.address = address if address is not None else device_class.default_address
+        assert self.address is not None, "I2C address must be provided"
+        self.kwargs = kwargs
+        self.device = None
 
 
     def initialize(self):
     def initialize(self):
         bus = smbus.SMBus(1)
         bus = smbus.SMBus(1)
-        self.mcp = MCP3428(bus, MCP3428_DEFAULT_ADDRESS)
+        self.device:I2CDevice = self.device_class()
+        self.device.initialize(bus, self.address)
         
         
-    def read_channel(self, channel):
-        if self.mcp is None:
+    async def read_channel(self, channel):
+        if self.device is None:
             raise RuntimeError("I2C not initialized")
             raise RuntimeError("I2C not initialized")
-        self.mcp.config_command(channel, self.gain, self.resolution)
-        sleep(0.01)
-        return self.mcp.read_adc()
+        self.device.config_channel(channel, **self.kwargs)
+        await asyncio.sleep(0.01)
+        return self.device.read_value()
     
     
-    def read(self):
-        return {channel: self.read_channel(channel) for channel in self.channels}
+    async def read_channels(self, channels):
+        self.channels = channels if isinstance(channels, list) else [channels]
+        
+        # Needs to be sequential otherwise readings will be mixedup
+        return {channel: await self.read_channel(channel) for channel in self.channels}
 
 
 
 
 
 

+ 12 - 11
robot_control/src/vendor/mcp3428.py

@@ -27,21 +27,28 @@ GAIN = {
 MCP3428_CMD_READ_CNVRSN                = 0x00 # Read Conversion Result Data
 MCP3428_CMD_READ_CNVRSN                = 0x00 # Read Conversion Result Data
 
 
 import smbus2 as smbus
 import smbus2 as smbus
+from ..api.i2c_handler import I2CDevice
 
 
-class MCP3428:
+class MCP3428(I2CDevice):
+    default_address = MCP3428_DEFAULT_ADDRESS
 
 
-    def __init__(self, bus:smbus.SMBus, address):
+    def initialize(self, bus: smbus.SMBus, address: int):
         self.bus = bus
         self.bus = bus
         self.address = address
         self.address = address
 
 
-    def config_command(self, channel, gain=1, resolution=12):
+    def config_channel(self, channel: int, gain=1, resolution=12):
         """Select the Configuration Command from the given provided values"""
         """Select the Configuration Command from the given provided values"""
         self.gain = gain
         self.gain = gain
         self.resolution = resolution
         self.resolution = resolution
         CONFIG_CMD = (MCP3428_CMD_MODE_CONT | RESOLUTION[resolution] | GAIN[gain] | CHANNEL[channel])
         CONFIG_CMD = (MCP3428_CMD_MODE_CONT | RESOLUTION[resolution] | GAIN[gain] | CHANNEL[channel])
         self.bus.write_byte(self.address, CONFIG_CMD)
         self.bus.write_byte(self.address, CONFIG_CMD)
 
 
-    def read_adc(self):
+    def read_value(self) -> float:
+        raw_adc = self._read_adc()
+        res_per_bit = 2.048/2**(self.resolution-1)
+        return raw_adc*res_per_bit/self.gain
+
+    def _read_adc(self):
         """Read data back from MCP3428_CMD_READ_CNVRSN(0x00), 2 bytes
         """Read data back from MCP3428_CMD_READ_CNVRSN(0x00), 2 bytes
         raw_adc MSB, raw_adc LSB"""
         raw_adc MSB, raw_adc LSB"""
         read = smbus.i2c_msg.read(self.address, 2)
         read = smbus.i2c_msg.read(self.address, 2)
@@ -49,12 +56,6 @@ class MCP3428:
         data = list(read)
         data = list(read)
         # Convert the data to 12-bits
         # Convert the data to 12-bits
         raw_adc = ((data[0] & 0x0F) * 256) + data[1]
         raw_adc = ((data[0] & 0x0F) * 256) + data[1]
-        if raw_adc > 2047 :
+        if raw_adc > 2047:
             raw_adc -= 4095
             raw_adc -= 4095
         return raw_adc
         return raw_adc
-
-    def read_value(self):
-        raw_adc = self.read_adc()
-        res_per_bit = 2.048/2**(self.resolution-1)
-        # ensure this works!
-        return raw_adc*res_per_bit/self.gain