use crate::{filter_by_content, prelude::*, pseudo_tcp_message::MessageState}; use config_handler::prelude::*; use std::{ collections::HashMap, net::SocketAddr, path::Path, str::FromStr, time::{Duration, Instant}, }; use anyhow::Result; pub const MESSAGE_STR: &str = "Message"; pub const ACKNOWLEDGE_STR: &str = "Acknowledge"; pub const FINISHED_STR: &str = "Finished"; const START_STATE: &str = "Start"; pub struct StateMachineBuilder { states: HashMap, transition_time_out: Duration, } impl std::fmt::Debug for StateMachineBuilder { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("StateMachineBuilder") .field("states", &self.states) .field("transition_time_out", &self.transition_time_out) .finish() } } impl StateMachineBuilder { fn find_state(&mut self, state_name: &str) -> Option<&mut State> { self.states.get_mut(state_name) } pub fn add_state(mut self, state_name: &str) -> Self { assert!( self.states .insert(state_name.to_string(), State::empty()) .is_none(), "state {} already present", state_name ); self } pub fn add_transition( mut self, state_name: &str, key_word: &str, transition: Transition, ) -> Self { self.find_state(state_name) .expect(&format!("state {} could not be found", state_name)) .transitions .insert(key_word.to_string(), transition); self } pub fn load(mut self, file_name: impl AsRef) -> Result { let config = ConfigHandler::read_config(file_name)?; for (state_name, parameter) in config.into_iter() { let (key_word, transition) = Transition::from_file(parameter)?; match self.find_state(&state_name) { Some(state) => { state.transitions.insert(key_word, transition); } None => { let mut state = State::empty(); state.transitions.insert(key_word, transition); self.states.insert(state_name.to_string(), state); } } } Ok(self) } pub fn set_transition_time_out(mut self, time_out: Duration) -> Self { self.transition_time_out = time_out; self } pub fn build(self, addr: impl Into) -> StateMachine { StateMachine { addr: addr.into(), states: self.states, current: START_STATE.to_string(), transition_time_out: self.transition_time_out, } } } pub struct StateMachine { addr: SocketAddr, states: HashMap, current: String, transition_time_out: Duration, } impl std::fmt::Debug for StateMachine { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("StateMachine") .field("add", &self.addr) .field("states", &self.states) .field("current_state", &self.current) .field("transition_time_out", &self.transition_time_out) .finish() } } impl StateMachine { pub fn builder() -> StateMachineBuilder { let mut states = HashMap::new(); states.insert(START_STATE.to_string(), State::empty()); StateMachineBuilder { states, transition_time_out: Duration::from_secs(1), } } pub fn current_state(&self) -> &str { &self.current } fn state(&self, name: &str) -> &State { self.states .get(name) .expect(&format!("current state {} not present", self.current)) } pub fn possible_targets(&self) -> Vec { self.state(&self.current).possible_targets() } pub fn addr(&self) -> SocketAddr { self.addr } // # Arguments // * state_change - callback arguments = (old state, new state, sender) pub fn receive<'a, T, F>( &mut self, sender: &mut T, content: &mut Vec, state_change: F, ) -> Result<()> where T: Sender, F: FnOnce(&str, &str, &mut T) -> Result<()>, { if let Some(new_state) = self.check_for_state_change(sender, sender.is_server(), content)? { let old_state = self.current.clone(); { let handle = self .states .get_mut(&new_state) .expect(&format!("new state {} not present", new_state)); // set previous state, for roll back handle.previous = Some(old_state.clone()); // reset transition handle.current_transition = None; // reset transitions for transition in handle.transitions.values_mut() { transition.transition_state = MessageState::None; } } self.current = new_state; state_change(&old_state, &self.current, sender)?; } Ok(()) } fn check_for_state_change( &mut self, sender: &mut T, is_server: bool, content: &mut Vec, ) -> Result> where T: Sender, { let addr = self.addr; let now = Instant::now(); let filtered = filter_by_content(content, |incoming_addr, msg| { if addr != incoming_addr { return false; } let (state_id, rest) = split_matches(msg, ':'); let (info, _) = split_matches(rest, ':'); if state_id != self.current { let current_state = self .states .get(&self.current) .expect(&format!("current state {} not present", self.current)); match ¤t_state.previous { Some(previous) => { if state_id != previous { return false; } else { self.current = previous.clone(); } } None => return false, } } // request state multiple times, because it could be a different one after roll back let state = self .states .get(&self.current) .expect(&format!("current state {} not present", self.current)); for key_word in state.key_words().iter() { if info == key_word { return true; } } false }); let current = self.current.clone(); let state = self .states .get_mut(&self.current) .expect(&format!("current state {} not present", self.current)); let mut new_transition_state = None; for c in filtered.into_iter() { match c { SocketContent::Message(_, msg) => { let (state_id, rest) = split_matches(&msg, ':'); debug_assert_eq!(state_id, current); let (key, transition_state) = split_matches(&rest, ':'); for (key_word, transition) in state.transitions.iter_mut() { if key == key_word { match transition_state { MESSAGE_STR => { if let MessageState::None | MessageState::Message = transition.transition_state { new_transition_state = Some(key_word.clone()); transition.transition_state = MessageState::Message; sender.send( addr, &Self::build_acknowledge(¤t, key_word), )?; } } ACKNOWLEDGE_STR => { if let MessageState::Message | MessageState::Acknowledge = transition.transition_state { transition.transition_state = MessageState::Acknowledge; sender.send( addr, &Self::build_finished(¤t, key_word), )?; return Ok(Some(transition.target_state.clone())); } } FINISHED_STR => { if transition.transition_state == MessageState::Message { return Ok(Some(transition.target_state.clone())); } } _ => unreachable!(), } } } } SocketContent::TimeOut(_) => unreachable!(), SocketContent::NewConnection(_) => unreachable!(), } } if let Some(new_transition_state) = new_transition_state { if let Some(old_transition) = match &mut state.current_transition { Some(transition_info) => { let old = transition_info.current_transition.clone(); transition_info.current_transition = new_transition_state; transition_info.last_action = now; Some(old) } None => { state.current_transition = Some(TransitionInfo { current_transition: new_transition_state, last_action: now, }); None } } { state.transition(&old_transition).transition_state = MessageState::None; } } if let Some(transition_info) = state.current_transition.clone() { // if time out is reached, resend last info if (now - transition_info.last_action) > self.transition_time_out { let transition = state.transition(&transition_info.current_transition); let initiator = transition.initiator; match transition.transition_state { MessageState::None => (), MessageState::Message => { let answer = Self::build_acknowledge( &self.current, &transition_info.current_transition, ); let message = Self::build_message(&self.current, &transition_info.current_transition); match (initiator, is_server) { (Initiator::Client, false) => { sender.send(self.addr, &message)?; } (Initiator::Server, true) => { sender.send(self.addr, &message)?; } (Initiator::Client, true) => { sender.send(self.addr, &answer)?; } (Initiator::Server, false) => { sender.send(self.addr, &answer)?; } } } MessageState::Acknowledge => { let answer = Self::build_finished( &self.current, &transition_info.current_transition, ); match initiator { Initiator::Client => { sender.send(self.addr, &answer)?; } Initiator::Server => { sender.send(self.addr, &answer)?; } } } MessageState::Finished => { unreachable!() } } } } Ok(None) } pub fn trigger_state_change<'a, T: Sender>( &mut self, target_state: &str, sender: &mut T, ) -> Result<()> { let addr = self.addr; let now = Instant::now(); let state = self .states .get_mut(&self.current) .expect(&format!("current state {} not present", self.current)); if let Some((key_word, transaction)) = state.transaction_by_target(target_state) { let message = Self::build_message(&self.current, key_word); transaction.transition_state = MessageState::Message; sender.send(addr, &message)?; state.current_transition = Some(TransitionInfo { current_transition: key_word.clone(), last_action: now, }); } Ok(()) } #[inline] fn build_request(current: &str, key_word: &str, action: &str) -> String { format!("{}:{}:{}", current, key_word, action) } #[inline] fn build_message(current: &str, key_word: &str) -> String { Self::build_request(current, key_word, MESSAGE_STR) } #[inline] fn build_acknowledge(current: &str, key_word: &str) -> String { Self::build_request(current, key_word, ACKNOWLEDGE_STR) } #[inline] fn build_finished(current: &str, key_word: &str) -> String { Self::build_request(current, key_word, FINISHED_STR) } } #[derive(Clone, Debug)] struct TransitionInfo { current_transition: String, last_action: Instant, } #[derive(Debug)] struct State { // in case that the other side did not receive the acknowledge // we can always go back previous: Option, transitions: HashMap, current_transition: Option, } impl State { fn empty() -> Self { Self { previous: None, transitions: HashMap::new(), current_transition: None, } } fn transition(&mut self, key_word: &str) -> &mut Transition { self.transitions .get_mut(key_word) .expect(&format!("unknown key_word {}", key_word)) } fn key_words(&self) -> Vec { self.transitions .keys() .map(|transition| transition.clone()) .collect() } fn possible_targets(&self) -> Vec { self.transitions .values() .map(|transition| transition.target_state.clone()) .collect() } fn transaction_by_target(&mut self, target: &str) -> Option<(&String, &mut Transition)> { for (key_word, transition) in self.transitions.iter_mut() { if transition.target_state == target { return Some((key_word, transition)); } } None } } #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Initiator { Server, Client, } impl PartialEq for Initiator { fn eq(&self, other: &T) -> bool { other.is_server() && *self == Self::Server || !other.is_server() && *self == Self::Client } } impl FromStr for Initiator { type Err = anyhow::Error; fn from_str(s: &str) -> Result { match s { "Server" => Ok(Self::Server), "Client" => Ok(Self::Client), _ => Err(anyhow::Error::msg(format!( "Failed parsing Initiator from {}", s ))), } } } #[derive(Debug)] pub struct Transition { initiator: Initiator, transition_state: MessageState, target_state: String, } impl Transition { fn from_file(parameter: HashMap) -> Result<(String, Self)> { let initiator = parameter .get("initiator") .ok_or(anyhow::Error::msg( "Failed parsing Transition: \"initiator\" not found", ))? .to_value()?; let target: String = parameter .get("target") .ok_or(anyhow::Error::msg( "Failed parsing Transition: \"target\" not found", ))? .to_value()?; let key_word: String = parameter .get("key_word") .ok_or(anyhow::Error::msg( "Failed parsing Transition: \"key_word\" not found", ))? .to_value()?; Ok((key_word, Self::new(initiator, &target))) } pub fn new(initiator: Initiator, target_state: &str) -> Self { Self { initiator, transition_state: MessageState::None, target_state: target_state.to_string(), } } }