140 lines
4.4 KiB
Python
140 lines
4.4 KiB
Python
|
from ...core.crc8 import calculate
|
||
|
from ...core.message import (
|
||
|
MessageType,
|
||
|
MessageRequest,
|
||
|
MessageResponse,
|
||
|
MessageBody,
|
||
|
)
|
||
|
|
||
|
|
||
|
class MessageFDBase(MessageRequest):
|
||
|
_message_serial = 0
|
||
|
|
||
|
def __init__(self, device_protocol_version, message_type, body_type):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
device_type=0xFD,
|
||
|
message_type=message_type,
|
||
|
body_type=body_type
|
||
|
)
|
||
|
MessageFDBase._message_serial += 1
|
||
|
if MessageFDBase._message_serial >= 254:
|
||
|
MessageFDBase._message_serial = 1
|
||
|
self._message_id = MessageFDBase._message_serial
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
raise NotImplementedError
|
||
|
|
||
|
@property
|
||
|
def body(self):
|
||
|
body = bytearray([self.body_type]) + self._body + bytearray([self._message_id])
|
||
|
body.append(calculate(body))
|
||
|
return body
|
||
|
|
||
|
|
||
|
class MessageQuery(MessageFDBase):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.query,
|
||
|
body_type=0x41)
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
return bytearray([
|
||
|
0x81, 0x00, 0xFF, 0x03,
|
||
|
0x00, 0x00, 0x02, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00
|
||
|
])
|
||
|
|
||
|
|
||
|
class MessageSet(MessageFDBase):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.set,
|
||
|
body_type=0x48)
|
||
|
self.power = False
|
||
|
self.fan_speed = 0
|
||
|
self.target_humidity = 50
|
||
|
self.prompt_tone = False
|
||
|
self.screen_display = 0x07
|
||
|
self.mode = 0x01
|
||
|
self.disinfect = None
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
power = 0x01 if self.power else 0x00
|
||
|
prompt_tone = 0x40 if self.prompt_tone else 0x00
|
||
|
disinfect = 0 if self.disinfect is None else (1 if self.disinfect else 2)
|
||
|
return bytearray([
|
||
|
power | prompt_tone | 0x02,
|
||
|
0x00,
|
||
|
self.fan_speed,
|
||
|
0x00, 0x00, 0x00,
|
||
|
self.target_humidity,
|
||
|
0x00,
|
||
|
self.screen_display,
|
||
|
self.mode,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
disinfect,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00
|
||
|
])
|
||
|
|
||
|
|
||
|
class FDC8MessageBody(MessageBody):
|
||
|
def __init__(self, body):
|
||
|
super().__init__(body)
|
||
|
self.power = (body[1] & 0x01) > 0
|
||
|
self.fan_speed = body[3] & 0x7F
|
||
|
self.target_humidity = body[7]
|
||
|
self.current_humidity = body[16]
|
||
|
self.current_temperature = (body[17] - 50) / 2
|
||
|
self.tank = body[10]
|
||
|
self.mode = (body[8] & 0x70) >> 4
|
||
|
self.screen_display = body[9] & 0x07
|
||
|
if len(body) > 36:
|
||
|
disinfect = body[34] & 0x03
|
||
|
if disinfect == 1:
|
||
|
self.disinfect = True
|
||
|
elif disinfect == 2:
|
||
|
self.disinfect = False
|
||
|
|
||
|
|
||
|
class FDA0MessageBody(MessageBody):
|
||
|
def __init__(self, body):
|
||
|
super().__init__(body)
|
||
|
self.power = (body[1] & 0x01) > 0
|
||
|
self.fan_speed = body[3] & 0x7F
|
||
|
self.target_humidity = body[7]
|
||
|
self.current_humidity = body[16]
|
||
|
self.current_temperature = (body[17] - 50) / 2
|
||
|
self.tank = body[10]
|
||
|
self.mode = body[10] & 0x07
|
||
|
self.screen_display = body[9] & 0x07
|
||
|
if len(body) > 29:
|
||
|
disinfect = body[27] & 0x03
|
||
|
if disinfect == 1:
|
||
|
self.disinfect = True
|
||
|
elif disinfect == 2:
|
||
|
self.disinfect = False
|
||
|
|
||
|
|
||
|
class MessageFDResponse(MessageResponse):
|
||
|
def __init__(self, message):
|
||
|
super().__init__(message)
|
||
|
if self.message_type in [MessageType.query, MessageType.set, MessageType.notify1]:
|
||
|
if self.body_type in [0xB0, 0xB1]:
|
||
|
pass
|
||
|
elif self.body_type == 0xA0:
|
||
|
self.set_body(FDA0MessageBody(super().body))
|
||
|
elif self.body_type == 0xC8:
|
||
|
self.set_body(FDC8MessageBody(super().body))
|
||
|
self.set_attr()
|
||
|
if hasattr(self, "fan_speed") and self.fan_speed is not None and self.fan_speed < 5:
|
||
|
self.fan_speed = 1
|