use actix_web::{ get, post, web::{Data, Json, Path}, Error, HttpRequest, Responder, ResponseError, }; use chrono::{Datelike, NaiveDateTime, Timelike}; use serde::Serialize; use serde_json::to_string; use crate::{ action::{Action, ActionSet, ActionType}, db::DataBase, task_scheduler::Scheduler, tasmota::Tasmota, temperature::{Thermometer, ThermometerChange}, }; use std::{ collections::HashMap, fmt::{Display, Formatter, Result as FmtResult}, sync::{Arc, Mutex}, }; #[derive(Serialize)] struct DeviceState { name: String, power: bool, led: bool, power_draw: f32, } #[derive(Debug)] struct MyError { msg: String, } impl Display for MyError { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { write!(f, "Error: {}", self.msg) } } impl ResponseError for MyError {} impl From for MyError { fn from(value: anyhow::Error) -> Self { MyError { msg: value.to_string(), } } } enum Resolution { Raw, Hourly, Daily, Monthly, } impl Resolution { fn from_str(s: &str) -> anyhow::Result { Ok(match s { "raw" | "Raw" => Self::Raw, "hourly" | "Hourly" => Self::Hourly, "daily" | "Daily" => Self::Daily, "monthly" | "Monthly" => Self::Monthly, _ => anyhow::bail!("failed to parse {s}"), }) } } #[get("/devices")] async fn device_query( db: Data>>, ) -> Result { db.lock() .unwrap() .devices() .map_err(|err| { println!("{err:?}"); MyError::from(err) })? .to_json() .map(|json| Json(json)) .map_err(|err| { println!("{err:?}"); MyError::from(err) }) } #[post("/device_name/{device}/{name}")] async fn change_device_name( param: Path<(String, String)>, db: Data>>, ) -> Result { let (device, name) = param.into_inner(); db.lock() .unwrap() .change_device_name(&device, &name) .map_err(|err| MyError::from(err))?; return Ok("Ok"); } async fn tasmota_info(tasmota: &Tasmota) -> anyhow::Result { let led = tasmota.led_state().await?; let power = tasmota.power_state().await?; let power_draw = tasmota.read_power_usage().await?; Ok(to_string(&DeviceState { name: tasmota.name().to_string(), power, led, power_draw, })?) } #[get("/plug_state/{plug}")] async fn plug_state( plug: Path, plugs: Data>, ) -> Result { let plug_name = plug.into_inner(); if let Some(tasmota) = plugs.iter().find(|tasmota| tasmota.name() == plug_name) { return Ok(tasmota_info(tasmota) .await .map(|s| Json(s)) .map_err(|err| MyError::from(err))?); } Err(MyError { msg: format!("plug ({plug_name}) not found"), }) } #[post("/plug/{plug}/{action}")] async fn change_plug_state( param: Path<(String, String)>, plugs: Data>, ) -> Result { let (plug_name, action_type) = param.into_inner(); if let Some(tasmota) = plugs.iter().find(|tasmota| tasmota.name() == plug_name) { match action_type.as_str() { "led_on" => tasmota .turn_on_led() .await .map_err(|err| MyError::from(err))?, "led_off" => tasmota .turn_off_led() .await .map_err(|err| MyError::from(err))?, "power_on" => tasmota .switch_on() .await .map_err(|err| MyError::from(err))?, "power_off" => tasmota .switch_off() .await .map_err(|err| MyError::from(err))?, _ => (), } return Ok("Ok"); } Err(MyError { msg: format!("plug ({plug_name}) not found"), }) } #[get("/plug_data/{plug}")] async fn plug_data( param: Path, db: Data>>, ) -> Result { let plug = param.into_inner(); let data = db .lock() .unwrap() .read(&plug) .map_err(|err| MyError::from(err))?; Ok(Json(to_string(&data)?)) } #[get("/plug_data/{plug}/{start_time}/{end_time}/{resolution}")] async fn plug_data_range( param: Path<(String, u64, u64, String)>, db: Data>>, ) -> Result { let (plug, start, end, resolution) = param.into_inner(); let data = db .lock() .unwrap() .read_range(&plug, start, end) .map_err(|err| MyError::from(err))?; Ok(Json(to_string( &match Resolution::from_str(&resolution).map_err(|err| MyError::from(err))? { Resolution::Raw => data, Resolution::Hourly => collapse_data(data, |datetime| { datetime.with_minute(0).unwrap().with_second(0).unwrap() }), Resolution::Daily => collapse_data(data, |datetime| { datetime .with_hour(0) .unwrap() .with_minute(0) .unwrap() .with_second(0) .unwrap() }), Resolution::Monthly => collapse_data(data, |datetime| { datetime .with_day(1) .unwrap() .with_hour(0) .unwrap() .with_minute(0) .unwrap() .with_second(0) .unwrap() }), }, )?)) } #[get("/push_temp/{temperature}")] async fn push_temperature( param: Path, req: HttpRequest, db: Data>>, scheduler: Data, ) -> Result { if let Some(val) = req.peer_addr() { Thermometer::push_change( ThermometerChange::Temperature(param.into_inner()), val.ip(), db, scheduler, ) .map_err(|err| MyError::from(err))?; } Ok("Ok") } #[get("/push_humid/{humidity}")] async fn push_humidity( param: Path, req: HttpRequest, db: Data>>, scheduler: Data, ) -> Result { if let Some(val) = req.peer_addr() { Thermometer::push_change( ThermometerChange::Humidity(param.into_inner()), val.ip(), db, scheduler, ) .map_err(|err| MyError::from(err))?; } Ok("Ok") } #[post("/update_push_action/{source_device}/{parameter}/{destination_device}")] async fn update_push_action( param: Path<(String, String, String)>, db: Data>>, ) -> Result { let (source_device, parameter, destination_device) = param.into_inner(); let db_lock = db.lock().unwrap(); let action_sets = db_lock .action_sets(&source_device) .map_err(|err| MyError::from(err))?; // check if action set is already present if let Some(old_action_set) = action_sets.iter().find(|action_set| { action_set.push_device() == Some(source_device.clone()) && action_set.receive_device() == Some(destination_device.clone()) && action_set.parameter(¶meter) }) { // remove old action set db_lock .remove_action_set(old_action_set) .map_err(|err| MyError::from(err))?; } let new_action_set = ActionSet::from(vec![ Action::new(source_device, ActionType::Push, parameter.clone()), Action::new(destination_device, ActionType::Receive, parameter), ]); db_lock .insert_action_set(new_action_set) .map_err(|err| MyError::from(err))?; Ok("Ok") } fn collapse_data(data: Vec<(u64, f32)>, f: F) -> Vec<(u64, f32)> where F: Fn(NaiveDateTime) -> NaiveDateTime, { let mut frames: HashMap> = HashMap::new(); for (timestamp, watts) in data { let date_time = f(NaiveDateTime::from_timestamp_opt(timestamp as i64, 0).unwrap()); match frames.get_mut(&date_time) { Some(entries) => entries.push(watts), None => { frames.insert(date_time, vec![watts]); } } } let mut v: Vec<(u64, f32)> = frames .into_iter() .map(|(date_time, entries)| { let length = entries.len(); let sum: f32 = entries.into_iter().sum(); (date_time.timestamp() as u64, sum / length as f32) }) .collect(); v.sort_by_key(|(timestamp, _)| *timestamp); v } #[cfg(test)] mod test { use actix_web::{http::header::ContentType, test, App}; use reqwest::Method; use std::{thread, time::Duration}; use super::*; #[actix_web::test] async fn test_led_on_off() { let app = test::init_service( App::new() .service(change_plug_state) .app_data(Data::new(vec![Tasmota::new("Tasmota-Plug-3")])), ) .await; { let req = test::TestRequest::default() .uri("/plug/Tasmota-Plug-3/led_off") .insert_header(ContentType::plaintext()) .method(Method::POST) .to_request(); let resp = test::call_service(&app, req).await; let status = resp.status(); let body = resp.into_body(); assert!( status.is_success(), "status: {:?}, error: {:?}", status, body ); } thread::sleep(Duration::from_secs(5)); { let req = test::TestRequest::default() .uri("/plug/Tasmota-Plug-3/led_on") .insert_header(ContentType::plaintext()) .method(Method::POST) .to_request(); let resp = test::call_service(&app, req).await; let status = resp.status(); let body = resp.into_body(); assert!( status.is_success(), "status: {:?}, error: {:?}", status, body ); } } }