| 123456789101112131415161718192021222324252627282930313233343536373839 |
- import asyncio
- import pigpio
- async def toggle_pin(pin: int, shared_state):
- """Toggle a GPIO pin based on shared state."""
- pi = pigpio.pi()
- if not pi.connected:
- raise RuntimeError(f"Failed to connect to pigpio daemon for pin {pin}")
-
- try:
- while True:
- pi.write(pin, shared_state['state'])
- await asyncio.sleep(0.1) # Small delay to prevent CPU overuse
- finally:
- pi.write(pin, 0)
- pi.stop()
- async def handle_input(shared_state):
- """Handle keyboard input to toggle pins."""
- while True:
- await asyncio.get_event_loop().run_in_executor(None, input, "Press Enter to toggle pins...")
- shared_state['state'] = 1 - shared_state['state'] # Toggle state
- print(f"Toggled pins to: {'ON' if shared_state['state'] else 'OFF'}")
- async def main():
- """Run input handling and pin toggling in parallel."""
- shared_state = {'state': 0}
- pins = [27]
- # pins = [17, 27, 22]
- pin_tasks = [toggle_pin(pin, shared_state) for pin in pins]
-
- await asyncio.gather(handle_input(shared_state), *pin_tasks)
- if __name__ == "__main__":
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- print("Exiting...")
|