Start building commands

This commit is contained in:
hodasemi 2023-09-26 19:42:50 +02:00
parent e0befff868
commit c184fc603e
7 changed files with 376 additions and 14 deletions

163
devicee1.py Normal file
View file

@ -0,0 +1,163 @@
import logging
from messagee1 import (
MessageQuery,
MessagePower,
MessageStorage,
MessageLock,
MessageE1Response
)
from enum import StrEnum
from device import MiedaDevice
_LOGGER = logging.getLogger(__name__)
class DeviceAttributes(StrEnum):
power = "power"
status = "status"
mode = "mode"
additional = "additional"
door = "door"
rinse_aid = "rinse_aid"
salt = "salt"
child_lock = "child_lock"
uv = "uv"
dry = "dry"
dry_status = "dry_status"
storage = "storage"
storage_status = "storage_status"
time_remaining = "time_remaining"
progress = "progress"
storage_remaining = "storage_remaining"
temperature = "temperature"
humidity = "humidity"
waterswitch = "waterswitch"
water_lack = "water_lack"
error_code = "error_code"
softwater = "softwater"
wrong_operation = "wrong_operation"
bright = "bright"
class MideaE1Device(MiedaDevice):
def __init__(
self,
name: str,
device_id: int,
ip_address: str,
port: int,
token: str,
key: str,
protocol: int,
model: str,
customize: str
):
super().__init__(
name=name,
device_id=device_id,
device_type=0xE1,
ip_address=ip_address,
port=port,
token=token,
key=key,
protocol=protocol,
model=model,
attributes={
DeviceAttributes.power: False,
DeviceAttributes.status: None,
DeviceAttributes.mode: 0,
DeviceAttributes.additional: 0,
DeviceAttributes.uv: False,
DeviceAttributes.dry: False,
DeviceAttributes.dry_status: False,
DeviceAttributes.door: False,
DeviceAttributes.rinse_aid: False,
DeviceAttributes.salt: False,
DeviceAttributes.child_lock: False,
DeviceAttributes.storage: False,
DeviceAttributes.storage_status: False,
DeviceAttributes.time_remaining: None,
DeviceAttributes.progress: None,
DeviceAttributes.storage_remaining: None,
DeviceAttributes.temperature: None,
DeviceAttributes.humidity: None,
DeviceAttributes.waterswitch: False,
DeviceAttributes.water_lack: False,
DeviceAttributes.error_code: None,
DeviceAttributes.softwater: 0,
DeviceAttributes.wrong_operation: None,
DeviceAttributes.bright: 0
})
self._modes = {
0x0: "Neutral Gear", # BYTE_MODE_NEUTRAL_GEAR
0x1: "Auto", # BYTE_MODE_AUTO_WASH
0x2: "Heavy", # BYTE_MODE_STRONG_WASH
0x3: "Normal", # BYTE_MODE_STANDARD_WASH
0x4: "Energy Saving", # BYTE_MODE_ECO_WASH
0x5: "Delicate", # BYTE_MODE_GLASS_WASH
0x6: "Hour", # BYTE_MODE_HOUR_WASH
0x7: "Quick", # BYTE_MODE_FAST_WASH
0x8: "Rinse", # BYTE_MODE_SOAK_WASH
0x9: "90min", # BYTE_MODE_90MIN_WASH
0xA: "Self Clean", # BYTE_MODE_SELF_CLEAN
0xB: "Fruit Wash", # BYTE_MODE_FRUIT_WASH
0xC: "Self Define", # BYTE_MODE_SELF_DEFINE
0xD: "Germ", # BYTE_MODE_GERM ???
0xE: "Bowl Wash", # BYTE_MODE_BOWL_WASH
0xF: "Kill Germ", # BYTE_MODE_KILL_GERM
0x10: "Sea Food Wash", # BYTE_MODE_SEA_FOOD_WASH
0x12: "Hot Pot Wash", # BYTE_MODE_HOT_POT_WASH
0x13: "Quiet", # BYTE_MODE_QUIET_NIGHT_WASH
0x14: "Less Wash", # BYTE_MODE_LESS_WASH
0x16: "Oil Net Wash", # BYTE_MODE_OIL_NET_WASH
0x19: "Cloud Wash" # BYTE_MODE_CLOUD_WASH
}
self._status = ["Off", "Idle", "Delay", "Running", "Error"]
self._progress = ["Idle", "Pre-wash", "Wash", "Rinse", "Dry", "Complete"]
def build_query(self):
return [MessageQuery(self._device_protocol_version)]
def process_message(self, msg):
message = MessageE1Response(msg)
_LOGGER.debug(f"[{self.device_id}] Received: {message}")
new_status = {}
for status in self._attributes.keys():
if hasattr(message, str(status)):
if status == DeviceAttributes.status:
v = getattr(message, str(status))
if v < len(self._status):
self._attributes[status] = self._status[v]
else:
self._attributes[status] = None
elif status == DeviceAttributes.progress:
v = getattr(message, str(status))
if v < len(self._progress):
self._attributes[status] = self._progress[v]
else:
self._attributes[status] = None
elif status == DeviceAttributes.mode:
v = getattr(message, str(status))
self._attributes[status] = self._modes[v]
else:
self._attributes[status] = getattr(message, str(status))
new_status[str(status)] = self._attributes[status]
return new_status
def set_attribute(self, attr, value):
if attr == DeviceAttributes.power:
message = MessagePower(self._device_protocol_version)
message.power = value
self.build_send(message)
elif attr == DeviceAttributes.child_lock:
message = MessageLock(self._device_protocol_version)
message.lock = value
self.build_send(message)
elif attr == DeviceAttributes.storage:
message = MessageStorage(self._device_protocol_version)
message.storage = value
self.build_send(message)
class MideaAppliance(MideaE1Device):
pass

