use std::{ io::{Read, Write}, net::TcpStream, sync::Mutex, thread, time::Duration, }; use anyhow::{bail, Context, Error, Result}; use serde_json::to_string; use crate::{ command::{CommandQuerySubtype, CommandRequest, CommandSubtypeResponse, MessageType}, devices::{e1::E1, DeviceBackend}, hex, packet_builder::PacketBuilder, security::{MsgType, Security}, DeviceInfo, }; enum ParseMessage { Success, Padding, } pub struct Device { info: DeviceInfo, socket: Mutex, security: Security, device_backend: Box, buffer: Vec, sub_type: u16, device_protocol_version: u8, updates: Vec Result<()>>>, token: [u8; 64], key: [u8; 32], } impl Device { pub fn connect(info: DeviceInfo, token: &str, key: &str) -> Result { let mut socket = Err(Error::msg("")); for _ in 0..10 { socket = TcpStream::connect(info.addr).context(info.addr); if socket.is_ok() { break; } thread::sleep(Duration::from_millis(500)); } let socket = socket?; socket.set_write_timeout(Some(Duration::from_secs(10)))?; socket.set_read_timeout(Some(Duration::from_secs(10)))?; let mut me = Self { device_backend: Box::new(match info.device_type { 0xE1 => E1::new()?, _ => bail!("unsupported device type: {:02X}", info.device_type), }), info, socket: Mutex::new(socket), security: Security::default(), buffer: Vec::new(), sub_type: 0, device_protocol_version: 0, updates: Vec::new(), token: hex(token)?.try_into().unwrap(), key: hex(key)?.try_into().unwrap(), }; if me.info.protocol == 3 { me.authenticate()?; } me.refresh_status()?; Ok(me) } fn authenticate(&mut self) -> Result<()> { let request = self .security .encode_8370(&self.token, MsgType::HANDSHAKE_REQUEST)?; let mut socket = self.socket.lock().unwrap(); socket.write(&request)?; let mut buffer = [0; 512]; let bytes_read = socket.read(&mut buffer)?; if bytes_read < 20 { bail!( "Authentication failed! (answer too short) {:?}", &buffer[..bytes_read] ); } self.security.tcp_key(&buffer[8..72], &self.key)?; Ok(()) } pub fn refresh_status(&mut self) -> Result<()> { let mut cmds = vec![self.device_backend.build_query()]; if self.sub_type == 0 { cmds.insert(0, CommandQuerySubtype::new(self.info.device_type).request()); } for cmd in cmds { self.build_send(cmd)?; loop { let mut buf = [0; 512]; let bytes_read = self.socket.lock().unwrap().read(&mut buf)?; if bytes_read == 0 { bail!("socket error"); } match self.parse_message(&buf[..bytes_read])? { ParseMessage::Success => break, ParseMessage::Padding => continue, } } } Ok(()) } pub fn register_update(&mut self, f: F) where F: Fn(&[u8]) -> Result<()> + 'static, { self.updates.push(Box::new(f)); } fn parse_message(&mut self, msg: &[u8]) -> Result { let (messages, buffer) = self.security.decode_8370(&self.buffer, msg)?; self.buffer = buffer; if messages.is_empty() { return Ok(ParseMessage::Padding); } for mut message in messages { if message == b"ERROR" { bail!("parse message error"); } let payload_len = message[4] as u16 + ((message[5] as u16) << 8) - 56; let payload_type = message[2] as u16 + ((message[3] as u16) << 8); // heartbeat if payload_type == 0x1001 || payload_type == 0x0001 { continue; } if message.len() > 56 && payload_len % 16 == 0 { let len = message.len(); let crypt = &mut message[40..len - 16]; Security::aes_decrypt(crypt); if self.pre_process_message(crypt) { let status = self.device_backend.process_message(crypt)?; if status.len() > 0 { for update in self.updates.iter() { update(&status)?; } } } } } Ok(ParseMessage::Success) } fn pre_process_message(&mut self, msg: &[u8]) -> bool { if msg[9] == MessageType::QuerySubtype as u8 { let message = CommandSubtypeResponse::new(msg); self.sub_type = message.sub_type; self.device_protocol_version = message.header().device_protocol_version(); false } else { true } } fn send_message(&self, msg: &[u8]) -> Result<()> { let data = self.security.encode_8370(msg, MsgType::ENCRYPTED_REQUEST)?; self.socket.lock().unwrap().write(&data)?; Ok(()) } fn send_heartbeat(&self) -> Result<()> { let msg = PacketBuilder::builder(self.info.id, &[0x00]).finalize(0); self.send_message(&msg) } fn build_send(&self, cmd: CommandRequest) -> Result<()> { let data = PacketBuilder::builder(self.info.id, to_string(&cmd.serialize())?.as_bytes()) .finalize(1); self.send_message(&data) } } #[cfg(test)] mod test { use anyhow::{Context, Result}; use futures::future::try_join; use serial_test::serial; use crate::{device::Device, Cloud, Startup}; #[tokio::test] async fn verify_hex() -> Result<()> { let devices = Startup::discover().await?; const PY_TOKEN: &str = "06df24fc4e8e950c6d9783051b8e38d971e5fbc617da259459d30d5e7d7fc05b4ccb708fe3a085f6f0af0f8cc961fa39dabfd0746f7bbcfbf7404d9cc5c2b077"; const PY_KEY: &str = "2a5b5200c2c04d4c811d0550e1dc5b31435436b95b774d2a88d7e46d61fd9669"; let token_hex = b"\x06\xdf$\xfcN\x8e\x95\x0cm\x97\x83\x05\x1b\x8e8\xd9q\xe5\xfb\xc6\x17\xda%\x94Y\xd3\r^}\x7f\xc0[L\xcbp\x8f\xe3\xa0\x85\xf6\xf0\xaf\x0f\x8c\xc9a\xfa9\xda\xbf\xd0to{\xbc\xfb\xf7@M\x9c\xc5\xc2\xb0w"; let key_hex = b"*[R\x00\xc2\xc0ML\x81\x1d\x05P\xe1\xdc[1CT6\xb9[wM*\x88\xd7\xe4ma\xfd\x96i"; for device_info in devices { let device = Device::connect(device_info, PY_TOKEN, PY_KEY)?; assert_eq!(&device.token, token_hex); assert_eq!(&device.key, key_hex); } Ok(()) } #[tokio::test] async fn connect_py_token() -> Result<()> { let devices = Startup::discover().await?; const PY_TOKEN: &str = "18a821cb88293c6552dc576f0672d8b9445205f74b636764929de5e8badfa48a24caa9d741f632a18e1a9fee67c40b0b40edc21ac7c4c40b6352181cd4000203"; const PY_KEY: &str = "0fc0c56ea8124414a362e6449ee45ba92558a54f159d4937af697e405f2326b9"; for device_info in devices { Device::connect(device_info, PY_TOKEN, PY_KEY)?; } Ok(()) } #[tokio::test] #[serial] async fn full_flow() -> Result<()> { let mut cloud = Cloud::new("michaelh.95@t-online.de", "Hoda.semi1")?; let (_, devices) = try_join(cloud.login(), Startup::discover()).await?; for device_info in devices { let (token, key) = cloud.keys(device_info.id).await?; Device::connect(device_info, &token, &key) .context(format!("\ntoken: {token}\nkey: {key}"))?; } Ok(()) } }