Remove pycache

This commit is contained in:
hodasemi 2023-09-22 18:06:37 +02:00
parent 383bcb4383
commit 223b7148af
13 changed files with 1094 additions and 19 deletions

5
.gitignore vendored
View file

@ -1,2 +1,5 @@
/target
Cargo.lock
Cargo.lock
*.pyc
__pycache__/

Binary file not shown.

Binary file not shown.

316
cloud.py Normal file
View file

@ -0,0 +1,316 @@
import logging
import time
import datetime
import json
import base64
from threading import Lock
from aiohttp import ClientSession
from secrets import token_hex
from security import CloudSecurity, MeijuCloudSecurity, MSmartCloudSecurity
_LOGGER = logging.getLogger(__name__)
clouds = {
"美的美居": {
"class_name": "MeijuCloud",
"app_id": "900",
"app_key": "46579c15",
"login_key": "ad0ee21d48a64bf49f4fb583ab76e799",
"iot_key": bytes.fromhex(format(9795516279659324117647275084689641883661667, 'x')).decode(),
"hmac_key": bytes.fromhex(format(117390035944627627450677220413733956185864939010425, 'x')).decode(),
"api_url": "https://mp-prod.smartmidea.net/mas/v5/app/proxy?alias=",
},
"MSmartHome": {
"class_name": "MSmartHomeCloud",
"app_id": "1010",
"app_key": "ac21b9f9cbfe4ca5a88562ef25e2b768",
"iot_key": bytes.fromhex(format(7882822598523843940, 'x')).decode(),
"hmac_key": bytes.fromhex(format(117390035944627627450677220413733956185864939010425, 'x')).decode(),
"api_url": "https://mp-prod.appsmb.com/mas/v5/app/proxy?alias=",
}
}
default_keys = {
99: {
"token": "ee755a84a115703768bcc7c6c13d3d629aa416f1e2fd798beb9f78cbb1381d09"
"1cc245d7b063aad2a900e5b498fbd936c811f5d504b2e656d4f33b3bbc6d1da3",
"key": "ed37bd31558a4b039aaf4e7a7a59aa7a75fd9101682045f69baf45d28380ae5c"
}
}
class MideaCloud:
def __init__(
self,
session: ClientSession,
security: CloudSecurity,
app_key: str,
account: str,
password: str,
api_url: str
):
self._device_id = CloudSecurity.get_deviceid(account)
self._session = session
self._security = security
self._api_lock = Lock()
self._app_key = app_key
self._account = account
self._password = password
self._api_url = api_url
self._access_token = None
self._login_id = None
def _make_general_data(self):
return {}
async def _api_request(self, endpoint: str, data: dict, header=None) -> dict | None:
header = header or {}
if not data.get("reqId"):
data.update({
"reqId": token_hex(16)
})
if not data.get("stamp"):
data.update({
"stamp": datetime.datetime.now().strftime("%Y%m%d%H%M%S")
})
random = str(int(time.time()))
url = self._api_url + endpoint
dump_data = json.dumps(data)
sign = self._security.sign(dump_data, random)
header.update({
"content-type": "application/json; charset=utf-8",
"secretVersion": "1",
"sign": sign,
"random": random,
})
if self._access_token is not None:
header.update({
"accesstoken": self._access_token
})
response:dict = {"code": -1}
for i in range(0, 3):
try:
with self._api_lock:
r = await self._session.request("POST", url, headers=header, data=dump_data, timeout=10)
raw = await r.read()
_LOGGER.debug(f"Midea cloud API url: {url}, data: {data}, response: {raw}")
response = json.loads(raw)
break
except Exception as e:
pass
if int(response["code"]) == 0 and "data" in response:
return response["data"]
print(response)
return None
async def _get_login_id(self) -> str | None:
data = self._make_general_data()
data.update({
"loginAccount": f"{self._account}"
})
if response := await self._api_request(
endpoint="/v1/user/login/id/get",
data=data
):
return response.get("loginId")
return None
async def login(self) -> bool:
raise NotImplementedError()
async def get_keys(self, appliance_id: int):
result = {}
for method in [1, 2]:
udp_id = self._security.get_udp_id(appliance_id, method)
data = self._make_general_data()
data.update({
"udpid": udp_id
})
response = await self._api_request(
endpoint="/v1/iot/secure/getToken",
data=data
)
if response and "tokenlist" in response:
for token in response["tokenlist"]:
if token["udpId"] == udp_id:
result[method] = {
"token": token["token"].lower(),
"key": token["key"].lower()
}
result.update(default_keys)
return result
class MeijuCloud(MideaCloud):
APP_ID = "900"
APP_VERSION = "8.20.0.2"
def __init__(
self,
cloud_name: str,
session: ClientSession,
account: str,
password: str,
):
super().__init__(
session=session,
security=MeijuCloudSecurity(
login_key=clouds[cloud_name].get("login_key"),
iot_key=clouds[cloud_name].get("iot_key"),
hmac_key=clouds[cloud_name].get("hmac_key"),
),
app_key=clouds[cloud_name]["app_key"],
account=account,
password=password,
api_url=clouds[cloud_name]["api_url"]
)
async def login(self) -> bool:
if login_id := await self._get_login_id():
self._login_id = login_id
stamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
data = {
"iotData": {
"clientType": 1,
"deviceId": self._device_id,
"iampwd": self._security.encrypt_iam_password(self._login_id, self._password),
"iotAppId": self.APP_ID,
"loginAccount": self._account,
"password": self._security.encrypt_password(self._login_id, self._password),
"reqId": token_hex(16),
"stamp": stamp
},
"data": {
"appKey": self._app_key,
"deviceId": self._device_id,
"platform": 2
},
"timestamp": stamp,
"stamp": stamp
}
if response := await self._api_request(
endpoint="/mj/user/login",
data=data
):
self._access_token = response["mdata"]["accessToken"]
self._security.set_aes_keys(
self._security.aes_decrypt_with_fixed_key(
response["key"]
), None
)
return True
return False
class MSmartHomeCloud(MideaCloud):
APP_ID = "1010"
SRC = "10"
APP_VERSION = "3.0.2"
def __init__(
self,
cloud_name: str,
session: ClientSession,
account: str,
password: str,
):
super().__init__(
session=session,
security=MSmartCloudSecurity(
login_key=clouds[cloud_name].get("app_key"),
iot_key=clouds[cloud_name].get("iot_key"),
hmac_key=clouds[cloud_name].get("hmac_key"),
),
app_key=clouds[cloud_name]["app_key"],
account=account,
password=password,
api_url=clouds[cloud_name]["api_url"]
)
self._auth_base = base64.b64encode(
f"{self._app_key}:{clouds['MSmartHome']['iot_key']}".encode("ascii")
).decode("ascii")
self._uid = ""
def _make_general_data(self):
return {
"appVersion": self.APP_VERSION,
"src": self.SRC,
"format": "2",
"stamp": datetime.datetime.now().strftime("%Y%m%d%H%M%S"),
"platformId": "1",
"deviceId": self._device_id,
"reqId": token_hex(16),
"uid": self._uid,
"clientType": "1",
"appId": self.APP_ID,
}
async def _api_request(self, endpoint: str, data: dict, header=None) -> dict | None:
header = header or {}
header.update({
"x-recipe-app": self.APP_ID,
"authorization": f"Basic {self._auth_base}"
})
if len(self._uid) > 0:
header.update({
"uid": self._uid
})
return await super()._api_request(endpoint, data, header)
async def _re_route(self):
data = self._make_general_data()
data.update({
"userType": "0",
"userName": f"{self._account}"
})
if response := await self._api_request(
endpoint="/v1/multicloud/platform/user/route",
data=data
):
if api_url := response.get("masUrl"):
self._api_url = api_url
async def login(self) -> bool:
await self._re_route()
if login_id := await self._get_login_id():
self._login_id = login_id
iot_data = self._make_general_data()
iot_data.pop("uid")
stamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
iot_data.update({
"iampwd": self._security.encrypt_iam_password(self._login_id, self._password),
"loginAccount": self._account,
"password": self._security.encrypt_password(self._login_id, self._password),
"stamp": stamp
})
data = {
"iotData": iot_data,
"data": {
"appKey": self._app_key,
"deviceId": self._device_id,
"platform": "2"
},
"stamp": stamp
}
if response := await self._api_request(
endpoint="/mj/user/login",
data=data
):
self._uid = response["uid"]
self._access_token = response["mdata"]["accessToken"]
self._security.set_aes_keys(response["accessToken"], response["randomData"])
return True
return False
def get_midea_cloud(cloud_name: str, session: ClientSession, account: str, password: str) -> MideaCloud | None:
cloud = None
if cloud_name in clouds.keys():
cloud = globals()[clouds[cloud_name]["class_name"]](
cloud_name=cloud_name,
session=session,
account=account,
password=password
)
return cloud