34
enum.py Normal file
View file

@ -0,0 +1,34 @@
"""Enum backports from standard lib."""
from __future__ import annotations
from enum import Enum
from typing import Any, TypeVar
_StrEnumSelfT = TypeVar("_StrEnumSelfT", bound="StrEnum")
class StrEnum(str, Enum):
"""Partial backport of Python 3.11's StrEnum for our basic use cases."""
def __new__(
cls: type[_StrEnumSelfT], value: str, *args: Any, **kwargs: Any
) -> _StrEnumSelfT:
"""Create a new StrEnum instance."""
if not isinstance(value, str):
raise TypeError(f"{value!r} is not a string")
return super().__new__(cls, value, *args, **kwargs)
def __str__(self) -> str:
"""Return self.value."""
return str(self.value)
@staticmethod
def _generate_next_value_(
name: str, start: int, count: int, last_values: list[Any]
) -> Any:
"""
Make `auto()` explicitly unsupported.
We may revisit this when it's very clear that Python 3.11's
`StrEnum.auto()` behavior will no longer change.
"""
raise TypeError("auto() is not supported by this implementation")

121
messagee1.py Normal file
View file

@ -0,0 +1,121 @@
from message import (
MessageType,
MessageRequest,
MessageResponse,
MessageBody,
)
class MessageE1Base(MessageRequest):
def __init__(self, device_protocol_version, message_type, body_type):
super().__init__(
device_protocol_version=device_protocol_version,
device_type=0xE1,
message_type=message_type,
body_type=body_type
)
@property
def _body(self):
raise NotImplementedError
class MessagePower(MessageE1Base):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.set,
body_type=0x08)
self.power = False
@property
def _body(self):
power = 0x01 if self.power else 0x00
return bytearray([
power,
0x00, 0x00, 0x00
])
class MessageLock(MessageE1Base):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.set,
body_type=0x83)
self.lock = False
@property
def _body(self):
lock = 0x03 if self.lock else 0x04
return bytearray([lock]) + bytearray([0x00] * 36)
class MessageStorage(MessageE1Base):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.set,
body_type=0x81)
self.storage = False
@property
def _body(self):
storage = 0x01 if self.storage else 0x00
return bytearray([0x00, 0x00, 0x00, storage]) + \
bytearray([0xff] * 6) + bytearray([0x00] * 27)
class MessageQuery(MessageE1Base):
def __init__(self, device_protocol_version):
super().__init__(
device_protocol_version=device_protocol_version,
message_type=MessageType.query,
body_type=0x00)
@property
def _body(self):
return bytearray([])
class E1GeneralMessageBody(MessageBody):
def __init__(self, body):
super().__init__(body)
self.power = body[1] > 0
self.status = body[1]
self.mode = body[2]
self.additional = body[3]
self.door = (body[5] & 0x01) == 0 # 0 - open, 1 - close
self.rinse_aid = (body[5] & 0x02) > 0 # 0 - enough, 1 - shortage
self.salt = (body[5] & 0x04) > 0 # 0 - enough, 1 - shortage
start_pause = (body[5] & 0x08) > 0
if start_pause:
self.start = True
elif self.status in [2, 3]:
self.start = False
self.child_lock = (body[5] & 0x10) > 0
self.uv = (body[4] & 0x2) > 0
self.dry = (body[4] & 0x10) > 0
self.dry_status = (body[4] & 0x20) > 0
self.storage = (body[5] & 0x20) > 0
self.storage_status = (body[5] & 0x40) > 0
self.time_remaining = body[6]
self.progress = body[9]
self.storage_remaining = body[18] if len(body) > 18 else False
self.temperature = body[11]
self.humidity = body[33] if len(body) > 33 else None
self.waterswitch = (body[4] & 0x4) > 0
self.water_lack = (body[5] & 0x80) > 0
self.error_code = body[10]
self.softwater = body[13]
self.wrong_operation = body[16]
self.bright = body[24] if len(body) > 24 else None
class MessageE1Response(MessageResponse):
def __init__(self, message):
super().__init__(message)
if (self.message_type == MessageType.set and 0 <= self.body_type <= 7) or \
(self.message_type in [MessageType.query, MessageType.notify1] and self.body_type == 0):
self.set_body(E1GeneralMessageBody(super().body))
self.set_attr()

