diff --git a/src/raw_stream.rs b/src/raw_stream.rs index d68be58..acd2793 100644 --- a/src/raw_stream.rs +++ b/src/raw_stream.rs @@ -492,6 +492,12 @@ impl RawDevice { .map(|_| ()) .map_err(nix_err) } + + #[cfg(feature = "tokio")] + #[inline] + pub fn into_event_stream(self) -> io::Result { + EventStream::new(self) + } } impl AsRawFd for RawDevice { @@ -511,3 +517,96 @@ fn vec_spare_capacity_mut(v: &mut Vec) -> &mut [mem::MaybeUninit] { ) } } + +#[cfg(feature = "tokio")] +mod tokio_stream { + use super::*; + + use tokio_1 as tokio; + + use futures_core::{ready, Stream}; + use std::pin::Pin; + use std::task::{Context, Poll}; + use tokio::io::unix::AsyncFd; + + /// An asynchronous stream of input events. + /// + /// This can be used by calling [`stream.next_event().await?`](Self::next_event), or if you + /// need to pass it as a stream somewhere, the [`futures::Stream`](Stream) implementation. + /// There's also a lower-level [`poll_event`] function if you need to fetch an event from + /// inside a `Future::poll` impl. + pub struct EventStream { + device: AsyncFd, + index: usize, + } + impl Unpin for EventStream {} + + impl EventStream { + pub(crate) fn new(device: RawDevice) -> io::Result { + use nix::fcntl; + fcntl::fcntl(device.as_raw_fd(), fcntl::F_SETFL(fcntl::OFlag::O_NONBLOCK)) + .map_err(nix_err)?; + let device = AsyncFd::new(device)?; + Ok(Self { device, index: 0 }) + } + + /// Returns a reference to the underlying device + pub fn device(&self) -> &RawDevice { + self.device.get_ref() + } + + /// Try to wait for the next event in this stream. Any errors are likely to be fatal, i.e. + /// any calls afterwards will likely error as well. + pub async fn next_event(&mut self) -> io::Result { + poll_fn(|cx| self.poll_event(cx)).await + } + + /// A lower-level function for directly polling this stream. + pub fn poll_event(&mut self, cx: &mut Context<'_>) -> Poll> { + 'outer: loop { + if let Some(&ev) = self.device.get_ref().event_buf.get(self.index) { + self.index += 1; + return Poll::Ready(Ok(InputEvent(ev))); + } + + self.device.get_mut().event_buf.clear(); + + loop { + let mut guard = ready!(self.device.poll_read_ready_mut(cx))?; + + let res = guard.try_io(|device| device.get_mut().fill_events()); + match res { + Ok(res) => { + let _ = res?; + continue 'outer; + } + Err(_would_block) => continue, + } + } + } + } + } + + impl Stream for EventStream { + type Item = io::Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().poll_event(cx).map(Some) + } + } + + // version of futures_util::future::poll_fn + pub(crate) fn poll_fn) -> Poll + Unpin>(f: F) -> PollFn { + PollFn(f) + } + pub(crate) struct PollFn(F); + impl) -> Poll + Unpin> std::future::Future for PollFn { + type Output = T; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + (self.get_mut().0)(cx) + } + } +} +#[cfg(feature = "tokio")] +pub(crate) use tokio_stream::poll_fn; +#[cfg(feature = "tokio")] +pub use tokio_stream::EventStream; diff --git a/src/sync_stream.rs b/src/sync_stream.rs index 80e95d4..bbc3eae 100644 --- a/src/sync_stream.rs +++ b/src/sync_stream.rs @@ -252,12 +252,7 @@ impl Device { Ok(()) } - /// Fetches and returns events from the kernel ring buffer, doing synchronization on SYN_DROPPED. - /// - /// By default this will block until events are available. Typically, users will want to call - /// this in a tight loop within a thread. - /// Will insert "fake" events. - pub fn fetch_events(&mut self) -> io::Result> { + fn fetch_events_inner(&mut self) -> io::Result> { let block_dropped = std::mem::take(&mut self.block_dropped); let sync = if block_dropped { self.prev_state.clone_from(&self.state); @@ -273,6 +268,17 @@ impl Device { self.raw.fill_events()?; + Ok(sync) + } + + /// Fetches and returns events from the kernel ring buffer, doing synchronization on SYN_DROPPED. + /// + /// By default this will block until events are available. Typically, users will want to call + /// this in a tight loop within a thread. + /// Will insert "fake" events. + pub fn fetch_events(&mut self) -> io::Result> { + let sync = self.fetch_events_inner()?; + Ok(FetchEventsSynced { dev: self, range: 0..0, @@ -634,8 +640,8 @@ mod tokio_stream { use tokio_1 as tokio; use crate::nix_err; + use crate::raw_stream::poll_fn; use futures_core::{ready, Stream}; - use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::unix::AsyncFd; @@ -648,7 +654,9 @@ mod tokio_stream { /// inside a `Future::poll` impl. pub struct EventStream { device: AsyncFd, - events: VecDeque, + event_range: std::ops::Range, + consumed_to: usize, + sync: Option, } impl Unpin for EventStream {} @@ -660,7 +668,9 @@ mod tokio_stream { let device = AsyncFd::new(device)?; Ok(Self { device, - events: VecDeque::new(), + event_range: 0..0, + consumed_to: 0, + sync: None, }) } @@ -677,29 +687,41 @@ mod tokio_stream { /// A lower-level function for directly polling this stream. pub fn poll_event(&mut self, cx: &mut Context<'_>) -> Poll> { - let Self { device, events } = self; - - if let Some(ev) = events.pop_front() { - return Poll::Ready(Ok(ev)); - } - - loop { - let mut guard = ready!(device.poll_read_ready_mut(cx))?; - - let res = guard.try_io(|device| { - events.extend(device.get_mut().fetch_events()?); - Ok(()) - }); + 'outer: loop { + let dev = self.device.get_mut(); + if let Some(ev) = compensate_events(&mut self.sync, dev) { + return Poll::Ready(Ok(ev)); + } + let state = &mut dev.state; + let (res, consumed_to) = + sync_events(&mut self.event_range, &dev.raw.event_buf, |ev| { + state.process_event(ev) + }); + if let Some(end) = consumed_to { + self.consumed_to = end + } match res { - Ok(res) => { - let () = res?; - let ret = match events.pop_front() { - Some(ev) => Poll::Ready(Ok(ev)), - None => Poll::Pending, - }; - return ret; + Ok(ev) => return Poll::Ready(Ok(InputEvent(ev))), + Err(requires_sync) => { + if requires_sync { + dev.block_dropped = true; + } + } + } + dev.raw.event_buf.drain(..self.consumed_to); + self.consumed_to = 0; + + loop { + let mut guard = ready!(self.device.poll_read_ready_mut(cx))?; + + let res = guard.try_io(|device| device.get_mut().fetch_events_inner()); + match res { + Ok(res) => { + self.sync = res?; + continue 'outer; + } + Err(_would_block) => continue, } - Err(_would_block) => continue, } } } @@ -711,18 +733,6 @@ mod tokio_stream { self.get_mut().poll_event(cx).map(Some) } } - - // version of futures_util::future::poll_fn - fn poll_fn) -> Poll + Unpin>(f: F) -> PollFn { - PollFn(f) - } - struct PollFn(F); - impl) -> Poll + Unpin> std::future::Future for PollFn { - type Output = T; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - (self.get_mut().0)(cx) - } - } } #[cfg(feature = "tokio")] pub use tokio_stream::EventStream;