390
device.py Normal file
View file

@ -0,0 +1,390 @@
import threading
try:
from enum import StrEnum
except ImportError:
from ..backports.enum import StrEnum
from enum import IntEnum
from security import LocalSecurity, MSGTYPE_HANDSHAKE_REQUEST, MSGTYPE_ENCRYPTED_REQUEST
from packet_builder import PacketBuilder
from message import MessageType, MessageQuerySubtype, MessageSubtypeResponse, MessageQuestCustom
import socket
import logging
import time
_LOGGER = logging.getLogger(__name__)
class AuthException(Exception):
pass
class ResponseException(Exception):
pass
class RefreshFailed(Exception):
pass
class DeviceAttributes(StrEnum):
pass
class ParseMessageResult(IntEnum):
SUCCESS = 0
PADDING = 1
ERROR = 99
class MiedaDevice(threading.Thread):
def __init__(self,
name: str,
device_id: int,
device_type: int,
ip_address: str,
port: int,
token: str,
key: str,
protocol: int,
model: str,
attributes: dict):
threading.Thread.__init__(self)
self._attributes = attributes if attributes else {}
self._socket = None
self._ip_address = ip_address
self._port = port
self._security = LocalSecurity()
self._token = bytes.fromhex(token) if token else None
self._key = bytes.fromhex(key) if key else None
self._buffer = b""
self._device_name = name
self._device_id = device_id
self._device_type = device_type
self._protocol = protocol
self._model = model
self._updates = []
self._unsupported_protocol = []
self._is_run = False
self._available = True
self._device_protocol_version = 0
self._sub_type = None
self._sn = None
self._refresh_interval = 30
self._heartbeat_interval = 10
self._default_refresh_interval = 30
@property
def name(self):
return self._device_name
@property
def available(self):
return self._available
@property
def device_id(self):
return self._device_id
@property
def device_type(self):
return self._device_type
@property
def model(self):
return self._model
@property
def sub_type(self):
return self._sub_type if self._sub_type else 0
@staticmethod
def fetch_v2_message(msg):
result = []
while len(msg) > 0:
factual_msg_len = len(msg)
if factual_msg_len < 6:
break
alleged_msg_len = msg[4] + (msg[5] << 8)
if factual_msg_len >= alleged_msg_len:
result.append(msg[:alleged_msg_len])
msg = msg[alleged_msg_len:]
else:
break
return result, msg
def connect(self, refresh_status=True):
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(10)
_LOGGER.debug(f"[{self._device_id}] Connecting to {self._ip_address}:{self._port}")
self._socket.connect((self._ip_address, self._port))
_LOGGER.debug(f"[{self._device_id}] Connected")
if self._protocol == 3:
self.authenticate()
_LOGGER.debug(f"[{self._device_id}] Authentication success")
if refresh_status:
self.refresh_status(wait_response=True)
self.enable_device(True)
return True
except socket.timeout:
_LOGGER.debug(f"[{self._device_id}] Connection timed out")
except socket.error:
_LOGGER.debug(f"[{self._device_id}] Connection error")
except AuthException:
_LOGGER.debug(f"[{self._device_id}] Authentication failed")
except ResponseException:
_LOGGER.debug(f"[{self._device_id}] Unexpected response received")
except RefreshFailed:
_LOGGER.debug(f"[{self._device_id}] Refresh status is timed out")
except Exception as e:
_LOGGER.error(f"[{self._device_id}] Unknown error: {e.__traceback__.tb_frame.f_globals['__file__']}, "
f"{e.__traceback__.tb_lineno}, {repr(e)}")
self.enable_device(False)
return False
def authenticate(self):
request = self._security.encode_8370(
self._token, MSGTYPE_HANDSHAKE_REQUEST)
_LOGGER.debug(f"[{self._device_id}] Handshaking")
self._socket.send(request)
response = self._socket.recv(512)
if len(response) < 20:
raise AuthException()
response = response[8: 72]
self._security.tcp_key(response, self._key)
def send_message(self, data):
if self._protocol == 3:
self.send_message_v3(data, msg_type=MSGTYPE_ENCRYPTED_REQUEST)
else:
self.send_message_v2(data)
def send_message_v2(self, data):
if self._socket is not None:
self._socket.send(data)
else:
_LOGGER.debug(f"[{self._device_id}] Send failure, device disconnected, data: {data.hex()}")
def send_message_v3(self, data, msg_type=MSGTYPE_ENCRYPTED_REQUEST):
data = self._security.encode_8370(data, msg_type)
self.send_message_v2(data)
def build_send(self, cmd):
data = cmd.serialize()
_LOGGER.debug(f"[{self._device_id}] Sending: {cmd}")
msg = PacketBuilder(self._device_id, data).finalize()
self.send_message(msg)
def refresh_status(self, wait_response=False):
cmds = self.build_query()
if self._sub_type is None:
cmds = [MessageQuerySubtype(self.device_type)] + cmds
error_count = 0
for cmd in cmds:
if cmd.__class__.__name__ not in self._unsupported_protocol:
self.build_send(cmd)
if wait_response:
try:
while True:
msg = self._socket.recv(512)
if len(msg) == 0:
raise socket.error
result = self.parse_message(msg)
if result == ParseMessageResult.SUCCESS:
break
elif result == ParseMessageResult.PADDING:
continue
else:
raise ResponseException
except socket.timeout:
error_count += 1
self._unsupported_protocol.append(cmd.__class__.__name__)
_LOGGER.debug(f"[{self._device_id}] Does not supports "
f"the protocol {cmd.__class__.__name__}, ignored")
except ResponseException:
error_count += 1
else:
error_count += 1
if error_count == len(cmds):
raise RefreshFailed
def set_subtype(self):
pass
def pre_process_message(self, msg):
if msg[9] == MessageType.querySubtype:
message = MessageSubtypeResponse(msg)
_LOGGER.debug(f"[{self.device_id}] Received: {message}")
self._sub_type = message.sub_type
self.set_subtype()
self._device_protocol_version = message.device_protocol_version
_LOGGER.debug(f"[{self._device_id}] Subtype: {self._sub_type}. "
f"Device protocol version: {self._device_protocol_version}")
return False
return True
def parse_message(self, msg):
if self._protocol == 3:
messages, self._buffer = self._security.decode_8370(self._buffer + msg)
else:
messages, self._buffer = self.fetch_v2_message(self._buffer + msg)
if len(messages) == 0:
return ParseMessageResult.PADDING
for message in messages:
if message == b"ERROR":
return ParseMessageResult.ERROR
payload_len = message[4] + (message[5] << 8) - 56
payload_type = message[2] + (message[3] << 8)
if payload_type in [0x1001, 0x0001]:
# Heartbeat detected
pass
elif len(message) > 56:
cryptographic = message[40:-16]
if payload_len % 16 == 0:
decrypted = self._security.aes_decrypt(cryptographic)
if self.pre_process_message(decrypted):
try:
status = self.process_message(decrypted)
if len(status) > 0:
self.update_all(status)
else:
_LOGGER.debug(f"[{self._device_id}] Unidentified protocol")
except Exception as e:
_LOGGER.error(f"[{self._device_id}] Error in process message, msg = {decrypted.hex()}")
else:
_LOGGER.warning(
f"[{self._device_id}] Illegal payload, "
f"original message = {msg.hex()}, buffer = {self._buffer.hex()}, "
f"8370 decoded = {message.hex()}, payload type = {payload_type}, "
f"alleged payload length = {payload_len}, factual payload length = {len(cryptographic)}"
)
else:
_LOGGER.warning(
f"[{self._device_id}] Illegal message, "
f"original message = {msg.hex()}, buffer = {self._buffer.hex()}, "
f"8370 decoded = {message.hex()}, payload type = {payload_type}, "
f"alleged payload length = {payload_len}, message length = {len(message)}, "
)
return ParseMessageResult.SUCCESS
def build_query(self):
raise NotImplementedError
def process_message(self, msg):
raise NotImplementedError
def send_command(self, cmd_type, cmd_body: bytearray):
cmd = MessageQuestCustom(self._device_type, cmd_type, cmd_body)
try:
self.build_send(cmd)
except socket.error as e:
_LOGGER.debug(f"[{self._device_id}] Interface send_command failure, {repr(e)}, "
f"cmd_type: {cmd_type}, cmd_body: {cmd_body.hex()}")
def send_heartbeat(self):
msg = PacketBuilder(self._device_id, bytearray([0x00])).finalize(msg_type=0)
self.send_message(msg)
def register_update(self, update):
self._updates.append(update)
def update_all(self, status):
_LOGGER.debug(f"[{self._device_id}] Status update: {status}")
for update in self._updates:
update(status)
def enable_device(self, available=True):
self._available = available
status = {"available": available}
self.update_all(status)
def open(self):
if not self._is_run:
self._is_run = True
threading.Thread.start(self)
def close(self):
if self._is_run:
self._is_run = False
self.close_socket()
def close_socket(self):
self._unsupported_protocol = []
self._buffer = b""
if self._socket:
self._socket.close()
self._socket = None
def set_ip_address(self, ip_address):
if self._ip_address != ip_address:
_LOGGER.debug(f"[{self._device_id}] Update IP address to {ip_address}")
self._ip_address = ip_address
self.close_socket()
def set_refresh_interval(self, refresh_interval):
self._refresh_interval = refresh_interval
def run(self):
while self._is_run:
while self._socket is None:
if self.connect(refresh_status=True) is False:
if not self._is_run:
return
self.close_socket()
time.sleep(5)
timeout_counter = 0
start = time.time()
previous_refresh = start
previous_heartbeat = start
self._socket.settimeout(1)
while True:
try:
now = time.time()
if 0 < self._refresh_interval <= now - previous_refresh:
self.refresh_status()
previous_refresh = now
if now - previous_heartbeat >= self._heartbeat_interval:
self.send_heartbeat()
previous_heartbeat = now
msg = self._socket.recv(512)
msg_len = len(msg)
if msg_len == 0:
raise socket.error("Connection closed by peer")
result = self.parse_message(msg)
if result == ParseMessageResult.ERROR:
_LOGGER.debug(f"[{self._device_id}] Message 'ERROR' received")
self.close_socket()
break
elif result == ParseMessageResult.SUCCESS:
timeout_counter = 0
except socket.timeout:
timeout_counter = timeout_counter + 1
if timeout_counter >= 120:
_LOGGER.debug(f"[{self._device_id}] Heartbeat timed out")
self.close_socket()
break
except socket.error as e:
_LOGGER.debug(f"[{self._device_id}] Socket error {repr(e)}")
self.close_socket()
break
except Exception as e:
_LOGGER.error(f"[{self._device_id}] Unknown error :{e.__traceback__.tb_frame.f_globals['__file__']}, "
f"{e.__traceback__.tb_lineno}, {repr(e)}")
self.close_socket()
break
# def set_attribute(self, attr, value):
# raise NotImplementedError
def get_attribute(self, attr):
return self._attributes.get(attr)
def set_customize(self, customize):
pass
@property
def attributes(self):
ret = {}
for status in self._attributes.keys():
ret[str(status)] = self._attributes[status]
return ret