View file

@ -4,6 +4,7 @@ import aiohttp;
import asyncio; import asyncio;
import discover; import discover;
import device; import device;
import devicee1;
async def test(): async def test():
cl = cloud.MSmartHomeCloud( cl = cloud.MSmartHomeCloud(
@ -52,17 +53,16 @@ async def test():
device_info = devices[device_id] device_info = devices[device_id]
dev = device.MiedaDevice( dev = devicee1.MideaE1Device(
name="", name="",
device_id=device_id, device_id=device_id,
device_type=225,
ip_address=device_info['ip_address'], ip_address=device_info['ip_address'],
port=device_info['port'], port=device_info['port'],
token=token, token=token,
key=key, key=key,
protocol=3, protocol=3,
model=device_info['model'], model=device_info['model'],
attributes={} customize=""
) )
if dev.connect(True): if dev.connect(True):

View file

@ -1,21 +1,65 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[repr(u8)]
#[derive(Debug, Clone, Serialize, Deserialize)]
enum MessageType {
Query,
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Command { pub struct Command {
// device_protocol_version: u8,
device_type: u8,
message_type: MessageType,
body_type: u8,
} }
impl Command { impl Command {
pub fn sub_type(device_type: u32) -> Self { pub fn sub_type(device_type: u32) -> Self {
Self {} // Self {}
todo!()
}
pub fn query(device_type: u8, device_protocol_version: u8) -> Self {
Self {
device_protocol_version,
device_type,
message_type: MessageType::Query,
body_type: 0x00,
}
} }
} }
#[derive(Debug, Clone, Serialize, Deserialize)] impl Command {
pub struct CommandResponse { const HEADER_LENGTH: u8 = 10;
//
pub fn header(&self) -> Vec<u8> {
let length = Self::HEADER_LENGTH + self.body().len() as u8;
vec![
// flag
0xAA,
// length
length,
// device type
self.device_type,
// frame checksum
0x00, // self._device_type ^ length,
// unused
0x00,
0x00,
// frame ID
0x00,
// frame protocol version
0x00,
// device protocol version
self.device_protocol_version,
// frame type
self.message_type as u8,
]
} }
impl CommandResponse { pub fn body(&self) -> Vec<u8> {
// todo!()
}
} }

View file

@ -191,8 +191,8 @@ impl E1 {
} }
impl DeviceBackend for E1 { impl DeviceBackend for E1 {
fn build_query(&self) -> Result<Vec<Command>> { fn build_query(&self, device_protocol_version: u32) -> Result<Vec<Command>> {
todo!() Ok(vec![Command::query(device_protocol_version)])
} }
fn process_message(&self, msg: &[u8]) -> Result<Vec<u8>> { fn process_message(&self, msg: &[u8]) -> Result<Vec<u8>> {

View file

@ -3,7 +3,7 @@ use crate::command::Command;
pub mod e1; pub mod e1;
pub trait DeviceBackend { pub trait DeviceBackend {
fn build_query(&self) -> anyhow::Result<Vec<Command>>; fn build_query(&self, device_protocol_version: u32) -> anyhow::Result<Vec<Command>>;
fn process_message(&self, msg: &[u8]) -> anyhow::Result<Vec<u8>>; fn process_message(&self, msg: &[u8]) -> anyhow::Result<Vec<u8>>;
fn set_attribute(&self, attribute: &str, value: &str) -> (); fn set_attribute(&self, attribute: &str, value: &str) -> ();
} }