Midea/midea_ac_lan/midea/devices/ac/message.py
2023-09-22 07:45:40 +02:00

581 lines
22 KiB
Python

from enum import IntEnum
from ...core.message import (
MessageType,
MessageRequest,
MessageResponse,
MessageBody,
NewProtocolMessageBody
)
from ...core.crc8 import calculate
BB_AC_MODES = [0, 3, 1, 2, 4, 5]
class NewProtocolTags(IntEnum):
indoor_humidity = 0x0015
screen_display = 0x0017
breezeless = 0x0018
prompt_tone = 0x001A
indirect_wind = 0x0042
fresh_air_1 = 0x0233
fresh_air_2 = 0x004b
class MessageACBase(MessageRequest):
_message_serial = 0
def __init__(self, device_protocol_version, message_type, body_type):
super().__init__(
device_protocol_version=device_protocol_version,
device_type=0xAC,
message_type=message_type,
body_type=body_type
)
MessageACBase._message_serial += 1
if MessageACBase._message_serial >= 254:
MessageACBase._message_serial = 1
self._message_id = MessageACBase._message_serial
@property
def _body(self):
raise NotImplementedError
@property
def body(self):
body = bytearray([self.body_type]) + self._body + bytearray([self._message_id])
body.append(calculate(body))
return body
class MessageQuery(MessageACBase):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.query,
body_type=0x41)
@property
def _body(self):
return bytearray([
0x81, 0x00, 0xFF, 0x03,
0xFF, 0x00, 0x02,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00
])
class MessagePowerQuery(MessageACBase):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.query,
body_type=0x41)
@property
def _body(self):
return bytearray([
0x21, 0x01, 0x44, 0x00, 0x01
])
@property
def body(self):
body = bytearray([self.body_type]) + self._body
body.append(calculate(body))
return body
class MessageSwitchDisplay(MessageACBase):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.query,
body_type=0x41)
@property
def _body(self):
return bytearray([
0x81, 0x00, 0xFF, 0x02,
0xFF, 0x02, 0x02,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00
])
class MessageNewProtocolQuery(MessageACBase):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.query,
body_type=0xB1)
@property
def _body(self):
query_params = [
NewProtocolTags.indirect_wind,
NewProtocolTags.breezeless,
NewProtocolTags.indoor_humidity,
NewProtocolTags.screen_display,
NewProtocolTags.fresh_air_1,
NewProtocolTags.fresh_air_2
]
_body = bytearray([len(query_params)])
for param in query_params:
_body.extend([param & 0xFF, param >> 8])
return _body
class MessageSubProtocol(MessageACBase):
def __init__(self, device_protocol_version, message_type, subprotocol_query_type):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=message_type,
body_type=0xAA)
self._subprotocol_query_type = subprotocol_query_type
@property
def _subprotocol_body(self):
return bytes([])
@property
def body(self):
body = bytearray([self.body_type]) + self._body
body.append(calculate(body))
body.append(self.checksum(body))
return body
@property
def _body(self):
_subprotocol_body = self._subprotocol_body
_body = bytearray([
6 + 2 + (len(_subprotocol_body) if _subprotocol_body is not None else 0),
0x00, 0xFF, 0xFF, self._subprotocol_query_type
])
if _subprotocol_body is not None:
_body.extend(_subprotocol_body)
return _body
class MessageSubProtocolQuery(MessageSubProtocol):
def __init__(self, device_protocol_version, subprotocol_query_type):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.query,
subprotocol_query_type=subprotocol_query_type)
class MessageSubProtocolSet(MessageSubProtocol):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.set,
subprotocol_query_type=0x20)
self.power = False
self.mode = 0
self.target_temperature = 20.0
self.fan_speed = 102
self.boost_mode = False
self.aux_heating = False
self.dry = False
self.eco_mode = False
self.sleep_mode = False
self.sn8_flag = False
self.timer = False
self.prompt_tone = False
@property
def _subprotocol_body(self):
power = 0x01 if self.power else 0
dry = 0x10 if self.power and self.dry else 0
boost_mode = 0x20 if self.boost_mode else 0
aux_heating = 0x40 if self.aux_heating else 0x80
sleep_mode = 0x80 if self.sleep_mode else 0
try:
mode = 0 if self.mode == 0 else BB_AC_MODES[self.mode] - 1
except IndexError:
mode = 2 # set Auto if invalid mode
target_temperature = int(self.target_temperature * 2 + 30)
water_model_temperature_set = int((self.target_temperature - 1) * 2 + 50)
fan_speed = self.fan_speed
eco = 0x40 if self.eco_mode else 0
prompt_tone = 0x01 if self.prompt_tone else 0
timer = 0x04 if (self.sn8_flag and self.timer) else 0
return bytearray([
boost_mode | power | dry, aux_heating, sleep_mode, 0x00,
0x00, mode, target_temperature, fan_speed,
0x32, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x01, 0x00, 0x01, water_model_temperature_set,
prompt_tone, target_temperature, 0x32, 0x66,
0x00, eco | timer, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x08
])
class MessageGeneralSet(MessageACBase):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.set,
body_type=0x40)
self.power = False
self.prompt_tone = True
self.mode = 0
self.target_temperature = 20.0
self.fan_speed = 102
self.swing_vertical = False
self.swing_horizontal = False
self.boost_mode = False
self.smart_eye = False
self.dry = False
self.aux_heating = False
self.eco_mode = False
self.temp_fahrenheit = False
self.sleep_mode = False
self.natural_wind = False
self.frost_protect = False
self.comfort_mode = False
@property
def _body(self):
# Byte1, Power, prompt_tone
power = 0x01 if self.power else 0
prompt_tone = 0x40 if self.prompt_tone else 0
# Byte2, mode target_temperature
mode = (self.mode << 5) & 0xe0
target_temperature = (int(self.target_temperature) & 0xf) | \
(0x10 if int(round(self.target_temperature * 2)) % 2 != 0 else 0)
# Byte 3, fan_speed
fan_speed = self.fan_speed & 0x7f
# Byte 7, swing_mode
swing_mode = 0x30 | \
(0x0c if self.swing_vertical else 0) | \
(0x03 if self.swing_horizontal else 0)
# Byte 8, turbo
boost_mode = 0x20 if self.boost_mode else 0
# Byte 9 aux_heating eco_mode
smart_eye = 0x01 if self.smart_eye else 0
dry = 0x04 if self.dry else 0
aux_heating = 0x08 if self.aux_heating else 0
eco_mode = 0x80 if self.eco_mode else 0
# Byte 10 temp_fahrenheit
temp_fahrenheit = 0x04 if self.temp_fahrenheit else 0
sleep_mode = 0x01 if self.sleep_mode else 0
boost_mode_1 = 0x02 if self.boost_mode else 0
# Byte 17 natural_wind
natural_wind = 0x40 if self.natural_wind else 0
# Byte 21 frost_protect
frost_protect = 0x80 if self.frost_protect else 0
# Byte 22 comfort_mode
comfort_mode = 0x01 if self.comfort_mode else 0
return bytearray([
power | prompt_tone,
mode | target_temperature,
fan_speed,
0x00, 0x00, 0x00,
swing_mode,
boost_mode,
smart_eye | dry | aux_heating | eco_mode,
temp_fahrenheit | sleep_mode | boost_mode_1,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
natural_wind,
0x00, 0x00, 0x00,
frost_protect,
comfort_mode
])
class MessageNewProtocolSet(MessageACBase):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.set,
body_type=0xB0)
self.indirect_wind = None
self.prompt_tone = None
self.breezeless = None
self.screen_display = None
self.fresh_air_1 = None
self.fresh_air_2 = None
@property
def _body(self):
pack_count = 0
payload = bytearray([0x00])
if self.breezeless is not None:
pack_count += 1
payload.extend(
NewProtocolMessageBody.pack(
param=NewProtocolTags.breezeless,
value=bytearray([0x01 if self.breezeless else 0x00])
))
if self.indirect_wind is not None:
pack_count += 1
payload.extend(
NewProtocolMessageBody.pack(
param=NewProtocolTags.indirect_wind,
value=bytearray([0x02 if self.indirect_wind else 0x01])
))
if self.prompt_tone is not None:
pack_count += 1
payload.extend(
NewProtocolMessageBody.pack(
param=NewProtocolTags.prompt_tone,
value=bytearray([0x01 if self.prompt_tone else 0x00])
))
if self.screen_display is not None:
pack_count += 1
payload.extend(
NewProtocolMessageBody.pack(
param=NewProtocolTags.screen_display,
value=bytearray([0x64 if self.screen_display else 0x00])
))
if self.fresh_air_1 is not None and len(self.fresh_air_1) == 2:
pack_count += 1
fresh_air_power = 2 if self.fresh_air_1[0] > 0 else 1
fresh_air_fan_speed = self.fresh_air_1[1]
payload.extend(
NewProtocolMessageBody.pack(
param=NewProtocolTags.fresh_air_1,
value=bytearray([
fresh_air_power,
fresh_air_fan_speed,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00
])
))
if self.fresh_air_2 is not None and len(self.fresh_air_2) == 2:
pack_count += 1
fresh_air_power = 1 if self.fresh_air_2[0] > 0 else 0
fresh_air_fan_speed = self.fresh_air_2[1]
payload.extend(
NewProtocolMessageBody.pack(
param=NewProtocolTags.fresh_air_2,
value=bytearray([
fresh_air_power,
fresh_air_fan_speed,
0xFF
])
))
payload[0] = pack_count
return payload
class XA0MessageBody(MessageBody):
def __init__(self, body):
super().__init__(body)
self.power = (body[1] & 0x1) > 0
self.target_temperature = ((body[1] & 0x3E) >> 1) - 4 + 16.0 + (0.5 if body[1] & 0x40 > 0 else 0.0)
self.mode = (body[2] & 0xe0) >> 5
self.fan_speed = body[3] & 0x7f
self.swing_vertical = (body[7] & 0xC) > 0
self.swing_horizontal = (body[7] & 0x3) > 0
self.boost_mode = ((body[8] & 0x20) > 0) or ((body[10] & 0x2) > 0)
self.smart_eye = (body[9] & 0x01) > 0
self.dry = (body[9] & 0x04) > 0
self.aux_heating = (body[9] & 0x08) > 0
self.eco_mode = (body[9] & 0x10) > 0
self.sleep_mode = (body[10] & 0x01) > 0
self.natural_wind = (body[10] & 0x40) > 0
self.full_dust = (body[13] & 0x20) > 0
self.comfort_mode = (body[14] & 0x1) > 0 if len(body) > 16 else False
class XA1MessageBody(MessageBody):
def __init__(self, body):
super().__init__(body)
if body[13] != 0xFF:
temp_integer = int((body[13] - 50) / 2)
temp_decimal = ((body[18] & 0xF) * 0.1) if len(body) > 20 else 0
if body[13] > 49:
self.indoor_temperature = temp_integer + temp_decimal
else:
self.indoor_temperature = temp_integer - temp_decimal
if body[14] == 0xFF:
self.outdoor_temperature = None
else:
temp_integer = int((body[14] - 50) / 2)
temp_decimal = (((body[18] & 0xF0) >> 4) * 0.1) if len(body) > 20 else 0
if body[14] > 49:
self.outdoor_temperature = temp_integer + temp_decimal
else:
self.outdoor_temperature = temp_integer - temp_decimal
self.indoor_humidity = body[17]
class XBXMessageBody(NewProtocolMessageBody):
def __init__(self, body, bt):
super().__init__(body, bt)
params = self.parse()
if NewProtocolTags.indirect_wind in params:
self.indirect_wind = (params[NewProtocolTags.indirect_wind][0] == 0x02)
if NewProtocolTags.indoor_humidity in params:
self.indoor_humidity = params[NewProtocolTags.indoor_humidity][0]
if NewProtocolTags.breezeless in params:
self.breezeless = (params[NewProtocolTags.breezeless][0] == 1)
if NewProtocolTags.screen_display in params:
self.screen_display = (params[NewProtocolTags.screen_display][0] > 0)
self.screen_display_new = True
if NewProtocolTags.fresh_air_1 in params:
self.fresh_air_1 = True
data = params[NewProtocolTags.fresh_air_1]
self.fresh_air_power = data[0] == 0x02
self.fresh_air_fan_speed = data[1]
if NewProtocolTags.fresh_air_2 in params:
self.fresh_air_2 = True
data = params[NewProtocolTags.fresh_air_2]
self.fresh_air_power = data[0] > 0
self.fresh_air_fan_speed = data[1]
class XC0MessageBody(MessageBody):
def __init__(self, body):
super().__init__(body)
self.power = (body[1] & 0x1) > 0
self.mode = (body[2] & 0xe0) >> 5
self.target_temperature = (body[2] & 0x0F) + 16.0 + (0.5 if body[0x02] & 0x10 > 0 else 0.0)
self.fan_speed = body[3] & 0x7F
self.swing_vertical = (body[7] & 0x0C) > 0
self.swing_horizontal = (body[7] & 0x03) > 0
self.boost_mode = ((body[8] & 0x20) > 0) or ((body[10] & 0x2) > 0)
self.smart_eye = (body[8] & 0x40) > 0
self.natural_wind = (body[9] & 0x2) > 0
self.dry = (body[9] & 0x4) > 0
self.eco_mode = (body[9] & 0x10) > 0
self.aux_heating = (body[9] & 0x08) > 0
self.temp_fahrenheit = (body[10] & 0x04) > 0
self.sleep_mode = (body[10] & 0x01) > 0
if body[11] != 0xFF:
temp_integer = int((body[11] - 50) / 2)
temp_decimal = (body[15] & 0x0F) * 0.1
if body[11] > 49:
self.indoor_temperature = temp_integer + temp_decimal
else:
self.indoor_temperature = temp_integer - temp_decimal
if body[12] == 0xFF:
self.outdoor_temperature = None
else:
temp_integer = int((body[12] - 50) / 2)
temp_decimal = ((body[15] & 0xF0) >> 4) * 0.1
if body[12] > 49:
self.outdoor_temperature = temp_integer + temp_decimal
else:
self.outdoor_temperature = temp_integer - temp_decimal
self.full_dust = (body[13] & 0x20) > 0
self.screen_display = ((body[14] >> 4 & 0x7) != 0x07) and self.power
self.frost_protect = (body[21] & 0x80) > 0 if len(body) > 23 else False
self.comfort_mode = (body[22] & 0x1) > 0 if len(body) > 24 else False
class XC1MessageBody(MessageBody):
def __init__(self, body, analysis_method=3):
super().__init__(body)
if body[3] == 0x44:
self.total_energy_consumption = XC1MessageBody.parse_consumption(
analysis_method,
body[4], body[5], body[6], body[7]
)
self.current_energy_consumption = XC1MessageBody.parse_consumption(
analysis_method,
body[12], body[13], body[14], body[15]
)
self.realtime_power = XC1MessageBody.parse_power(
analysis_method,
body[16], body[17], body[18]
)
elif body[3] == 0x40:
pass
@staticmethod
def parse_value(byte):
return (byte >> 4) * 10 + (byte & 0x0F)
@staticmethod
def parse_power(analysis_method, byte1, byte2, byte3):
if analysis_method == 1:
return float(XC1MessageBody.parse_value(byte1) * 10000 +
XC1MessageBody.parse_value(byte2) * 100 +
XC1MessageBody.parse_value(byte3)) / 10
elif analysis_method == 2:
return float((byte1 << 16) + (byte2 << 8) + byte3) / 10
else:
return float(byte1 * 10000 + byte2 * 100 + byte3) / 10
@staticmethod
def parse_consumption(analysis_method, byte1, byte2, byte3, byte4):
if analysis_method == 1:
return float(XC1MessageBody.parse_value(byte1) * 1000000 +
XC1MessageBody.parse_value(byte2) * 10000 +
XC1MessageBody.parse_value(byte3) * 100 +
XC1MessageBody.parse_value(byte4)) / 100
elif analysis_method == 2:
return float((byte1 << 32) + (byte2 << 16) + (byte3 << 8) + byte4) / 10
else:
return float(byte1 * 1000000 + byte2 * 10000 + byte3 * 100 + byte4) / 100
class XBBMessageBody(MessageBody):
def __init__(self, body):
super().__init__(body)
subprotocol_head = body[:6]
subprotocol_body = body[6:]
data_type = subprotocol_head[-1]
subprotocol_body_len = len(subprotocol_body)
if data_type == 0x20 or data_type == 0x11:
self.power = (subprotocol_body[0] & 0x1) > 0
self.dry = (subprotocol_body[0] & 0x10) > 0
self.boost_mode = (subprotocol_body[0] & 0x20) > 0
self.aux_heating = (subprotocol_body[1] & 0x40) > 0
self.sleep_mode = (subprotocol_body[2] & 0x80) > 0
try:
self.mode = BB_AC_MODES.index(subprotocol_body[5] + 1)
except ValueError:
self.mode = 0
self.target_temperature = (subprotocol_body[6] - 30) / 2
self.fan_speed = subprotocol_body[7]
self.timer = (subprotocol_body[25] & 0x04) > 0 if subprotocol_body_len > 27 else False
self.eco_mode = (subprotocol_body[25] & 0x40) > 0 if subprotocol_body_len > 27 else False
elif data_type == 0x10:
if subprotocol_body[8] & 0x80 == 0x80:
self.indoor_temperature = (0 - (~(subprotocol_body[7] + subprotocol_body[8] * 256) + 1) & 0xffff) / 100
else:
self.indoor_temperature = (subprotocol_body[7] + subprotocol_body[8] * 256) / 100
self.indoor_humidity = subprotocol_body[30]
self.sn8_flag = subprotocol_body[80] == 0x31
elif data_type == 0x12:
pass
elif data_type == 0x30:
if subprotocol_body[6] & 0x80 == 0x80:
self.outdoor_temperature = (0 - (~(subprotocol_body[5] + subprotocol_body[6] * 256) + 1) & 0xffff) / 100
else:
self.outdoor_temperature = (subprotocol_body[5] + subprotocol_body[6] * 256) / 100
elif data_type == 0x13 or data_type == 0x21:
pass
class MessageACResponse(MessageResponse):
def __init__(self, message, power_analysis_method=3):
super().__init__(message)
if self.message_type == MessageType.notify2 and self.body_type == 0xA0:
self.set_body(XA0MessageBody(super().body))
elif self.message_type == MessageType.notify1 and self.body_type == 0xA1:
self.set_body(XA1MessageBody(super().body))
elif self.message_type in [MessageType.query, MessageType.set, MessageType.notify2] and \
self.body_type in [0xB0, 0xB1, 0xB5]:
self.set_body(XBXMessageBody(super().body, self.body_type))
elif self.message_type in [MessageType.query, MessageType.set] and self.body_type == 0xC0:
self.set_body(XC0MessageBody(super().body))
elif self.message_type == MessageType.query and self.body_type == 0xC1:
self.set_body(XC1MessageBody(super().body, power_analysis_method))
elif self.message_type in [MessageType.set, MessageType.query, MessageType.notify2] and \
self.body_type == 0xBB and len(super().body) >= 21:
self.used_subprotocol = True
self.set_body(XBBMessageBody(super().body))
self.set_attr()