134 lines
4.5 KiB
Python
134 lines
4.5 KiB
Python
from ...core.message import (
|
|
MessageType,
|
|
MessageRequest,
|
|
MessageResponse,
|
|
MessageBody,
|
|
)
|
|
|
|
|
|
class MessageFABase(MessageRequest):
|
|
def __init__(self, device_protocol_version, message_type, body_type):
|
|
super().__init__(
|
|
device_protocol_version=device_protocol_version,
|
|
device_type=0xCE,
|
|
message_type=message_type,
|
|
body_type=body_type
|
|
)
|
|
|
|
@property
|
|
def _body(self):
|
|
raise NotImplementedError
|
|
|
|
|
|
class MessageQuery(MessageFABase):
|
|
def __init__(self, device_protocol_version):
|
|
super().__init__(
|
|
device_protocol_version=device_protocol_version,
|
|
message_type=MessageType.query,
|
|
body_type=0x01)
|
|
|
|
@property
|
|
def _body(self):
|
|
return bytearray([])
|
|
|
|
|
|
class MessageSet(MessageFABase):
|
|
def __init__(self, device_protocol_version):
|
|
super().__init__(
|
|
device_protocol_version=device_protocol_version,
|
|
message_type=MessageType.set,
|
|
body_type=0x01)
|
|
|
|
self.power = False
|
|
self.fan_speed = 0
|
|
self.link_to_ac = False
|
|
self.sleep_mode = False
|
|
self.eco_mode = False
|
|
self.aux_heating = False
|
|
self.powerful_purify = False
|
|
self.scheduled = False
|
|
self.child_lock = False
|
|
|
|
@property
|
|
def _body(self):
|
|
power = 0x80 if self.power else 0x00
|
|
link_to_ac = 0x01 if self.link_to_ac else 0x00
|
|
sleep_mode = 0x02 if self.sleep_mode else 0x00
|
|
eco_mode = 0x04 if self.eco_mode else 0x00
|
|
aux_heating = 0x08 if self.aux_heating else 0x00
|
|
powerful_purify = 0x10 if self.powerful_purify else 0x00
|
|
scheduled = 0x01 if self.scheduled else 0x00
|
|
child_lock = 0x7F if self.child_lock else 0x00
|
|
return bytearray([
|
|
power | 0x01,
|
|
self.fan_speed,
|
|
link_to_ac | sleep_mode | eco_mode | aux_heating | powerful_purify,
|
|
scheduled,
|
|
0x00,
|
|
child_lock
|
|
])
|
|
|
|
|
|
class CEGeneralMessageBody(MessageBody):
|
|
def __init__(self, body):
|
|
super().__init__(body)
|
|
self.power = (body[1] & 0x80) > 0
|
|
self.child_lock = (body[1] & 0x20) > 0
|
|
self.scheduled = (body[1] & 0x40) > 0
|
|
self.fan_speed = body[2]
|
|
self.pm25 = (body[3] << 8) + body[4]
|
|
self.co2 = (body[5] << 8) + body[6]
|
|
if body[7] != 0xFF:
|
|
self.current_humidity = (body[7] << 8) + body[8] / 10
|
|
else:
|
|
self.current_humidity = None
|
|
if body[9] != 0xFF:
|
|
self.current_temperature = (body[9] << 8) + (body[10] - 60) / 2
|
|
else:
|
|
self.current_temperature = None
|
|
if body[11] != 0xFF:
|
|
self.hcho = (body[11] << 8) + body[12] / 1000
|
|
else:
|
|
self.hcho = None
|
|
self.link_to_ac = (body[17] & 0x01) > 0
|
|
self.sleep_mode = (body[17] & 0x02) > 0
|
|
self.eco_mode = (body[17] & 0x04) > 0
|
|
if (body[19] & 0x02) > 0:
|
|
self.aux_heating = (body[17] & 0x08) > 0
|
|
else:
|
|
self.aux_heating = None
|
|
self.powerful_purify = (body[17] & 0x10) > 0
|
|
self.filter_cleaning_reminder = (body[18] & 0x01) > 0
|
|
self.filter_change_reminder = (body[18] & 0x02) > 0
|
|
self.error_code = body[24]
|
|
|
|
|
|
class CENotifyMessageBody(MessageBody):
|
|
def __init__(self, body):
|
|
super().__init__(body)
|
|
self.pm25 = (body[1] << 8) + body[2]
|
|
self.co2 = (body[3] << 8) + body[4]
|
|
if body[5] != 0xFF:
|
|
self.current_humidity = (body[5] << 8) + body[6] / 10
|
|
else:
|
|
self.current_humidity = None
|
|
if body[7] != 0xFF:
|
|
self.current_temperature = (body[7] << 8) + (body[8] - 60) / 2
|
|
else:
|
|
self.current_temperature = None
|
|
if body[9] != 0xFF:
|
|
self.hcho = (body[9] << 8) + body[10] / 1000
|
|
else:
|
|
self.hcho = None
|
|
self.error_code = body[12]
|
|
|
|
|
|
class MessageCEResponse(MessageResponse):
|
|
def __init__(self, message):
|
|
super().__init__(message)
|
|
if (self.message_type in [MessageType.query, MessageType.set] and self.body_type == 0x01) or \
|
|
(self.message_type == MessageType.notify1 and self.body_type == 0x02):
|
|
self.set_body(CEGeneralMessageBody(super().body))
|
|
elif self.message_type == MessageType.notify1 and self.body_type == 0x01:
|
|
self.set_body(CENotifyMessageBody(super().body))
|
|
self.set_attr()
|