from abc import ABC, abstractmethod import asyncio import random class I2CDevice(ABC): default_address = 0x08 @abstractmethod def initialize(self, address: int): pass @abstractmethod def config_channel(self, channel: int, **kwargs): pass @abstractmethod def read_value(self) -> float: pass class I2C: def __init__(self, device_class:I2CDevice, address=None, **kwargs): self.device_class = device_class self.address = address if address is not None else device_class.default_address assert self.address is not None, "I2C address must be provided" self.kwargs = kwargs self.device = None def initialize(self): self.device:I2CDevice = self.device_class() self.device.initialize(self.address) async def read_channel(self, channel): if self.device is None: raise RuntimeError("I2C not initialized") self.device.config_channel(channel, **self.kwargs) await asyncio.sleep(0.01) return self.device.read_value() async def read_channels(self, channels): self.channels = channels if isinstance(channels, list) else [channels] # Needs to be sequential otherwise readings will be mixedup return {channel: await self.read_channel(channel) for channel in self.channels} class MockI2C(I2CDevice): def __init__(self): self.channels = { 1: 0.3, # Generic sensor 3: 0.8, # Tank sensor 4: 0.8 # Endeffector sensor } self.current_channel = 1 def initialize(self, address): pass def config_channel(self, channel: int, gain=1, resolution=12): if self.current_channel not in self.channels: raise ValueError(f"Invalid channel {channel}") self.current_channel = channel def read_value(self) -> float: # Add some noise to the readings base_value = self.channels[self.current_channel] noise = random.uniform(-0.05, 0.05) return base_value + noise