LPP implementatie#
De ulpp
module verzorgt de codering en decodering van berichten in het binaire Cayenne LPP formaat.
Voor een beschrijving van dit formaat, zie:
-
dit is een uitgebreide Python-implementatie van Cayenne LPP
deze beschrijft meer sensor-formaten dan de oorspronkelijk LPP versie
De lppjson
module bevat een enkele functie, lpp_to_json
, om binaire LPP berichten om te zetten in JSON string-formaat. Deze wordt o.a. gebruikt door de radio-logger.
De ulpp
module is een onafhankelijke implementatie van de codering en decodering, bedoeld om het gebruik van LPP voor de microbit eenvoudig te houden.
De belangrijke elementen van deze library zijn:
class LppBuffer
LppBuffer(prefix, maxsize)
buffer.add_digital_input(channel, value)
- etc.met varianten voor elk type sensor
frame.to_bytes() -> bytes
bytes_to_dict(bytes)
dict_to_bytes(dict)
Via LppBuffer
en buffer.add_xxsensor(channel, value)
bouw je een array van getallen op die je als bytes-waarde kunt versturen.
Deze bytes-waarde kun je ook omzetten (decoderen) met bytes_to_dict()
in een Python dictionary - die je vervolgens kunt omzetten in JSON, voor een MQTT bericht.
Via de codering dict_to_bytes()
kun je een Python dictionary (in de geschikte LPP vorm!) omzetten naar een LPP bytes-waarde.
dict_to_bytes()
verwerkt in deze implementatie (voorlopig) alleen actuator-data: deze is bedoeld voor downlink-berichten.
Daarnaast is er nog een afzonderlijke functie:
json_to_lpp(s: str) -> bytes
Voorbeelden van het gebruik:
IoT-knoop: opbouwen van een reeks sensorwaarden, en versturen als bytes-waarde (uplink).
prefix = bytes([1,2,3]) # prefix before LPP data
lpp = LppBuffer(data=prefix, maxsize=60)
lpp.add_digital_input(0, 1)
lpp.add_digital_output(1, 1)
lpp.add_analog_input(2, 1234)
lpp.add_analog_output(4, -1234)
lpp.add_luminosity(7, 345)
lpp.add_presence(8, 1)
lpp.add_barometer(9, 10230)
lpp.add_temperature(5, int(23.4*10))
lpp.add_digital_input(10, 1)
radio.send_bytes(lpp.to_bytes())
IoT-knoop: omzetten van een ontvangen (binair) LPP-bericht in acties - afhandelen van actuator-data.
(Dit wordt voorlopig ad-hoc in de IoT-knoop opgelost.)
Gateway: Omzetten (decoderen) van een ontvangen byte-array in een JSON-string, om te versturen via MQTT. (Uplink-berichten in de gateway.)
# data bevat ontvangen radio-bericht: header, lpp payload
nodeID = data[1] * 256 + data[2]
counter = data[3] * 256 + data[4]
lpp_dict = bytes_to_dict(data[5:])
msg = {'nodeid': '{0:x}'.format(nodeID), # in hex format
'counter': counter,
'payload': lpp_dict
}
mqtt.publish(topic, json.dumps(msg))
Gateway: van ontvangen JSON-string naar binair LPP downlink-bericht
obj = json.loads(msg)
lpp_bytes = dict_to_bytes(obj) # from JSON string to LPP byte-array
header_bytes = bytes([0x0B, nodeID // 256, nodeID % 256, 0, 0]) # downlink header
radio.send_bytes(header_bytes + lpp_bytes) # send header followed by LPP data
Radio-logger: omzetten van (binaire) LPP-bytes naar LPP in JSON: lpp_to_json(data: bytes) -> str
.
print(lpp_to_json(msg[5:]) # skip radio msg header
ULPP module code#
# module ulpp
# LPP tags
LPP_dIn = 0
LPP_dOut = 1
LPP_aIn = 2
LPP_aOut = 3
LPP_luminosity = 101
LPP_presence = 102
LPP_temperature = 103
LPP_humidity = 104
LPP_barometer = 115
class LppBuffer(object):
# some assumptions:
# - the value-parameters are in the LPP-required format and range
# i.e. scaling is done by the caller (if needed)
# all value-parameters are int
def __init__(self, data=b'', maxsize=32):
self.buffer = bytearray(data)
self.maxsize = maxsize
self.pos = 0
def __str__(self):
return str(list(self.buffer))
def to_bytes(self) -> bytes:
return bytes(self.buffer)
def add_byte(self, channel, tag, value):
if len(self.buffer) + 3 > self.maxsize:
raise OverflowError
self.buffer.append(channel)
self.buffer.append(tag)
self.buffer.append(value)
def add_unsigned_int16(self, channel, tag, value):
if len(self.buffer) + 4 > self.maxsize:
raise OverflowError
self.buffer.append(channel)
self.buffer.append(tag)
if value >= 65536:
value = value % 65536
(hi, lo) = divmod(value, 256)
self.buffer.append(hi)
self.buffer.append(lo)
def add_signed_int16(self, channel, tag, value):
while value < 0:
value = value + 65536
self.add_unsigned_int16(channel, tag, value)
def add_digital_input(self, channel, value):
self.add_byte(channel, LPP_dIn, value)
def add_digital_output(self, channel, value):
self.add_byte(channel, LPP_dOut, value)
def add_analog_input(self, channel, value):
self.add_signed_int16(channel, LPP_aIn, value)
def add_analog_output(self, channel, value):
self.add_signed_int16(channel, LPP_aOut, value)
def add_luminosity(self, channel, value):
self.add_unsigned_int16(channel, LPP_luminosity, value)
def add_presence(self, channel, value):
self.add_byte(channel, LPP_presence, value)
def add_temperature(self, channel, value):
# temperature: 0.1C, signed int
self.add_signed_int16(channel, LPP_temperature, value)
def add_humidity(self, channel, value):
# rel. humidity: 0.5% unsigned byte
self.add_byte(channel, LPP_humidity, value)
def add_barometer(self, channel, value):
# barometric pressue: 0.1 hPa unsigned int16
self.add_unsigned_int16(channel, LPP_barometer, value)
def bytes_to_dict(data: bytes) -> dict:
pos = 0
def nextbyte():
nonlocal pos
if pos >= len(data):
raise OverflowError
value = data[pos]
pos = pos + 1
return value
def nextunsignedint():
hi = nextbyte()
lo = nextbyte()
value = hi * 256 + lo
return value
def nextint():
value = nextunsignedint()
if value > 32767:
value = value - 65536
return value
pos = 0
obj = {}
while pos < len(data):
channel = nextbyte()
tag = nextbyte()
if tag == LPP_dIn:
value = nextbyte()
obj[channel] = {'dIn': value}
elif tag == LPP_dOut:
value = nextbyte()
obj[channel] = {'dOut': value}
elif tag == LPP_aIn:
value = nextint()
obj[channel] = {'aIn': value}
elif tag == LPP_aOut:
value = nextint()
obj[channel] = {'aOut': value}
elif tag == LPP_luminosity:
value = nextunsignedint()
obj[channel] = {'luminosity': value}
elif tag == LPP_presence:
value = nextbyte()
obj[channel] = {'presence': value}
elif tag == LPP_temperature:
value = nextint()
obj[channel] = {'temperature': value}
elif tag == LPP_humidity:
value = nextbyte()
obj[channel] = {'humidity': value}
elif tag == LPP_barometer:
value = nextunsignedint()
obj[channel] = {'barometer': value}
return obj
def dict_to_bytes (obj) -> bytes:
lpp = LppBuffer()
for channel in obj:
item = obj[channel] # item is an object with a single key...
if type(channel) is str:
channel = int(channel)
for key in item:
if key == 'dOut':
lpp.add_digital_output(channel, item[key])
elif key == 'aOut':
lpp.add_analog_output(channel, item[key])
else:
# not implemented, raise exception? only output allowed
raise ValueError('only output values allowed in actuator msg')
return lpp.to_bytes()
lppjson
module code#
De module lppjson
heeft 1 functie, lpp_to_json
. Deze wordt gebruikt in de radio-logger.
# LPP tags
LPP_dIn = 0
LPP_dOut = 1
LPP_aIn = 2
LPP_aOut = 3
LPP_luminosity = 101
LPP_presence = 102
LPP_temperature = 103
LPP_humidity = 104
LPP_barometer = 115
def lpp_to_json(data: bytes) -> str:
pos = 0
def nextbyte():
nonlocal pos
if pos >= len(data):
raise OverflowError
value = data[pos]
pos = pos + 1
return value
def nextunsignedint():
hi = nextbyte()
lo = nextbyte()
return hi * 256 + lo
def nextint():
value = nextunsignedint()
if value > 32767:
value = value - 65536
return value
pos = 0
json_string = '{'
sep = ''
while pos < len(data):
channel = nextbyte()
tag = nextbyte()
value = 0
tagname = ''
if tag == LPP_dIn:
value = nextbyte()
tagname = 'dIn'
elif tag == LPP_dOut:
value = nextbyte()
tagname = 'dOut'
elif tag == LPP_aIn:
value = nextint()
tagname = 'aIn'
elif tag == LPP_aOut:
value = nextint()
tagname = 'aOut'
elif tag == LPP_luminosity:
value = nextunsignedint()
tagname = 'luminosity'
elif tag == LPP_presence:
value = nextbyte()
tagname = 'presence'
elif tag == LPP_temperature:
value = nextint()
tagname = 'temperature'
elif tag == LPP_humidity:
value = nextbyte()
tagname = 'humidity'
elif tag == LPP_barometer:
value = nextunsignedint()
tagname = 'barometer'
json_string += '{0}"{1}": {{"{2}": {3}}}'.format(sep, channel, tagname, value)
sep = ', '
json_string += '}'
return json_string
functie json_to_bytes
#
def json_to_bytes (js: str) -> bytes:
obj = json.loads(js)
return dict_to_bytes(obj)
Testen#
De onderstaande testen kunnen uitgevoerd worden in Jupyter Notebook.
Test: opbouwen van een binaire LPP-waarde#
Deze code vind je in de IoT-knoop, bij het versturen van de sensorwaarden. En mogelijk bij het opbouwen van een LPP-waarde met actuator-waarden, vanuit een toepassing.
header = bytes([1,2,3,4,5])
frame = LppBuffer(data=header, maxsize=60)
frame.add_digital_input(0, 1)
frame.add_digital_output(1, 1)
frame.add_analog_input(2, 1234)
frame.add_analog_output(4, -1234)
frame.add_luminosity(7, 345)
frame.add_presence(8, 1)
frame.add_barometer(9, 10230)
frame.add_temperature(5, int(23.4*10))
frame.add_digital_input(10, 1)
# radio.send_bytes(frame.to_bytes())
buffer = frame.to_bytes()
print(list(buffer))
[1, 2, 3, 4, 5, 0, 0, 1, 1, 1, 1, 2, 2, 4, 210, 4, 3, 251, 46, 7, 101, 1, 89, 8, 102, 1, 9, 115, 39, 246, 5, 103, 0, 234, 10, 0, 1]
Test: van LPP-bytes naar JSON#
In de gateway worden ontvangen (binaire) sensor-berichten omgezet in JSON-formaat, om te versturen via MQTT.
import json
lppdata = bytes_to_dict(buffer[5:]) # skip header
lppdata
{0: {'dIn': 1},
1: {'dOut': 1},
2: {'aIn': 1234},
4: {'aOut': -1234},
7: {'luminosity': 345},
8: {'presence': 1},
9: {'barometer': 10230},
5: {'temperature': 234},
10: {'dIn': 1}}
jsondata = json.dumps(lppdata)
jsondata
'{"0": {"dIn": 1}, "1": {"dOut": 1}, "2": {"aIn": 1234}, "4": {"aOut": -1234}, "7": {"luminosity": 345}, "8": {"presence": 1}, "9": {"barometer": 10230}, "5": {"temperature": 234}, "10": {"dIn": 1}}'
Test: van JSON naar LPP-bytes#
lppdata1 = json.loads(jsondata)
lppdata1
{'0': {'dIn': 1},
'1': {'dOut': 1},
'2': {'aIn': 1234},
'4': {'aOut': -1234},
'7': {'luminosity': 345},
'8': {'presence': 1},
'9': {'barometer': 10230},
'5': {'temperature': 234},
'10': {'dIn': 1}}
lppbytes1 = dict_to_bytes(lppdata1)
lppbytes1
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Cell In[9], line 1
----> 1 lppbytes1 = dict_to_bytes(lppdata1)
2 lppbytes1
Cell In[1], line 159, in dict_to_bytes(obj)
156 lpp.add_analog_output(channel, item[key])
157 else:
158 # not implemented, raise exception? only output allowed
--> 159 raise ValueError('only output values allowed in actuator msg')
161 return lpp.to_bytes()
ValueError: only output values allowed in actuator msg
Bovenstaande werkt niet in de (beperkte) implementatie van dict_to_bytes. Dit werkt wel als we de dictionary beperken tot alleen actuator-waarden.
lppbytes1 = dict_to_bytes({'1': {'dOut': 1}, '4': {'aOut': -1234}})
list(lppbytes1)
[1, 1, 1, 4, 3, 251, 46]
Nog een extra voorbeeld, met alleen actuator-waarden.
lpp = LppBuffer()
lpp.add_analog_output(3, 123)
lpp.add_digital_output(5, 1)
lpp.add_analog_output(6, -12)
lppbuf = lpp.to_bytes()
lppdict = bytes_to_dict(lppbuf)
list(dict_to_bytes(lppdict))
[3, 3, 0, 123, 5, 1, 1, 6, 3, 255, 244]
Test lpp_to_json
#
lpp_to_json([1, 1, 1, 4, 3, 251, 46])
'{"1": {"dOut": 1}, "4": {"aOut": -1234}}'
lpp_to_json([3, 0, 0, 2, 0, 0, 4, 2, 0, 0, 6, 103, 0, 200])
'{"3": {"dIn": 0}, "2": {"dIn": 0}, "4": {"aIn": 0}, "6": {"temperature": 200}}'
lpp_to_json([])
'{}'
Het omgekeerde: JSON naar LPP, gaat in twee stappen: (i) JSON naar Python object (via de json loads); (ii) dict_to_lpp - maar dit is beperkt tot een aantal actuator-waarden, specifiek voor de gateway downlink.