Midea/midea_ac_lan/midea/devices/ce/message.py

135 lines
4.5 KiB
Python
Raw Permalink Normal View History

2023-09-22 05:45:40 +00:00
from ...core.message import (
MessageType,
MessageRequest,
MessageResponse,
MessageBody,
)
class MessageFABase(MessageRequest):
def __init__(self, device_protocol_version, message_type, body_type):
super().__init__(
device_protocol_version=device_protocol_version,
device_type=0xCE,
message_type=message_type,
body_type=body_type
)
@property
def _body(self):
raise NotImplementedError
class MessageQuery(MessageFABase):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.query,
body_type=0x01)
@property
def _body(self):
return bytearray([])
class MessageSet(MessageFABase):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.set,
body_type=0x01)
self.power = False
self.fan_speed = 0
self.link_to_ac = False
self.sleep_mode = False
self.eco_mode = False
self.aux_heating = False
self.powerful_purify = False
self.scheduled = False
self.child_lock = False
@property
def _body(self):
power = 0x80 if self.power else 0x00
link_to_ac = 0x01 if self.link_to_ac else 0x00
sleep_mode = 0x02 if self.sleep_mode else 0x00
eco_mode = 0x04 if self.eco_mode else 0x00
aux_heating = 0x08 if self.aux_heating else 0x00
powerful_purify = 0x10 if self.powerful_purify else 0x00
scheduled = 0x01 if self.scheduled else 0x00
child_lock = 0x7F if self.child_lock else 0x00
return bytearray([
power | 0x01,
self.fan_speed,
link_to_ac | sleep_mode | eco_mode | aux_heating | powerful_purify,
scheduled,
0x00,
child_lock
])
class CEGeneralMessageBody(MessageBody):
def __init__(self, body):
super().__init__(body)
self.power = (body[1] & 0x80) > 0
self.child_lock = (body[1] & 0x20) > 0
self.scheduled = (body[1] & 0x40) > 0
self.fan_speed = body[2]
self.pm25 = (body[3] << 8) + body[4]
self.co2 = (body[5] << 8) + body[6]
if body[7] != 0xFF:
self.current_humidity = (body[7] << 8) + body[8] / 10
else:
self.current_humidity = None
if body[9] != 0xFF:
self.current_temperature = (body[9] << 8) + (body[10] - 60) / 2
else:
self.current_temperature = None
if body[11] != 0xFF:
self.hcho = (body[11] << 8) + body[12] / 1000
else:
self.hcho = None
self.link_to_ac = (body[17] & 0x01) > 0
self.sleep_mode = (body[17] & 0x02) > 0
self.eco_mode = (body[17] & 0x04) > 0
if (body[19] & 0x02) > 0:
self.aux_heating = (body[17] & 0x08) > 0
else:
self.aux_heating = None
self.powerful_purify = (body[17] & 0x10) > 0
self.filter_cleaning_reminder = (body[18] & 0x01) > 0
self.filter_change_reminder = (body[18] & 0x02) > 0
self.error_code = body[24]
class CENotifyMessageBody(MessageBody):
def __init__(self, body):
super().__init__(body)
self.pm25 = (body[1] << 8) + body[2]
self.co2 = (body[3] << 8) + body[4]
if body[5] != 0xFF:
self.current_humidity = (body[5] << 8) + body[6] / 10
else:
self.current_humidity = None
if body[7] != 0xFF:
self.current_temperature = (body[7] << 8) + (body[8] - 60) / 2
else:
self.current_temperature = None
if body[9] != 0xFF:
self.hcho = (body[9] << 8) + body[10] / 1000
else:
self.hcho = None
self.error_code = body[12]
class MessageCEResponse(MessageResponse):
def __init__(self, message):
super().__init__(message)
if (self.message_type in [MessageType.query, MessageType.set] and self.body_type == 0x01) or \
(self.message_type == MessageType.notify1 and self.body_type == 0x02):
self.set_body(CEGeneralMessageBody(super().body))
elif self.message_type == MessageType.notify1 and self.body_type == 0x01:
self.set_body(CENotifyMessageBody(super().body))
self.set_attr()