LPP implementatie#

De ulpp module verzorgt de codering en decodering van berichten in het binaire Cayenne LPP formaat. Voor een beschrijving van dit formaat, zie:

De lppjson module bevat een enkele functie, lpp_to_json, om binaire LPP berichten om te zetten in JSON string-formaat. Deze wordt o.a. gebruikt door de radio-logger.

De ulpp module is een onafhankelijke implementatie van de codering en decodering, bedoeld om het gebruik van LPP voor de microbit eenvoudig te houden.

De belangrijke elementen van deze library zijn:

  • class LppBuffer

    • LppBuffer(prefix, maxsize)

    • buffer.add_digital_input(channel, value) - etc.

    • met varianten voor elk type sensor

    • frame.to_bytes() -> bytes

  • bytes_to_dict(bytes)

  • dict_to_bytes(dict)

Via LppBuffer en buffer.add_xxsensor(channel, value) bouw je een array van getallen op die je als bytes-waarde kunt versturen.

Deze bytes-waarde kun je ook omzetten (decoderen) met bytes_to_dict() in een Python dictionary - die je vervolgens kunt omzetten in JSON, voor een MQTT bericht. Via de codering dict_to_bytes() kun je een Python dictionary (in de geschikte LPP vorm!) omzetten naar een LPP bytes-waarde.

  • dict_to_bytes() verwerkt in deze implementatie (voorlopig) alleen actuator-data: deze is bedoeld voor downlink-berichten.

Daarnaast is er nog een afzonderlijke functie:

  • json_to_lpp(s: str) -> bytes

Voorbeelden van het gebruik:

IoT-knoop: opbouwen van een reeks sensorwaarden, en versturen als bytes-waarde (uplink).

prefix = bytes([1,2,3]) # prefix before LPP data
lpp = LppBuffer(data=prefix, maxsize=60)
lpp.add_digital_input(0, 1)
lpp.add_digital_output(1, 1)
lpp.add_analog_input(2, 1234)
lpp.add_analog_output(4, -1234)
lpp.add_luminosity(7, 345)
lpp.add_presence(8, 1)
lpp.add_barometer(9, 10230)
lpp.add_temperature(5, int(23.4*10))
lpp.add_digital_input(10, 1)
radio.send_bytes(lpp.to_bytes())

IoT-knoop: omzetten van een ontvangen (binair) LPP-bericht in acties - afhandelen van actuator-data.

(Dit wordt voorlopig ad-hoc in de IoT-knoop opgelost.)

Gateway: Omzetten (decoderen) van een ontvangen byte-array in een JSON-string, om te versturen via MQTT. (Uplink-berichten in de gateway.)

# data bevat ontvangen radio-bericht: header, lpp payload
nodeID = data[1] * 256 + data[2]
counter = data[3] * 256 + data[4]
lpp_dict = bytes_to_dict(data[5:])
msg = {'nodeid': '{0:x}'.format(nodeID),  # in hex format
       'counter': counter,
       'payload': lpp_dict
       }
mqtt.publish(topic, json.dumps(msg))

Gateway: van ontvangen JSON-string naar binair LPP downlink-bericht

