Add function to fine grain data request
This commit is contained in:
parent
ad23b6ecee
commit
18c80b8f99
3 changed files with 133 additions and 14 deletions
24
src/db.rs
24
src/db.rs
|
@ -166,8 +166,7 @@ impl DataBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read(&self, device: &str) -> Result<Vec<(u64, f32)>> {
|
pub fn read(&self, device: &str) -> Result<Vec<(u64, f32)>> {
|
||||||
self.sql
|
self._read(&format!(
|
||||||
.prepare(&format!(
|
|
||||||
"
|
"
|
||||||
SELECT data.time, data.watts
|
SELECT data.time, data.watts
|
||||||
FROM data
|
FROM data
|
||||||
|
@ -175,7 +174,26 @@ impl DataBase {
|
||||||
ON data.device_id=devices.id
|
ON data.device_id=devices.id
|
||||||
WHERE devices.device=\"{device}\"
|
WHERE devices.device=\"{device}\"
|
||||||
"
|
"
|
||||||
))?
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read_range(&self, device: &str, start: u64, end: u64) -> Result<Vec<(u64, f32)>> {
|
||||||
|
self._read(&format!(
|
||||||
|
"
|
||||||
|
SELECT data.time, data.watts
|
||||||
|
FROM data
|
||||||
|
INNER JOIN devices
|
||||||
|
ON data.device_id=devices.id
|
||||||
|
WHERE devices.device=\"{device}\"
|
||||||
|
AND data.time>={start}
|
||||||
|
AND data.time<{end}
|
||||||
|
"
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn _read(&self, query: &str) -> Result<Vec<(u64, f32)>> {
|
||||||
|
self.sql
|
||||||
|
.prepare(query)?
|
||||||
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
|
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
|
||||||
.map(|row| {
|
.map(|row| {
|
||||||
let (time, watts) = row?;
|
let (time, watts) = row?;
|
||||||
|
|
14
src/main.rs
14
src/main.rs
|
@ -4,7 +4,7 @@ use std::{
|
||||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::db::DataBase;
|
use crate::{db::DataBase, web_server::plug_data_range};
|
||||||
|
|
||||||
mod data;
|
mod data;
|
||||||
mod db;
|
mod db;
|
||||||
|
@ -18,9 +18,7 @@ use anyhow::Result;
|
||||||
use devices::Devices;
|
use devices::Devices;
|
||||||
use futures::{future::try_join_all, try_join, Future};
|
use futures::{future::try_join_all, try_join, Future};
|
||||||
use tasmota::Tasmota;
|
use tasmota::Tasmota;
|
||||||
use web_server::{
|
use web_server::*;
|
||||||
change_device_name, change_plug_state, device_query, index, plug_data, plug_state,
|
|
||||||
};
|
|
||||||
|
|
||||||
fn since_epoch() -> Result<u64> {
|
fn since_epoch() -> Result<u64> {
|
||||||
Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
|
Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
|
||||||
|
@ -53,6 +51,11 @@ async fn run_web_server(
|
||||||
plugs: Vec<Tasmota>,
|
plugs: Vec<Tasmota>,
|
||||||
db: Arc<Mutex<DataBase>>,
|
db: Arc<Mutex<DataBase>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
const IP: &str = "0.0.0.0";
|
||||||
|
const PORT: u16 = 8062;
|
||||||
|
|
||||||
|
println!("Starting server on {IP}:{PORT}");
|
||||||
|
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
App::new()
|
App::new()
|
||||||
.app_data(Data::new(devices.clone()))
|
.app_data(Data::new(devices.clone()))
|
||||||
|
@ -67,8 +70,9 @@ async fn run_web_server(
|
||||||
.service(change_plug_state)
|
.service(change_plug_state)
|
||||||
.service(change_device_name)
|
.service(change_device_name)
|
||||||
.service(plug_data)
|
.service(plug_data)
|
||||||
|
.service(plug_data_range)
|
||||||
})
|
})
|
||||||
.bind(("0.0.0.0", 8062))
|
.bind((IP, PORT))
|
||||||
.map_err(|err| anyhow::Error::msg(format!("failed binding to address: {err:#?}")))?
|
.map_err(|err| anyhow::Error::msg(format!("failed binding to address: {err:#?}")))?
|
||||||
.run()
|
.run()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
|
@ -4,12 +4,14 @@ use actix_web::{
|
||||||
web::{Data, Json, Path},
|
web::{Data, Json, Path},
|
||||||
Error, Responder, ResponseError,
|
Error, Responder, ResponseError,
|
||||||
};
|
};
|
||||||
|
use chrono::{Datelike, NaiveDateTime, Timelike};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::to_string;
|
use serde_json::to_string;
|
||||||
|
|
||||||
use crate::{db::DataBase, tasmota::Tasmota};
|
use crate::{db::DataBase, tasmota::Tasmota};
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
fmt::{Display, Formatter, Result as FmtResult},
|
fmt::{Display, Formatter, Result as FmtResult},
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
};
|
};
|
||||||
|
@ -43,6 +45,26 @@ impl From<anyhow::Error> for MyError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum Resolution {
|
||||||
|
Raw,
|
||||||
|
Hourly,
|
||||||
|
Daily,
|
||||||
|
Monthly,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Resolution {
|
||||||
|
fn from_str(s: &str) -> anyhow::Result<Self> {
|
||||||
|
Ok(match s {
|
||||||
|
"raw" | "Raw" => Self::Raw,
|
||||||
|
"hourly" | "Hourly" => Self::Hourly,
|
||||||
|
"daily" | "Daily" => Self::Daily,
|
||||||
|
"monthly" | "Monthly" => Self::Monthly,
|
||||||
|
|
||||||
|
_ => anyhow::bail!("failed to parse {s}"),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[get("/")]
|
#[get("/")]
|
||||||
async fn index() -> Result<NamedFile, impl ResponseError> {
|
async fn index() -> Result<NamedFile, impl ResponseError> {
|
||||||
NamedFile::open("resources/static/index.html")
|
NamedFile::open("resources/static/index.html")
|
||||||
|
@ -167,6 +189,81 @@ async fn plug_data(
|
||||||
Ok(Json(to_string(&data)?))
|
Ok(Json(to_string(&data)?))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[get("/plug_data/{plug}/{start_time}/{end_time}/{resolution}")]
|
||||||
|
async fn plug_data_range(
|
||||||
|
param: Path<(String, u64, u64, String)>,
|
||||||
|
db: Data<Arc<Mutex<DataBase>>>,
|
||||||
|
) -> Result<impl Responder, Error> {
|
||||||
|
let (plug, start, end, resolution) = param.into_inner();
|
||||||
|
|
||||||
|
let data = db
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.read_range(&plug, start, end)
|
||||||
|
.map_err(|err| MyError::from(err))?;
|
||||||
|
|
||||||
|
Ok(Json(to_string(
|
||||||
|
&match Resolution::from_str(&resolution).map_err(|err| MyError::from(err))? {
|
||||||
|
Resolution::Raw => data,
|
||||||
|
Resolution::Hourly => collapse_data(data, |datetime| {
|
||||||
|
datetime.with_minute(0).unwrap().with_second(0).unwrap()
|
||||||
|
}),
|
||||||
|
Resolution::Daily => collapse_data(data, |datetime| {
|
||||||
|
datetime
|
||||||
|
.with_hour(0)
|
||||||
|
.unwrap()
|
||||||
|
.with_minute(0)
|
||||||
|
.unwrap()
|
||||||
|
.with_second(0)
|
||||||
|
.unwrap()
|
||||||
|
}),
|
||||||
|
Resolution::Monthly => collapse_data(data, |datetime| {
|
||||||
|
datetime
|
||||||
|
.with_day(1)
|
||||||
|
.unwrap()
|
||||||
|
.with_hour(0)
|
||||||
|
.unwrap()
|
||||||
|
.with_minute(0)
|
||||||
|
.unwrap()
|
||||||
|
.with_second(0)
|
||||||
|
.unwrap()
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
)?))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn collapse_data<F>(data: Vec<(u64, f32)>, f: F) -> Vec<(u64, f32)>
|
||||||
|
where
|
||||||
|
F: Fn(NaiveDateTime) -> NaiveDateTime,
|
||||||
|
{
|
||||||
|
let mut frames: HashMap<NaiveDateTime, Vec<f32>> = HashMap::new();
|
||||||
|
|
||||||
|
for (timestamp, watts) in data {
|
||||||
|
let date_time = f(NaiveDateTime::from_timestamp_opt(timestamp as i64, 0).unwrap());
|
||||||
|
|
||||||
|
match frames.get_mut(&date_time) {
|
||||||
|
Some(entries) => entries.push(watts),
|
||||||
|
None => {
|
||||||
|
frames.insert(date_time, vec![watts]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut v: Vec<(u64, f32)> = frames
|
||||||
|
.into_iter()
|
||||||
|
.map(|(date_time, entries)| {
|
||||||
|
let length = entries.len();
|
||||||
|
let sum: f32 = entries.into_iter().sum();
|
||||||
|
|
||||||
|
(date_time.timestamp() as u64, sum / length as f32)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
v.sort_by_key(|(timestamp, _)| *timestamp);
|
||||||
|
|
||||||
|
v
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use actix_web::{http::header::ContentType, test, App};
|
use actix_web::{http::header::ContentType, test, App};
|
||||||
|
|
Loading…
Reference in a new issue