146 lines
4.4 KiB
Python
146 lines
4.4 KiB
Python
|
from ...core.message import (
|
||
|
MessageType,
|
||
|
MessageRequest,
|
||
|
MessageResponse,
|
||
|
MessageBody,
|
||
|
)
|
||
|
|
||
|
|
||
|
NEW_PROTOCOL_PARAMS = {
|
||
|
"zero_cold_water": 0x03,
|
||
|
# "zero_cold_master": 0x12,
|
||
|
"zero_cold_pulse": 0x04,
|
||
|
"smart_volume": 0x07,
|
||
|
"target_temperature": 0x08
|
||
|
}
|
||
|
|
||
|
|
||
|
class MessageE3Base(MessageRequest):
|
||
|
def __init__(self, device_protocol_version, message_type, body_type):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
device_type=0xE3,
|
||
|
message_type=message_type,
|
||
|
body_type=body_type
|
||
|
)
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
raise NotImplementedError
|
||
|
|
||
|
|
||
|
class MessageQuery(MessageE3Base):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.query,
|
||
|
body_type=0x01)
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
return bytearray([0x01])
|
||
|
|
||
|
|
||
|
class MessagePower(MessageE3Base):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.set,
|
||
|
body_type=0x02)
|
||
|
self.power = False
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
if self.power:
|
||
|
self.body_type = 0x01
|
||
|
else:
|
||
|
self.body_type = 0x02
|
||
|
return bytearray([0x01])
|
||
|
|
||
|
|
||
|
class MessageSet(MessageE3Base):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.set,
|
||
|
body_type=0x04)
|
||
|
|
||
|
self.target_temperature = 0
|
||
|
self.zero_cold_water = False
|
||
|
self.bathtub_volume = 0
|
||
|
self.protection = False
|
||
|
self.zero_cold_pulse = False
|
||
|
self.smart_volume = False
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
# Byte 2 zero_cold_water mode
|
||
|
zero_cold_water = 0x01 if self.zero_cold_water else 0x00
|
||
|
# Byte 3
|
||
|
protection = 0x08 if self.protection else 0x00
|
||
|
zero_cold_pulse = 0x10 if self.zero_cold_pulse else 0x00
|
||
|
smart_volume = 0x20 if self.smart_volume else 0x00
|
||
|
# Byte 5 target_temperature
|
||
|
target_temperature = self.target_temperature & 0xFF
|
||
|
|
||
|
return bytearray([
|
||
|
0x01,
|
||
|
zero_cold_water | 0x02,
|
||
|
protection | zero_cold_pulse | smart_volume,
|
||
|
0x00,
|
||
|
target_temperature,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00,
|
||
|
|
||
|
])
|
||
|
|
||
|
|
||
|
class MessageNewProtocolSet(MessageE3Base):
|
||
|
def __init__(self, device_protocol_version):
|
||
|
super().__init__(
|
||
|
device_protocol_version=device_protocol_version,
|
||
|
message_type=MessageType.set,
|
||
|
body_type=0x14)
|
||
|
self.key = None
|
||
|
self.value = None
|
||
|
|
||
|
@property
|
||
|
def _body(self):
|
||
|
key = NEW_PROTOCOL_PARAMS.get(self.key)
|
||
|
if self.key == "target_temperature":
|
||
|
value = self.value
|
||
|
else:
|
||
|
value = 0x01 if self.value else 0x00
|
||
|
return bytearray([
|
||
|
key, value,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00, 0x00, 0x00, 0x00,
|
||
|
0x00
|
||
|
])
|
||
|
|
||
|
|
||
|
class E3GeneralMessageBody(MessageBody):
|
||
|
def __init__(self, body):
|
||
|
super().__init__(body)
|
||
|
self.power = (body[2] & 0x01) > 0
|
||
|
self.burning_state = (body[2] & 0x02) > 0
|
||
|
self.zero_cold_water = (body[2] & 0x04) > 0
|
||
|
self.current_temperature = body[5]
|
||
|
self.target_temperature = body[6]
|
||
|
self.protection = (body[8] & 0x08) > 0
|
||
|
self.zero_cold_pulse = (body[20] & 0x01) > 0 if len(body) > 20 else False
|
||
|
self.smart_volume = (body[20] & 0x02) > 0 if len(body) > 20 else False
|
||
|
|
||
|
|
||
|
class MessageE3Response(MessageResponse):
|
||
|
def __init__(self, message):
|
||
|
super().__init__(message)
|
||
|
if (self.message_type == MessageType.query and self.body_type == 0x01) or \
|
||
|
(self.message_type == MessageType.set and self.body_type in [0x01, 0x02, 0x04, 0x14]) or \
|
||
|
(self.message_type == MessageType.notify1 and self.body_type in [0x00, 0x01]):
|
||
|
self.set_body(E3GeneralMessageBody(super().body))
|
||
|
self.set_attr()
|