engine/Networking/src/state.rs
2024-08-23 13:22:09 +02:00

565 lines
17 KiB
Rust

use crate::{filter_by_content, prelude::*, pseudo_tcp_message::MessageState};
use config_handler::prelude::*;
use std::{
collections::HashMap,
net::SocketAddr,
path::Path,
str::FromStr,
time::{Duration, Instant},
};
use anyhow::Result;
pub const MESSAGE_STR: &str = "Message";
pub const ACKNOWLEDGE_STR: &str = "Acknowledge";
pub const FINISHED_STR: &str = "Finished";
const START_STATE: &str = "Start";
pub struct StateMachineBuilder {
states: HashMap<String, State>,
transition_time_out: Duration,
}
impl std::fmt::Debug for StateMachineBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StateMachineBuilder")
.field("states", &self.states)
.field("transition_time_out", &self.transition_time_out)
.finish()
}
}
impl StateMachineBuilder {
fn find_state(&mut self, state_name: &str) -> Option<&mut State> {
self.states.get_mut(state_name)
}
pub fn add_state(mut self, state_name: &str) -> Self {
assert!(
self.states
.insert(state_name.to_string(), State::empty())
.is_none(),
"state {} already present",
state_name
);
self
}
pub fn add_transition(
mut self,
state_name: &str,
key_word: &str,
transition: Transition,
) -> Self {
self.find_state(state_name)
.expect(&format!("state {} could not be found", state_name))
.transitions
.insert(key_word.to_string(), transition);
self
}
pub fn load(mut self, file_name: impl AsRef<Path>) -> Result<Self> {
let config = ConfigHandler::read_config(file_name)?;
for (state_name, parameter) in config.into_iter() {
let (key_word, transition) = Transition::from_file(parameter)?;
match self.find_state(&state_name) {
Some(state) => {
state.transitions.insert(key_word, transition);
}
None => {
let mut state = State::empty();
state.transitions.insert(key_word, transition);
self.states.insert(state_name.to_string(), state);
}
}
}
Ok(self)
}
pub fn set_transition_time_out(mut self, time_out: Duration) -> Self {
self.transition_time_out = time_out;
self
}
pub fn build(self, addr: impl Into<SocketAddr>) -> StateMachine {
StateMachine {
addr: addr.into(),
states: self.states,
current: START_STATE.to_string(),
transition_time_out: self.transition_time_out,
}
}
}
pub struct StateMachine {
addr: SocketAddr,
states: HashMap<String, State>,
current: String,
transition_time_out: Duration,
}
impl std::fmt::Debug for StateMachine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StateMachine")
.field("add", &self.addr)
.field("states", &self.states)
.field("current_state", &self.current)
.field("transition_time_out", &self.transition_time_out)
.finish()
}
}
impl StateMachine {
pub fn builder() -> StateMachineBuilder {
let mut states = HashMap::new();
states.insert(START_STATE.to_string(), State::empty());
StateMachineBuilder {
states,
transition_time_out: Duration::from_secs(1),
}
}
pub fn current_state(&self) -> &str {
&self.current
}
fn state(&self, name: &str) -> &State {
self.states
.get(name)
.expect(&format!("current state {} not present", self.current))
}
pub fn possible_targets(&self) -> Vec<String> {
self.state(&self.current).possible_targets()
}
pub fn addr(&self) -> SocketAddr {
self.addr
}
// # Arguments
// * state_change - callback arguments = (old state, new state, sender)
pub fn receive<'a, T, F>(
&mut self,
sender: &mut T,
content: &mut Vec<SocketContent>,
state_change: F,
) -> Result<()>
where
T: Sender,
F: FnOnce(&str, &str, &mut T) -> Result<()>,
{
if let Some(new_state) = self.check_for_state_change(sender, sender.is_server(), content)? {
let old_state = self.current.clone();
{
let handle = self
.states
.get_mut(&new_state)
.expect(&format!("new state {} not present", new_state));
// set previous state, for roll back
handle.previous = Some(old_state.clone());
// reset transition
handle.current_transition = None;
// reset transitions
for transition in handle.transitions.values_mut() {
transition.transition_state = MessageState::None;
}
}
self.current = new_state;
state_change(&old_state, &self.current, sender)?;
}
Ok(())
}
fn check_for_state_change<T>(
&mut self,
sender: &mut T,
is_server: bool,
content: &mut Vec<SocketContent>,
) -> Result<Option<String>>
where
T: Sender,
{
let addr = self.addr;
let now = Instant::now();
let filtered = filter_by_content(content, |incoming_addr, msg| {
if addr != incoming_addr {
return false;
}
let (state_id, rest) = split_matches(msg, ':');
let (info, _) = split_matches(rest, ':');
if state_id != self.current {
let current_state = self
.states
.get(&self.current)
.expect(&format!("current state {} not present", self.current));
match &current_state.previous {
Some(previous) => {
if state_id != previous {
return false;
} else {
self.current = previous.clone();
}
}
None => return false,
}
}
// request state multiple times, because it could be a different one after roll back
let state = self
.states
.get(&self.current)
.expect(&format!("current state {} not present", self.current));
for key_word in state.key_words().iter() {
if info == key_word {
return true;
}
}
false
});
let current = self.current.clone();
let state = self
.states
.get_mut(&self.current)
.expect(&format!("current state {} not present", self.current));
let mut new_transition_state = None;
for c in filtered.into_iter() {
match c {
SocketContent::Message(_, msg) => {
let (state_id, rest) = split_matches(&msg, ':');
debug_assert_eq!(state_id, current);
let (key, transition_state) = split_matches(&rest, ':');
for (key_word, transition) in state.transitions.iter_mut() {
if key == key_word {
match transition_state {
MESSAGE_STR => {
if let MessageState::None | MessageState::Message =
transition.transition_state
{
new_transition_state = Some(key_word.clone());
transition.transition_state = MessageState::Message;
sender.send(
addr,
&Self::build_acknowledge(&current, key_word),
)?;
}
}
ACKNOWLEDGE_STR => {
if let MessageState::Message | MessageState::Acknowledge =
transition.transition_state
{
transition.transition_state = MessageState::Acknowledge;
sender.send(
addr,
&Self::build_finished(&current, key_word),
)?;
return Ok(Some(transition.target_state.clone()));
}
}
FINISHED_STR => {
if transition.transition_state == MessageState::Message {
return Ok(Some(transition.target_state.clone()));
}
}
_ => unreachable!(),
}
}
}
}
SocketContent::TimeOut(_) => unreachable!(),
SocketContent::NewConnection(_) => unreachable!(),
}
}
if let Some(new_transition_state) = new_transition_state {
if let Some(old_transition) = match &mut state.current_transition {
Some(transition_info) => {
let old = transition_info.current_transition.clone();
transition_info.current_transition = new_transition_state;
transition_info.last_action = now;
Some(old)
}
None => {
state.current_transition = Some(TransitionInfo {
current_transition: new_transition_state,
last_action: now,
});
None
}
} {
state.transition(&old_transition).transition_state = MessageState::None;
}
}
if let Some(transition_info) = state.current_transition.clone() {
// if time out is reached, resend last info
if (now - transition_info.last_action) > self.transition_time_out {
let transition = state.transition(&transition_info.current_transition);
let initiator = transition.initiator;
match transition.transition_state {
MessageState::None => (),
MessageState::Message => {
let answer = Self::build_acknowledge(
&self.current,
&transition_info.current_transition,
);
let message =
Self::build_message(&self.current, &transition_info.current_transition);
match (initiator, is_server) {
(Initiator::Client, false) => {
sender.send(self.addr, &message)?;
}
(Initiator::Server, true) => {
sender.send(self.addr, &message)?;
}
(Initiator::Client, true) => {
sender.send(self.addr, &answer)?;
}
(Initiator::Server, false) => {
sender.send(self.addr, &answer)?;
}
}
}
MessageState::Acknowledge => {
let answer = Self::build_finished(
&self.current,
&transition_info.current_transition,
);
match initiator {
Initiator::Client => {
sender.send(self.addr, &answer)?;
}
Initiator::Server => {
sender.send(self.addr, &answer)?;
}
}
}
MessageState::Finished => {
unreachable!()
}
}
}
}
Ok(None)
}
pub fn trigger_state_change<'a, T: Sender>(
&mut self,
target_state: &str,
sender: &mut T,
) -> Result<()> {
let addr = self.addr;
let now = Instant::now();
let state = self
.states
.get_mut(&self.current)
.expect(&format!("current state {} not present", self.current));
if let Some((key_word, transaction)) = state.transaction_by_target(target_state) {
let message = Self::build_message(&self.current, key_word);
transaction.transition_state = MessageState::Message;
sender.send(addr, &message)?;
state.current_transition = Some(TransitionInfo {
current_transition: key_word.clone(),
last_action: now,
});
}
Ok(())
}
#[inline]
fn build_request(current: &str, key_word: &str, action: &str) -> String {
format!("{}:{}:{}", current, key_word, action)
}
#[inline]
fn build_message(current: &str, key_word: &str) -> String {
Self::build_request(current, key_word, MESSAGE_STR)
}
#[inline]
fn build_acknowledge(current: &str, key_word: &str) -> String {
Self::build_request(current, key_word, ACKNOWLEDGE_STR)
}
#[inline]
fn build_finished(current: &str, key_word: &str) -> String {
Self::build_request(current, key_word, FINISHED_STR)
}
}
#[derive(Clone, Debug)]
struct TransitionInfo {
current_transition: String,
last_action: Instant,
}
#[derive(Debug)]
struct State {
// in case that the other side did not receive the acknowledge
// we can always go back
previous: Option<String>,
transitions: HashMap<String, Transition>,
current_transition: Option<TransitionInfo>,
}
impl State {
fn empty() -> Self {
Self {
previous: None,
transitions: HashMap::new(),
current_transition: None,
}
}
fn transition(&mut self, key_word: &str) -> &mut Transition {
self.transitions
.get_mut(key_word)
.expect(&format!("unknown key_word {}", key_word))
}
fn key_words(&self) -> Vec<String> {
self.transitions
.keys()
.map(|transition| transition.clone())
.collect()
}
fn possible_targets(&self) -> Vec<String> {
self.transitions
.values()
.map(|transition| transition.target_state.clone())
.collect()
}
fn transaction_by_target(&mut self, target: &str) -> Option<(&String, &mut Transition)> {
for (key_word, transition) in self.transitions.iter_mut() {
if transition.target_state == target {
return Some((key_word, transition));
}
}
None
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Initiator {
Server,
Client,
}
impl<T: Sender> PartialEq<T> for Initiator {
fn eq(&self, other: &T) -> bool {
other.is_server() && *self == Self::Server || !other.is_server() && *self == Self::Client
}
}
impl FromStr for Initiator {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self> {
match s {
"Server" => Ok(Self::Server),
"Client" => Ok(Self::Client),
_ => Err(anyhow::Error::msg(format!(
"Failed parsing Initiator from {}",
s
))),
}
}
}
#[derive(Debug)]
pub struct Transition {
initiator: Initiator,
transition_state: MessageState,
target_state: String,
}
impl Transition {
fn from_file(parameter: HashMap<String, Value>) -> Result<(String, Self)> {
let initiator = parameter
.get("initiator")
.ok_or(anyhow::Error::msg(
"Failed parsing Transition: \"initiator\" not found",
))?
.to_value()?;
let target: String = parameter
.get("target")
.ok_or(anyhow::Error::msg(
"Failed parsing Transition: \"target\" not found",
))?
.to_value()?;
let key_word: String = parameter
.get("key_word")
.ok_or(anyhow::Error::msg(
"Failed parsing Transition: \"key_word\" not found",
))?
.to_value()?;
Ok((key_word, Self::new(initiator, &target)))
}
pub fn new(initiator: Initiator, target_state: &str) -> Self {
Self {
initiator,
transition_state: MessageState::None,
target_state: target_state.to_string(),
}
}
}