use std::{path::Path, str::FromStr}; use anyhow::Result; use rusqlite::{Connection, OptionalExtension, ToSql}; use crate::{ action::{Action, ActionID, ActionSet, ActionType}, devices::{DeviceWithName, Devices, DevicesWithName}, }; pub struct DataBase { sql: Connection, } impl DataBase { const VERSION_0_1_0: &'static str = "0.1.0"; pub async fn new(path: impl AsRef) -> Result { let me = Self { sql: Connection::open(path)?, }; me.generate_tables()?; me.init()?; Ok(me) } fn generate_tables(&self) -> Result<()> { self.sql.execute( "CREATE TABLE IF NOT EXISTS meta ( id INTEGER PRIMARY KEY, version INTEGER NOT NULL )", [], )?; self.sql.execute( "CREATE TABLE IF NOT EXISTS devices ( id INTEGER PRIMARY KEY, device VARCHAR(60) NOT NULL, type VARCHAR(30) NOT NULL, control INTEGER NOT NULL, name VARCHAR(80) )", [], )?; self.sql.execute( "CREATE TABLE IF NOT EXISTS data ( id INTEGER PRIMARY KEY, time BIGINT NOT NULL, name VARCHAR(30) NOT NULL, value REAL NOT NULL, device_id INTEGER NOT NULL, FOREIGN KEY(device_id) REFERENCES devices(id) )", [], )?; self.sql.execute( "CREATE TABLE IF NOT EXISTS credentials ( id INTEGER PRIMARY KEY, key VARCHAR(60) NOT NULL, device_id BIGINT NOT NULL, cred VARCHAR(256) NOT NULL )", [], )?; self.sql.execute( " CREATE TABLE IF NOT EXISTS actions ( id INTEGER PRIMARY KEY, device_id INTEGER NOT NULL, action VARCHAR(30) NOT NULL, parameter VARCHAR(60) NOT NULL, action_id INTEGER, FOREIGN KEY(action_id) REFERENCES actions(id), FOREIGN KEY(device_id) REFERENCES devices(id) ) ", [], )?; Ok(()) } fn device_id(&self, device_name: &str) -> Result { Ok(self .sql .prepare(&format!( " SELECT id FROM devices WHERE device=\"{}\" ", device_name ))? .query_row([], |row| Ok(row.get(0)?))?) } pub fn insert_action_set(&self, action_set: ActionSet) -> Result { let mut action_ids = Vec::new(); for (i, action) in action_set.iter().enumerate() { // get device id from device name let device_id = self.device_id(&action.device_id)?; // insert action to DB self.sql.execute( &format!( "INSERT INTO actions (device_id, action, parameter) VALUES (?1, \"{}\", \"{}\")", action.action_type, action.parameter ), &[&device_id], )?; action_ids.push(self.sql.last_insert_rowid()); if i > 0 { // chain actions self.sql.execute( &format!( " UPDATE actions SET action_id=?2 WHERE id=?1 " ), [&action_ids[i - 1], &action_ids[i]], )?; } } Ok(ActionID(action_ids[0])) } pub fn remove_action_set(&self, action_set: &ActionSet) -> Result<()> { if let Some(action_id) = action_set.first_id() { self.sql.execute( " DELETE FROM actions WHERE id=?1 ", &[&action_id.0], )?; } Ok(()) } pub fn action_set(&self, mut action_id: ActionID) -> Result { let mut action_set = ActionSet::default(); loop { let (device_id, action, parameter, next_action): (i64, String, String, Option) = self.sql.query_row( " SELECT device_id, action, parameter, action_id FROM actions WHERE id=?1 ", &[&action_id.0], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), )?; let device_name: String = self.sql.query_row( " SELECT device FROM devices WHERE id=?1 ", &[&device_id], |row| row.get(0), )?; let mut action = Action::new(device_name, ActionType::from_str(&action)?, parameter); action.id = Some(action_id); action_set.chain(action); match next_action { Some(id) => action_id.0 = id, None => break, } } Ok(action_set) } pub fn action_sets(&self, device_name: &str) -> Result> { let mut action_sets = Vec::new(); let device_id = self.device_id(device_name)?; let base_actions: Vec = self .sql .prepare( " SELECT id FROM actions WHERE device_id=?1 ", )? .query_map(&[&device_id], |row| row.get(0))? .map(|row| { let r: i64 = row?; Ok(r) }) .collect::>>()?; for mut action_id in base_actions { loop { match self .sql .query_row( " SELECT id FROM actions WHERE action_id=?1 ", &[&action_id], |row| row.get(0), ) .optional()? { Some(id) => action_id = id, None => { action_sets.push(self.action_set(ActionID(action_id))?); break; } } } } Ok(action_sets) } pub fn version(&self) -> Result { Ok(self .sql .query_row("SELECT version FROM meta WHERE id=1", [], |row| row.get(0))?) } fn init(&self) -> Result<()> { if self.version().is_err() { self.sql.execute( "INSERT INTO meta (version) VALUES (?1)", &[Self::VERSION_0_1_0], )?; } Ok(()) } pub fn register_devices(&self, devices: &Devices) -> Result<()> { for (device, control) in devices.plugs.iter() { self.sql.execute( &format!( "INSERT INTO devices (device, type, control) SELECT \"{device}\", \"plug\", ?1 WHERE NOT EXISTS ( SELECT device FROM devices WHERE device=\"{device}\" ) " ), &[control], )?; let ctl = if *control { 1 } else { 0 }; self.sql.execute( &format!( " UPDATE devices SET control=\"{ctl}\" WHERE device=\"{device}\" " ), [], )?; } for device in devices.thermostat.iter() { self.sql.execute( &format!( "INSERT INTO devices (device, type, control) SELECT \"{device}\", \"thermostat\", false WHERE NOT EXISTS ( SELECT device FROM devices WHERE device=\"{device}\" ) " ), [], )?; } for device in devices.thermometer.iter() { self.sql.execute( &format!( "INSERT INTO devices (device, type, control) SELECT \"{device}\", \"thermometer\", false WHERE NOT EXISTS ( SELECT device FROM devices WHERE device=\"{device}\" ) " ), [], )?; } Ok(()) } pub fn write(&self, device_name: &str, time: u64, name: &str, value: f32) -> Result<()> { let params: &[&dyn ToSql] = &[&time, &value]; self.sql.execute( &format!( "INSERT INTO data (time, name, value, device_id) VALUES (?1, \"{name}\", ?2, (SELECT id FROM devices WHERE device=\"{device_name}\") )" ), params, )?; Ok(()) } pub fn devices(&self) -> Result { let mut devices = DevicesWithName::default(); for row in self .sql .prepare(&format!( " SELECT device, type, name, control FROM devices " ))? .query_map([], |row| { Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)) })? { let (device, dev_type, name, control): (String, String, Option, i32) = row?; match dev_type.as_str() { "plug" => devices.plugs.push(DeviceWithName { id: device, desc: name, toggle: control != 0, }), "thermostat" => devices.thermostat.push(DeviceWithName { id: device, desc: name, toggle: control != 0, }), "thermometer" => devices.temperature_and_humidity.push(DeviceWithName { id: device, desc: name, toggle: control != 0, }), _ => panic!(), } } Ok(devices) } pub fn device_exists(&self, device_id: &str) -> Result { Ok(self .sql .prepare(&format!( " SELECT * FROM devices WHERE device=\"{device_id}\" " ))? .exists([])?) } pub fn change_device_name(&self, device: &str, description: &str) -> Result<()> { self.sql.execute( &format!( " UPDATE devices SET name=\"{description}\" WHERE device=\"{device}\" " ), [], )?; Ok(()) } pub fn read(&self, device: &str) -> Result> { self._read(&format!( " SELECT data.time, data.value FROM data INNER JOIN devices ON data.device_id=devices.id WHERE devices.device=\"{device}\" " )) } pub fn read_range(&self, device: &str, start: u64, end: u64) -> Result> { self._read(&format!( " SELECT data.time, data.value FROM data INNER JOIN devices ON data.device_id=devices.id WHERE devices.device=\"{device}\" AND data.time>={start} AND data.time<{end} " )) } fn _read(&self, query: &str) -> Result> { self.sql .prepare(query)? .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))? .map(|row| { let (time, watts) = row?; Ok((time, watts)) }) .collect() } pub fn write_credential(&self, key: &str, device_id: u64, credential: &str) -> Result<()> { if self .sql .prepare(&format!( " SELECT cred FROM credentials WHERE key=\"{key}\" AND device_id={device_id} " ))? .exists([])? { self.sql.execute( &format!( " UPDATE credentials SET cred=\"{credential}\" WHERE key=\"{key}\" AND device_id={device_id} " ), [], )?; } else { self.sql.execute( &format!( "INSERT INTO crendetials (key, device_id, cred) VALUES (\"{key}\", {device_id}, \"{credential}\")" ), [], )?; } Ok(()) } pub fn read_credential(&self, key: &str, device_id: u64) -> Result> { Ok(self .sql .prepare(&format!( " SELECT cred FROM credentials WHERE key=\"{key}\" AND device_id={device_id} " ))? .query_row([], |row| Ok(row.get(0)?)) .optional()?) } } #[cfg(test)] mod test { use std::fs; use anyhow::Result; use crate::{ action::{Action, ActionSet, ActionType}, devices::Devices, }; use super::DataBase; #[tokio::test] async fn test_connection() -> Result<()> { let db = DataBase::new("connection_test.db").await?; assert_eq!(DataBase::VERSION_0_1_0, db.version()?); fs::remove_file("connection_test.db")?; Ok(()) } #[tokio::test] async fn test_startup() -> Result<()> { let db = DataBase::new("startup_test.db").await?; db.register_devices(&Devices { plugs: vec![("test".to_string(), true)], thermostat: Vec::new(), thermometer: Vec::new(), })?; fs::remove_file("startup_test.db")?; Ok(()) } #[tokio::test] async fn test_write() -> Result<()> { let db = DataBase::new("write_test.db").await?; let device_name = "test"; db.register_devices(&Devices { plugs: vec![(device_name.to_string(), true)], thermostat: Vec::new(), thermometer: Vec::new(), })?; db.write(device_name, 0, "watts", 5.5)?; let device_descriptor = "udo"; db.change_device_name(device_name, device_descriptor)?; let devices = db.devices()?; assert_eq!(devices.plugs[0].desc.as_ref().unwrap(), device_descriptor); assert_eq!(devices.plugs[0].id, device_name); fs::remove_file("write_test.db")?; Ok(()) } #[tokio::test] async fn action_set_test() -> Result<()> { let db = DataBase::new("action_set_test.db").await?; let thermometer = "shelly_plus_ht"; let thermostat = "shelly_trv"; db.register_devices(&Devices { plugs: Vec::new(), thermostat: vec![thermostat.to_string()], thermometer: vec![thermometer.to_string()], })?; let mut action_set = ActionSet::default(); action_set.chain(Action::new(thermometer, ActionType::Push, "temperature")); action_set.chain(Action::new(thermostat, ActionType::Receive, "temperature")); let action_id = db.insert_action_set(action_set.clone())?; let cmp_action_set = db.action_set(action_id)?; assert_eq!(action_set, cmp_action_set); let action_sets_thermometer = db.action_sets(thermometer)?; let action_sets_thermostat = db.action_sets(thermostat)?; assert_eq!(action_sets_thermometer, action_sets_thermostat); fs::remove_file("action_set_test.db")?; Ok(()) } }