HomeServer/src/task_scheduler.rs

127 lines
3.2 KiB
Rust

use anyhow::Result;
use futures::future::Shared;
use futures::FutureExt;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::thread;
use std::{
sync::Mutex,
time::{Duration, SystemTime},
};
enum Callback {
Looping(Shared<Pin<Box<dyn Future<Output = ()> + Unpin + Send + 'static>>>),
Once(Pin<Box<dyn Future<Output = ()> + Unpin + Send + 'static>>),
}
pub struct Task {
creation_time: SystemTime,
time: Duration,
callback: Callback,
}
impl Task {
pub fn looping<F>(interval: Duration, f: F) -> Self
where
F: Future<Output = ()> + Unpin + Send + 'static,
{
let c: Pin<Box<dyn Future<Output = ()> + Unpin + Send + 'static>> = Box::pin(f);
Self {
creation_time: SystemTime::now(),
time: interval,
callback: Callback::Looping(c.shared()),
}
}
fn recreate(
start: SystemTime,
interval: Duration,
f: Shared<Pin<Box<dyn Future<Output = ()> + Unpin + Send + 'static>>>,
) -> Self {
Self {
creation_time: start,
time: interval,
callback: Callback::Looping(f),
}
}
pub fn one_shot<F>(time: Duration, f: F) -> Self
where
F: Future<Output = ()> + Unpin + Send + 'static,
{
Self {
creation_time: SystemTime::now(),
time,
callback: Callback::Once(Box::pin(f)),
}
}
fn execution_time(&self) -> SystemTime {
self.creation_time + self.time
}
}
#[derive(Default, Clone)]
pub struct Scheduler {
tasks: Arc<Mutex<VecDeque<Task>>>,
}
impl Scheduler {
pub fn add_task(&self, new_task: Task) {
let mut task_lock = self.tasks.lock().unwrap();
let pos = task_lock
.binary_search_by_key(&new_task.execution_time(), |task| task.execution_time())
.unwrap_or_else(|e| e);
task_lock.insert(pos, new_task);
}
pub fn run(self) -> impl Future<Output = Result<()>> {
async move {
loop {
// exec first if time is up
while let Some(first) = self.check_first() {
let execution_time = first.execution_time();
match first.callback {
Callback::Looping(callback) => {
let callback_clone = callback.clone();
tokio::spawn(callback_clone);
self.add_task(Task::recreate(execution_time, first.time, callback));
}
Callback::Once(callback) => {
tokio::spawn(callback);
}
}
}
thread::sleep(Duration::from_millis(500));
}
}
}
pub fn check_first(&self) -> Option<Task> {
let mut task_lock = self.tasks.lock().unwrap();
// get first element
if let Some(first) = task_lock.front() {
// check if execution time is reached
if first.execution_time() < SystemTime::now() {
return task_lock.pop_front();
}
}
return None;
}
}