582 lines
22 KiB
Python
582 lines
22 KiB
Python
|
from enum import IntEnum
|
||
|
from ...core.message import (
|
||
|
MessageType,
|
||
|
MessageRequest,
|
||
|
MessageResponse,
|
||
|
MessageBody,
|
||
|
NewProtocolMessageBody
|
||
|
)
|
||
|
from ...core.crc8 import calculate
|
||
|
|
||
|
BB_AC_MODES = [0, 3, 1, 2, 4, 5]
|
||
|
|
||
|
|
||
|
class NewProtocolTags(IntEnum):
|
||
|
indoor_humidity = 0x0015
|
||
|
screen_display = 0x0017
|
||
|
breezeless = 0x0018
|
||
|
prompt_tone = 0x001A
|
||
|
indirect_wind = 0x0042
|
||
|
fresh_air_1 = 0x0233
|
||
|
fresh_air_2 = 0x004b
|
||
|
|
||
|
|
||
|
class MessageACBase(MessageRequest):
|
||
|
_message_serial = 0
|
||
|
|
||
|
def __init__(self, device_protocol_version, message_type, body_type):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
device_type=0xAC,
|
||
|
message_type=message_type,
|
||
|
body_type=body_type
|
||
|
)
|
||
|
MessageACBase._message_serial += 1
|
||
|
if MessageACBase._message_serial >= 254:
|
||
|
MessageACBase._message_serial = 1
|
||
|
self._message_id = MessageACBase._message_serial
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
raise NotImplementedError
|
||
|
|
||
|
@property
|
||
|
def body(self):
|
||
|
body = bytearray([self.body_type]) + self._body + bytearray([self._message_id])
|
||
|
body.append(calculate(body))
|
||
|
return body
|
||
|
|
||
|
|
||
|
class MessageQuery(MessageACBase):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.query,
|
||
|
body_type=0x41)
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
return bytearray([
|
||
|
0x81, 0x00, 0xFF, 0x03,
|
||
|
0xFF, 0x00, 0x02,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00
|
||
|
])
|
||
|
|
||
|
|
||
|
class MessagePowerQuery(MessageACBase):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.query,
|
||
|
body_type=0x41)
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
return bytearray([
|
||
|
0x21, 0x01, 0x44, 0x00, 0x01
|
||
|
])
|
||
|
|
||
|
@property
|
||
|
def body(self):
|
||
|
body = bytearray([self.body_type]) + self._body
|
||
|
body.append(calculate(body))
|
||
|
return body
|
||
|
|
||
|
|
||
|
class MessageSwitchDisplay(MessageACBase):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.query,
|
||
|
body_type=0x41)
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
return bytearray([
|
||
|
0x81, 0x00, 0xFF, 0x02,
|
||
|
0xFF, 0x02, 0x02,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00
|
||
|
])
|
||
|
|
||
|
|
||
|
class MessageNewProtocolQuery(MessageACBase):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.query,
|
||
|
body_type=0xB1)
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
query_params = [
|
||
|
NewProtocolTags.indirect_wind,
|
||
|
NewProtocolTags.breezeless,
|
||
|
NewProtocolTags.indoor_humidity,
|
||
|
NewProtocolTags.screen_display,
|
||
|
NewProtocolTags.fresh_air_1,
|
||
|
NewProtocolTags.fresh_air_2
|
||
|
]
|
||
|
|
||
|
_body = bytearray([len(query_params)])
|
||
|
for param in query_params:
|
||
|
_body.extend([param & 0xFF, param >> 8])
|
||
|
return _body
|
||
|
|
||
|
|
||
|
class MessageSubProtocol(MessageACBase):
|
||
|
def __init__(self, device_protocol_version, message_type, subprotocol_query_type):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=message_type,
|
||
|
body_type=0xAA)
|
||
|
self._subprotocol_query_type = subprotocol_query_type
|
||
|
|
||
|
@property
|
||
|
def _subprotocol_body(self):
|
||
|
return bytes([])
|
||
|
|
||
|
@property
|
||
|
def body(self):
|
||
|
body = bytearray([self.body_type]) + self._body
|
||
|
body.append(calculate(body))
|
||
|
body.append(self.checksum(body))
|
||
|
return body
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
_subprotocol_body = self._subprotocol_body
|
||
|
_body = bytearray([
|
||
|
6 + 2 + (len(_subprotocol_body) if _subprotocol_body is not None else 0),
|
||
|
0x00, 0xFF, 0xFF, self._subprotocol_query_type
|
||
|
])
|
||
|
if _subprotocol_body is not None:
|
||
|
_body.extend(_subprotocol_body)
|
||
|
return _body
|
||
|
|
||
|
|
||
|
class MessageSubProtocolQuery(MessageSubProtocol):
|
||
|
def __init__(self, device_protocol_version, subprotocol_query_type):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.query,
|
||
|
subprotocol_query_type=subprotocol_query_type)
|
||
|
|
||
|
|
||
|
class MessageSubProtocolSet(MessageSubProtocol):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.set,
|
||
|
subprotocol_query_type=0x20)
|
||
|
self.power = False
|
||
|
self.mode = 0
|
||
|
self.target_temperature = 20.0
|
||
|
self.fan_speed = 102
|
||
|
self.boost_mode = False
|
||
|
self.aux_heating = False
|
||
|
self.dry = False
|
||
|
self.eco_mode = False
|
||
|
self.sleep_mode = False
|
||
|
self.sn8_flag = False
|
||
|
self.timer = False
|
||
|
self.prompt_tone = False
|
||
|
|
||
|
@property
|
||
|
def _subprotocol_body(self):
|
||
|
power = 0x01 if self.power else 0
|
||
|
dry = 0x10 if self.power and self.dry else 0
|
||
|
boost_mode = 0x20 if self.boost_mode else 0
|
||
|
aux_heating = 0x40 if self.aux_heating else 0x80
|
||
|
sleep_mode = 0x80 if self.sleep_mode else 0
|
||
|
try:
|
||
|
mode = 0 if self.mode == 0 else BB_AC_MODES[self.mode] - 1
|
||
|
except IndexError:
|
||
|
mode = 2 # set Auto if invalid mode
|
||
|
target_temperature = int(self.target_temperature * 2 + 30)
|
||
|
water_model_temperature_set = int((self.target_temperature - 1) * 2 + 50)
|
||
|
fan_speed = self.fan_speed
|
||
|
eco = 0x40 if self.eco_mode else 0
|
||
|
|
||
|
prompt_tone = 0x01 if self.prompt_tone else 0
|
||
|
timer = 0x04 if (self.sn8_flag and self.timer) else 0
|
||
|
return bytearray([
|
||
|
boost_mode | power | dry, aux_heating, sleep_mode, 0x00,
|
||
|
0x00, mode, target_temperature, fan_speed,
|
||
|
0x32, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x01,
|
||
|
0x01, 0x00, 0x01, water_model_temperature_set,
|
||
|
prompt_tone, target_temperature, 0x32, 0x66,
|
||
|
0x00, eco | timer, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x08
|
||
|
])
|
||
|
|
||
|
|
||
|
class MessageGeneralSet(MessageACBase):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.set,
|
||
|
body_type=0x40)
|
||
|
self.power = False
|
||
|
self.prompt_tone = True
|
||
|
self.mode = 0
|
||
|
self.target_temperature = 20.0
|
||
|
self.fan_speed = 102
|
||
|
self.swing_vertical = False
|
||
|
self.swing_horizontal = False
|
||
|
self.boost_mode = False
|
||
|
self.smart_eye = False
|
||
|
self.dry = False
|
||
|
self.aux_heating = False
|
||
|
self.eco_mode = False
|
||
|
self.temp_fahrenheit = False
|
||
|
self.sleep_mode = False
|
||
|
self.natural_wind = False
|
||
|
self.frost_protect = False
|
||
|
self.comfort_mode = False
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
# Byte1, Power, prompt_tone
|
||
|
power = 0x01 if self.power else 0
|
||
|
prompt_tone = 0x40 if self.prompt_tone else 0
|
||
|
# Byte2, mode target_temperature
|
||
|
mode = (self.mode << 5) & 0xe0
|
||
|
target_temperature = (int(self.target_temperature) & 0xf) | \
|
||
|
(0x10 if int(round(self.target_temperature * 2)) % 2 != 0 else 0)
|
||
|
# Byte 3, fan_speed
|
||
|
fan_speed = self.fan_speed & 0x7f
|
||
|
# Byte 7, swing_mode
|
||
|
swing_mode = 0x30 | \
|
||
|
(0x0c if self.swing_vertical else 0) | \
|
||
|
(0x03 if self.swing_horizontal else 0)
|
||
|
# Byte 8, turbo
|
||
|
boost_mode = 0x20 if self.boost_mode else 0
|
||
|
# Byte 9 aux_heating eco_mode
|
||
|
smart_eye = 0x01 if self.smart_eye else 0
|
||
|
dry = 0x04 if self.dry else 0
|
||
|
aux_heating = 0x08 if self.aux_heating else 0
|
||
|
eco_mode = 0x80 if self.eco_mode else 0
|
||
|
# Byte 10 temp_fahrenheit
|
||
|
temp_fahrenheit = 0x04 if self.temp_fahrenheit else 0
|
||
|
sleep_mode = 0x01 if self.sleep_mode else 0
|
||
|
boost_mode_1 = 0x02 if self.boost_mode else 0
|
||
|
# Byte 17 natural_wind
|
||
|
natural_wind = 0x40 if self.natural_wind else 0
|
||
|
# Byte 21 frost_protect
|
||
|
frost_protect = 0x80 if self.frost_protect else 0
|
||
|
# Byte 22 comfort_mode
|
||
|
comfort_mode = 0x01 if self.comfort_mode else 0
|
||
|
|
||
|
return bytearray([
|
||
|
power | prompt_tone,
|
||
|
mode | target_temperature,
|
||
|
fan_speed,
|
||
|
0x00, 0x00, 0x00,
|
||
|
swing_mode,
|
||
|
boost_mode,
|
||
|
smart_eye | dry | aux_heating | eco_mode,
|
||
|
temp_fahrenheit | sleep_mode | boost_mode_1,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00,
|
||
|
natural_wind,
|
||
|
0x00, 0x00, 0x00,
|
||
|
frost_protect,
|
||
|
comfort_mode
|
||
|
])
|
||
|
|
||
|
|
||
|
class MessageNewProtocolSet(MessageACBase):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.set,
|
||
|
body_type=0xB0)
|
||
|
self.indirect_wind = None
|
||
|
self.prompt_tone = None
|
||
|
self.breezeless = None
|
||
|
self.screen_display = None
|
||
|
self.fresh_air_1 = None
|
||
|
self.fresh_air_2 = None
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
pack_count = 0
|
||
|
payload = bytearray([0x00])
|
||
|
if self.breezeless is not None:
|
||
|
pack_count += 1
|
||
|
payload.extend(
|
||
|
NewProtocolMessageBody.pack(
|
||
|
param=NewProtocolTags.breezeless,
|
||
|
value=bytearray([0x01 if self.breezeless else 0x00])
|
||
|
))
|
||
|
if self.indirect_wind is not None:
|
||
|
pack_count += 1
|
||
|
payload.extend(
|
||
|
NewProtocolMessageBody.pack(
|
||
|
param=NewProtocolTags.indirect_wind,
|
||
|
value=bytearray([0x02 if self.indirect_wind else 0x01])
|
||
|
))
|
||
|
if self.prompt_tone is not None:
|
||
|
pack_count += 1
|
||
|
payload.extend(
|
||
|
NewProtocolMessageBody.pack(
|
||
|
param=NewProtocolTags.prompt_tone,
|
||
|
value=bytearray([0x01 if self.prompt_tone else 0x00])
|
||
|
))
|
||
|
if self.screen_display is not None:
|
||
|
pack_count += 1
|
||
|
payload.extend(
|
||
|
NewProtocolMessageBody.pack(
|
||
|
param=NewProtocolTags.screen_display,
|
||
|
value=bytearray([0x64 if self.screen_display else 0x00])
|
||
|
))
|
||
|
if self.fresh_air_1 is not None and len(self.fresh_air_1) == 2:
|
||
|
pack_count += 1
|
||
|
fresh_air_power = 2 if self.fresh_air_1[0] > 0 else 1
|
||
|
fresh_air_fan_speed = self.fresh_air_1[1]
|
||
|
payload.extend(
|
||
|
NewProtocolMessageBody.pack(
|
||
|
param=NewProtocolTags.fresh_air_1,
|
||
|
value=bytearray([
|
||
|
fresh_air_power,
|
||
|
fresh_air_fan_speed,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00
|
||
|
])
|
||
|
))
|
||
|
if self.fresh_air_2 is not None and len(self.fresh_air_2) == 2:
|
||
|
pack_count += 1
|
||
|
fresh_air_power = 1 if self.fresh_air_2[0] > 0 else 0
|
||
|
fresh_air_fan_speed = self.fresh_air_2[1]
|
||
|
payload.extend(
|
||
|
NewProtocolMessageBody.pack(
|
||
|
param=NewProtocolTags.fresh_air_2,
|
||
|
value=bytearray([
|
||
|
fresh_air_power,
|
||
|
fresh_air_fan_speed,
|
||
|
0xFF
|
||
|
])
|
||
|
))
|
||
|
payload[0] = pack_count
|
||
|
return payload
|
||
|
|
||
|
|
||
|
class XA0MessageBody(MessageBody):
|
||
|
def __init__(self, body):
|
||
|
super().__init__(body)
|
||
|
self.power = (body[1] & 0x1) > 0
|
||
|
self.target_temperature = ((body[1] & 0x3E) >> 1) - 4 + 16.0 + (0.5 if body[1] & 0x40 > 0 else 0.0)
|
||
|
self.mode = (body[2] & 0xe0) >> 5
|
||
|
self.fan_speed = body[3] & 0x7f
|
||
|
self.swing_vertical = (body[7] & 0xC) > 0
|
||
|
self.swing_horizontal = (body[7] & 0x3) > 0
|
||
|
self.boost_mode = ((body[8] & 0x20) > 0) or ((body[10] & 0x2) > 0)
|
||
|
self.smart_eye = (body[9] & 0x01) > 0
|
||
|
self.dry = (body[9] & 0x04) > 0
|
||
|
self.aux_heating = (body[9] & 0x08) > 0
|
||
|
self.eco_mode = (body[9] & 0x10) > 0
|
||
|
self.sleep_mode = (body[10] & 0x01) > 0
|
||
|
self.natural_wind = (body[10] & 0x40) > 0
|
||
|
self.full_dust = (body[13] & 0x20) > 0
|
||
|
self.comfort_mode = (body[14] & 0x1) > 0 if len(body) > 16 else False
|
||
|
|
||
|
|
||
|
class XA1MessageBody(MessageBody):
|
||
|
def __init__(self, body):
|
||
|
super().__init__(body)
|
||
|
if body[13] != 0xFF:
|
||
|
temp_integer = int((body[13] - 50) / 2)
|
||
|
temp_decimal = ((body[18] & 0xF) * 0.1) if len(body) > 20 else 0
|
||
|
if body[13] > 49:
|
||
|
self.indoor_temperature = temp_integer + temp_decimal
|
||
|
else:
|
||
|
self.indoor_temperature = temp_integer - temp_decimal
|
||
|
if body[14] == 0xFF:
|
||
|
self.outdoor_temperature = None
|
||
|
else:
|
||
|
temp_integer = int((body[14] - 50) / 2)
|
||
|
temp_decimal = (((body[18] & 0xF0) >> 4) * 0.1) if len(body) > 20 else 0
|
||
|
if body[14] > 49:
|
||
|
self.outdoor_temperature = temp_integer + temp_decimal
|
||
|
else:
|
||
|
self.outdoor_temperature = temp_integer - temp_decimal
|
||
|
self.indoor_humidity = body[17]
|
||
|
|
||
|
|
||
|
class XBXMessageBody(NewProtocolMessageBody):
|
||
|
def __init__(self, body, bt):
|
||
|
super().__init__(body, bt)
|
||
|
params = self.parse()
|
||
|
if NewProtocolTags.indirect_wind in params:
|
||
|
self.indirect_wind = (params[NewProtocolTags.indirect_wind][0] == 0x02)
|
||
|
if NewProtocolTags.indoor_humidity in params:
|
||
|
self.indoor_humidity = params[NewProtocolTags.indoor_humidity][0]
|
||
|
if NewProtocolTags.breezeless in params:
|
||
|
self.breezeless = (params[NewProtocolTags.breezeless][0] == 1)
|
||
|
if NewProtocolTags.screen_display in params:
|
||
|
self.screen_display = (params[NewProtocolTags.screen_display][0] > 0)
|
||
|
self.screen_display_new = True
|
||
|
if NewProtocolTags.fresh_air_1 in params:
|
||
|
self.fresh_air_1 = True
|
||
|
data = params[NewProtocolTags.fresh_air_1]
|
||
|
self.fresh_air_power = data[0] == 0x02
|
||
|
self.fresh_air_fan_speed = data[1]
|
||
|
if NewProtocolTags.fresh_air_2 in params:
|
||
|
self.fresh_air_2 = True
|
||
|
data = params[NewProtocolTags.fresh_air_2]
|
||
|
self.fresh_air_power = data[0] > 0
|
||
|
self.fresh_air_fan_speed = data[1]
|
||
|
|
||
|
|
||
|
class XC0MessageBody(MessageBody):
|
||
|
def __init__(self, body):
|
||
|
super().__init__(body)
|
||
|
self.power = (body[1] & 0x1) > 0
|
||
|
self.mode = (body[2] & 0xe0) >> 5
|
||
|
self.target_temperature = (body[2] & 0x0F) + 16.0 + (0.5 if body[0x02] & 0x10 > 0 else 0.0)
|
||
|
self.fan_speed = body[3] & 0x7F
|
||
|
self.swing_vertical = (body[7] & 0x0C) > 0
|
||
|
self.swing_horizontal = (body[7] & 0x03) > 0
|
||
|
self.boost_mode = ((body[8] & 0x20) > 0) or ((body[10] & 0x2) > 0)
|
||
|
self.smart_eye = (body[8] & 0x40) > 0
|
||
|
self.natural_wind = (body[9] & 0x2) > 0
|
||
|
self.dry = (body[9] & 0x4) > 0
|
||
|
self.eco_mode = (body[9] & 0x10) > 0
|
||
|
self.aux_heating = (body[9] & 0x08) > 0
|
||
|
self.temp_fahrenheit = (body[10] & 0x04) > 0
|
||
|
self.sleep_mode = (body[10] & 0x01) > 0
|
||
|
if body[11] != 0xFF:
|
||
|
temp_integer = int((body[11] - 50) / 2)
|
||
|
temp_decimal = (body[15] & 0x0F) * 0.1
|
||
|
if body[11] > 49:
|
||
|
self.indoor_temperature = temp_integer + temp_decimal
|
||
|
else:
|
||
|
self.indoor_temperature = temp_integer - temp_decimal
|
||
|
if body[12] == 0xFF:
|
||
|
self.outdoor_temperature = None
|
||
|
else:
|
||
|
temp_integer = int((body[12] - 50) / 2)
|
||
|
temp_decimal = ((body[15] & 0xF0) >> 4) * 0.1
|
||
|
if body[12] > 49:
|
||
|
self.outdoor_temperature = temp_integer + temp_decimal
|
||
|
else:
|
||
|
self.outdoor_temperature = temp_integer - temp_decimal
|
||
|
self.full_dust = (body[13] & 0x20) > 0
|
||
|
self.screen_display = ((body[14] >> 4 & 0x7) != 0x07) and self.power
|
||
|
self.frost_protect = (body[21] & 0x80) > 0 if len(body) > 23 else False
|
||
|
self.comfort_mode = (body[22] & 0x1) > 0 if len(body) > 24 else False
|
||
|
|
||
|
|
||
|
class XC1MessageBody(MessageBody):
|
||
|
def __init__(self, body, analysis_method=3):
|
||
|
super().__init__(body)
|
||
|
if body[3] == 0x44:
|
||
|
self.total_energy_consumption = XC1MessageBody.parse_consumption(
|
||
|
analysis_method,
|
||
|
body[4], body[5], body[6], body[7]
|
||
|
)
|
||
|
self.current_energy_consumption = XC1MessageBody.parse_consumption(
|
||
|
analysis_method,
|
||
|
body[12], body[13], body[14], body[15]
|
||
|
)
|
||
|
self.realtime_power = XC1MessageBody.parse_power(
|
||
|
analysis_method,
|
||
|
body[16], body[17], body[18]
|
||
|
)
|
||
|
elif body[3] == 0x40:
|
||
|
pass
|
||
|
|
||
|
@staticmethod
|
||
|
def parse_value(byte):
|
||
|
return (byte >> 4) * 10 + (byte & 0x0F)
|
||
|
|
||
|
@staticmethod
|
||
|
def parse_power(analysis_method, byte1, byte2, byte3):
|
||
|
if analysis_method == 1:
|
||
|
return float(XC1MessageBody.parse_value(byte1) * 10000 +
|
||
|
XC1MessageBody.parse_value(byte2) * 100 +
|
||
|
XC1MessageBody.parse_value(byte3)) / 10
|
||
|
elif analysis_method == 2:
|
||
|
return float((byte1 << 16) + (byte2 << 8) + byte3) / 10
|
||
|
else:
|
||
|
return float(byte1 * 10000 + byte2 * 100 + byte3) / 10
|
||
|
|
||
|
@staticmethod
|
||
|
def parse_consumption(analysis_method, byte1, byte2, byte3, byte4):
|
||
|
if analysis_method == 1:
|
||
|
return float(XC1MessageBody.parse_value(byte1) * 1000000 +
|
||
|
XC1MessageBody.parse_value(byte2) * 10000 +
|
||
|
XC1MessageBody.parse_value(byte3) * 100 +
|
||
|
XC1MessageBody.parse_value(byte4)) / 100
|
||
|
elif analysis_method == 2:
|
||
|
return float((byte1 << 32) + (byte2 << 16) + (byte3 << 8) + byte4) / 10
|
||
|
else:
|
||
|
return float(byte1 * 1000000 + byte2 * 10000 + byte3 * 100 + byte4) / 100
|
||
|
|
||
|
|
||
|
class XBBMessageBody(MessageBody):
|
||
|
def __init__(self, body):
|
||
|
super().__init__(body)
|
||
|
subprotocol_head = body[:6]
|
||
|
subprotocol_body = body[6:]
|
||
|
data_type = subprotocol_head[-1]
|
||
|
subprotocol_body_len = len(subprotocol_body)
|
||
|
if data_type == 0x20 or data_type == 0x11:
|
||
|
self.power = (subprotocol_body[0] & 0x1) > 0
|
||
|
self.dry = (subprotocol_body[0] & 0x10) > 0
|
||
|
self.boost_mode = (subprotocol_body[0] & 0x20) > 0
|
||
|
self.aux_heating = (subprotocol_body[1] & 0x40) > 0
|
||
|
self.sleep_mode = (subprotocol_body[2] & 0x80) > 0
|
||
|
try:
|
||
|
self.mode = BB_AC_MODES.index(subprotocol_body[5] + 1)
|
||
|
except ValueError:
|
||
|
self.mode = 0
|
||
|
self.target_temperature = (subprotocol_body[6] - 30) / 2
|
||
|
self.fan_speed = subprotocol_body[7]
|
||
|
self.timer = (subprotocol_body[25] & 0x04) > 0 if subprotocol_body_len > 27 else False
|
||
|
self.eco_mode = (subprotocol_body[25] & 0x40) > 0 if subprotocol_body_len > 27 else False
|
||
|
elif data_type == 0x10:
|
||
|
if subprotocol_body[8] & 0x80 == 0x80:
|
||
|
self.indoor_temperature = (0 - (~(subprotocol_body[7] + subprotocol_body[8] * 256) + 1) & 0xffff) / 100
|
||
|
else:
|
||
|
self.indoor_temperature = (subprotocol_body[7] + subprotocol_body[8] * 256) / 100
|
||
|
self.indoor_humidity = subprotocol_body[30]
|
||
|
self.sn8_flag = subprotocol_body[80] == 0x31
|
||
|
elif data_type == 0x12:
|
||
|
pass
|
||
|
elif data_type == 0x30:
|
||
|
if subprotocol_body[6] & 0x80 == 0x80:
|
||
|
self.outdoor_temperature = (0 - (~(subprotocol_body[5] + subprotocol_body[6] * 256) + 1) & 0xffff) / 100
|
||
|
else:
|
||
|
self.outdoor_temperature = (subprotocol_body[5] + subprotocol_body[6] * 256) / 100
|
||
|
elif data_type == 0x13 or data_type == 0x21:
|
||
|
pass
|
||
|
|
||
|
|
||
|
class MessageACResponse(MessageResponse):
|
||
|
def __init__(self, message, power_analysis_method=3):
|
||
|
super().__init__(message)
|
||
|
if self.message_type == MessageType.notify2 and self.body_type == 0xA0:
|
||
|
self.set_body(XA0MessageBody(super().body))
|
||
|
elif self.message_type == MessageType.notify1 and self.body_type == 0xA1:
|
||
|
self.set_body(XA1MessageBody(super().body))
|
||
|
elif self.message_type in [MessageType.query, MessageType.set, MessageType.notify2] and \
|
||
|
self.body_type in [0xB0, 0xB1, 0xB5]:
|
||
|
self.set_body(XBXMessageBody(super().body, self.body_type))
|
||
|
elif self.message_type in [MessageType.query, MessageType.set] and self.body_type == 0xC0:
|
||
|
self.set_body(XC0MessageBody(super().body))
|
||
|
elif self.message_type == MessageType.query and self.body_type == 0xC1:
|
||
|
self.set_body(XC1MessageBody(super().body, power_analysis_method))
|
||
|
elif self.message_type in [MessageType.set, MessageType.query, MessageType.notify2] and \
|
||
|
self.body_type == 0xBB and len(super().body) >= 21:
|
||
|
self.used_subprotocol = True
|
||
|
self.set_body(XBBMessageBody(super().body))
|
||
|
self.set_attr()
|