obj = json.loads(msg)
lpp_bytes = dict_to_bytes(obj)   # from JSON string to LPP byte-array
header_bytes = bytes([0x0B, nodeID // 256, nodeID % 256, 0, 0]) # downlink header  
radio.send_bytes(header_bytes + lpp_bytes)    # send header followed by LPP data

Radio-logger: omzetten van (binaire) LPP-bytes naar LPP in JSON: lpp_to_json(data: bytes) -> str.

print(lpp_to_json(msg[5:]) # skip radio msg header

ULPP module code#

# module ulpp

# LPP tags
LPP_dIn = 0
LPP_dOut = 1
LPP_aIn = 2
LPP_aOut = 3
LPP_luminosity = 101
LPP_presence = 102
LPP_temperature = 103
LPP_humidity = 104
LPP_barometer = 115

class LppBuffer(object):

    # some assumptions:
    # - the value-parameters are in the LPP-required format and range
    #   i.e. scaling is done by the caller (if needed)
    #   all value-parameters are int
    
    def __init__(self, data=b'', maxsize=32):
        self.buffer = bytearray(data)
        self.maxsize = maxsize
        self.pos = 0

    def __str__(self):
        return str(list(self.buffer))
    
    def to_bytes(self) -> bytes:
        return bytes(self.buffer)
    
    def add_byte(self, channel, tag, value):
        if len(self.buffer) + 3 > self.maxsize:
            raise OverflowError
        self.buffer.append(channel)
        self.buffer.append(tag)
        self.buffer.append(value)

    def add_unsigned_int16(self, channel, tag, value):
        if len(self.buffer) + 4 > self.maxsize:
            raise OverflowError        
        self.buffer.append(channel)
        self.buffer.append(tag)
        if value >= 65536:
            value = value % 65536
        (hi, lo) = divmod(value, 256)
        self.buffer.append(hi)
        self.buffer.append(lo)

    def add_signed_int16(self, channel, tag, value):
        while value < 0:
            value = value + 65536
        self.add_unsigned_int16(channel, tag, value)

    def add_digital_input(self, channel, value):
        self.add_byte(channel, LPP_dIn, value)

    def add_digital_output(self, channel, value):
        self.add_byte(channel, LPP_dOut, value)
        
    def add_analog_input(self, channel, value):
        self.add_signed_int16(channel, LPP_aIn, value)

    def add_analog_output(self, channel, value):
        self.add_signed_int16(channel, LPP_aOut, value)

    def add_luminosity(self, channel, value):
        self.add_unsigned_int16(channel, LPP_luminosity, value)

    def add_presence(self, channel, value):
        self.add_byte(channel, LPP_presence, value)

    def add_temperature(self, channel, value):
        # temperature: 0.1C, signed int
        self.add_signed_int16(channel, LPP_temperature, value)

    def add_humidity(self, channel, value):
        # rel. humidity: 0.5% unsigned byte
        self.add_byte(channel, LPP_humidity, value)

    def add_barometer(self, channel, value):
        # barometric pressue: 0.1 hPa unsigned int16
        self.add_unsigned_int16(channel, LPP_barometer, value)

def bytes_to_dict(data: bytes) -> dict:

    pos = 0

    def nextbyte():
        nonlocal pos
        if pos >= len(data):
            raise OverflowError
        value = data[pos]
        pos = pos + 1
        return value
    
    def nextunsignedint():
        hi = nextbyte()
        lo = nextbyte()
        value = hi * 256 + lo
        return value 
    
    def nextint():
        value = nextunsignedint()
        if value > 32767:
            value = value - 65536
        return value

    pos = 0
    obj = {}

    while pos < len(data):
        channel = nextbyte()
        tag = nextbyte()
        if tag == LPP_dIn:
            value = nextbyte()
            obj[channel] = {'dIn': value}
        elif tag == LPP_dOut:
            value = nextbyte()
            obj[channel] = {'dOut': value}                
        elif tag == LPP_aIn:
            value = nextint()
            obj[channel] = {'aIn': value}                
        elif tag == LPP_aOut:
            value = nextint()
            obj[channel] = {'aOut': value}
        elif tag == LPP_luminosity:
            value = nextunsignedint()
            obj[channel] = {'luminosity': value}                
        elif tag == LPP_presence:
            value = nextbyte()
            obj[channel] = {'presence': value}                
        elif tag == LPP_temperature:
            value = nextint()                
            obj[channel] = {'temperature': value}                
        elif tag == LPP_humidity:
            value = nextbyte()
            obj[channel] = {'humidity': value}                
        elif tag == LPP_barometer:
            value = nextunsignedint()
            obj[channel] = {'barometer': value}
            
    return obj

def dict_to_bytes (obj) -> bytes:
    lpp = LppBuffer()

    for channel in obj:
        item = obj[channel]  # item is an object with a single key...
        if type(channel) is str:
            channel = int(channel)
        for key in item:
            if key == 'dOut':
                lpp.add_digital_output(channel,  item[key])
            elif key == 'aOut':
                lpp.add_analog_output(channel,  item[key])
            else:
                # not implemented, raise exception? only output allowed
                raise ValueError('only output values allowed in actuator msg')

    return lpp.to_bytes()

lppjson module code#

De module lppjson heeft 1 functie, lpp_to_json. Deze wordt gebruikt in de radio-logger.

# LPP tags
LPP_dIn = 0
LPP_dOut = 1
LPP_aIn = 2
LPP_aOut = 3
LPP_luminosity = 101
LPP_presence = 102
LPP_temperature = 103
LPP_humidity = 104
LPP_barometer = 115

def lpp_to_json(data: bytes) -> str:
    
    pos = 0
    
    def nextbyte():
        nonlocal pos
        if pos >= len(data):
            raise OverflowError
        value = data[pos]
        pos = pos + 1
        return value
    
    def nextunsignedint():
        hi = nextbyte()
        lo = nextbyte()
        return hi * 256 + lo 
    
    def nextint():
        value = nextunsignedint()
        if value > 32767:
            value = value - 65536
        return value
    
    pos = 0
    json_string = '{'
    sep = ''

    while pos < len(data):
        channel = nextbyte()
        tag = nextbyte()
        value = 0
        tagname = ''
        if tag == LPP_dIn:
            value = nextbyte()
            tagname = 'dIn'
        elif tag == LPP_dOut:
            value = nextbyte()
            tagname = 'dOut'
        elif tag == LPP_aIn:
            value = nextint()
            tagname = 'aIn'
        elif tag == LPP_aOut:
            value = nextint()
            tagname = 'aOut'
        elif tag == LPP_luminosity:
            value = nextunsignedint()
            tagname = 'luminosity'
        elif tag == LPP_presence:
            value = nextbyte()
            tagname = 'presence'
        elif tag == LPP_temperature:
            value = nextint()                
            tagname = 'temperature'
        elif tag == LPP_humidity:
            value = nextbyte()
            tagname = 'humidity'
        elif tag == LPP_barometer:
            value = nextunsignedint()
            tagname = 'barometer'
        json_string += '{0}"{1}": {{"{2}": {3}}}'.format(sep, channel, tagname, value)
        sep = ', '
        
    json_string += '}' 
    return json_string

functie json_to_bytes#

def json_to_bytes (js: str) -> bytes:
    obj = json.loads(js)
    return dict_to_bytes(obj)

Testen#

De onderstaande testen kunnen uitgevoerd worden in Jupyter Notebook.

Test: opbouwen van een binaire LPP-waarde#

Deze code vind je in de IoT-knoop, bij het versturen van de sensorwaarden. En mogelijk bij het opbouwen van een LPP-waarde met actuator-waarden, vanuit een toepassing.

header = bytes([1,2,3,4,5])
frame = LppBuffer(data=header, maxsize=60)
frame.add_digital_input(0, 1)
frame.add_digital_output(1, 1)
frame.add_analog_input(2, 1234)
frame.add_analog_output(4, -1234)
frame.add_luminosity(7, 345)
frame.add_presence(8, 1)
frame.add_barometer(9, 10230)
frame.add_temperature(5, int(23.4*10))
frame.add_digital_input(10, 1)
  # radio.send_bytes(frame.to_bytes())
buffer = frame.to_bytes()
print(list(buffer))
[1, 2, 3, 4, 5, 0, 0, 1, 1, 1, 1, 2, 2, 4, 210, 4, 3, 251, 46, 7, 101, 1, 89, 8, 102, 1, 9, 115, 39, 246, 5, 103, 0, 234, 10, 0, 1]

Test: van LPP-bytes naar JSON#

In de gateway worden ontvangen (binaire) sensor-berichten omgezet in JSON-formaat, om te versturen via MQTT.

import json
lppdata = bytes_to_dict(buffer[5:]) # skip header
lppdata
{0: {'dIn': 1},
 1: {'dOut': 1},
 2: {'aIn': 1234},
 4: {'aOut': -1234},
 7: {'luminosity': 345},
 8: {'presence': 1},
 9: {'barometer': 10230},
 5: {'temperature': 234},
 10: {'dIn': 1}}
jsondata = json.dumps(lppdata)
jsondata
'{"0": {"dIn": 1}, "1": {"dOut": 1}, "2": {"aIn": 1234}, "4": {"aOut": -1234}, "7": {"luminosity": 345}, "8": {"presence": 1}, "9": {"barometer": 10230}, "5": {"temperature": 234}, "10": {"dIn": 1}}'

Test: van JSON naar LPP-bytes#

lppdata1 = json.loads(jsondata)
lppdata1
{'0': {'dIn': 1},
 '1': {'dOut': 1},
 '2': {'aIn': 1234},
 '4': {'aOut': -1234},
 '7': {'luminosity': 345},
 '8': {'presence': 1},
 '9': {'barometer': 10230},
 '5': {'temperature': 234},
 '10': {'dIn': 1}}
lppbytes1 = dict_to_bytes(lppdata1)
lppbytes1
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[9], line 1
----> 1 lppbytes1 = dict_to_bytes(lppdata1)
      2 lppbytes1

Cell In[1], line 159, in dict_to_bytes(obj)
    156             lpp.add_analog_output(channel,  item[key])
    157         else:
    158             # not implemented, raise exception? only output allowed
--> 159             raise ValueError('only output values allowed in actuator msg')
    161 return lpp.to_bytes()

ValueError: only output values allowed in actuator msg

Bovenstaande werkt niet in de (beperkte) implementatie van dict_to_bytes. Dit werkt wel als we de dictionary beperken tot alleen actuator-waarden.

lppbytes1 = dict_to_bytes({'1': {'dOut': 1}, '4': {'aOut': -1234}})
list(lppbytes1)
[1, 1, 1, 4, 3, 251, 46]

Nog een extra voorbeeld, met alleen actuator-waarden.

lpp = LppBuffer()
lpp.add_analog_output(3, 123)
lpp.add_digital_output(5, 1)
lpp.add_analog_output(6, -12)

lppbuf = lpp.to_bytes()

lppdict = bytes_to_dict(lppbuf)
list(dict_to_bytes(lppdict))
[3, 3, 0, 123, 5, 1, 1, 6, 3, 255, 244]

Test lpp_to_json#

lpp_to_json([1, 1, 1, 4, 3, 251, 46])
'{"1": {"dOut": 1}, "4": {"aOut": -1234}}'
lpp_to_json([3, 0, 0, 2, 0, 0, 4, 2, 0, 0, 6, 103, 0, 200])
'{"3": {"dIn": 0}, "2": {"dIn": 0}, "4": {"aIn": 0}, "6": {"temperature": 200}}'
lpp_to_json([])
'{}'

Het omgekeerde: JSON naar LPP, gaat in twee stappen: (i) JSON naar Python object (via de json loads); (ii) dict_to_lpp - maar dit is beperkt tot een aantal actuator-waarden, specifiek voor de gateway downlink.