EventStream improvements (#40)

* Add a non-syncing async stream

* Optimize sync_stream::EventStream
This commit is contained in:
Noah 2021-03-19 17:56:39 -05:00 committed by GitHub
parent 6c1add8b73
commit 0d964c60c6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 151 additions and 42 deletions

View file

@ -492,6 +492,12 @@ impl RawDevice {
.map(|_| ()) .map(|_| ())
.map_err(nix_err) .map_err(nix_err)
} }
#[cfg(feature = "tokio")]
#[inline]
pub fn into_event_stream(self) -> io::Result<EventStream> {
EventStream::new(self)
}
} }
impl AsRawFd for RawDevice { impl AsRawFd for RawDevice {
@ -511,3 +517,96 @@ fn vec_spare_capacity_mut<T>(v: &mut Vec<T>) -> &mut [mem::MaybeUninit<T>] {
) )
} }
} }
#[cfg(feature = "tokio")]
mod tokio_stream {
use super::*;
use tokio_1 as tokio;
use futures_core::{ready, Stream};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::unix::AsyncFd;
/// An asynchronous stream of input events.
///
/// This can be used by calling [`stream.next_event().await?`](Self::next_event), or if you
/// need to pass it as a stream somewhere, the [`futures::Stream`](Stream) implementation.
/// There's also a lower-level [`poll_event`] function if you need to fetch an event from
/// inside a `Future::poll` impl.
pub struct EventStream {
device: AsyncFd<RawDevice>,
index: usize,
}
impl Unpin for EventStream {}
impl EventStream {
pub(crate) fn new(device: RawDevice) -> io::Result<Self> {
use nix::fcntl;
fcntl::fcntl(device.as_raw_fd(), fcntl::F_SETFL(fcntl::OFlag::O_NONBLOCK))
.map_err(nix_err)?;
let device = AsyncFd::new(device)?;
Ok(Self { device, index: 0 })
}
/// Returns a reference to the underlying device
pub fn device(&self) -> &RawDevice {
self.device.get_ref()
}
/// Try to wait for the next event in this stream. Any errors are likely to be fatal, i.e.
/// any calls afterwards will likely error as well.
pub async fn next_event(&mut self) -> io::Result<InputEvent> {
poll_fn(|cx| self.poll_event(cx)).await
}
/// A lower-level function for directly polling this stream.
pub fn poll_event(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<InputEvent>> {
'outer: loop {
if let Some(&ev) = self.device.get_ref().event_buf.get(self.index) {
self.index += 1;
return Poll::Ready(Ok(InputEvent(ev)));
}
self.device.get_mut().event_buf.clear();
loop {
let mut guard = ready!(self.device.poll_read_ready_mut(cx))?;
let res = guard.try_io(|device| device.get_mut().fill_events());
match res {
Ok(res) => {
let _ = res?;
continue 'outer;
}
Err(_would_block) => continue,
}
}
}
}
}
impl Stream for EventStream {
type Item = io::Result<InputEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().poll_event(cx).map(Some)
}
}
// version of futures_util::future::poll_fn
pub(crate) fn poll_fn<T, F: FnMut(&mut Context<'_>) -> Poll<T> + Unpin>(f: F) -> PollFn<F> {
PollFn(f)
}
pub(crate) struct PollFn<F>(F);
impl<T, F: FnMut(&mut Context<'_>) -> Poll<T> + Unpin> std::future::Future for PollFn<F> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
(self.get_mut().0)(cx)
}
}
}
#[cfg(feature = "tokio")]
pub(crate) use tokio_stream::poll_fn;
#[cfg(feature = "tokio")]
pub use tokio_stream::EventStream;

View file