View file

@ -65,10 +65,6 @@ def discover(discover_type=None, ip_address=None):
else:
continue
print(data[20:26])
print(data[20:26].hex())
print(bytearray.fromhex(data[20:26].hex()))
device_id = int.from_bytes(bytearray.fromhex(data[20:26].hex()), "little")
if device_id in found_devices:
continue

266
message.py Normal file
View file

@ -0,0 +1,266 @@
import logging
from abc import ABC
from enum import IntEnum
_LOGGER = logging.getLogger(__name__)
class MessageLenError(Exception):
pass
class MessageBodyError(Exception):
pass
class MessageCheckSumError(Exception):
pass
class MessageType(IntEnum):
set = 0x02,
query = 0x03,
notify1 = 0x04,
notify2 = 0x05,
exception = 0x06,
querySN = 0x07,
exception2 = 0x0A,
querySubtype = 0xA0
class MessageBase(ABC):
HEADER_LENGTH = 10
def __init__(self):
self._device_type = 0x00
self._message_type = 0x00
self._body_type = 0x00
self._device_protocol_version = 0
@staticmethod
def checksum(data):
return (~ sum(data) + 1) & 0xff
# @property
# def header(self):
# raise NotImplementedError
# @property
# def body(self):
# raise NotImplementedError
@property
def message_type(self):
return self._message_type
@message_type.setter
def message_type(self, value):
self._message_type = value
@property
def device_type(self):
return self._device_type
@device_type.setter
def device_type(self, value):
self._device_type = value
@property
def body_type(self):
return self._body_type
@body_type.setter
def body_type(self, value):
self._body_type = value
@property
def device_protocol_version(self):
return self._device_protocol_version
@device_protocol_version.setter
def device_protocol_version(self, value):
self._device_protocol_version = value
def __str__(self) -> str:
output = {
"header": self.header.hex(),
"body": self.body.hex(),
"message type": "%02x" % self._message_type,
"body type": ("%02x" % self._body_type) if self._body_type is not None else "None"
}
return str(output)
class MessageRequest(MessageBase):
def __init__(self, device_protocol_version, device_type, message_type, body_type):
super().__init__()
self.device_protocol_version = device_protocol_version
self.device_type = device_type
self.message_type = message_type
self.body_type = body_type
@property
def header(self):
length = self.HEADER_LENGTH + len(self.body)
return bytearray([
# flag
0xAA,
# length
length,
# device type
self._device_type,
# frame checksum
0x00, # self._device_type ^ length,
# unused
0x00, 0x00,
# frame ID
0x00,
# frame protocol version
0x00,
# device protocol version
self._device_protocol_version,
# frame type
self._message_type
])
@property
def _body(self):
raise NotImplementedError
@property
def body(self):
body = bytearray([])
if self.body_type is not None:
body.append(self.body_type)
if self._body is not None:
body.extend(self._body)
return body
def serialize(self):
stream = self.header + self.body
stream.append(MessageBase.checksum(stream[1:]))
return stream
class MessageQuerySubtype(MessageRequest):
def __init__(self, device_type):
super().__init__(
device_protocol_version=0,
device_type=device_type,
message_type=MessageType.querySubtype,
body_type=0x00)
@property
def _body(self):
return bytearray([0x00] * 18)
class MessageQuestCustom(MessageRequest):
def __init__(self, device_type, cmd_type, cmd_body):
super().__init__(
device_protocol_version=0,
device_type=device_type,
message_type=cmd_type,
body_type=None)
self._cmd_body = cmd_body
@property
def _body(self):
return bytearray([])
@property
def body(self):
return self._cmd_body
class MessageBody:
def __init__(self, body):
self._data = body
@property
def data(self):
return self._data
@property
def body_type(self):
return self._data[0]
@staticmethod
def read_byte(body, byte, default_value=0):
return body[byte] if len(body) > byte else default_value
class NewProtocolMessageBody(MessageBody):
def __init__(self, body, bt):
super().__init__(body)
if bt == 0xb5:
self._pack_len = 4
else:
self._pack_len = 5
@staticmethod
def pack(param, value: bytearray, pack_len=4):
length = len(value)
if pack_len == 4:
stream = bytearray([param & 0xFF, param >> 8, length]) + value
else:
stream = bytearray([param & 0xFF, param >> 8, 0x00, length]) + value
return stream
def parse(self):
result = {}
try:
pos = 2
for pack in range(0, self.data[1]):
param = self.data[pos] + (self.data[pos + 1] << 8)
if self._pack_len == 5:
pos += 1
length = self.data[pos + 2]
if length > 0:
value = self.data[pos + 3: pos + 3 + length]
result[param] = value
pos += (3 + length)
except IndexError:
# Some device used non-standard new-protocol(美的乐享三代中央空调?)
_LOGGER.debug(f"Non-standard new-protocol {self.data.hex()}")
return result
class MessageResponse(MessageBase):
def __init__(self, message):
super().__init__()
if message is None or len(message) < self.HEADER_LENGTH + 1:
raise MessageLenError
self._header = message[:self.HEADER_LENGTH]
self.device_protocol_version = self._header[8]
self.message_type = self._header[-1]
self.device_type = self._header[2]
body = message[self.HEADER_LENGTH: -1]
self._body = MessageBody(body)
self.body_type = self._body.body_type
@property
def header(self):
return self._header
@property
def body(self):
return self._body.data
def set_body(self, body: MessageBody):
self._body = body
def set_attr(self):
for key in vars(self._body).keys():
if key != "data":
value = getattr(self._body, key, None)
setattr(self, key, value)
class MessageSubtypeResponse(MessageResponse):
def __init__(self, message):
super().__init__(message)
if self._message_type == MessageType.querySubtype:
body = message[self.HEADER_LENGTH: -1]
self.sub_type = (body[2] if len(body) > 2 else 0) + ((body[3] << 8) if len(body) > 3 else 0)

