| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- from abc import ABC, abstractmethod
- import asyncio
- import random
- class I2CDevice(ABC):
- default_address = 0x08
- @abstractmethod
- def initialize(self, address: int):
- pass
- @abstractmethod
- def config_channel(self, channel: int, **kwargs):
- pass
- @abstractmethod
- def read_value(self) -> float:
- pass
- class I2C:
- def __init__(self, device_class:I2CDevice, address=None, **kwargs):
- self.device_class = device_class
- self.address = address if address is not None else device_class.default_address
- assert self.address is not None, "I2C address must be provided"
- self.kwargs = kwargs
- self.device = None
- def initialize(self):
- self.device:I2CDevice = self.device_class()
- self.device.initialize(self.address)
-
- async def read_channel(self, channel):
- if self.device is None:
- raise RuntimeError("I2C not initialized")
- self.device.config_channel(channel, **self.kwargs)
- await asyncio.sleep(0.01)
- return self.device.read_value()
-
- async def read_channels(self, channels):
- self.channels = channels if isinstance(channels, list) else [channels]
-
- # Needs to be sequential otherwise readings will be mixedup
- return {channel: await self.read_channel(channel) for channel in self.channels}
-
- class MockI2C(I2CDevice):
- def __init__(self):
- self.channels = {
- 1: 0.3, # Generic sensor
- 3: 0.8, # Tank sensor
- 4: 0.8 # Endeffector sensor
- }
- self.current_channel = 1
- def initialize(self, address):
- pass
- def config_channel(self, channel: int, gain=1, resolution=12):
- if self.current_channel not in self.channels:
- raise ValueError(f"Invalid channel {channel}")
- self.current_channel = channel
- def read_value(self) -> float:
- # Add some noise to the readings
- base_value = self.channels[self.current_channel]
- noise = random.uniform(-0.05, 0.05)
- return base_value + noise
|