i2c_handler.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from abc import ABC, abstractmethod
  2. import asyncio
  3. import random
  4. class I2CDevice(ABC):
  5. default_address = 0x08
  6. @abstractmethod
  7. def initialize(self, address: int):
  8. pass
  9. @abstractmethod
  10. def config_channel(self, channel: int, **kwargs):
  11. pass
  12. @abstractmethod
  13. def read_value(self) -> float:
  14. pass
  15. class I2C:
  16. def __init__(self, device_class:I2CDevice, address=None, **kwargs):
  17. self.device_class = device_class
  18. self.address = address if address is not None else device_class.default_address
  19. assert self.address is not None, "I2C address must be provided"
  20. self.kwargs = kwargs
  21. self.device = None
  22. def initialize(self):
  23. self.device:I2CDevice = self.device_class()
  24. self.device.initialize(self.address)
  25. async def read_channel(self, channel):
  26. if self.device is None:
  27. raise RuntimeError("I2C not initialized")
  28. self.device.config_channel(channel, **self.kwargs)
  29. await asyncio.sleep(0.01)
  30. return self.device.read_value()
  31. async def read_channels(self, channels):
  32. self.channels = channels if isinstance(channels, list) else [channels]
  33. # Needs to be sequential otherwise readings will be mixedup
  34. return {channel: await self.read_channel(channel) for channel in self.channels}
  35. class MockI2C(I2CDevice):
  36. def __init__(self):
  37. self.channels = {
  38. 1: 0.3, # Generic sensor
  39. 3: 0.8, # Tank sensor
  40. 4: 0.8 # Endeffector sensor
  41. }
  42. self.current_channel = 1
  43. def initialize(self, address):
  44. pass
  45. def config_channel(self, channel: int, gain=1, resolution=12):
  46. if self.current_channel not in self.channels:
  47. raise ValueError(f"Invalid channel {channel}")
  48. self.current_channel = channel
  49. def read_value(self) -> float:
  50. # Add some noise to the readings
  51. base_value = self.channels[self.current_channel]
  52. noise = random.uniform(-0.05, 0.05)
  53. return base_value + noise