ESP-AT module#
De ESP-AT module verzorgt de communicatie met de ESP8266 WiFi-(hardware)module, via de AT-opdrachten. Deze WiFi-module wordt onder andere gebruikt in de iot:bit van Elecfreaks.
Deze ESP-AT module vormt de basis voor de microbit ESP-AT gateway. Je kunt hiermee ook een microbit als IoT-knoop maken, zonder het gebruik van een gateway (ESP-AT IoT-node).
(iot:bit link)
(iot:bit github library)
AT opdrachten#
De communicatie tussen de microbit en de ESP-module gebeurt via de seriële verbinding van de microbit.
Deze verbinding moet ingesteld worden op 115200 baud.
Waarschuwing
De seriële verbinding is op het iot:bot bordje niet beschikbaar voor communicatie met de host. Voor foutzoeken is dat lastig: je kunt geen “logging” doen via de seriële verbinding. De ESP-AT-library gebruikt daarom voor de logging de microbit-radio. Dit kan interfereren met ander gebruik van deze radio.
De microbit stuurt een AT-opdracht, de ESP-module verwerkt deze, en stuurt het resultaat terug naar de microbit. De response van de ESP-module kan informatie bevatten, bijvoorbeeld over de WiFi-verbinding (IP-adres). De response van een AT-opdracht wordt altijd afgesloten met OK
of ERROR
, om aan te geven of de opdracht geslaagd is of niet.
De ESP-module stuurt ook “unsollicited” (ongevraagde) berichten, bijvoorbeeld als er een MQTT-bericht vanuit de broker ontvangen is. Ook het wegvallen van de MQTT-verbinding of van de WIFi-verbinding is een “unsollicited” bericht.
De ESP-module kan maar één opdracht tegelijk verwerken: je moet wachten tot de vorige opdracht klaar is, voordat je een nieuwe opdracht kunt geven. De verwerking van een opdracht kan vele seconden in beslag nemen, in het bijzonder bij het opzetten van een WiFi-verbinding of een MQTT-verbinding.
Eigenlijk moet je ervoor zorgen dat tijdens het wachten op de vorige opdracht, de verwerking van lokale berichten gewoon door kan gaan.
De library#
Synchrone functies#
Om ervoor te zorgen dat er niet meer dan één AT-opdracht tegelijk actief is, wacht een nieuwe opdracht altijd totdat de vorige afgerond is.
Dit wachten gebeurt met de functie: wait_at_ready()
, op basis van de boolean functie at_ready()
.
Je kunt in de event-loop er ook voor zorgen dat een at-functie altijd aangeroepen wordt als de vorige afgerond is, door het wachten in de event-loop uit te programmeren.
Sommige functies zijn volledig synchroon, dat wil zeggen dat deze wachten totdat de AT-opdracht compleet uitgevoerd is. Dit is bijvoorbeeld nodig voor functies die een resultaat opleveren (???)
Nog aanpassen#
logging optioneel, in- en uitschakelen via een functie
de functie
at_wifi_connect()
is helemaal synchroon - met een exception als het niet lukt. Is dit nodig? Is dit de beste oplossing?hoe signaleren we dat de wifi-verbinding verbroken is? of moet de toepassing daar voortdurend op controleren?
in de toepassing moet de functie voor het verwerken van de invoer van de ESP-module steeds aangeroepen worden; in principe kunnen we het via die functie signaleren?
AT string-escapes#
In een AT-opdracht kunnen strings voorkomen, genoteerd tussen dubbele quotes.
In zulke strings moeten de volgende tekens van een escape-code (\
) voorzien zijn:
"
->\"
,
->\,
(!!)\
->\\
Dit geldt voor de MQTT-string, maar ook voor MQTT topics, WiFi netwerknaam, WiFi wachtwoord, MQTT gebruikersnaam, MQTT wachtwoord, enz.
De library zorgt voor deze escape-codes: de applicatie moet de string zonder escape-codes aanbieden.
ESP-AT module code#
# ESP-AT library for iot:bit
from microbit import *
import radio
# global variables
at_ok = False
at_error = False
at_errorcode = 0
at_cmd = ''
wifi_connected = False
mqtt_connected = False
ip_address = ""
mac_address = ""
netmask = ""
# logging by radio to logging microbit
def log(txt: str):
if len(txt) > 250:
radio.send(txt[0:250])
radio.send(txt[250:])
else:
radio.send(txt)
radio.config(length=250)
radio.on()
# low-level: send command to ESP module
def sendAT(command: str, wait: int = 0):
uart.write('{a}\r\n'.format(a=command))
sleep(wait)
def escape_string(data: str) -> str:
return data.replace('"', '\\\"').replace(',','\\,')
def handle_inputline(line: str):
global at_cmd, at_ok, at_error, at_errorcode, wifi_connected, mac_address, ip_address, gateway_address, netmask
global mqtt_connected
log('>>' + line + '<<')
# first, handle unsollicited messages
# see: AT Messages
if line.startswith('WIFI CONNECTED'):
wifi_connected = True
return
if line.startswith('WIFI DISCONNECT'):
wifi_connected = False
return
if line.startswith('WIFI GOT IP'):
return
if line.startswith("+MQTTCONNECTED:"):
log("mqtt connected")
mqtt_connected = True
return
if line.startswith("+MQTTDISCONNECTED:"):
log("mqtt disconnected")
mqtt_connected = False
return
# next, handle complex (or all?) commands
if at_cmd == 'AT+CWJAP=':
if line.startswith('+CWJAP:'):
arg = line[7:]
at_errorcode = int(arg)
log('error: ' + arg)
return
elif at_cmd == "AT+CIPSTAMAC?":
if line.startswith("+CIPSTAMAC:"):
arg = line[11:]
mac_address = arg.replace(':', '').replace('"','')
log("mac-addr:" + mac_address)
return
elif at_cmd == "AT+CIPSTA?":
if line.startswith("+CIPSTA:ip:"):
ip_address = line[11:]
log("ip-addr:" + ip_address)
return
if line.startswith("+CIPSTA:gateway:"):
gateway_address = line[16:]
log("gateway-addr:" + gateway_address)
return
if line.startswith("+CIPSTA:netmask:"):
netmask = line[16:]
log("netmask:" + netmask)
return
else:
pass
if line.startswith(at_cmd):
# echo
return
if line == 'OK':
at_ok = True
at_cmd = ''
return
if line == 'ERROR':
at_error = True
at_cmd = ''
return
log("unexpected-3: " + line)
return
def log_mqtt_message(topic: str, msg: str):
log('msg-topic: ' + topic)
log(msg)
on_mqtt_message = log_mqtt_message # default message handler
input_buffer = ''
def at_check_input():
global input_buffer
if uart.any():
in_bytes = uart.read()
if in_bytes != None:
input_buffer += in_bytes.decode('utf-8')
if input_buffer.startswith('+MQTTSUBRECV:0'):
parts = input_buffer.split(',')
if len(parts) < 4:
return
topic = parts[1].strip('"')
datasize = int(parts[2])
log("mqtt-datasize: " + parts[2])
if len(parts[3]) < datasize + 2: # incl. CRLF
return
totalsize = len(parts[0]) + len(parts[1]) + len(parts[2]) + datasize + 5
# 3 comma's and CRLF
input_buffer = input_buffer[totalsize:]
on_mqtt_message(topic, parts[3][0:datasize])
return
eol_pos = input_buffer.find('\n')
if eol_pos > 0:
line = input_buffer[0:eol_pos-1] # get line and
input_buffer = input_buffer[eol_pos+1:] # remove from input_buffer
if line != '':
handle_inputline(line)
# wait until last at-command is finished (OR or ERROR)
def wait_at_ready():
global at_cmd
while at_cmd != '':
at_check_input()
def at_is_ready()-> bool:
global at_cmd
return at_cmd == ''
# send AT command - only after prev. command has finished.
# ...may wait for several seconds, e.g. when connecting to WiFi, MQTT
def at_send(cmd: str):
global at_ok, at_error, at_errorcode, at_cmd
wait_at_ready()
at_ok = False
at_error = False
at_errorcode = 0
pos = cmd.find('=')
if pos >= 0:
at_cmd = cmd[0:pos+1] # remove parameters
else:
at_cmd = cmd
log(".." + cmd)
sendAT(cmd, 0)
def at_init_ESP():
global wifi_connected, mqtt_connected, input_buffer
wifi_connected = False
mqtt_connected = False
input_buffer = ''
sendAT('AT', 1000) # wake-up
sendAT('AT+RESTORE', 1000) # restore to factory settings
sendAT('AT+RST', 1000) # reset, restart
buf = uart.read()
if buf != None:
log(">"+str(buf)+"<")
at_send('AT+CWMODE=1') # set to STA mode
wait_at_ready()
def at_wifi_connect(ssid: str, password: str):
global at_errorcode
at_send('AT+CWJAP="{a}","{b}"'.format(
a=escape_string(ssid),
b=escape_string(password)))
wait_at_ready()
if at_ok and wifi_connected:
return
else:
raise Exception("wifi connection failed", str(at_errorcode))
def at_wifi_connected() -> bool:
global wifi_connected
return wifi_connected
def at_get_mac_address() -> str:
global mac_address
at_send('AT+CIPSTAMAC?')
wait_at_ready()
return mac_address
def at_get_ip_address() -> str:
global ip_adress, wifi_connected
if not wifi_connected:
return ""
at_send('AT+CIPSTA?')
wait_at_ready()
return ip_address
def at_mqtt_set_userconfig(scheme: int, client_id: str, username: str, password: str):
at_send('AT+MQTTUSERCFG=0,{a},"{b}","{c}","{d}",0,0,""'.format(
a = scheme,
b = escape_string(client_id),
c = escape_string(username),
d = escape_string(password)
))
def at_mqtt_connect(host, port, reconnect):
at_send('AT+MQTTCONN=0,"{a}",{b},{c}'.format(
a = host,
b = port,
c = reconnect
))
def at_mqtt_connected():
global mqtt_connected
return mqtt_connected
def at_mqtt_publish(topic: str, data: str, qos: int, retain: bool):
at_send('AT+MQTTPUB=0,"{a}","{b}",{c},{d}'.format(
a = escape_string(topic),
b = escape_string(data),
c = qos,
d = 1 if retain else 0
))
def at_mqtt_subscribe(topic: str, qos: int):
at_send('AT+MQTTSUB=0,"{a}",{b}'.format(
a = escape_string(topic),
b = qos
))
# function (topic: str, msg: str)
def at_set_on_mqtt_message(function):
global on_mqtt_message
on_mqtt_message = function