View file

@ -1,16 +1,47 @@
# import midea_ac_lan.midea.core as midea_core
# devices = midea_core,discover()
import security;
import cloud;
import aiohttp;
import asyncio;
import discover;
import device;
async def test():
devices = discover.discover()
for device_id in devices:
cl = cloud.MSmartHomeCloud(
"MSmartHome",
aiohttp.ClientSession(),
"michaelh.95@t-online.de",
"Hoda.semi1"
)
secure = security.LocalSecurity()
if await cl.login():
keys = await cl.get_keys(device_id)
result = secure.encode_8370(None, security.MSGTYPE_HANDSHAKE_REQUEST)
for k in keys:
token = keys[k]['token']
key = keys[k]['key']
device_info = devices[device_id]
dev = device.MiedaDevice(
name="",
device_id=device_id,
device_type=225,
ip_address=device_info['ip_address'],
port=device_info['port'],
token=token,
key=key,
protocol=3,
model=device_info['model'],
attributes={}
)
if dev.connect(False):
return dev
dev = asyncio.run(test())
print(dev)
print(result)
print(devices)

60
packet_builder.py Normal file
View file

@ -0,0 +1,60 @@
from security import LocalSecurity
import datetime
class PacketBuilder:
def __init__(self, device_id: int, command):
self.command = None
self.security = LocalSecurity()
# aa20ac00000000000003418100ff03ff000200000000000000000000000006f274
# Init the packet with the header data.
self.packet = bytearray([
# 2 bytes - StaicHeader
0x5a, 0x5a,
# 2 bytes - mMessageType
0x01, 0x11,
# 2 bytes - PacketLenght
0x00, 0x00,
# 2 bytes
0x20, 0x00,
# 4 bytes - MessageId
0x00, 0x00, 0x00, 0x00,
# 8 bytes - Date&Time
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
# 6 bytes - mDeviceID
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
# 12 bytes
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
])
self.packet[12:20] = self.packet_time()
self.packet[20:28] = device_id.to_bytes(8, "little")
self.command = command
def finalize(self, msg_type=1):
if msg_type != 1:
self.packet[3] = 0x10
self.packet[6] = 0x7b
else:
self.packet.extend(self.security.aes_encrypt(self.command))
# PacketLenght
self.packet[4:6] = (len(self.packet) + 16).to_bytes(2, "little")
# Append a basic checksum data(16 bytes) to the packet
self.packet.extend(self.encode32(self.packet))
return self.packet
def encode32(self, data: bytearray):
return self.security.encode32_data(data)
@staticmethod
def checksum(data):
return (~ sum(data) + 1) & 0xff
@staticmethod
def packet_time():
t = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")[
:16]
b = bytearray()
for i in range(0, len(t), 2):
d = int(t[i:i+2])
b.insert(0, d)
return b

