Midea/midea_ac_lan/midea/devices/ed/message.py

195 lines
6.5 KiB
Python
Raw Normal View History

2023-09-22 05:45:40 +00:00
from enum import IntEnum
from ...core.message import (
MessageType,
MessageRequest,
MessageResponse,
MessageBody,
)
class NewSetTags(IntEnum):
power = 0x0100
lock = 0x0201
class EDNewSetParamPack:
@staticmethod
def pack(param, value, addition=0):
return bytearray([param & 0xFF, param >> 8, value, addition & 0xFF, addition >> 8])
class MessageEDBase(MessageRequest):
def __init__(self, device_protocol_version, message_type, body_type):
super().__init__(
device_protocol_version=device_protocol_version,
device_type=0xED,
message_type=message_type,
body_type=body_type
)
@property
def _body(self):
raise NotImplementedError
class MessageQuery(MessageEDBase):
def __init__(self, device_protocol_version, device_class):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.query,
body_type=device_class)
@property
def _body(self):
return bytearray([0x01])
class MessageNewSet(MessageEDBase):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.set,
body_type=0x15)
self.power = None
self.lock = None
@property
def _body(self):
pack_count = 0
payload = bytearray([0x01, 0x00])
if self.power is not None:
pack_count += 1
payload.extend(
EDNewSetParamPack.pack(
param=NewSetTags.power, # power
value=0x01 if self.power else 0x00
)
)
if self.lock is not None:
pack_count += 1
payload.extend(
EDNewSetParamPack.pack(
param=NewSetTags.lock, # lock
value=0x01 if self.lock else 0x00
)
)
payload[1] = pack_count
return payload
class MessageOldSet(MessageEDBase):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.set,
body_type=None)
@property
def body(self):
return bytearray([])
@property
def _body(self):
return bytearray([])
class EDMessageBody01(MessageBody):
def __init__(self, body):
super().__init__(body)
self.power = (body[2] & 0x01) > 0
self.water_consumption = body[7] + (body[8] << 8)
self.in_tds = body[36] + (body[37] << 8)
self.out_tds = body[38] + (body[39] << 8)
self.child_lock = body[15] > 0
self.filter1 = round((body[25] + (body[26] << 8)) / 24)
self.filter2 = round((body[27] + (body[28] << 8)) / 24)
self.filter3 = round((body[29] + (body[30] << 8)) / 24)
self.life1 = body[16]
self.life2 = body[17]
self.life3 = body[18]
class EDMessageBody03(MessageBody):
def __init__(self, body):
super().__init__(body)
self.power = (body[51] & 0x01) > 0
self.child_lock = (body[51] & 0x08) > 0
self.water_consumption = body[20] + (body[21] << 8)
self.life1 = body[22]
self.life2 = body[23]
self.life3 = body[24]
self.in_tds = body[27] + (body[28] << 8)
self.out_tds = body[29] + (body[30] << 8)
class EDMessageBody05(MessageBody):
def __init__(self, body):
super().__init__(body)
self.power = (body[51] & 0x01) > 0
self.child_lock = (body[51] & 0x08) > 0
self.water_consumption = body[20] + (body[21] << 8)
class EDMessageBody06(MessageBody):
def __init__(self, body):
super().__init__(body)
self.power = (body[51] & 0x01) > 0
self.child_lock = (body[51] & 0x08) > 0
self.water_consumption = body[25] + (body[26] << 8)
class EDMessageBody07(MessageBody):
def __init__(self, body):
super().__init__(body)
self.water_consumption = (body[21] << 8) + body[20]
self.power = (body[51] & 0x01) > 0
self.child_lock = (body[51] & 0x08) > 0
class EDMessageBodyFF(MessageBody):
def __init__(self, body):
super().__init__(body)
data_offset = 2
while True:
length = (body[data_offset + 2] >> 4) + 2
attr = ((body[data_offset + 2] % 16) << 8) + body[data_offset + 1]
if attr == 0x000:
self.child_lock = (body[data_offset + 5] & 0x01) > 0
self.power = (body[data_offset + 6] & 0x01) > 0
elif attr == 0x011:
self.water_consumption = float((body[data_offset + 3] +
(body[data_offset + 4] << 8) +
(body[data_offset + 5] << 16) +
(body[data_offset + 6] << 24))) / 1000
elif attr == 0x013:
self.in_tds = body[data_offset + 3] + (body[data_offset + 4] << 8)
self.out_tds = body[data_offset + 5] + (body[data_offset + 6] << 8)
elif attr == 0x10:
self.life1 = body[data_offset + 3]
self.life2 = body[data_offset + 4]
self.life3 = body[data_offset + 5]
# fix index out of range error
if data_offset + length + 6 > len(body):
break
data_offset += length
class MessageEDResponse(MessageResponse):
def __init__(self, message):
super().__init__(message)
if self._message_type in [MessageType.query, MessageType.notify1]:
self.device_class = self._body_type
if self._body_type in [0x00, 0xFF]:
self.set_body(EDMessageBodyFF(super().body))
if self.body_type == 0x01:
self.set_body(EDMessageBody01(super().body))
elif self.body_type in [0x03, 0x04]:
self.set_body(EDMessageBody03(super().body))
elif self.body_type == 0x05:
self.set_body(EDMessageBody05(super().body))
elif self.body_type == 0x06:
self.set_body(EDMessageBody06(super().body))
elif self.body_type == 0x07:
self.set_body(EDMessageBody07(super().body))
self.set_attr()