From cb2f5ef87acdf7bc9c7308f27f9f15c91c24b4a7 Mon Sep 17 00:00:00 2001 From: Jeff Hiner <37913568+jeff-hiner@users.noreply.github.com> Date: Mon, 1 Mar 2021 10:09:11 -0700 Subject: [PATCH] Separate out sane blocking and async (nonblocking) implementations (#33) * Default impl should block. Renaming it to avoid API confusion from previous users. * Can't get rid of libc::read because we have MaybeUninit, but we can clean up the match * Remove extra include * Switch everything over to io::Error * Add initial tokio impl+example * Move evtest_tokio to normal example * Add documentation and clarify * Use a VecDeque (ring buffer) instead of repeatedly popping things off the front of a Vec * Looks like we are not using thiserror anymore; removing * Store read buf between calls * Add nonblocking example with epoll Co-authored-by: Jeff Hiner Co-authored-by: Noah <33094578+coolreader18@users.noreply.github.com> --- Cargo.toml | 17 +- README.md | 6 +- examples/evtest.rs | 3 +- examples/evtest_nonblocking.rs | 92 ++++++++ examples/evtest_tokio.rs | 29 +++ src/lib.rs | 379 ++++++++++++++++++++++----------- src/tokio_stream.rs | 57 +++++ 7 files changed, 450 insertions(+), 133 deletions(-) create mode 100644 examples/evtest_nonblocking.rs create mode 100644 examples/evtest_tokio.rs create mode 100644 src/tokio_stream.rs diff --git a/Cargo.toml b/Cargo.toml index 22c8e04..e9b00c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "evdev" -version = "0.11.0-alpha.4" +version = "0.11.0-alpha.5" authors = ["Corey Richardson "] description = "evdev interface for Linux" license = "Apache-2.0 OR MIT" @@ -8,8 +8,21 @@ repository = "https://github.com/cmr/evdev" documentation = "https://docs.rs/evdev" edition = "2018" +[features] +tokio = ["tokio_1", "futures-core"] + [dependencies] libc = "0.2.22" bitvec = "0.21" nix = "0.19.0" -thiserror = "1.0.24" + +tokio_1 = { package = "tokio", version = "1.0", features = ["net"], optional = true } +futures-core = { version = "0.3", optional = true } + +[dev-dependencies] +tokio_1 = { package = "tokio", version = "1.0", features = ["macros", "rt-multi-thread"] } +futures-util = "0.3" + +[[example]] +name = "evtest_tokio" +required-features = ["tokio"] diff --git a/README.md b/README.md index 5883335..0c8b052 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,11 @@ What does this library support? =============================== This library exposes raw evdev events, but uses the Rust `Iterator` trait to -do so, and will handle `SYN_DROPPED` events properly for the client. I try to +do so. When processing events via `fetch_events`, the library will handle +`SYN_DROPPED` events by injecting fake state updates in an attempt to ensure +callers see state transition messages consistent with actual device state. When +processing via `*_no_sync` this correction is not done, and `SYN_DROPPED` messages +will appear if the kernel ring buffer is overrun before messages are read. I try to match [libevdev](https://www.freedesktop.org/software/libevdev/doc/latest/) closely, where possible. diff --git a/examples/evtest.rs b/examples/evtest.rs index f0914e5..6c4ae31 100644 --- a/examples/evtest.rs +++ b/examples/evtest.rs @@ -20,9 +20,8 @@ fn main() { println!("{}", d); println!("Events:"); loop { - for ev in d.events_no_sync().unwrap() { + for ev in d.fetch_events_no_sync().unwrap() { println!("{:?}", ev); } - d.wait_ready().unwrap(); } } diff --git a/examples/evtest_nonblocking.rs b/examples/evtest_nonblocking.rs new file mode 100644 index 0000000..22e4c54 --- /dev/null +++ b/examples/evtest_nonblocking.rs @@ -0,0 +1,92 @@ +//! This example demonstrates how to use the evdev crate with a nonblocking file descriptor. +//! +//! Note that for this implementation the caller is responsible for ensuring the underlying +//! Device file descriptor is set to O_NONBLOCK. The caller must also create the epoll descriptor, +//! bind it, check for EAGAIN returns from fetch_events_*, call epoll_wait as appropriate, and +//! clean up the epoll file descriptor when finished. + +use nix::{ + fcntl::{FcntlArg, OFlag}, + sys::epoll, +}; +use std::io::prelude::*; +use std::os::unix::io::{AsRawFd, RawFd}; + +fn main() -> Result<(), Box> { + let mut args = std::env::args_os(); + let mut d = if args.len() > 1 { + evdev::Device::open(&args.nth(1).unwrap()).unwrap() + } else { + let mut devices = evdev::enumerate().collect::>(); + for (i, d) in devices.iter().enumerate() { + println!("{}: {}", i, d.name().unwrap_or("Unnamed device")); + } + print!("Select the device [0-{}]: ", devices.len()); + let _ = std::io::stdout().flush(); + let mut chosen = String::new(); + std::io::stdin().read_line(&mut chosen).unwrap(); + devices.swap_remove(chosen.trim().parse::().unwrap()) + }; + println!("{}", d); + + let raw_fd = d.as_raw_fd(); + // Set nonblocking + nix::fcntl::fcntl(raw_fd, FcntlArg::F_SETFL(OFlag::O_NONBLOCK))?; + + // Create epoll handle and attach raw_fd + let epoll_fd = Epoll::new(epoll::epoll_create1( + epoll::EpollCreateFlags::EPOLL_CLOEXEC, + )?); + let mut event = epoll::EpollEvent::new(epoll::EpollFlags::EPOLLIN, 0); + epoll::epoll_ctl( + epoll_fd.as_raw_fd(), + epoll::EpollOp::EpollCtlAdd, + raw_fd, + Some(&mut event), + )?; + + // We don't care about these, but the kernel wants to fill them. + let mut events = [epoll::EpollEvent::empty(); 2]; + + println!("Events:"); + loop { + match d.fetch_events_no_sync() { + Ok(iterator) => { + for ev in iterator { + println!("{:?}", ev); + } + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // Wait forever for bytes available on raw_fd + epoll::epoll_wait(epoll_fd.as_raw_fd(), &mut events, -1)?; + } + Err(e) => { + eprintln!("{}", e); + break; + } + } + } + Ok(()) +} + +// The rest here is to ensure the epoll handle is cleaned up properly. +// You can also use the epoll crate, if you prefer. +struct Epoll(RawFd); + +impl Epoll { + pub(crate) fn new(fd: RawFd) -> Self { + Epoll(fd) + } +} + +impl AsRawFd for Epoll { + fn as_raw_fd(&self) -> RawFd { + self.0 + } +} + +impl Drop for Epoll { + fn drop(&mut self) { + let _ = nix::unistd::close(self.0); + } +} diff --git a/examples/evtest_tokio.rs b/examples/evtest_tokio.rs new file mode 100644 index 0000000..75d08b9 --- /dev/null +++ b/examples/evtest_tokio.rs @@ -0,0 +1,29 @@ +use tokio_1 as tokio; + +use futures_util::TryStreamExt; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut args = std::env::args_os(); + let d = if args.len() > 1 { + evdev::Device::open(&args.nth(1).unwrap())? + } else { + let mut devices = evdev::enumerate().collect::>(); + for (i, d) in devices.iter().enumerate() { + println!("{}: {}", i, d.name().unwrap_or("Unnamed device")); + } + print!("Select the device [0-{}]: ", devices.len()); + let _ = std::io::Write::flush(&mut std::io::stdout()); + let mut chosen = String::new(); + std::io::stdin().read_line(&mut chosen)?; + devices.swap_remove(chosen.trim().parse::()?) + }; + println!("{}", d); + println!("Events:"); + let mut events = d.into_event_stream_no_sync()?; + while let Some(ev) = events.try_next().await? { + println!("{:?}", ev); + } + println!("EOF!"); + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 1c0be94..3ef5ac9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,9 @@ //! It is recommended that you dedicate a thread to processing input events, or use epoll or an //! async runtime with the fd returned by `::as_raw_fd` to process events when //! they are ready. +//! +//! For demonstrations of how to use this library in blocking, nonblocking, and async (tokio) modes, +//! please reference the "examples" directory. #![cfg(any(unix, target_os = "android"))] #![allow(non_camel_case_types)] @@ -57,18 +60,19 @@ mod attribute_set; mod constants; -pub mod raw; +mod raw; mod scancodes; +#[cfg(feature = "tokio")] +mod tokio_stream; + use bitvec::prelude::*; +use std::collections::VecDeque; use std::fmt; -use std::fs::File; -use std::fs::OpenOptions; +use std::fs::{File, OpenOptions}; +use std::io; use std::mem; -use std::os::unix::{ - fs::OpenOptionsExt, - io::{AsRawFd, RawFd}, -}; +use std::os::unix::io::{AsRawFd, RawFd}; use std::path::Path; use std::time::{Duration, SystemTime}; use std::{ffi::CString, mem::MaybeUninit}; @@ -126,6 +130,7 @@ const fn bit_elts(bits: usize) -> usize { type KeyArray = [u8; bit_elts::(Key::COUNT)]; #[derive(Debug, Clone)] +/// A cached representation of device state at a certain time. pub struct DeviceState { /// The state corresponds to kernel state at this timestamp. timestamp: libc::timeval, @@ -139,24 +144,37 @@ pub struct DeviceState { } impl DeviceState { + /// Returns the time when this snapshot was taken. + pub fn timestamp(&self) -> SystemTime { + timeval_to_systime(&self.timestamp) + } + + /// Returns the set of keys pressed when the snapshot was taken. + /// + /// Returns `None` if keys are not supported by this device. pub fn key_vals(&self) -> Option> { self.key_vals .as_deref() .map(|v| AttributeSet::new(BitSlice::from_slice(v).unwrap())) } - pub fn timestamp(&self) -> SystemTime { - timeval_to_systime(&self.timestamp) - } - + /// Returns the set of absolute axis measurements when the snapshot was taken. + /// + /// Returns `None` if not supported by this device. pub fn abs_vals(&self) -> Option<&[libc::input_absinfo]> { self.abs_vals.as_deref().map(|v| &v[..]) } + /// Returns the set of switches triggered when the snapshot was taken. + /// + /// Returns `None` if switches are not supported by this device. pub fn switch_vals(&self) -> Option> { self.switch_vals.as_deref().map(AttributeSet::new) } + /// Returns the set of LEDs turned on when the snapshot was taken. + /// + /// Returns `None` if LEDs are not supported by this device. pub fn led_vals(&self) -> Option> { self.led_vals.as_deref().map(AttributeSet::new) } @@ -177,16 +195,12 @@ impl Default for DeviceState { } } -/// Publicly visible errors which can be returned from evdev -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("libc/system error: {0}")] - NixError(#[from] nix::Error), - #[error("standard i/o error: {0}")] - StdIoError(#[from] std::io::Error), -} - #[derive(Debug)] +/// A physical or virtual device supported by evdev. +/// +/// Each device corresponds to a path typically found in `/dev/input`, and supports access via +/// one or more "types". For example, an optical mouse has buttons that are represented by "keys", +/// and reflects changes in its position via "relative axis" reports. pub struct Device { file: File, ty: BitArr!(for EventType::COUNT, in u8), @@ -206,9 +220,8 @@ pub struct Device { // ff_stat: Option, // rep: Option, supported_snd: Option, - pending_events: Vec, - // pending_events[last_seen..] is the events that have occurred since the last sync. - last_seen: usize, + pending_events: VecDeque, + read_buf: Vec, state: DeviceState, } @@ -348,57 +361,149 @@ impl fmt::Display for Device { } } +const DEFAULT_EVENT_COUNT: usize = 32; + impl Device { + /// Returns a set of the event types supported by this device (Key, Switch, etc) + /// + /// If you're interested in the individual keys or switches supported, it's probably easier + /// to just call the appropriate `supported_*` function instead. pub fn supported_events(&self) -> AttributeSet<'_, EventType> { AttributeSet::new(&self.ty) } + /// Returns the device's name as read from the kernel. pub fn name(&self) -> Option<&str> { self.name.as_deref() } + /// Returns the device's physical location, either as set by the caller or as read from the kernel. pub fn physical_path(&self) -> Option<&str> { self.phys.as_deref() } + /// Returns the user-defined "unique name" of the device, if one has been set. pub fn unique_name(&self) -> Option<&str> { self.uniq.as_deref() } + /// Returns a struct containing bustype, vendor, product, and version identifiers pub fn input_id(&self) -> libc::input_id { self.id } + /// Returns the set of supported "properties" for the device (see `INPUT_PROP_*` in kernel headers) pub fn properties(&self) -> AttributeSet<'_, PropType> { AttributeSet::new(&self.props) } + /// Returns a tuple of the driver version containing major, minor, rev pub fn driver_version(&self) -> (u8, u8, u8) { self.driver_version } + /// Returns the set of supported keys reported by the device. + /// + /// For keyboards, this is the set of all possible keycodes the keyboard may emit. Controllers, + /// mice, and other peripherals may also report buttons as keys. + /// + /// # Examples + /// + /// ``` + /// # fn main() -> Result<(), Box> { + /// use evdev::{Device, Key}; + /// let device = Device::open("/dev/input/event0")?; + /// + /// // Does this device have an ENTER key? + /// let supported = device.supported_keys().map_or(false, |keys| keys.contains(Key::KEY_ENTER)); + /// # Ok(()) + /// # } + /// ``` pub fn supported_keys(&self) -> Option> { self.supported_keys .as_deref() .map(|v| AttributeSet::new(BitSlice::from_slice(v).unwrap())) } + /// Returns the set of supported "relative axes" reported by the device. + /// + /// Standard mice will generally report `REL_X` and `REL_Y` along with wheel if supported. + /// + /// # Examples + /// + /// ``` + /// # fn main() -> Result<(), Box> { + /// use evdev::{Device, RelativeAxisType}; + /// let device = Device::open("/dev/input/event0")?; + /// + /// // Does the device have a scroll wheel? + /// let supported = device + /// .supported_relative_axes() + /// .map_or(false, |axes| axes.contains(RelativeAxisType::REL_WHEEL)); + /// # Ok(()) + /// # } + /// ``` pub fn supported_relative_axes(&self) -> Option> { self.supported_relative.as_deref().map(AttributeSet::new) } + /// Returns the set of supported "absolute axes" reported by the device. + /// + /// These are most typically supported by joysticks and touchpads. + /// + /// # Examples + /// + /// ``` + /// # fn main() -> Result<(), Box> { + /// use evdev::{Device, AbsoluteAxisType}; + /// let device = Device::open("/dev/input/event0")?; + /// + /// // Does the device have an absolute X axis? + /// let supported = device + /// .supported_absolute_axes() + /// .map_or(false, |axes| axes.contains(AbsoluteAxisType::ABS_X)); + /// # Ok(()) + /// # } + /// ``` pub fn supported_absolute_axes(&self) -> Option> { self.supported_absolute.as_deref().map(AttributeSet::new) } + /// Returns the set of supported switches reported by the device. + /// + /// These are typically used for things like software switches on laptop lids (which the + /// system reacts to by suspending or locking), or virtual switches to indicate whether a + /// headphone jack is plugged in (used to disable external speakers). + /// + /// # Examples + /// + /// ``` + /// # fn main() -> Result<(), Box> { + /// use evdev::{Device, SwitchType}; + /// let device = Device::open("/dev/input/event0")?; + /// + /// // Does the device report a laptop lid switch? + /// let supported = device + /// .supported_switches() + /// .map_or(false, |axes| axes.contains(SwitchType::SW_LID)); + /// # Ok(()) + /// # } + /// ``` pub fn supported_switches(&self) -> Option> { self.supported_switch.as_deref().map(AttributeSet::new) } + /// Returns a set of supported LEDs on the device. + /// + /// Most commonly these are state indicator lights for things like Scroll Lock, but they + /// can also be found in cameras and other devices. pub fn supported_leds(&self) -> Option> { self.supported_led.as_deref().map(AttributeSet::new) } + /// Returns a set of supported "miscellaneous" capabilities. + /// + /// Aside from vendor-specific key scancodes, most of these are uncommon. pub fn misc_properties(&self) -> Option> { self.supported_misc.as_deref().map(AttributeSet::new) } @@ -407,33 +512,44 @@ impl Device { // self.rep // } + /// Returns the set of supported simple sounds supported by a device. + /// + /// You can use these to make really annoying beep sounds come from an internal self-test + /// speaker, for instance. pub fn supported_sounds(&self) -> Option> { self.supported_snd.as_deref().map(AttributeSet::new) } + /// Returns the *cached* state of the device. + /// + /// Pulling updates via `fetch_events` or manually invoking `sync_state` will refresh the cache. pub fn state(&self) -> &DeviceState { &self.state } #[inline(always)] - pub fn open(path: impl AsRef) -> Result { + /// Opens a device, given its system path. + /// + /// Paths are typically something like `/dev/input/event0`. + pub fn open(path: impl AsRef) -> io::Result { Self::_open(path.as_ref()) } - fn _open(path: &Path) -> Result { + fn _open(path: &Path) -> io::Result { let mut options = OpenOptions::new(); // Try to load read/write, then fall back to read-only. let file = options .read(true) .write(true) - .custom_flags(libc::O_NONBLOCK) .open(path) .or_else(|_| options.write(false).open(path))?; let ty = { let mut ty = BitArray::zeroed(); - unsafe { raw::eviocgbit_type(file.as_raw_fd(), ty.as_mut_raw_slice())? }; + unsafe { + raw::eviocgbit_type(file.as_raw_fd(), ty.as_mut_raw_slice()).map_err(nix_err)? + }; ty }; @@ -446,12 +562,12 @@ impl Device { let id = unsafe { let mut id = MaybeUninit::uninit(); - raw::eviocgid(file.as_raw_fd(), id.as_mut_ptr())?; + raw::eviocgid(file.as_raw_fd(), id.as_mut_ptr()).map_err(nix_err)?; id.assume_init() }; let mut driver_version: i32 = 0; unsafe { - raw::eviocgversion(file.as_raw_fd(), &mut driver_version)?; + raw::eviocgversion(file.as_raw_fd(), &mut driver_version).map_err(nix_err)?; } let driver_version = ( ((driver_version >> 16) & 0xff) as u8, @@ -461,7 +577,9 @@ impl Device { let props = { let mut props = BitArray::zeroed(); - unsafe { raw::eviocgprop(file.as_raw_fd(), props.as_mut_raw_slice())? }; + unsafe { + raw::eviocgprop(file.as_raw_fd(), props.as_mut_raw_slice()).map_err(nix_err)? + }; props }; // FIXME: handle old kernel @@ -474,7 +592,7 @@ impl Device { let mut supported_keys = Box::new(KEY_ARR_INIT); let key_slice = &mut supported_keys[..]; - unsafe { raw::eviocgbit_key(file.as_raw_fd(), key_slice)? }; + unsafe { raw::eviocgbit_key(file.as_raw_fd(), key_slice).map_err(nix_err)? }; Some(supported_keys) } else { @@ -483,7 +601,10 @@ impl Device { let supported_relative = if ty[EventType::RELATIVE.0 as usize] { let mut rel = BitArray::zeroed(); - unsafe { raw::eviocgbit_relative(file.as_raw_fd(), rel.as_mut_raw_slice())? }; + unsafe { + raw::eviocgbit_relative(file.as_raw_fd(), rel.as_mut_raw_slice()) + .map_err(nix_err)? + }; Some(rel) } else { None @@ -498,7 +619,10 @@ impl Device { [ABSINFO_ZERO; AbsoluteAxisType::COUNT]; state.abs_vals = Some(Box::new(ABS_VALS_INIT)); let mut abs = BitArray::zeroed(); - unsafe { raw::eviocgbit_absolute(file.as_raw_fd(), abs.as_mut_raw_slice())? }; + unsafe { + raw::eviocgbit_absolute(file.as_raw_fd(), abs.as_mut_raw_slice()) + .map_err(nix_err)? + }; Some(abs) } else { None @@ -507,7 +631,10 @@ impl Device { let supported_switch = if ty[EventType::SWITCH.0 as usize] { state.switch_vals = Some(BitArray::zeroed()); let mut switch = BitArray::zeroed(); - unsafe { raw::eviocgbit_switch(file.as_raw_fd(), switch.as_mut_raw_slice())? }; + unsafe { + raw::eviocgbit_switch(file.as_raw_fd(), switch.as_mut_raw_slice()) + .map_err(nix_err)? + }; Some(switch) } else { None @@ -516,7 +643,9 @@ impl Device { let supported_led = if ty[EventType::LED.0 as usize] { state.led_vals = Some(BitArray::zeroed()); let mut led = BitArray::zeroed(); - unsafe { raw::eviocgbit_led(file.as_raw_fd(), led.as_mut_raw_slice())? }; + unsafe { + raw::eviocgbit_led(file.as_raw_fd(), led.as_mut_raw_slice()).map_err(nix_err)? + }; Some(led) } else { None @@ -524,7 +653,9 @@ impl Device { let supported_misc = if ty[EventType::MISC.0 as usize] { let mut misc = BitArray::zeroed(); - unsafe { raw::eviocgbit_misc(file.as_raw_fd(), misc.as_mut_raw_slice())? }; + unsafe { + raw::eviocgbit_misc(file.as_raw_fd(), misc.as_mut_raw_slice()).map_err(nix_err)? + }; Some(misc) } else { None @@ -534,7 +665,9 @@ impl Device { let supported_snd = if ty[EventType::SOUND.0 as usize] { let mut snd = BitArray::zeroed(); - unsafe { raw::eviocgbit_sound(file.as_raw_fd(), snd.as_mut_raw_slice())? }; + unsafe { + raw::eviocgbit_sound(file.as_raw_fd(), snd.as_mut_raw_slice()).map_err(nix_err)? + }; Some(snd) } else { None @@ -556,8 +689,8 @@ impl Device { supported_led, supported_misc, supported_snd, - pending_events: Vec::with_capacity(64), - last_seen: 0, + pending_events: VecDeque::with_capacity(64), + read_buf: Vec::new(), state, }; @@ -569,10 +702,10 @@ impl Device { /// Synchronize the `Device` state with the kernel device state. /// /// If there is an error at any point, the state will not be synchronized completely. - pub fn sync_state(&mut self) -> Result<(), Error> { + pub fn sync_state(&mut self) -> io::Result<()> { let fd = self.as_raw_fd(); if let Some(key_vals) = &mut self.state.key_vals { - unsafe { raw::eviocgkey(fd, &mut key_vals[..])? }; + unsafe { raw::eviocgkey(fd, &mut key_vals[..]).map_err(nix_err)? }; } if let (Some(supported_abs), Some(abs_vals)) = @@ -583,18 +716,16 @@ impl Device { // // handling later removed. not sure what the intention of "handling that later" was // the abs data seems to be fine (tested ABS_MT_POSITION_X/Y) - unsafe { - raw::eviocgabs(fd, idx as u32, &mut abs_vals[idx])?; - } + unsafe { raw::eviocgabs(fd, idx as u32, &mut abs_vals[idx]).map_err(nix_err)? }; } } if let Some(switch_vals) = &mut self.state.switch_vals { - unsafe { raw::eviocgsw(fd, switch_vals.as_mut_raw_slice())? }; + unsafe { raw::eviocgsw(fd, switch_vals.as_mut_raw_slice()).map_err(nix_err)? }; } if let Some(led_vals) = &mut self.state.led_vals { - unsafe { raw::eviocgled(fd, led_vals.as_mut_raw_slice())? }; + unsafe { raw::eviocgled(fd, led_vals.as_mut_raw_slice()).map_err(nix_err)? }; } Ok(()) @@ -603,9 +734,9 @@ impl Device { /// Do SYN_DROPPED synchronization, and compensate for missing events by inserting events into /// the stream which, when applied to any state being kept outside of this `Device`, will /// synchronize it with the kernel state. - fn compensate_dropped(&mut self) -> Result<(), Error> { + fn compensate_dropped(&mut self) -> io::Result<()> { let mut drop_from = None; - for (idx, event) in self.pending_events[self.last_seen..].iter().enumerate() { + for (idx, event) in self.pending_events.iter().enumerate() { if event.type_ == SYN_DROPPED as u16 { drop_from = Some(idx); break; @@ -616,7 +747,7 @@ impl Device { if let Some(idx) = drop_from { // look for the nearest SYN_REPORT before the SYN_DROPPED, remove everything after it. let mut prev_report = 0; // (if there's no previous SYN_REPORT, then the entire vector is bogus) - for (idx, event) in self.pending_events[..idx].iter().enumerate().rev() { + for (idx, event) in self.pending_events.iter().take(idx).enumerate().rev() { if event.type_ == SYN_REPORT as u16 { prev_report = idx; break; @@ -643,7 +774,7 @@ impl Device { let old_vals = old_state.key_vals(); for key in supported_keys.iter() { if old_vals.map(|v| v.contains(key)) != Some(key_vals.contains(key)) { - self.pending_events.push(libc::input_event { + self.pending_events.push_back(libc::input_event { time, type_: EventType::KEY.0 as _, code: key.code() as u16, @@ -658,7 +789,7 @@ impl Device { { for idx in supported_abs.iter_ones() { if old_state.abs_vals.as_ref().map(|v| v[idx]) != Some(abs_vals[idx]) { - self.pending_events.push(libc::input_event { + self.pending_events.push_back(libc::input_event { time, type_: EventType::ABSOLUTE.0 as _, code: idx as u16, @@ -673,7 +804,7 @@ impl Device { { for idx in supported_switch.iter_ones() { if old_state.switch_vals.as_ref().map(|v| v[idx]) != Some(switch_vals[idx]) { - self.pending_events.push(libc::input_event { + self.pending_events.push_back(libc::input_event { time, type_: EventType::SWITCH.0 as _, code: idx as u16, @@ -686,7 +817,7 @@ impl Device { if let (Some(supported_led), Some(led_vals)) = (self.supported_led, &self.state.led_vals) { for idx in supported_led.iter_ones() { if old_state.led_vals.as_ref().map(|v| v[idx]) != Some(led_vals[idx]) { - self.pending_events.push(libc::input_event { + self.pending_events.push_back(libc::input_event { time, type_: EventType::LED.0 as _, code: idx as u16, @@ -696,7 +827,7 @@ impl Device { } } - self.pending_events.push(libc::input_event { + self.pending_events.push_back(libc::input_event { time, type_: EventType::SYNCHRONIZATION.0 as _, code: SYN_REPORT as u16, @@ -705,85 +836,67 @@ impl Device { Ok(()) } - fn fill_events(&mut self) -> Result<(), Error> { - let fd = self.as_raw_fd(); - let buf = &mut self.pending_events; - loop { - buf.reserve(20); - // TODO: use Vec::spare_capacity_mut or Vec::split_at_spare_mut when they stabilize - let spare_capacity = vec_spare_capacity_mut(buf); - let (_, uninit_buf, _) = - unsafe { spare_capacity.align_to_mut::>() }; - - // use libc::read instead of nix::unistd::read b/c we need to pass an uninitialized buf - let res = unsafe { libc::read(fd, uninit_buf.as_mut_ptr() as _, uninit_buf.len()) }; - match nix::errno::Errno::result(res) { - Ok(bytes_read) => unsafe { - let pre_len = buf.len(); - buf.set_len( - pre_len + (bytes_read as usize / mem::size_of::()), - ); - }, - Err(e) => { - if e == nix::Error::Sys(::nix::errno::Errno::EAGAIN) { - break; - } else { - return Err(e.into()); - } - } - } - } - Ok(()) - } - - /// Exposes the raw evdev events without doing synchronization on SYN_DROPPED. - pub fn events_no_sync(&mut self) -> Result { - self.fill_events()?; - Ok(RawEvents::new(self)) - } - - /// Exposes the raw evdev events, doing synchronization on SYN_DROPPED. + /// Read a maximum of `num` events into the internal buffer. If the underlying fd is not + /// O_NONBLOCK, this will block. /// - /// Will insert "fake" events - pub fn events(&mut self) -> Result { - self.fill_events()?; + /// Returns the number of events that were read, or an error. + pub fn fill_events(&mut self, num: usize) -> io::Result { + let fd = self.as_raw_fd(); + self.read_buf.clear(); + self.read_buf.reserve_exact(num); + + // TODO: use Vec::spare_capacity_mut or Vec::split_at_spare_mut when they stabilize + let spare_capacity = vec_spare_capacity_mut(&mut self.read_buf); + let (_, uninit_buf, _) = unsafe { spare_capacity.align_to_mut::>() }; + + // use libc::read instead of nix::unistd::read b/c we need to pass an uninitialized buf + let res = unsafe { libc::read(fd, uninit_buf.as_mut_ptr() as _, uninit_buf.len()) }; + let bytes_read = nix::errno::Errno::result(res).map_err(nix_err)?; + let num_read = bytes_read as usize / mem::size_of::(); + unsafe { + let len = self.read_buf.len(); + self.read_buf.set_len(len + num_read); + } + self.pending_events.extend(self.read_buf.drain(..)); + Ok(num_read) + } + + #[cfg(feature = "tokio")] + fn pop_event(&mut self) -> Option { + self.pending_events.pop_front().map(InputEvent) + } + + /// Fetches and returns events from the kernel ring buffer without doing synchronization on + /// SYN_DROPPED. + /// + /// By default this will block until events are available. Typically, users will want to call + /// this in a tight loop within a thread. + pub fn fetch_events_no_sync(&mut self) -> io::Result + '_> { + self.fill_events(DEFAULT_EVENT_COUNT)?; + Ok(self.pending_events.drain(..).map(InputEvent)) + } + + /// Fetches and returns events from the kernel ring buffer, doing synchronization on SYN_DROPPED. + /// + /// By default this will block until events are available. Typically, users will want to call + /// this in a tight loop within a thread. + /// Will insert "fake" events. + pub fn fetch_events(&mut self) -> io::Result + '_> { + self.fill_events(DEFAULT_EVENT_COUNT)?; self.compensate_dropped()?; - Ok(RawEvents::new(self)) + Ok(self.pending_events.drain(..).map(InputEvent)) } - pub fn wait_ready(&self) -> nix::Result<()> { - use nix::poll; - let mut pfd = poll::PollFd::new(self.as_raw_fd(), poll::PollFlags::POLLIN); - poll::poll(std::slice::from_mut(&mut pfd), -1)?; - Ok(()) - } -} - -pub struct Events<'a>(&'a mut Device); - -pub struct RawEvents<'a>(&'a mut Device); - -impl<'a> RawEvents<'a> { - fn new(dev: &'a mut Device) -> RawEvents<'a> { - dev.pending_events.reverse(); - RawEvents(dev) - } -} - -impl<'a> Drop for RawEvents<'a> { - fn drop(&mut self) { - self.0.pending_events.reverse(); - self.0.last_seen = self.0.pending_events.len(); - } -} - -impl<'a> Iterator for RawEvents<'a> { - type Item = InputEvent; - - #[inline(always)] - fn next(&mut self) -> Option { - self.0.pending_events.pop().map(InputEvent) + #[cfg(feature = "tokio")] + /// Return a `futures::stream` asynchronous stream of `InputEvent` compatible with Tokio. + /// + /// The stream does NOT compensate for SYN_DROPPED events and will not update internal cached + /// state. + /// The Tokio runtime is expected to keep up with typical event rates. + /// This operation consumes the Device. + pub fn into_event_stream_no_sync(self) -> io::Result { + tokio_stream::EventStream::new(self) } } @@ -916,6 +1029,16 @@ fn timeval_to_systime(tv: &libc::timeval) -> SystemTime { } } +fn nix_err(err: nix::Error) -> io::Error { + match err { + nix::Error::Sys(errno) => io::Error::from_raw_os_error(errno as i32), + nix::Error::InvalidPath => io::Error::new(io::ErrorKind::InvalidInput, err), + nix::Error::InvalidUtf8 => io::Error::new(io::ErrorKind::Other, err), + // TODO: io::ErrorKind::NotSupported once stable + nix::Error::UnsupportedOperation => io::Error::new(io::ErrorKind::Other, err), + } +} + /// A copy of the unstable Vec::spare_capacity_mut #[inline] fn vec_spare_capacity_mut(v: &mut Vec) -> &mut [mem::MaybeUninit] { diff --git a/src/tokio_stream.rs b/src/tokio_stream.rs new file mode 100644 index 0000000..318fc7c --- /dev/null +++ b/src/tokio_stream.rs @@ -0,0 +1,57 @@ +use tokio_1 as tokio; + +use crate::{nix_err, Device, InputEvent, DEFAULT_EVENT_COUNT}; +use futures_core::{ready, Stream}; +use std::io; +use std::os::unix::io::AsRawFd; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::unix::AsyncFd; + +pub struct EventStream { + device: AsyncFd, +} +impl Unpin for EventStream {} + +impl EventStream { + pub(crate) fn new(device: Device) -> io::Result { + use nix::fcntl; + fcntl::fcntl(device.as_raw_fd(), fcntl::F_SETFL(fcntl::OFlag::O_NONBLOCK)) + .map_err(nix_err)?; + let device = AsyncFd::new(device)?; + Ok(Self { device }) + } + + /// Returns a reference to the underlying device + pub fn device(&self) -> &Device { + self.device.get_ref() + } +} + +impl Stream for EventStream { + type Item = io::Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let me = self.get_mut(); + + if let Some(ev) = me.device.get_mut().pop_event() { + return Poll::Ready(Some(Ok(ev))); + } + + loop { + let mut guard = ready!(me.device.poll_read_ready_mut(cx))?; + + match guard.try_io(|device| device.get_mut().fill_events(DEFAULT_EVENT_COUNT)) { + Ok(res) => { + let ret = match res { + Ok(0) => None, + Ok(_) => Some(Ok(me.device.get_mut().pop_event().unwrap())), + Err(e) if e.raw_os_error() == Some(libc::ENODEV) => None, + Err(e) => Some(Err(e)), + }; + return Poll::Ready(ret); + } + Err(_would_block) => continue, + } + } + } +}