7
src/cloud.rs Normal file
View file

@ -0,0 +1,7 @@
pub struct Cloud {
//
}
impl Cloud {
pub const APP_KEY: &str = "ac21b9f9cbfe4ca5a88562ef25e2b768";
}

View file

@ -26,6 +26,7 @@ pub struct Device {
info: DeviceInfo,
socket: UdpSocket,
security: Security,
}
impl Device {
@ -36,7 +37,11 @@ impl Device {
socket.connect(info.addr)?;
let me = Self { info, socket };
let mut me = Self {
info,
socket,
security: Security::default(),
};
if me.info.protocol == 3 {
me.authenticate()?;
@ -47,8 +52,8 @@ impl Device {
Ok(me)
}
fn authenticate(&self) -> Result<()> {
let request = Security::encode_8370(MsgType::HANDSHAKE_REQUEST)?;
fn authenticate(&mut self) -> Result<()> {
let request = self.security.encode_8370(MsgType::HANDSHAKE_REQUEST)?;
Ok(())
}

View file

@ -1,5 +1,6 @@
use std::num::ParseIntError;
mod cloud;
mod discover;
mod security;

View file

@ -27,7 +27,7 @@ impl Security {
const N: u128 = 141661095494369103254425781617665632877;
const KEY: [u8; 16] = Self::N.to_be_bytes();
pub fn decrypt(&self, data: &mut [u8]) -> &[u8] {
pub fn decrypt(data: &mut [u8]) -> &[u8] {
let array = GenericArray::from(Self::KEY);
let cipher = Aes128::new(&array);