mqtt-test.py 1014 B

12345678910111213141516171819202122232425262728293031323334353637
  1. import paho.mqtt.client as mqtt
  2. import time
  3. import random # Replace with actual voltage measurement code
  4. """
  5. This script connects to local MQTT broker and publishes random voltage measurements
  6. """
  7. BROKER = "localhost"
  8. PORT = 1883 # Use 8883 for TLS
  9. TOPIC = "cells_inserted/device1" # Change for each device
  10. USERNAME = "robot"
  11. PASSWORD = "robot"
  12. def on_connect(client, userdata, flags, rc):
  13. if rc == 0:
  14. print("Connected to MQTT Broker!")
  15. else:
  16. print(f"Failed to connect, return code {rc}\n")
  17. client = mqtt.Client()
  18. client.username_pw_set(USERNAME, PASSWORD)
  19. client.on_connect = on_connect
  20. client.connect(BROKER, PORT, 60)
  21. client.loop_start()
  22. try:
  23. while True:
  24. slot = int(random.uniform(0, 5)) # Replace with actual measurement
  25. client.publish(TOPIC, slot)
  26. print(f"Published: {slot} to topic {TOPIC}")
  27. time.sleep(2) # Adjust as needed
  28. except KeyboardInterrupt:
  29. print("Disconnecting from broker")
  30. client.disconnect()
  31. client.loop_stop()