Simple test¶
Ensure your device works with this simple test.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | import board
import busio
from digitalio import DigitalInOut
import neopixel
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
import adafruit_minimqtt as MQTT
### WiFi ###
# Get wifi details and more from a secrets.py file
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
# If you are using a board with pre-defined ESP32 Pins:
esp32_cs = DigitalInOut(board.ESP_CS)
esp32_ready = DigitalInOut(board.ESP_BUSY)
esp32_reset = DigitalInOut(board.ESP_RESET)
# If you have an externally connected ESP32:
# esp32_cs = DigitalInOut(board.D9)
# esp32_ready = DigitalInOut(board.D10)
# esp32_reset = DigitalInOut(board.D5)
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
"""Use below for Most Boards"""
status_light = neopixel.NeoPixel(
board.NEOPIXEL, 1, brightness=0.2
) # Uncomment for Most Boards
"""Uncomment below for ItsyBitsy M4"""
# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
# Uncomment below for an externally defined RGB LED
# import adafruit_rgbled
# from adafruit_esp32spi import PWMOut
# RED_LED = PWMOut.PWMOut(esp, 26)
# GREEN_LED = PWMOut.PWMOut(esp, 27)
# BLUE_LED = PWMOut.PWMOut(esp, 25)
# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
### Topic Setup ###
# MQTT Topic
# Use this topic if you'd like to connect to a standard MQTT broker
mqtt_topic = "test/topic"
# Adafruit IO-style Topic
# Use this topic if you'd like to connect to io.adafruit.com
# mqtt_topic = 'aio_user/feeds/temperature'
### Code ###
# Define callback methods which are called when events occur
# pylint: disable=unused-argument, redefined-outer-name
def connect(client, userdata, flags, rc):
# This function will be called when the client is connected
# successfully to the broker.
print("Connected to MQTT Broker!")
print("Flags: {0}\n RC: {1}".format(flags, rc))
def disconnect(client, userdata, rc):
# This method is called when the client disconnects
# from the broker.
print("Disconnected from MQTT Broker!")
def subscribe(client, userdata, topic, granted_qos):
# This method is called when the client subscribes to a new feed.
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
def unsubscribe(client, userdata, topic, pid):
# This method is called when the client unsubscribes from a feed.
print("Unsubscribed from {0} with PID {1}".format(topic, pid))
def publish(client, userdata, topic, pid):
# This method is called when the client publishes data to a feed.
print("Published to {0} with PID {1}".format(topic, pid))
# Connect to WiFi
print("Connecting to WiFi...")
wifi.connect()
print("Connected!")
# Initialize MQTT interface with the esp interface
MQTT.set_socket(socket, esp)
# Set up a MiniMQTT Client
client = MQTT.MQTT(broker = secrets['broker'],
username = secrets['user'],
password = secrets['pass'])
# Connect callback handlers to client
client.on_connect = connect
client.on_disconnect = disconnect
client.on_subscribe = subscribe
client.on_unsubscribe = unsubscribe
client.on_publish = publish
print("Attempting to connect to %s" % client.broker)
client.connect()
print("Subscribing to %s" % mqtt_topic)
client.subscribe(mqtt_topic)
print("Publishing to %s" % mqtt_topic)
client.publish(mqtt_topic, "Hello Broker!")
print("Unsubscribing from %s" % mqtt_topic)
client.unsubscribe(mqtt_topic)
print("Disconnecting from %s" % client.broker)
client.disconnect()
|