188 lines
5.8 KiB
Python
188 lines
5.8 KiB
Python
|
from enum import IntEnum
|
||
|
from ...core.crc8 import calculate
|
||
|
from ...core.message import (
|
||
|
MessageType,
|
||
|
MessageRequest,
|
||
|
MessageResponse,
|
||
|
MessageBody,
|
||
|
NewProtocolMessageBody
|
||
|
)
|
||
|
|
||
|
class NewProtocolTags(IntEnum):
|
||
|
light = 0x005B
|
||
|
|
||
|
|
||
|
class MessageA1Base(MessageRequest):
|
||
|
_message_serial = 0
|
||
|
|
||
|
def __init__(self, device_protocol_version, message_type, body_type):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
device_type=0xA1,
|
||
|
message_type=message_type,
|
||
|
body_type=body_type
|
||
|
)
|
||
|
MessageA1Base._message_serial += 1
|
||
|
if MessageA1Base._message_serial >= 100:
|
||
|
MessageA1Base._message_serial = 1
|
||
|
self._message_id = MessageA1Base._message_serial
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
raise NotImplementedError
|
||
|
|
||
|
@property
|
||
|
def body(self):
|
||
|
body = bytearray([self.body_type]) + self._body + bytearray([self._message_id])
|
||
|
body.append(calculate(body))
|
||
|
return body
|
||
|
|
||
|
|
||
|
class MessageQuery(MessageA1Base):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.query,
|
||
|
body_type=0x41)
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
return bytearray([
|
||
|
0x81, 0x00, 0xFF, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00
|
||
|
])
|
||
|
|
||
|
|
||
|
class MessageNewProtocolQuery(MessageA1Base):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.query,
|
||
|
body_type=0xB1)
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
query_params = [
|
||
|
NewProtocolTags.light
|
||
|
]
|
||
|
_body = bytearray([len(query_params)])
|
||
|
for param in query_params:
|
||
|
_body.extend([param & 0xFF, param >> 8])
|
||
|
return _body
|
||
|
|
||
|
|
||
|
class MessageSet(MessageA1Base):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.set,
|
||
|
body_type=0x48)
|
||
|
self.power = False
|
||
|
self.prompt_tone = False
|
||
|
self.mode = 1
|
||
|
self.fan_speed = 40
|
||
|
self.child_lock = False
|
||
|
self.target_humidity = 40
|
||
|
self.swing = False
|
||
|
self.anion = False
|
||
|
self.water_level_set = 50
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
# byte1, power, prompt_tone
|
||
|
power = 0x01 if self.power else 0x00
|
||
|
prompt_tone = 0x40 if self.prompt_tone else 0x00
|
||
|
# byte2 mode
|
||
|
mode = self.mode
|
||
|
# byte3 fan_speed
|
||
|
fan_speed = self.fan_speed
|
||
|
# byte7 target_humidity
|
||
|
target_humidity = self.target_humidity
|
||
|
# byte8 child_lock
|
||
|
child_lock = 0x80 if self.child_lock else 0x00
|
||
|
# byte9 anion
|
||
|
anion = 0x40 if self.anion else 0x00
|
||
|
# byte10 swing
|
||
|
swing = 0x08 if self.swing else 0x00
|
||
|
# byte 13 water_level_set
|
||
|
water_level_set = self.water_level_set
|
||
|
return bytearray([
|
||
|
power | prompt_tone | 0x02,
|
||
|
mode,
|
||
|
fan_speed,
|
||
|
0x00, 0x00, 0x00,
|
||
|
target_humidity,
|
||
|
child_lock,
|
||
|
anion,
|
||
|
swing,
|
||
|
0x00, 0x00,
|
||
|
water_level_set,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00
|
||
|
])
|
||
|
|
||
|
|
||
|
class MessageNewProtocolSet(MessageA1Base):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.set,
|
||
|
body_type=0xB0)
|
||
|
self.light = None
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
pack_count = 0
|
||
|
payload = bytearray([0x00])
|
||
|
if self.light is not None:
|
||
|
pack_count += 1
|
||
|
payload.extend(
|
||
|
NewProtocolMessageBody.pack(
|
||
|
param=NewProtocolTags.indirect_wind,
|
||
|
value=bytearray([0x01 if self.light else 0x00])
|
||
|
))
|
||
|
payload[0] = pack_count
|
||
|
return payload
|
||
|
|
||
|
|
||
|
class A1GeneralMessageBody(MessageBody):
|
||
|
def __init__(self, body):
|
||
|
super().__init__(body)
|
||
|
self.power = (body[1] & 0x01) > 0
|
||
|
self.mode = body[2] & 0x0F
|
||
|
self.fan_speed = body[3] & 0x7F
|
||
|
self.target_humidity = 35 if (body[7] < 35) else body[7]
|
||
|
self.child_lock = (body[8] & 0x80) > 0
|
||
|
self.anion = (body[9] & 0x40) > 0
|
||
|
self.tank = body[10] & 0x7F
|
||
|
self.water_level_set = body[15]
|
||
|
self.current_humidity = body[16]
|
||
|
self.current_temperature = (body[17] - 50) / 2
|
||
|
self.swing = (body[19] & 0x20) > 0
|
||
|
if self.fan_speed < 5:
|
||
|
self.fan_speed = 1
|
||
|
|
||
|
|
||
|
class A1NewProtocolMessageBody(NewProtocolMessageBody):
|
||
|
def __init__(self, body, bt):
|
||
|
super().__init__(body, bt)
|
||
|
params = self.parse()
|
||
|
if NewProtocolTags.light in params:
|
||
|
self.light = (params[NewProtocolTags.light][0] > 0)
|
||
|
|
||
|
|
||
|
class MessageA1Response(MessageResponse):
|
||
|
def __init__(self, message):
|
||
|
super().__init__(message)
|
||
|
if self.message_type in [MessageType.query, MessageType.set, MessageType.notify1]:
|
||
|
if self.body_type in [0xB0, 0xB1, 0xB5]:
|
||
|
self.set_body(A1NewProtocolMessageBody(super().body, self.body_type))
|
||
|
else:
|
||
|
self.set_body(A1GeneralMessageBody(super().body))
|
||
|
elif self.message_type == MessageType.notify2 and self.body_type == 0xA0:
|
||
|
self.set_body(A1GeneralMessageBody(super().body))
|
||
|
self.set_attr()
|