Separate out sane blocking and async (nonblocking) implementations (#33)

* Default impl should block. Renaming it to avoid API confusion from previous users.

* Can't get rid of libc::read because we have MaybeUninit, but we can clean up the match

* Remove extra include

* Switch everything over to io::Error

* Add initial tokio impl+example

* Move evtest_tokio to normal example

* Add documentation and clarify

* Use a VecDeque (ring buffer) instead of repeatedly popping things off the front of a Vec

* Looks like we are not using thiserror anymore; removing

* Store read buf between calls

* Add nonblocking example with epoll

Co-authored-by: Jeff Hiner <jeff-hiner@users.noreply.github.com>
Co-authored-by: Noah <33094578+coolreader18@users.noreply.github.com>
This commit is contained in:
Jeff Hiner 2021-03-01 10:09:11 -07:00 committed by GitHub
parent 3f32c41fc0
commit cb2f5ef87a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 450 additions and 133 deletions

View file

@ -1,6 +1,6 @@
[package]
name = "evdev"
version = "0.11.0-alpha.4"
version = "0.11.0-alpha.5"
authors = ["Corey Richardson <corey@octayn.net>"]
description = "evdev interface for Linux"
license = "Apache-2.0 OR MIT"
@ -8,8 +8,21 @@ repository = "https://github.com/cmr/evdev"
documentation = "https://docs.rs/evdev"
edition = "2018"
[features]
tokio = ["tokio_1", "futures-core"]
[dependencies]
libc = "0.2.22"
bitvec = "0.21"
nix = "0.19.0"
thiserror = "1.0.24"
tokio_1 = { package = "tokio", version = "1.0", features = ["net"], optional = true }
futures-core = { version = "0.3", optional = true }
[dev-dependencies]
tokio_1 = { package = "tokio", version = "1.0", features = ["macros", "rt-multi-thread"] }
futures-util = "0.3"
[[example]]
name = "evtest_tokio"
required-features = ["tokio"]

View file

@ -21,7 +21,11 @@ What does this library support?
===============================
This library exposes raw evdev events, but uses the Rust `Iterator` trait to
do so, and will handle `SYN_DROPPED` events properly for the client. I try to
do so. When processing events via `fetch_events`, the library will handle
`SYN_DROPPED` events by injecting fake state updates in an attempt to ensure
callers see state transition messages consistent with actual device state. When
processing via `*_no_sync` this correction is not done, and `SYN_DROPPED` messages
will appear if the kernel ring buffer is overrun before messages are read. I try to
match [libevdev](https://www.freedesktop.org/software/libevdev/doc/latest/)
closely, where possible.

View file

@ -20,9 +20,8 @@ fn main() {
println!("{}", d);
println!("Events:");
loop {
for ev in d.events_no_sync().unwrap() {
for ev in d.fetch_events_no_sync().unwrap() {
println!("{:?}", ev);
}
d.wait_ready().unwrap();
}
}

View file

@ -0,0 +1,92 @@
//! This example demonstrates how to use the evdev crate with a nonblocking file descriptor.
//!
//! Note that for this implementation the caller is responsible for ensuring the underlying
//! Device file descriptor is set to O_NONBLOCK. The caller must also create the epoll descriptor,
//! bind it, check for EAGAIN returns from fetch_events_*, call epoll_wait as appropriate, and
//! clean up the epoll file descriptor when finished.
use nix::{
fcntl::{FcntlArg, OFlag},
sys::epoll,
};
use std::io::prelude::*;
use std::os::unix::io::{AsRawFd, RawFd};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut args = std::env::args_os();
let mut d = if args.len() > 1 {
evdev::Device::open(&args.nth(1).unwrap()).unwrap()
} else {
let mut devices = evdev::enumerate().collect::<Vec<_>>();
for (i, d) in devices.iter().enumerate() {
println!("{}: {}", i, d.name().unwrap_or("Unnamed device"));
}
print!("Select the device [0-{}]: ", devices.len());
let _ = std::io::stdout().flush();
let mut chosen = String::new();
std::io::stdin().read_line(&mut chosen).unwrap();
devices.swap_remove(chosen.trim().parse::<usize>().unwrap())
};
println!("{}", d);
let raw_fd = d.as_raw_fd();
// Set nonblocking
nix::fcntl::fcntl(raw_fd, FcntlArg::F_SETFL(OFlag::O_NONBLOCK))?;
// Create epoll handle and attach raw_fd
let epoll_fd = Epoll::new(epoll::epoll_create1(
epoll::EpollCreateFlags::EPOLL_CLOEXEC,
)?);
let mut event = epoll::EpollEvent::new(epoll::EpollFlags::EPOLLIN, 0);
epoll::epoll_ctl(
epoll_fd.as_raw_fd(),
epoll::EpollOp::EpollCtlAdd,
raw_fd,
Some(&mut event),
)?;
// We don't care about these, but the kernel wants to fill them.
let mut events = [epoll::EpollEvent::empty(); 2];
println!("Events:");
loop {
match d.fetch_events_no_sync() {
Ok(iterator) => {
for ev in iterator {
println!("{:?}", ev);
}
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// Wait forever for bytes available on raw_fd
epoll::epoll_wait(epoll_fd.as_raw_fd(), &mut events, -1)?;
}
Err(e) => {
eprintln!("{}", e);
break;
}
}
}
Ok(())
}
// The rest here is to ensure the epoll handle is cleaned up properly.
// You can also use the epoll crate, if you prefer.
struct Epoll(RawFd);
impl Epoll {
pub(crate) fn new(fd: RawFd) -> Self {
Epoll(fd)
}
}
impl AsRawFd for Epoll {
fn as_raw_fd(&self) -> RawFd {
self.0
}
}
impl Drop for Epoll {
fn drop(&mut self) {
let _ = nix::unistd::close(self.0);
}
}

29
examples/evtest_tokio.rs Normal file
View file

@ -0,0 +1,29 @@
use tokio_1 as tokio;
use futures_util::TryStreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut args = std::env::args_os();
let d = if args.len() > 1 {
evdev::Device::open(&args.nth(1).unwrap())?
} else {
let mut devices = evdev::enumerate().collect::<Vec<_>>();
for (i, d) in devices.iter().enumerate() {
println!("{}: {}", i, d.name().unwrap_or("Unnamed device"));
}
print!("Select the device [0-{}]: ", devices.len());
let _ = std::io::Write::flush(&mut std::io::stdout());
let mut chosen = String::new();
std::io::stdin().read_line(&mut chosen)?;
devices.swap_remove(chosen.trim().parse::<usize>()?)
};
println!("{}", d);
println!("Events:");
let mut events = d.into_event_stream_no_sync()?;
while let Some(ev) = events.try_next().await? {
println!("{:?}", ev);
}
println!("EOF!");
Ok(())
}

View file

@ -48,6 +48,9 @@
//! It is recommended that you dedicate a thread to processing input events, or use epoll or an
//! async runtime with the fd returned by `<Device as AsRawFd>::as_raw_fd` to process events when
//! they are ready.
//!
//! For demonstrations of how to use this library in blocking, nonblocking, and async (tokio) modes,
//! please reference the "examples" directory.
#![cfg(any(unix, target_os = "android"))]
#![allow(non_camel_case_types)]
@ -57,18 +60,19 @@
mod attribute_set;
mod constants;
pub mod raw;
mod raw;
mod scancodes;
#[cfg(feature = "tokio")]
mod tokio_stream;
use bitvec::prelude::*;
use std::collections::VecDeque;
use std::fmt;
use std::fs::File;
use std::fs::OpenOptions;
use std::fs::{File, OpenOptions};
use std::io;
use std::mem;
use std::os::unix::{
fs::OpenOptionsExt,
io::{AsRawFd, RawFd},
};
use std::os::unix::io::{AsRawFd, RawFd};
use std::path::Path;
use std::time::{Duration, SystemTime};
use std::{ffi::CString, mem::MaybeUninit};
@ -126,6 +130,7 @@ const fn bit_elts<T>(bits: usize) -> usize {
type KeyArray = [u8; bit_elts::<u8>(Key::COUNT)];
#[derive(Debug, Clone)]
/// A cached representation of device state at a certain time.
pub struct DeviceState {
/// The state corresponds to kernel state at this timestamp.
timestamp: libc::timeval,
@ -139,24 +144,37 @@ pub struct DeviceState {
}
impl DeviceState {
/// Returns the time when this snapshot was taken.
pub fn timestamp(&self) -> SystemTime {
timeval_to_systime(&self.timestamp)
}
/// Returns the set of keys pressed when the snapshot was taken.
///
/// Returns `None` if keys are not supported by this device.
pub fn key_vals(&self) -> Option<AttributeSet<'_, Key>> {
self.key_vals
.as_deref()
.map(|v| AttributeSet::new(BitSlice::from_slice(v).unwrap()))
}
pub fn timestamp(&self) -> SystemTime {
timeval_to_systime(&self.timestamp)
}
/// Returns the set of absolute axis measurements when the snapshot was taken.
///
/// Returns `None` if not supported by this device.
pub fn abs_vals(&self) -> Option<&[libc::input_absinfo]> {
self.abs_vals.as_deref().map(|v| &v[..])
}
/// Returns the set of switches triggered when the snapshot was taken.
///
/// Returns `None` if switches are not supported by this device.
pub fn switch_vals(&self) -> Option<AttributeSet<'_, SwitchType>> {
self.switch_vals.as_deref().map(AttributeSet::new)
}
/// Returns the set of LEDs turned on when the snapshot was taken.
///
/// Returns `None` if LEDs are not supported by this device.
pub fn led_vals(&self) -> Option<AttributeSet<'_, LedType>> {
self.led_vals.as_deref().map(AttributeSet::new)
}
@ -177,16 +195,12 @@ impl Default for DeviceState {
}
}
/// Publicly visible errors which can be returned from evdev
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("libc/system error: {0}")]
NixError(#[from] nix::Error),
#[error("standard i/o error: {0}")]
StdIoError(#[from] std::io::Error),
}
#[derive(Debug)]
/// A physical or virtual device supported by evdev.
///
/// Each device corresponds to a path typically found in `/dev/input`, and supports access via
/// one or more "types". For example, an optical mouse has buttons that are represented by "keys",
/// and reflects changes in its position via "relative axis" reports.
pub struct Device {
file: File,
ty: BitArr!(for EventType::COUNT, in u8),
@ -206,9 +220,8 @@ pub struct Device {
// ff_stat: Option<FFStatus>,
// rep: Option<Repeat>,
supported_snd: Option<BitArr!(for SoundType::COUNT, in u8)>,
pending_events: Vec<libc::input_event>,
// pending_events[last_seen..] is the events that have occurred since the last sync.
last_seen: usize,
pending_events: VecDeque<libc::input_event>,
read_buf: Vec<libc::input_event>,
state: DeviceState,
}
@ -348,57 +361,149 @@ impl fmt::Display for Device {
}
}
const DEFAULT_EVENT_COUNT: usize = 32;
impl Device {
/// Returns a set of the event types supported by this device (Key, Switch, etc)
///
/// If you're interested in the individual keys or switches supported, it's probably easier
/// to just call the appropriate `supported_*` function instead.
pub fn supported_events(&self) -> AttributeSet<'_, EventType> {
AttributeSet::new(&self.ty)
}
/// Returns the device's name as read from the kernel.
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
/// Returns the device's physical location, either as set by the caller or as read from the kernel.
pub fn physical_path(&self) -> Option<&str> {
self.phys.as_deref()
}
/// Returns the user-defined "unique name" of the device, if one has been set.
pub fn unique_name(&self) -> Option<&str> {
self.uniq.as_deref()
}
/// Returns a struct containing bustype, vendor, product, and version identifiers
pub fn input_id(&self) -> libc::input_id {
self.id
}
/// Returns the set of supported "properties" for the device (see `INPUT_PROP_*` in kernel headers)
pub fn properties(&self) -> AttributeSet<'_, PropType> {
AttributeSet::new(&self.props)
}
/// Returns a tuple of the driver version containing major, minor, rev
pub fn driver_version(&self) -> (u8, u8, u8) {
self.driver_version
}
/// Returns the set of supported keys reported by the device.
///
/// For keyboards, this is the set of all possible keycodes the keyboard may emit. Controllers,
/// mice, and other peripherals may also report buttons as keys.
///
/// # Examples
///
/// ```
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// use evdev::{Device, Key};
/// let device = Device::open("/dev/input/event0")?;
///
/// // Does this device have an ENTER key?
/// let supported = device.supported_keys().map_or(false, |keys| keys.contains(Key::KEY_ENTER));
/// # Ok(())
/// # }
/// ```
pub fn supported_keys(&self) -> Option<AttributeSet<'_, Key>> {
self.supported_keys
.as_deref()
.map(|v| AttributeSet::new(BitSlice::from_slice(v).unwrap()))
}
/// Returns the set of supported "relative axes" reported by the device.
///
/// Standard mice will generally report `REL_X` and `REL_Y` along with wheel if supported.
///
/// # Examples
///
/// ```
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// use evdev::{Device, RelativeAxisType};
/// let device = Device::open("/dev/input/event0")?;
///
/// // Does the device have a scroll wheel?
/// let supported = device
/// .supported_relative_axes()
/// .map_or(false, |axes| axes.contains(RelativeAxisType::REL_WHEEL));
/// # Ok(())
/// # }
/// ```
pub fn supported_relative_axes(&self) -> Option<AttributeSet<'_, RelativeAxisType>> {
self.supported_relative.as_deref().map(AttributeSet::new)
}
/// Returns the set of supported "absolute axes" reported by the device.
///
/// These are most typically supported by joysticks and touchpads.
///
/// # Examples
///
/// ```
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// use evdev::{Device, AbsoluteAxisType};
/// let device = Device::open("/dev/input/event0")?;
///
/// // Does the device have an absolute X axis?
/// let supported = device
/// .supported_absolute_axes()
/// .map_or(false, |axes| axes.contains(AbsoluteAxisType::ABS_X));
/// # Ok(())
/// # }
/// ```
pub fn supported_absolute_axes(&self) -> Option<AttributeSet<'_, AbsoluteAxisType>> {
self.supported_absolute.as_deref().map(AttributeSet::new)
}
/// Returns the set of supported switches reported by the device.
///
/// These are typically used for things like software switches on laptop lids (which the
/// system reacts to by suspending or locking), or virtual switches to indicate whether a
/// headphone jack is plugged in (used to disable external speakers).
///
/// # Examples
///
/// ```
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// use evdev::{Device, SwitchType};
/// let device = Device::open("/dev/input/event0")?;
///
/// // Does the device report a laptop lid switch?
/// let supported = device
/// .supported_switches()
/// .map_or(false, |axes| axes.contains(SwitchType::SW_LID));
/// # Ok(())
/// # }
/// ```
pub fn supported_switches(&self) -> Option<AttributeSet<'_, SwitchType>> {
self.supported_switch.as_deref().map(AttributeSet::new)
}
/// Returns a set of supported LEDs on the device.
///
/// Most commonly these are state indicator lights for things like Scroll Lock, but they
/// can also be found in cameras and other devices.
pub fn supported_leds(&self) -> Option<AttributeSet<'_, LedType>> {
self.supported_led.as_deref().map(AttributeSet::new)
}
/// Returns a set of supported "miscellaneous" capabilities.
///
/// Aside from vendor-specific key scancodes, most of these are uncommon.
pub fn misc_properties(&self) -> Option<AttributeSet<'_, MiscType>> {
self.supported_misc.as_deref().map(AttributeSet::new)
}
@ -407,33 +512,44 @@ impl Device {
// self.rep
// }
/// Returns the set of supported simple sounds supported by a device.
///
/// You can use these to make really annoying beep sounds come from an internal self-test
/// speaker, for instance.
pub fn supported_sounds(&self) -> Option<AttributeSet<'_, SoundType>> {
self.supported_snd.as_deref().map(AttributeSet::new)
}
/// Returns the *cached* state of the device.
///
/// Pulling updates via `fetch_events` or manually invoking `sync_state` will refresh the cache.
pub fn state(&self) -> &DeviceState {
&self.state
}
#[inline(always)]
pub fn open(path: impl AsRef<Path>) -> Result<Device, Error> {
/// Opens a device, given its system path.
///
/// Paths are typically something like `/dev/input/event0`.
pub fn open(path: impl AsRef<Path>) -> io::Result<Device> {
Self::_open(path.as_ref())
}
fn _open(path: &Path) -> Result<Device, Error> {
fn _open(path: &Path) -> io::Result<Device> {
let mut options = OpenOptions::new();
// Try to load read/write, then fall back to read-only.
let file = options
.read(true)
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)
.or_else(|_| options.write(false).open(path))?;
let ty = {
let mut ty = BitArray::zeroed();
unsafe { raw::eviocgbit_type(file.as_raw_fd(), ty.as_mut_raw_slice())? };
unsafe {
raw::eviocgbit_type(file.as_raw_fd(), ty.as_mut_raw_slice()).map_err(nix_err)?
};
ty
};
@ -446,12 +562,12 @@ impl Device {
let id = unsafe {
let mut id = MaybeUninit::uninit();
raw::eviocgid(file.as_raw_fd(), id.as_mut_ptr())?;
raw::eviocgid(file.as_raw_fd(), id.as_mut_ptr()).map_err(nix_err)?;
id.assume_init()
};
let mut driver_version: i32 = 0;
unsafe {
raw::eviocgversion(file.as_raw_fd(), &mut driver_version)?;
raw::eviocgversion(file.as_raw_fd(), &mut driver_version).map_err(nix_err)?;
}
let driver_version = (
((driver_version >> 16) & 0xff) as u8,
@ -461,7 +577,9 @@ impl Device {
let props = {
let mut props = BitArray::zeroed();
unsafe { raw::eviocgprop(file.as_raw_fd(), props.as_mut_raw_slice())? };
unsafe {
raw::eviocgprop(file.as_raw_fd(), props.as_mut_raw_slice()).map_err(nix_err)?
};
props
}; // FIXME: handle old kernel
@ -474,7 +592,7 @@ impl Device {
let mut supported_keys = Box::new(KEY_ARR_INIT);
let key_slice = &mut supported_keys[..];
unsafe { raw::eviocgbit_key(file.as_raw_fd(), key_slice)? };
unsafe { raw::eviocgbit_key(file.as_raw_fd(), key_slice).map_err(nix_err)? };
Some(supported_keys)
} else {
@ -483,7 +601,10 @@ impl Device {
let supported_relative = if ty[EventType::RELATIVE.0 as usize] {
let mut rel = BitArray::zeroed();
unsafe { raw::eviocgbit_relative(file.as_raw_fd(), rel.as_mut_raw_slice())? };
unsafe {
raw::eviocgbit_relative(file.as_raw_fd(), rel.as_mut_raw_slice())
.map_err(nix_err)?
};
Some(rel)
} else {
None
@ -498,7 +619,10 @@ impl Device {
[ABSINFO_ZERO; AbsoluteAxisType::COUNT];
state.abs_vals = Some(Box::new(ABS_VALS_INIT));
let mut abs = BitArray::zeroed();
unsafe { raw::eviocgbit_absolute(file.as_raw_fd(), abs.as_mut_raw_slice())? };
unsafe {
raw::eviocgbit_absolute(file.as_raw_fd(), abs.as_mut_raw_slice())
.map_err(nix_err)?
};
Some(abs)
} else {
None
@ -507,7 +631,10 @@ impl Device {
let supported_switch = if ty[EventType::SWITCH.0 as usize] {
state.switch_vals = Some(BitArray::zeroed());
let mut switch = BitArray::zeroed();
unsafe { raw::eviocgbit_switch(file.as_raw_fd(), switch.as_mut_raw_slice())? };
unsafe {
raw::eviocgbit_switch(file.as_raw_fd(), switch.as_mut_raw_slice())
.map_err(nix_err)?
};
Some(switch)
} else {
None
@ -516,7 +643,9 @@ impl Device {
let supported_led = if ty[EventType::LED.0 as usize] {
state.led_vals = Some(BitArray::zeroed());
let mut led = BitArray::zeroed();
unsafe { raw::eviocgbit_led(file.as_raw_fd(), led.as_mut_raw_slice())? };
unsafe {
raw::eviocgbit_led(file.as_raw_fd(), led.as_mut_raw_slice()).map_err(nix_err)?
};
Some(led)
} else {
None
@ -524,7 +653,9 @@ impl Device {
let supported_misc = if ty[EventType::MISC.0 as usize] {
let mut misc = BitArray::zeroed();
unsafe { raw::eviocgbit_misc(file.as_raw_fd(), misc.as_mut_raw_slice())? };
unsafe {
raw::eviocgbit_misc(file.as_raw_fd(), misc.as_mut_raw_slice()).map_err(nix_err)?
};
Some(misc)
} else {
None
@ -534,7 +665,9 @@ impl Device {
let supported_snd = if ty[EventType::SOUND.0 as usize] {
let mut snd = BitArray::zeroed();
unsafe { raw::eviocgbit_sound(file.as_raw_fd(), snd.as_mut_raw_slice())? };
unsafe {
raw::eviocgbit_sound(file.as_raw_fd(), snd.as_mut_raw_slice()).map_err(nix_err)?
};
Some(snd)
} else {
None
@ -556,8 +689,8 @@ impl Device {
supported_led,
supported_misc,
supported_snd,
pending_events: Vec::with_capacity(64),
last_seen: 0,
pending_events: VecDeque::with_capacity(64),
read_buf: Vec::new(),
state,
};
@ -569,10 +702,10 @@ impl Device {
/// Synchronize the `Device` state with the kernel device state.
///
/// If there is an error at any point, the state will not be synchronized completely.
pub fn sync_state(&mut self) -> Result<(), Error> {
pub fn sync_state(&mut self) -> io::Result<()> {
let fd = self.as_raw_fd();
if let Some(key_vals) = &mut self.state.key_vals {
unsafe { raw::eviocgkey(fd, &mut key_vals[..])? };
unsafe { raw::eviocgkey(fd, &mut key_vals[..]).map_err(nix_err)? };
}
if let (Some(supported_abs), Some(abs_vals)) =
@ -583,18 +716,16 @@ impl Device {
//
// handling later removed. not sure what the intention of "handling that later" was
// the abs data seems to be fine (tested ABS_MT_POSITION_X/Y)
unsafe {
raw::eviocgabs(fd, idx as u32, &mut abs_vals[idx])?;
}
unsafe { raw::eviocgabs(fd, idx as u32, &mut abs_vals[idx]).map_err(nix_err)? };
}
}
if let Some(switch_vals) = &mut self.state.switch_vals {
unsafe { raw::eviocgsw(fd, switch_vals.as_mut_raw_slice())? };
unsafe { raw::eviocgsw(fd, switch_vals.as_mut_raw_slice()).map_err(nix_err)? };
}
if let Some(led_vals) = &mut self.state.led_vals {
unsafe { raw::eviocgled(fd, led_vals.as_mut_raw_slice())? };
unsafe { raw::eviocgled(fd, led_vals.as_mut_raw_slice()).map_err(nix_err)? };
}
Ok(())
@ -603,9 +734,9 @@ impl Device {
/// Do SYN_DROPPED synchronization, and compensate for missing events by inserting events into
/// the stream which, when applied to any state being kept outside of this `Device`, will
/// synchronize it with the kernel state.
fn compensate_dropped(&mut self) -> Result<(), Error> {
fn compensate_dropped(&mut self) -> io::Result<()> {
let mut drop_from = None;
for (idx, event) in self.pending_events[self.last_seen..].iter().enumerate() {
for (idx, event) in self.pending_events.iter().enumerate() {
if event.type_ == SYN_DROPPED as u16 {
drop_from = Some(idx);
break;
@ -616,7 +747,7 @@ impl Device {
if let Some(idx) = drop_from {
// look for the nearest SYN_REPORT before the SYN_DROPPED, remove everything after it.
let mut prev_report = 0; // (if there's no previous SYN_REPORT, then the entire vector is bogus)
for (idx, event) in self.pending_events[..idx].iter().enumerate().rev() {
for (idx, event) in self.pending_events.iter().take(idx).enumerate().rev() {
if event.type_ == SYN_REPORT as u16 {
prev_report = idx;
break;
@ -643,7 +774,7 @@ impl Device {
let old_vals = old_state.key_vals();
for key in supported_keys.iter() {
if old_vals.map(|v| v.contains(key)) != Some(key_vals.contains(key)) {
self.pending_events.push(libc::input_event {
self.pending_events.push_back(libc::input_event {
time,
type_: EventType::KEY.0 as _,
code: key.code() as u16,
@ -658,7 +789,7 @@ impl Device {
{
for idx in supported_abs.iter_ones() {
if old_state.abs_vals.as_ref().map(|v| v[idx]) != Some(abs_vals[idx]) {
self.pending_events.push(libc::input_event {
self.pending_events.push_back(libc::input_event {
time,
type_: EventType::ABSOLUTE.0 as _,
code: idx as u16,
@ -673,7 +804,7 @@ impl Device {
{
for idx in supported_switch.iter_ones() {
if old_state.switch_vals.as_ref().map(|v| v[idx]) != Some(switch_vals[idx]) {
self.pending_events.push(libc::input_event {
self.pending_events.push_back(libc::input_event {
time,
type_: EventType::SWITCH.0 as _,
code: idx as u16,
@ -686,7 +817,7 @@ impl Device {
if let (Some(supported_led), Some(led_vals)) = (self.supported_led, &self.state.led_vals) {
for idx in supported_led.iter_ones() {
if old_state.led_vals.as_ref().map(|v| v[idx]) != Some(led_vals[idx]) {
self.pending_events.push(libc::input_event {
self.pending_events.push_back(libc::input_event {
time,
type_: EventType::LED.0 as _,
code: idx as u16,
@ -696,7 +827,7 @@ impl Device {
}
}
self.pending_events.push(libc::input_event {
self.pending_events.push_back(libc::input_event {
time,
type_: EventType::SYNCHRONIZATION.0 as _,
code: SYN_REPORT as u16,
@ -705,85 +836,67 @@ impl Device {
Ok(())
}
fn fill_events(&mut self) -> Result<(), Error> {
let fd = self.as_raw_fd();
let buf = &mut self.pending_events;
loop {
buf.reserve(20);
// TODO: use Vec::spare_capacity_mut or Vec::split_at_spare_mut when they stabilize
let spare_capacity = vec_spare_capacity_mut(buf);
let (_, uninit_buf, _) =
unsafe { spare_capacity.align_to_mut::<mem::MaybeUninit<u8>>() };
// use libc::read instead of nix::unistd::read b/c we need to pass an uninitialized buf
let res = unsafe { libc::read(fd, uninit_buf.as_mut_ptr() as _, uninit_buf.len()) };
match nix::errno::Errno::result(res) {
Ok(bytes_read) => unsafe {
let pre_len = buf.len();
buf.set_len(
pre_len + (bytes_read as usize / mem::size_of::<libc::input_event>()),
);
},
Err(e) => {
if e == nix::Error::Sys(::nix::errno::Errno::EAGAIN) {
break;
} else {
return Err(e.into());
}
}
}
}
Ok(())
}
/// Exposes the raw evdev events without doing synchronization on SYN_DROPPED.
pub fn events_no_sync(&mut self) -> Result<RawEvents, Error> {
self.fill_events()?;
Ok(RawEvents::new(self))
}
/// Exposes the raw evdev events, doing synchronization on SYN_DROPPED.
/// Read a maximum of `num` events into the internal buffer. If the underlying fd is not
/// O_NONBLOCK, this will block.
///
/// Will insert "fake" events
pub fn events(&mut self) -> Result<RawEvents, Error> {
self.fill_events()?;
/// Returns the number of events that were read, or an error.
pub fn fill_events(&mut self, num: usize) -> io::Result<usize> {
let fd = self.as_raw_fd();
self.read_buf.clear();
self.read_buf.reserve_exact(num);
// TODO: use Vec::spare_capacity_mut or Vec::split_at_spare_mut when they stabilize
let spare_capacity = vec_spare_capacity_mut(&mut self.read_buf);
let (_, uninit_buf, _) = unsafe { spare_capacity.align_to_mut::<mem::MaybeUninit<u8>>() };
// use libc::read instead of nix::unistd::read b/c we need to pass an uninitialized buf
let res = unsafe { libc::read(fd, uninit_buf.as_mut_ptr() as _, uninit_buf.len()) };
let bytes_read = nix::errno::Errno::result(res).map_err(nix_err)?;
let num_read = bytes_read as usize / mem::size_of::<libc::input_event>();
unsafe {
let len = self.read_buf.len();
self.read_buf.set_len(len + num_read);
}
self.pending_events.extend(self.read_buf.drain(..));
Ok(num_read)
}
#[cfg(feature = "tokio")]
fn pop_event(&mut self) -> Option<InputEvent> {
self.pending_events.pop_front().map(InputEvent)
}
/// Fetches and returns events from the kernel ring buffer without doing synchronization on
/// SYN_DROPPED.
///
/// By default this will block until events are available. Typically, users will want to call
/// this in a tight loop within a thread.
pub fn fetch_events_no_sync(&mut self) -> io::Result<impl Iterator<Item = InputEvent> + '_> {
self.fill_events(DEFAULT_EVENT_COUNT)?;
Ok(self.pending_events.drain(..).map(InputEvent))
}
/// Fetches and returns events from the kernel ring buffer, doing synchronization on SYN_DROPPED.
///
/// By default this will block until events are available. Typically, users will want to call
/// this in a tight loop within a thread.
/// Will insert "fake" events.
pub fn fetch_events(&mut self) -> io::Result<impl Iterator<Item = InputEvent> + '_> {
self.fill_events(DEFAULT_EVENT_COUNT)?;
self.compensate_dropped()?;
Ok(RawEvents::new(self))
Ok(self.pending_events.drain(..).map(InputEvent))
}
pub fn wait_ready(&self) -> nix::Result<()> {
use nix::poll;
let mut pfd = poll::PollFd::new(self.as_raw_fd(), poll::PollFlags::POLLIN);
poll::poll(std::slice::from_mut(&mut pfd), -1)?;
Ok(())
}
}
pub struct Events<'a>(&'a mut Device);
pub struct RawEvents<'a>(&'a mut Device);
impl<'a> RawEvents<'a> {
fn new(dev: &'a mut Device) -> RawEvents<'a> {
dev.pending_events.reverse();
RawEvents(dev)
}
}
impl<'a> Drop for RawEvents<'a> {
fn drop(&mut self) {
self.0.pending_events.reverse();
self.0.last_seen = self.0.pending_events.len();
}
}
impl<'a> Iterator for RawEvents<'a> {
type Item = InputEvent;
#[inline(always)]
fn next(&mut self) -> Option<InputEvent> {
self.0.pending_events.pop().map(InputEvent)
#[cfg(feature = "tokio")]
/// Return a `futures::stream` asynchronous stream of `InputEvent` compatible with Tokio.
///
/// The stream does NOT compensate for SYN_DROPPED events and will not update internal cached
/// state.
/// The Tokio runtime is expected to keep up with typical event rates.
/// This operation consumes the Device.
pub fn into_event_stream_no_sync(self) -> io::Result<tokio_stream::EventStream> {
tokio_stream::EventStream::new(self)
}
}
@ -916,6 +1029,16 @@ fn timeval_to_systime(tv: &libc::timeval) -> SystemTime {
}
}
fn nix_err(err: nix::Error) -> io::Error {
match err {
nix::Error::Sys(errno) => io::Error::from_raw_os_error(errno as i32),
nix::Error::InvalidPath => io::Error::new(io::ErrorKind::InvalidInput, err),
nix::Error::InvalidUtf8 => io::Error::new(io::ErrorKind::Other, err),
// TODO: io::ErrorKind::NotSupported once stable
nix::Error::UnsupportedOperation => io::Error::new(io::ErrorKind::Other, err),
}
}
/// A copy of the unstable Vec::spare_capacity_mut
#[inline]
fn vec_spare_capacity_mut<T>(v: &mut Vec<T>) -> &mut [mem::MaybeUninit<T>] {

57
src/tokio_stream.rs Normal file
View file

@ -0,0 +1,57 @@
use tokio_1 as tokio;
use crate::{nix_err, Device, InputEvent, DEFAULT_EVENT_COUNT};
use futures_core::{ready, Stream};
use std::io;
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::unix::AsyncFd;
pub struct EventStream {
device: AsyncFd<Device>,
}
impl Unpin for EventStream {}
impl EventStream {
pub(crate) fn new(device: Device) -> io::Result<Self> {
use nix::fcntl;
fcntl::fcntl(device.as_raw_fd(), fcntl::F_SETFL(fcntl::OFlag::O_NONBLOCK))
.map_err(nix_err)?;
let device = AsyncFd::new(device)?;
Ok(Self { device })
}
/// Returns a reference to the underlying device
pub fn device(&self) -> &Device {
self.device.get_ref()
}
}
impl Stream for EventStream {
type Item = io::Result<InputEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = self.get_mut();
if let Some(ev) = me.device.get_mut().pop_event() {
return Poll::Ready(Some(Ok(ev)));
}
loop {
let mut guard = ready!(me.device.poll_read_ready_mut(cx))?;
match guard.try_io(|device| device.get_mut().fill_events(DEFAULT_EVENT_COUNT)) {
Ok(res) => {
let ret = match res {
Ok(0) => None,
Ok(_) => Some(Ok(me.device.get_mut().pop_event().unwrap())),
Err(e) if e.raw_os_error() == Some(libc::ENODEV) => None,
Err(e) => Some(Err(e)),
};
return Poll::Ready(ret);
}
Err(_would_block) => continue,
}
}
}
}