136 lines
4.4 KiB
Python
136 lines
4.4 KiB
Python
from ...core.message import (
|
|
MessageType,
|
|
MessageRequest,
|
|
MessageResponse,
|
|
MessageBody,
|
|
)
|
|
|
|
|
|
class MessageE2Base(MessageRequest):
|
|
def __init__(self, device_protocol_version, message_type, body_type):
|
|
super().__init__(
|
|
device_protocol_version=device_protocol_version,
|
|
device_type=0xE2,
|
|
message_type=message_type,
|
|
body_type=body_type
|
|
)
|
|
|
|
@property
|
|
def _body(self):
|
|
raise NotImplementedError
|
|
|
|
|
|
class MessageQuery(MessageE2Base):
|
|
def __init__(self, device_protocol_version):
|
|
super().__init__(
|
|
device_protocol_version=device_protocol_version,
|
|
message_type=MessageType.query,
|
|
body_type=0x01)
|
|
|
|
@property
|
|
def _body(self):
|
|
return bytearray([0x01])
|
|
|
|
|
|
class MessagePower(MessageE2Base):
|
|
def __init__(self, device_protocol_version):
|
|
super().__init__(
|
|
device_protocol_version=device_protocol_version,
|
|
message_type=MessageType.set,
|
|
body_type=0x02)
|
|
self.power = False
|
|
|
|
@property
|
|
def _body(self):
|
|
if self.power:
|
|
self.body_type = 0x01
|
|
else:
|
|
self.body_type = 0x02
|
|
return bytearray([0x01])
|
|
|
|
|
|
class MessageNewProtocolSet(MessageE2Base):
|
|
def __init__(self, device_protocol_version):
|
|
super().__init__(
|
|
device_protocol_version=device_protocol_version,
|
|
message_type=MessageType.set,
|
|
body_type=0x14)
|
|
self.target_temperature = None
|
|
self.variable_heating = None
|
|
self.whole_tank_heating = None
|
|
|
|
@property
|
|
def _body(self):
|
|
byte1 = 0x00
|
|
byte2 = 0x00
|
|
if self.target_temperature is not None:
|
|
byte1 = 0x07
|
|
byte2 = int(self.target_temperature) & 0xFF
|
|
elif self.whole_tank_heating is not None:
|
|
byte1 = 0x04
|
|
byte2 = 0x02 if self.whole_tank_heating else 0x01
|
|
elif self.variable_heating is not None:
|
|
byte1 = 0x10
|
|
byte2 = 0x01 if self.variable_heating else 0x00
|
|
return bytearray([byte1, byte2])
|
|
|
|
|
|
class MessageSet(MessageE2Base):
|
|
def __init__(self, device_protocol_version):
|
|
super().__init__(
|
|
device_protocol_version=device_protocol_version,
|
|
message_type=MessageType.set,
|
|
body_type=0x04)
|
|
self.target_temperature = 0
|
|
self.variable_heating = False
|
|
self.whole_tank_heating = False
|
|
self.protection = False
|
|
|
|
@property
|
|
def _body(self):
|
|
# Byte 4 whole_tank_heating, protection
|
|
protection = 0x04 if self.protection else 0x00
|
|
whole_tank_heating = 0x02 if self.whole_tank_heating else 0x01
|
|
# Byte 5 target_temperature
|
|
target_temperature = self.target_temperature & 0xFF
|
|
# Byte 9 variable_heating
|
|
variable_heating = 0x10 if self.variable_heating else 0x00
|
|
return bytearray([
|
|
0x01,
|
|
0x00,
|
|
0x80,
|
|
whole_tank_heating | protection,
|
|
target_temperature,
|
|
0x00, 0x00, 0x00,
|
|
variable_heating,
|
|
0x00, 0x00, 0x00, 0x00,
|
|
0x00, 0x00, 0x00, 0x00,
|
|
0x00
|
|
])
|
|
|
|
|
|
class E2GeneralMessageBody(MessageBody):
|
|
def __init__(self, body):
|
|
super().__init__(body)
|
|
self.power = (body[2] & 0x01) > 0
|
|
self.heating = (body[2] & 0x04) > 0
|
|
self.keep_warm = (body[2] & 0x08) > 0
|
|
self.variable_heating = (body[2] & 0x80) > 0
|
|
self.current_temperature = body[4]
|
|
self.whole_tank_heating = (body[7] & 0x08) > 0
|
|
self.heating_time_remaining = body[9] * 60 + body[10]
|
|
self.target_temperature = body[11]
|
|
self.protection = ((body[22] & 0x02) > 0) if len(body) > 22 else False
|
|
if len(body) > 25:
|
|
self.water_consumption = body[24] + (body[25] << 8)
|
|
if len(body) > 34:
|
|
self.heating_power = body[34] * 100
|
|
|
|
|
|
class MessageE2Response(MessageResponse):
|
|
def __init__(self, message):
|
|
super().__init__(message)
|
|
if (self.message_type in [MessageType.query, MessageType.notify1] and self.body_type == 0x01) or \
|
|
(self.message_type == MessageType.set and self.body_type in [0x01, 0x02, 0x04, 0x14]):
|
|
self.set_body(E2GeneralMessageBody(super().body))
|
|
self.set_attr()
|