@ -252,12 +252,7 @@ impl Device {
Ok(()) Ok(())
} }
/// Fetches and returns events from the kernel ring buffer, doing synchronization on SYN_DROPPED. fn fetch_events_inner(&mut self) -> io::Result<Option<SyncState>> {
///
/// By default this will block until events are available. Typically, users will want to call
/// this in a tight loop within a thread.
/// Will insert "fake" events.
pub fn fetch_events(&mut self) -> io::Result<FetchEventsSynced<'_>> {
let block_dropped = std::mem::take(&mut self.block_dropped); let block_dropped = std::mem::take(&mut self.block_dropped);
let sync = if block_dropped { let sync = if block_dropped {
self.prev_state.clone_from(&self.state); self.prev_state.clone_from(&self.state);
@ -273,6 +268,17 @@ impl Device {
self.raw.fill_events()?; self.raw.fill_events()?;
Ok(sync)
}
/// Fetches and returns events from the kernel ring buffer, doing synchronization on SYN_DROPPED.
///
/// By default this will block until events are available. Typically, users will want to call
/// this in a tight loop within a thread.
/// Will insert "fake" events.
pub fn fetch_events(&mut self) -> io::Result<FetchEventsSynced<'_>> {
let sync = self.fetch_events_inner()?;
Ok(FetchEventsSynced { Ok(FetchEventsSynced {
dev: self, dev: self,
range: 0..0, range: 0..0,
@ -634,8 +640,8 @@ mod tokio_stream {
use tokio_1 as tokio; use tokio_1 as tokio;
use crate::nix_err; use crate::nix_err;
use crate::raw_stream::poll_fn;
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use std::collections::VecDeque;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::io::unix::AsyncFd; use tokio::io::unix::AsyncFd;
@ -648,7 +654,9 @@ mod tokio_stream {
/// inside a `Future::poll` impl. /// inside a `Future::poll` impl.
pub struct EventStream { pub struct EventStream {
device: AsyncFd<Device>, device: AsyncFd<Device>,
events: VecDeque<InputEvent>, event_range: std::ops::Range<usize>,
consumed_to: usize,
sync: Option<SyncState>,
} }
impl Unpin for EventStream {} impl Unpin for EventStream {}
@ -660,7 +668,9 @@ mod tokio_stream {
let device = AsyncFd::new(device)?; let device = AsyncFd::new(device)?;
Ok(Self { Ok(Self {
device, device,
events: VecDeque::new(), event_range: 0..0,
consumed_to: 0,
sync: None,
}) })
} }
@ -677,29 +687,41 @@ mod tokio_stream {
/// A lower-level function for directly polling this stream. /// A lower-level function for directly polling this stream.
pub fn poll_event(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<InputEvent>> { pub fn poll_event(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<InputEvent>> {
let Self { device, events } = self; 'outer: loop {
let dev = self.device.get_mut();
if let Some(ev) = events.pop_front() { if let Some(ev) = compensate_events(&mut self.sync, dev) {
return Poll::Ready(Ok(ev)); return Poll::Ready(Ok(ev));
} }
let state = &mut dev.state;
loop { let (res, consumed_to) =
let mut guard = ready!(device.poll_read_ready_mut(cx))?; sync_events(&mut self.event_range, &dev.raw.event_buf, |ev| {
state.process_event(ev)
let res = guard.try_io(|device| { });
events.extend(device.get_mut().fetch_events()?); if let Some(end) = consumed_to {
Ok(()) self.consumed_to = end
}); }
match res { match res {
Ok(res) => { Ok(ev) => return Poll::Ready(Ok(InputEvent(ev))),
let () = res?; Err(requires_sync) => {
let ret = match events.pop_front() { if requires_sync {
Some(ev) => Poll::Ready(Ok(ev)), dev.block_dropped = true;
None => Poll::Pending, }
}; }
return ret; }
dev.raw.event_buf.drain(..self.consumed_to);
self.consumed_to = 0;
loop {
let mut guard = ready!(self.device.poll_read_ready_mut(cx))?;
let res = guard.try_io(|device| device.get_mut().fetch_events_inner());
match res {
Ok(res) => {
self.sync = res?;
continue 'outer;
}
Err(_would_block) => continue,
} }
Err(_would_block) => continue,
} }
} }
} }
@ -711,18 +733,6 @@ mod tokio_stream {
self.get_mut().poll_event(cx).map(Some) self.get_mut().poll_event(cx).map(Some)
} }
} }
// version of futures_util::future::poll_fn
fn poll_fn<T, F: FnMut(&mut Context<'_>) -> Poll<T> + Unpin>(f: F) -> PollFn<F> {
PollFn(f)
}
struct PollFn<F>(F);
impl<T, F: FnMut(&mut Context<'_>) -> Poll<T> + Unpin> std::future::Future for PollFn<F> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
(self.get_mut().0)(cx)
}
}
} }
#[cfg(feature = "tokio")] #[cfg(feature = "tokio")]
pub use tokio_stream::EventStream; pub use tokio_stream::EventStream;