use crate::{ filter_by_content, prelude::*, state::{ACKNOWLEDGE_STR, FINISHED_STR, MESSAGE_STR}, }; use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, mem::swap, net::SocketAddr, time::{Duration, Instant}, }; use anyhow::Result; const PREFIX: &str = "PSEUDOTCP"; const RESPONSE_TIME_OUT: Duration = Duration::from_millis(250); const FINISHED_TIME_OUT: Duration = Duration::from_secs(1); #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum MessageState { None, Message, Acknowledge, Finished, } pub struct PseudoTcpMessage { pay_load: String, hash: u64, state: MessageState, initiator: Initiator, receiver: SocketAddr, last_change: Instant, } impl PseudoTcpMessage { /// Creates a PseudoTcpMessage pub fn initiate( pay_load: impl Into, receiver: SocketAddr, sender: &mut T, ) -> Result where T: Sender, { let pay_load = pay_load.into(); let mut hasher = DefaultHasher::new(); pay_load.hash(&mut hasher); let hash = hasher.finish(); sender.send( receiver, &Self::build_message(MESSAGE_STR, hash, Some(&pay_load)), )?; Ok(Self { pay_load, hash, state: MessageState::Message, initiator: if sender.is_server() { Initiator::Server } else { Initiator::Client }, receiver, last_change: Instant::now(), }) } fn create_response( pay_load: &str, hash: u64, receiver: SocketAddr, sender: &mut T, state_str: &str, ) -> Result where T: Sender, { sender.send(receiver, &Self::build_message(state_str, hash, None))?; Ok(Self { pay_load: pay_load.to_string(), hash, state: MessageState::Acknowledge, initiator: if sender.is_server() { Initiator::Server } else { Initiator::Client }, receiver, last_change: Instant::now(), }) } pub fn pay_load(&self) -> &str { &self.pay_load } pub fn receiver(&self) -> SocketAddr { self.receiver } pub fn is_initiator<'a>(&self, is_server: bool) -> bool { is_server && self.initiator == Initiator::Server } pub fn filter_messages<'a, T, F>( sender: &mut T, content: &mut Vec, pseudo_tcp_messages: &mut Vec, mut on_new: F, ) -> Vec<(SocketAddr, String)> where F: FnMut(SocketAddr, &str) -> Result<()>, T: Sender, { let filtered = filter_by_content(content, |_, msg| { let (prefix, _) = split_matches(msg, ':'); if prefix == PREFIX { return true; } false }); let mut converted = filtered .into_iter() .map(|c| match c { SocketContent::Message(socket, msg) => (socket, msg), _ => unreachable!(), }) .collect(); Self::filter_for_new_ones(&mut converted, |socket, msg| { let (_, rest) = split_matches(&msg, ':'); let (state, rest) = split_matches(rest, ':'); let (hash_str, pay_load) = split_matches(rest, ':'); let hash = hash_str.parse()?; if pseudo_tcp_messages .iter() .find(|message| *socket == message.receiver && hash == message.hash) .is_none() { match state { MESSAGE_STR => { on_new(*socket, pay_load)?; pseudo_tcp_messages.push(Self::create_response( pay_load, hash, *socket, sender, ACKNOWLEDGE_STR, )?); return Ok(true); } ACKNOWLEDGE_STR => { on_new(*socket, pay_load)?; pseudo_tcp_messages.push(Self::create_response( pay_load, hash, *socket, sender, FINISHED_STR, )?); return Ok(true); } _ => (), } } Ok(false) }); converted } pub fn check_messages( sender: &mut T, pseudo_tcp_messages: &mut Vec, unhandled: &mut Vec<(SocketAddr, String)>, ) -> Result<()> where T: Sender, { let mut indices_to_remove = Vec::new(); for (index, pseudo_tcp_message) in pseudo_tcp_messages.iter_mut().enumerate() { if pseudo_tcp_message.check(unhandled, sender)? { indices_to_remove.push(index); } } for i in (0..pseudo_tcp_messages.len()).rev() { if indices_to_remove.contains(&i) { pseudo_tcp_messages.remove(i); } } Ok(()) } /// Returns `true` if this message has been finished fn check<'a, T>( &mut self, messages: &mut Vec<(SocketAddr, String)>, sender: &mut T, ) -> Result where T: Sender, { let now = Instant::now(); // get all messages regarding me (matching hash) let filtered = self.filter_for_self(messages); for msg in filtered.into_iter() { let (_, rest) = split_matches(&msg, ':'); let (state, _) = split_matches(rest, ':'); self.last_change = now; // respond accordingly to the received message match state { MESSAGE_STR => { self.send_answer(ACKNOWLEDGE_STR, sender)?; } ACKNOWLEDGE_STR => { self.state = MessageState::Finished; self.send_answer(FINISHED_STR, sender)?; } FINISHED_STR => { self.state = MessageState::Finished; } _ => unreachable!(), } } let time_difference = now - self.last_change; // check if a timeout is reached and resend the message if needed match self.state { MessageState::Message => { if time_difference > RESPONSE_TIME_OUT { if self.initiator == *sender { self.send_initial_message(sender)?; // update last change to not resend this message every frame // and give the other side a bit of time to answer self.last_change = now; } } } MessageState::Acknowledge => { if time_difference > RESPONSE_TIME_OUT { if self.initiator != *sender { self.send_answer(ACKNOWLEDGE_STR, sender)?; // update last change to not resend this message every frame // and give the other side a bit of time to answer self.last_change = now; } } } MessageState::Finished => { if time_difference > FINISHED_TIME_OUT { return Ok(true); } } _ => unreachable!(), } Ok(false) } /// Filters all messages regarding this message (matching the hash of this message) /// out the pool of messages fn filter_for_self(&self, messages: &mut Vec<(SocketAddr, String)>) -> Vec { let mut v = Vec::new(); swap(messages, &mut v); let (filtered, others): (Vec<(SocketAddr, String)>, Vec<(SocketAddr, String)>) = v.into_iter().partition(|(addr, msg)| { if *addr != self.receiver { false } else { let (prefix, rest) = split_matches(msg, ':'); let (_, rest) = split_matches(rest, ':'); let (hash_str, _) = split_matches(rest, ':'); let hash: u64 = hash_str.parse().unwrap(); // check that the message is a pseudo tcp message // and the hash matches with this message prefix == PREFIX && hash == self.hash } }); *messages = others; filtered.into_iter().map(|(_, s)| s).collect() } fn filter_for_new_ones( messages: &mut Vec<(SocketAddr, String)>, mut filter: F, ) -> Vec<(SocketAddr, String)> where F: FnMut(&SocketAddr, &String) -> Result, { let mut v = Vec::new(); swap(messages, &mut v); let (filtered, others): (Vec<(SocketAddr, String)>, Vec<(SocketAddr, String)>) = v .into_iter() .partition(|(addr, msg)| filter(addr, msg).unwrap()); *messages = others; filtered } /// Sends an answer (Acknowledge or Finished) fn send_answer(&mut self, state_str: &str, sender: &mut T) -> Result<()> where T: Sender, { debug_assert!(state_str == ACKNOWLEDGE_STR || state_str == FINISHED_STR); sender.send( self.receiver, &Self::build_message(state_str, self.hash, None), )?; Ok(()) } /// Sends initial message with message information fn send_initial_message(&mut self, sender: &mut T) -> Result<()> where T: Sender, { sender.send( self.receiver, &Self::build_message(MESSAGE_STR, self.hash, Some(&self.pay_load)), )?; Ok(()) } /// Builds the network message out of the given information fn build_message(state_str: &str, hash: u64, pay_load: Option<&str>) -> String { match pay_load { Some(pay_load) => { format!("{}:{}:{}:{}", PREFIX, state_str, hash, pay_load) } None => { format!("{}:{}:{}", PREFIX, state_str, hash) } } } }