From 0650270b22e2f39141f4ac54775b9b1a074577b3 Mon Sep 17 00:00:00 2001 From: hodasemi Date: Mon, 23 Oct 2023 10:03:16 +0200 Subject: [PATCH] Implement actions and tasks --- src/action.rs | 113 ++++++++++++++++++++++++ src/db.rs | 196 +++++++++++++++++++++++++++++++++++++++++- src/main.rs | 61 +++++++++---- src/task_scheduler.rs | 134 +++++++++++++++++++++++++++++ 4 files changed, 484 insertions(+), 20 deletions(-) create mode 100644 src/action.rs create mode 100644 src/task_scheduler.rs diff --git a/src/action.rs b/src/action.rs new file mode 100644 index 0000000..f1e000e --- /dev/null +++ b/src/action.rs @@ -0,0 +1,113 @@ +use core::slice::Iter; +use std::{fmt::Display, str::FromStr}; + +use anyhow::{bail, Result}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum ActionType { + GreaterThan, + LessThan, + Push, + Receive, + Update, +} + +impl Display for ActionType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::GreaterThan => write!(f, "GreaterThan"), + Self::LessThan => write!(f, "LessThan"), + Self::Push => write!(f, "Push"), + Self::Receive => write!(f, "Receive"), + Self::Update => write!(f, "Update"), + } + } +} + +impl FromStr for ActionType { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s { + "GreaterThan" => Ok(Self::GreaterThan), + "LessThan" => Ok(Self::LessThan), + "Push" => Ok(Self::Push), + "Receive" => Ok(Self::Receive), + "Update" => Ok(Self::Update), + + _ => bail!("could not parse ActionType from {s}"), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct Action { + pub device_id: String, + pub action_type: ActionType, + pub parameter: String, +} + +impl Action { + pub fn new( + device_id: impl ToString, + action_type: ActionType, + parameter: impl ToString, + ) -> Self { + Self { + device_id: device_id.to_string(), + action_type, + parameter: parameter.to_string(), + } + } +} + +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct ActionSet { + actions: Vec, +} + +impl ActionSet { + pub fn chain(&mut self, action: Action) { + self.actions.push(action); + } + + pub fn iter(&self) -> Iter<'_, Action> { + self.actions.iter() + } +} + +impl From for ActionSet +where + I: IntoIterator, +{ + fn from(value: I) -> Self { + Self { + actions: value.into_iter().collect(), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use anyhow::Result; + + #[test] + fn example_chain() -> Result<()> { + let mut action_set = ActionSet::default(); + + action_set.chain(Action::new( + "shelly_plus_ht", + ActionType::Push, + "temperature", + )); + + action_set.chain(Action::new( + "shelly_trv", + ActionType::Receive, + "temperature", + )); + + Ok(()) + } +} diff --git a/src/db.rs b/src/db.rs index eb66a6a..f37b697 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,9 +1,12 @@ -use std::path::Path; +use std::{path::Path, str::FromStr}; use anyhow::Result; use rusqlite::{Connection, OptionalExtension, ToSql}; -use crate::devices::{DeviceWithName, Devices, DevicesWithName}; +use crate::{ + action::{Action, ActionSet, ActionType}, + devices::{DeviceWithName, Devices, DevicesWithName}, +}; pub struct DataBase { sql: Connection, @@ -65,9 +68,161 @@ impl DataBase { [], )?; + self.sql.execute( + " + CREATE TABLE IF NOT EXISTS actions ( + id INTEGER PRIMARY KEY, + device_id INTEGER NOT NULL, + action VARCHAR(30) NOT NULL, + parameter VARCHAR(60) NOT NULL, + action_id INTEGER, + FOREIGN KEY(action_id) REFERENCES actions(id), + FOREIGN KEY(device_id) REFERENCES devices(id) + ) + ", + [], + )?; + Ok(()) } + fn device_id(&self, device_name: &str) -> Result { + Ok(self + .sql + .prepare(&format!( + " + SELECT id + FROM devices + WHERE device=\"{}\" + ", + device_name + ))? + .query_row([], |row| Ok(row.get(0)?))?) + } + + pub fn insert_action_set(&self, action_set: ActionSet) -> Result { + let mut action_ids = Vec::new(); + + for (i, action) in action_set.iter().enumerate() { + // get device id from device name + let device_id = self.device_id(&action.device_id)?; + + // insert action to DB + self.sql.execute( + &format!( + "INSERT INTO actions (device_id, action, parameter) + VALUES (?1, \"{}\", \"{}\")", + action.action_type, action.parameter + ), + &[&device_id], + )?; + + action_ids.push(self.sql.last_insert_rowid()); + + if i > 0 { + // chain actions + self.sql.execute( + &format!( + " + UPDATE actions + SET action_id=?2 + WHERE id=?1 + " + ), + [&action_ids[i - 1], &action_ids[i]], + )?; + } + } + + Ok(action_ids[0]) + } + + pub fn action_set(&self, mut action_id: i64) -> Result { + let mut action_set = ActionSet::default(); + + loop { + let (device_id, action, parameter, next_action): (i64, String, String, Option) = + self.sql.query_row( + " + SELECT device_id, action, parameter, action_id + FROM actions + WHERE id=?1 + ", + &[&action_id], + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), + )?; + + let device_name: String = self.sql.query_row( + " + SELECT device + FROM devices + WHERE id=?1 + ", + &[&device_id], + |row| row.get(0), + )?; + + let action = Action::new(device_name, ActionType::from_str(&action)?, parameter); + + action_set.chain(action); + + match next_action { + Some(id) => action_id = id, + None => break, + } + } + + Ok(action_set) + } + + pub fn action_sets(&self, device_name: &str) -> Result> { + let mut action_sets = Vec::new(); + + let device_id = self.device_id(device_name)?; + + let base_actions: Vec = self + .sql + .prepare( + " + SELECT id + FROM actions + WHERE device_id=?1 + ", + )? + .query_map(&[&device_id], |row| row.get(0))? + .map(|row| { + let r: i64 = row?; + Ok(r) + }) + .collect::>>()?; + + for mut action_id in base_actions { + loop { + match self + .sql + .query_row( + " + SELECT id + FROM actions + WHERE action_id=?1 + ", + &[&action_id], + |row| row.get(0), + ) + .optional()? + { + Some(id) => action_id = id, + None => { + action_sets.push(self.action_set(action_id)?); + break; + } + } + } + } + + Ok(action_sets) + } + pub fn version(&self) -> Result { Ok(self .sql @@ -331,7 +486,10 @@ mod test { use anyhow::Result; - use crate::devices::Devices; + use crate::{ + action::{Action, ActionSet, ActionType}, + devices::Devices, + }; use super::DataBase; @@ -386,4 +544,36 @@ mod test { Ok(()) } + + #[tokio::test] + async fn action_set_test() -> Result<()> { + let db = DataBase::new("action_set_test.db").await?; + + let thermometer = "shelly_plus_ht"; + let thermostat = "shelly_trv"; + + db.register_devices(&Devices { + plugs: Vec::new(), + thermostat: vec![thermostat.to_string()], + thermometer: vec![thermometer.to_string()], + })?; + + let mut action_set = ActionSet::default(); + action_set.chain(Action::new(thermometer, ActionType::Push, "temperature")); + action_set.chain(Action::new(thermostat, ActionType::Receive, "temperature")); + + let action_id = db.insert_action_set(action_set.clone())?; + let cmp_action_set = db.action_set(action_id)?; + + assert_eq!(action_set, cmp_action_set); + + let action_sets_thermometer = db.action_sets(thermometer)?; + let action_sets_thermostat = db.action_sets(thermostat)?; + + assert_eq!(action_sets_thermometer, action_sets_thermostat); + + fs::remove_file("action_set_test.db")?; + + Ok(()) + } } diff --git a/src/main.rs b/src/main.rs index 5cc7274..33502c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use std::{ fs, + pin::pin, sync::{Arc, Mutex}, thread, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -7,10 +8,12 @@ use std::{ use crate::{db::DataBase, midea_helper::MideaDiscovery, web_server::plug_data_range}; +mod action; mod data; mod db; mod devices; mod midea_helper; +mod task_scheduler; mod tasmota; mod temperature; mod tibber_handler; @@ -22,6 +25,7 @@ use anyhow::Result; use devices::Devices; use futures::{future::try_join_all, try_join, Future}; use midea_helper::MideaDishwasher; +use task_scheduler::{Scheduler, Task}; use tasmota::Tasmota; use tibber::TimeResolution::Daily; use tibber_handler::TibberHandler; @@ -31,25 +35,36 @@ fn since_epoch() -> Result { Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()) } -fn read_power_usage( +fn handle_error( + f: impl Future> + Send + 'static, +) -> impl Future + Unpin + Send + 'static { + Box::pin(async move { + if let Err(err) = f.await { + println!("{err}:?"); + } + }) +} + +fn setup_tasmota_tasks( + scheduler: &Scheduler, tasmota_plugs: Vec, db: Arc>, -) -> impl Future> { - async move { - loop { - try_join_all(tasmota_plugs.iter().map(|plug| async { - if let Ok(usage) = plug.read_power_usage().await { - db.lock() - .unwrap() - .write(plug.name(), since_epoch()?, "watts", usage)?; - } +) { + for plug in tasmota_plugs.into_iter() { + let db_clone = db.clone(); - Ok::<(), anyhow::Error>(()) - })) - .await?; + let fut = async move { + if let Ok(usage) = plug.read_power_usage().await { + db_clone + .lock() + .unwrap() + .write(plug.name(), since_epoch()?, "watts", usage)?; + } - thread::sleep(Duration::from_secs(3)); - } + Ok(()) + }; + + scheduler.add_task(Task::looping(Duration::from_secs(3), handle_error(fut))); } } @@ -58,6 +73,7 @@ async fn run_web_server( plugs: Vec, db: Arc>, dishwasher: Vec>, + scheduler: Scheduler, ) -> Result<()> { const IP: &str = "0.0.0.0"; const PORT: u16 = 8062; @@ -76,6 +92,7 @@ async fn run_web_server( .app_data(Data::new(db.clone())) .app_data(Data::new(plugs.clone())) .app_data(Data::new(dishwasher.clone())) + .app_data(Data::new(scheduler.clone())) .service(device_query) .service(plug_state) .service(change_plug_state) @@ -125,9 +142,19 @@ async fn main() -> Result<()> { .map(|d| Arc::new(d)) .collect(); + let scheduler = Scheduler::default(); + setup_tasmota_tasks(&scheduler, tasmota_plugs.clone(), shared_db.clone()); + let scheduler_clone = scheduler.clone(); + try_join!( - read_power_usage(tasmota_plugs.clone(), shared_db.clone()), - run_web_server(devices, tasmota_plugs, shared_db, dishwasher) + scheduler.run(), + run_web_server( + devices, + tasmota_plugs, + shared_db, + dishwasher, + scheduler_clone + ) )?; Ok(()) diff --git a/src/task_scheduler.rs b/src/task_scheduler.rs new file mode 100644 index 0000000..28b4bc0 --- /dev/null +++ b/src/task_scheduler.rs @@ -0,0 +1,134 @@ +use anyhow::Result; +use futures::future::Shared; +use futures::FutureExt; + +use std::collections::VecDeque; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::thread; +use std::{ + sync::Mutex, + time::{Duration, SystemTime}, +}; + +enum Callback { + Looping(Shared + Unpin + Send + 'static>>>), + Once(Pin + Unpin + Send + 'static>>), +} + +pub struct Task { + creation_time: SystemTime, + time: Duration, + + callback: Callback, +} + +impl Task { + pub fn looping(interval: Duration, f: F) -> Self + where + F: Future + Unpin + Send + 'static, + { + let c: Pin + Unpin + Send + 'static>> = Box::pin(f); + + Self { + creation_time: SystemTime::now(), + time: interval, + + callback: Callback::Looping(c.shared()), + } + } + + fn recreate( + start: SystemTime, + interval: Duration, + f: Shared + Unpin + Send + 'static>>>, + ) -> Self { + Self { + creation_time: start, + time: interval, + + callback: Callback::Looping(f), + } + } + + pub fn one_shot(time: Duration, f: F) -> Self + where + F: Future + Unpin + Send + 'static, + { + Self { + creation_time: SystemTime::now(), + time, + + callback: Callback::Once(Box::pin(f)), + } + } + + fn execution_time(&self) -> SystemTime { + self.creation_time + self.time + } + + fn reschedule(&self) -> bool { + match self.callback { + Callback::Looping(_) => true, + Callback::Once(_) => false, + } + } +} + +#[derive(Default, Clone)] +pub struct Scheduler { + tasks: Arc>>, +} + +impl Scheduler { + pub fn add_task(&self, new_task: Task) { + let mut task_lock = self.tasks.lock().unwrap(); + + let pos = task_lock + .binary_search_by_key(&new_task.execution_time(), |task| task.execution_time()) + .unwrap_or_else(|e| e); + + task_lock.insert(pos, new_task); + } + + pub fn run(self) -> impl Future> { + async move { + loop { + // exec first if time is up + while let Some(first) = self.check_first() { + let execution_time = first.execution_time(); + + match first.callback { + Callback::Looping(callback) => { + let callback_clone = callback.clone(); + + tokio::spawn(callback_clone); + + self.add_task(Task::recreate(execution_time, first.time, callback)); + } + Callback::Once(callback) => { + tokio::spawn(callback); + } + } + } + + thread::sleep(Duration::from_millis(500)); + } + } + } + + pub fn check_first(&self) -> Option { + let mut task_lock = self.tasks.lock().unwrap(); + + // get first element + if let Some(first) = task_lock.front() { + // check if execution time is reached + if first.execution_time() < SystemTime::now() { + return task_lock.pop_front(); + } + } + + return None; + } +}