Implement actions and tasks

This commit is contained in:
hodasemi 2023-10-23 10:03:16 +02:00
parent e77e0275fd
commit 0650270b22
4 changed files with 484 additions and 20 deletions

113
src/action.rs Normal file
View file

@ -0,0 +1,113 @@
use core::slice::Iter;
use std::{fmt::Display, str::FromStr};
use anyhow::{bail, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum ActionType {
GreaterThan,
LessThan,
Push,
Receive,
Update,
}
impl Display for ActionType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::GreaterThan => write!(f, "GreaterThan"),
Self::LessThan => write!(f, "LessThan"),
Self::Push => write!(f, "Push"),
Self::Receive => write!(f, "Receive"),
Self::Update => write!(f, "Update"),
}
}
}
impl FromStr for ActionType {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"GreaterThan" => Ok(Self::GreaterThan),
"LessThan" => Ok(Self::LessThan),
"Push" => Ok(Self::Push),
"Receive" => Ok(Self::Receive),
"Update" => Ok(Self::Update),
_ => bail!("could not parse ActionType from {s}"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Action {
pub device_id: String,
pub action_type: ActionType,
pub parameter: String,
}
impl Action {
pub fn new(
device_id: impl ToString,
action_type: ActionType,
parameter: impl ToString,
) -> Self {
Self {
device_id: device_id.to_string(),
action_type,
parameter: parameter.to_string(),
}
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct ActionSet {
actions: Vec<Action>,
}
impl ActionSet {
pub fn chain(&mut self, action: Action) {
self.actions.push(action);
}
pub fn iter(&self) -> Iter<'_, Action> {
self.actions.iter()
}
}
impl<I> From<I> for ActionSet
where
I: IntoIterator<Item = Action>,
{
fn from(value: I) -> Self {
Self {
actions: value.into_iter().collect(),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use anyhow::Result;
#[test]
fn example_chain() -> Result<()> {
let mut action_set = ActionSet::default();
action_set.chain(Action::new(
"shelly_plus_ht",
ActionType::Push,
"temperature",
));
action_set.chain(Action::new(
"shelly_trv",
ActionType::Receive,
"temperature",
));
Ok(())
}
}

196
src/db.rs
View file

@ -1,9 +1,12 @@
use std::path::Path; use std::{path::Path, str::FromStr};
use anyhow::Result; use anyhow::Result;
use rusqlite::{Connection, OptionalExtension, ToSql}; use rusqlite::{Connection, OptionalExtension, ToSql};
use crate::devices::{DeviceWithName, Devices, DevicesWithName}; use crate::{
action::{Action, ActionSet, ActionType},
devices::{DeviceWithName, Devices, DevicesWithName},
};
pub struct DataBase { pub struct DataBase {
sql: Connection, sql: Connection,
@ -65,9 +68,161 @@ impl DataBase {
[], [],
)?; )?;
self.sql.execute(
"
CREATE TABLE IF NOT EXISTS actions (
id INTEGER PRIMARY KEY,
device_id INTEGER NOT NULL,
action VARCHAR(30) NOT NULL,
parameter VARCHAR(60) NOT NULL,
action_id INTEGER,
FOREIGN KEY(action_id) REFERENCES actions(id),
FOREIGN KEY(device_id) REFERENCES devices(id)
)
",
[],
)?;
Ok(()) Ok(())
} }
fn device_id(&self, device_name: &str) -> Result<i64> {
Ok(self
.sql
.prepare(&format!(
"
SELECT id
FROM devices
WHERE device=\"{}\"
",
device_name
))?
.query_row([], |row| Ok(row.get(0)?))?)
}
pub fn insert_action_set(&self, action_set: ActionSet) -> Result<i64> {
let mut action_ids = Vec::new();
for (i, action) in action_set.iter().enumerate() {
// get device id from device name
let device_id = self.device_id(&action.device_id)?;
// insert action to DB
self.sql.execute(
&format!(
"INSERT INTO actions (device_id, action, parameter)
VALUES (?1, \"{}\", \"{}\")",
action.action_type, action.parameter
),
&[&device_id],
)?;
action_ids.push(self.sql.last_insert_rowid());
if i > 0 {
// chain actions
self.sql.execute(
&format!(
"
UPDATE actions
SET action_id=?2
WHERE id=?1
"
),
[&action_ids[i - 1], &action_ids[i]],
)?;
}
}
Ok(action_ids[0])
}
pub fn action_set(&self, mut action_id: i64) -> Result<ActionSet> {
let mut action_set = ActionSet::default();
loop {
let (device_id, action, parameter, next_action): (i64, String, String, Option<i64>) =
self.sql.query_row(
"
SELECT device_id, action, parameter, action_id
FROM actions
WHERE id=?1
",
&[&action_id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
)?;
let device_name: String = self.sql.query_row(
"
SELECT device
FROM devices
WHERE id=?1
",
&[&device_id],
|row| row.get(0),
)?;
let action = Action::new(device_name, ActionType::from_str(&action)?, parameter);
action_set.chain(action);
match next_action {
Some(id) => action_id = id,
None => break,
}
}
Ok(action_set)
}
pub fn action_sets(&self, device_name: &str) -> Result<Vec<ActionSet>> {
let mut action_sets = Vec::new();
let device_id = self.device_id(device_name)?;
let base_actions: Vec<i64> = self
.sql
.prepare(
"
SELECT id
FROM actions
WHERE device_id=?1
",
)?
.query_map(&[&device_id], |row| row.get(0))?
.map(|row| {
let r: i64 = row?;
Ok(r)
})
.collect::<Result<Vec<i64>>>()?;
for mut action_id in base_actions {
loop {
match self
.sql
.query_row(
"
SELECT id
FROM actions
WHERE action_id=?1
",
&[&action_id],
|row| row.get(0),
)
.optional()?
{
Some(id) => action_id = id,
None => {
action_sets.push(self.action_set(action_id)?);
break;
}
}
}
}
Ok(action_sets)
}
pub fn version(&self) -> Result<String> { pub fn version(&self) -> Result<String> {
Ok(self Ok(self
.sql .sql
@ -331,7 +486,10 @@ mod test {
use anyhow::Result; use anyhow::Result;
use crate::devices::Devices; use crate::{
action::{Action, ActionSet, ActionType},
devices::Devices,
};
use super::DataBase; use super::DataBase;
@ -386,4 +544,36 @@ mod test {
Ok(()) Ok(())
} }
#[tokio::test]
async fn action_set_test() -> Result<()> {
let db = DataBase::new("action_set_test.db").await?;
let thermometer = "shelly_plus_ht";
let thermostat = "shelly_trv";
db.register_devices(&Devices {
plugs: Vec::new(),
thermostat: vec![thermostat.to_string()],
thermometer: vec![thermometer.to_string()],
})?;
let mut action_set = ActionSet::default();
action_set.chain(Action::new(thermometer, ActionType::Push, "temperature"));
action_set.chain(Action::new(thermostat, ActionType::Receive, "temperature"));
let action_id = db.insert_action_set(action_set.clone())?;
let cmp_action_set = db.action_set(action_id)?;
assert_eq!(action_set, cmp_action_set);
let action_sets_thermometer = db.action_sets(thermometer)?;
let action_sets_thermostat = db.action_sets(thermostat)?;
assert_eq!(action_sets_thermometer, action_sets_thermostat);
fs::remove_file("action_set_test.db")?;
Ok(())
}
} }

View file

@ -1,5 +1,6 @@
use std::{ use std::{
fs, fs,
pin::pin,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
thread, thread,
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
@ -7,10 +8,12 @@ use std::{
use crate::{db::DataBase, midea_helper::MideaDiscovery, web_server::plug_data_range}; use crate::{db::DataBase, midea_helper::MideaDiscovery, web_server::plug_data_range};
mod action;
mod data; mod data;
mod db; mod db;
mod devices; mod devices;
mod midea_helper; mod midea_helper;
mod task_scheduler;
mod tasmota; mod tasmota;
mod temperature; mod temperature;
mod tibber_handler; mod tibber_handler;
@ -22,6 +25,7 @@ use anyhow::Result;
use devices::Devices; use devices::Devices;
use futures::{future::try_join_all, try_join, Future}; use futures::{future::try_join_all, try_join, Future};
use midea_helper::MideaDishwasher; use midea_helper::MideaDishwasher;
use task_scheduler::{Scheduler, Task};
use tasmota::Tasmota; use tasmota::Tasmota;
use tibber::TimeResolution::Daily; use tibber::TimeResolution::Daily;
use tibber_handler::TibberHandler; use tibber_handler::TibberHandler;
@ -31,25 +35,36 @@ fn since_epoch() -> Result<u64> {
Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()) Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
} }
fn read_power_usage( fn handle_error(
f: impl Future<Output = Result<()>> + Send + 'static,
) -> impl Future<Output = ()> + Unpin + Send + 'static {
Box::pin(async move {
if let Err(err) = f.await {
println!("{err}:?");
}
})
}
fn setup_tasmota_tasks(
scheduler: &Scheduler,
tasmota_plugs: Vec<Tasmota>, tasmota_plugs: Vec<Tasmota>,
db: Arc<Mutex<DataBase>>, db: Arc<Mutex<DataBase>>,
) -> impl Future<Output = Result<()>> { ) {
async move { for plug in tasmota_plugs.into_iter() {
loop { let db_clone = db.clone();
try_join_all(tasmota_plugs.iter().map(|plug| async {
if let Ok(usage) = plug.read_power_usage().await {
db.lock()
.unwrap()
.write(plug.name(), since_epoch()?, "watts", usage)?;
}
Ok::<(), anyhow::Error>(()) let fut = async move {
})) if let Ok(usage) = plug.read_power_usage().await {
.await?; db_clone
.lock()
.unwrap()
.write(plug.name(), since_epoch()?, "watts", usage)?;
}
thread::sleep(Duration::from_secs(3)); Ok(())
} };
scheduler.add_task(Task::looping(Duration::from_secs(3), handle_error(fut)));
} }
} }
@ -58,6 +73,7 @@ async fn run_web_server(
plugs: Vec<Tasmota>, plugs: Vec<Tasmota>,
db: Arc<Mutex<DataBase>>, db: Arc<Mutex<DataBase>>,
dishwasher: Vec<Arc<MideaDishwasher>>, dishwasher: Vec<Arc<MideaDishwasher>>,
scheduler: Scheduler,
) -> Result<()> { ) -> Result<()> {
const IP: &str = "0.0.0.0"; const IP: &str = "0.0.0.0";
const PORT: u16 = 8062; const PORT: u16 = 8062;
@ -76,6 +92,7 @@ async fn run_web_server(
.app_data(Data::new(db.clone())) .app_data(Data::new(db.clone()))
.app_data(Data::new(plugs.clone())) .app_data(Data::new(plugs.clone()))
.app_data(Data::new(dishwasher.clone())) .app_data(Data::new(dishwasher.clone()))
.app_data(Data::new(scheduler.clone()))
.service(device_query) .service(device_query)
.service(plug_state) .service(plug_state)
.service(change_plug_state) .service(change_plug_state)
@ -125,9 +142,19 @@ async fn main() -> Result<()> {
.map(|d| Arc::new(d)) .map(|d| Arc::new(d))
.collect(); .collect();
let scheduler = Scheduler::default();
setup_tasmota_tasks(&scheduler, tasmota_plugs.clone(), shared_db.clone());
let scheduler_clone = scheduler.clone();
try_join!( try_join!(
read_power_usage(tasmota_plugs.clone(), shared_db.clone()), scheduler.run(),
run_web_server(devices, tasmota_plugs, shared_db, dishwasher) run_web_server(
devices,
tasmota_plugs,
shared_db,
dishwasher,
scheduler_clone
)
)?; )?;
Ok(()) Ok(())

134
src/task_scheduler.rs Normal file
View file

@ -0,0 +1,134 @@
use anyhow::Result;
use futures::future::Shared;
use futures::FutureExt;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::thread;
use std::{
sync::Mutex,
time::{Duration, SystemTime},
};
enum Callback {
Looping(Shared<Pin<Box<dyn Future<Output = ()> + Unpin + Send + 'static>>>),
Once(Pin<Box<dyn Future<Output = ()> + Unpin + Send + 'static>>),
}
pub struct Task {
creation_time: SystemTime,
time: Duration,
callback: Callback,
}
impl Task {
pub fn looping<F>(interval: Duration, f: F) -> Self
where
F: Future<Output = ()> + Unpin + Send + 'static,
{
let c: Pin<Box<dyn Future<Output = ()> + Unpin + Send + 'static>> = Box::pin(f);
Self {
creation_time: SystemTime::now(),
time: interval,
callback: Callback::Looping(c.shared()),
}
}
fn recreate(
start: SystemTime,
interval: Duration,
f: Shared<Pin<Box<dyn Future<Output = ()> + Unpin + Send + 'static>>>,
) -> Self {
Self {
creation_time: start,
time: interval,
callback: Callback::Looping(f),
}
}
pub fn one_shot<F>(time: Duration, f: F) -> Self
where
F: Future<Output = ()> + Unpin + Send + 'static,
{
Self {
creation_time: SystemTime::now(),
time,
callback: Callback::Once(Box::pin(f)),
}
}
fn execution_time(&self) -> SystemTime {
self.creation_time + self.time
}
fn reschedule(&self) -> bool {
match self.callback {
Callback::Looping(_) => true,
Callback::Once(_) => false,
}
}
}
#[derive(Default, Clone)]
pub struct Scheduler {
tasks: Arc<Mutex<VecDeque<Task>>>,
}
impl Scheduler {
pub fn add_task(&self, new_task: Task) {
let mut task_lock = self.tasks.lock().unwrap();
let pos = task_lock
.binary_search_by_key(&new_task.execution_time(), |task| task.execution_time())
.unwrap_or_else(|e| e);
task_lock.insert(pos, new_task);
}
pub fn run(self) -> impl Future<Output = Result<()>> {
async move {
loop {
// exec first if time is up
while let Some(first) = self.check_first() {
let execution_time = first.execution_time();
match first.callback {
Callback::Looping(callback) => {
let callback_clone = callback.clone();
tokio::spawn(callback_clone);
self.add_task(Task::recreate(execution_time, first.time, callback));
}
Callback::Once(callback) => {
tokio::spawn(callback);
}
}
}
thread::sleep(Duration::from_millis(500));
}
}
}
pub fn check_first(&self) -> Option<Task> {
let mut task_lock = self.tasks.lock().unwrap();
// get first element
if let Some(first) = task_lock.front() {
// check if execution time is reached
if first.execution_time() < SystemTime::now() {
return task_lock.pop_front();
}
}
return None;
}
}