Skip to content

Commit

Permalink
Compute soft deadline in poll() and use nonblocking sockets.
Browse files Browse the repository at this point in the history
Before this commit, anything that touched RawSocket or TapInterface
worked partly by accident and partly because of a horrible crutch
that resulted in massive latencies as well as inevitable packet loss
every time an ARP request had to be issued. Also, there was no way
to use poll() other than by continuously calling it in a busy loop.

After this commit, poll() indicates when the earliest timer expires,
and so the caller can sleep until that moment (or until packets
arrive).

Note that there is a subtle problem remaining: every time poll()
is called, every socket with a pending outbound packet whose
IP address doesn't correspond to a MAC address will send a new
ARP request, resulting in potentially a whole lot of such requests.
ARP rate limiting is a separate topic though.
whitequark committed Aug 29, 2017
1 parent 4663060 commit 39464a5
Showing 18 changed files with 312 additions and 195 deletions.
14 changes: 6 additions & 8 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,8 @@ mod utils;

use std::str::{self, FromStr};
use std::time::Instant;
use smoltcp::Error;
use std::os::unix::io::AsRawFd;
use smoltcp::phy::wait as phy_wait;
use smoltcp::wire::{EthernetAddress, IpAddress};
use smoltcp::iface::{ArpCache, SliceArpCache, EthernetInterface};
use smoltcp::socket::{AsSocket, SocketSet};
@@ -25,6 +26,7 @@ fn main() {

let mut matches = utils::parse_options(&opts, free);
let device = utils::parse_tap_options(&mut matches);
let fd = device.as_raw_fd();
let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/false);
let address = IpAddress::from_str(&matches.free[0]).expect("invalid address format");
let port = u16::from_str(&matches.free[1]).expect("invalid port format");
@@ -86,12 +88,8 @@ fn main() {
}
}

let timestamp = Instant::now().duration_since(startup_time);
let timestamp_ms = (timestamp.as_secs() * 1000) +
(timestamp.subsec_nanos() / 1000000) as u64;
match iface.poll(&mut sockets, timestamp_ms) {
Ok(()) | Err(Error::Exhausted) => (),
Err(e) => debug!("poll error: {}", e)
}
let timestamp = utils::millis_since(startup_time);
let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
phy_wait(fd, poll_at).expect("wait error");
}
}
10 changes: 6 additions & 4 deletions examples/loopback.rs
Original file line number Diff line number Diff line change
@@ -16,7 +16,6 @@ extern crate getopts;
mod utils;

use core::str;
use smoltcp::Error;
use smoltcp::phy::Loopback;
use smoltcp::wire::{EthernetAddress, IpAddress};
use smoltcp::iface::{ArpCache, SliceArpCache, EthernetInterface};
@@ -161,11 +160,14 @@ fn main() {
}

match iface.poll(&mut socket_set, clock.elapsed()) {
Ok(()) | Err(Error::Exhausted) => (),
Ok(Some(poll_at)) => {
let delay = poll_at - clock.elapsed();
debug!("sleeping for {} ms", delay);
clock.advance(delay)
}
Ok(None) => clock.advance(1),
Err(e) => debug!("poll error: {}", e)
}

clock.advance(1);
}

if done {
39 changes: 23 additions & 16 deletions examples/ping.rs
Original file line number Diff line number Diff line change
@@ -8,8 +8,10 @@ extern crate byteorder;
mod utils;

use std::str::{self, FromStr};
use std::time::{Duration, Instant};
use smoltcp::Error;
use std::cmp;
use std::time::Instant;
use std::os::unix::io::AsRawFd;
use smoltcp::phy::wait as phy_wait;
use smoltcp::wire::{EthernetAddress, IpVersion, IpProtocol, IpAddress,
Ipv4Address, Ipv4Packet, Ipv4Repr,
Icmpv4Repr, Icmpv4Packet};
@@ -35,6 +37,7 @@ fn main() {

let mut matches = utils::parse_options(&opts, free);
let device = utils::parse_tap_options(&mut matches);
let fd = device.as_raw_fd();
let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/false);
let address = Ipv4Address::from_str(&matches.free[0]).expect("invalid address format");
let count = matches.opt_str("count").map(|s| usize::from_str(&s).unwrap()).unwrap_or(4);
@@ -61,7 +64,7 @@ fn main() {
let mut sockets = SocketSet::new(vec![]);
let raw_handle = sockets.add(raw_socket);

let mut send_next = Duration::default();
let mut send_at = 0;
let mut seq_no = 0;
let mut received = 0;
let mut echo_payload = [0xffu8; 40];
@@ -75,11 +78,8 @@ fn main() {
let timestamp_us = (timestamp.as_secs() * 1000000) +
(timestamp.subsec_nanos() / 1000) as u64;

if seq_no == count as u16 && waiting_queue.is_empty() {
break;
}

if socket.can_send() && seq_no < count as u16 && send_next <= timestamp {
if socket.can_send() && seq_no < count as u16 &&
send_at <= utils::millis_since(startup_time) {
NetworkEndian::write_u64(&mut echo_payload, timestamp_us);
let icmp_repr = Icmpv4Repr::EchoRequest {
ident: 1,
@@ -105,7 +105,7 @@ fn main() {

waiting_queue.insert(seq_no, timestamp);
seq_no += 1;
send_next += Duration::new(interval, 0);
send_at += interval * 1000;
}

if socket.can_recv() {
@@ -137,16 +137,23 @@ fn main() {
println!("From {} icmp_seq={} timeout", remote_addr, seq);
false
}
})
});

if seq_no == count as u16 && waiting_queue.is_empty() {
break
}
}

let timestamp = Instant::now().duration_since(startup_time);
let timestamp_ms = (timestamp.as_secs() * 1000) +
(timestamp.subsec_nanos() / 1000000) as u64;
match iface.poll(&mut sockets, timestamp_ms) {
Ok(()) | Err(Error::Exhausted) => (),
Err(e) => debug!("poll error: {}", e),
let timestamp = utils::millis_since(startup_time);

let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
let mut resume_at = Some(send_at);
if let Some(poll_at) = poll_at {
resume_at = resume_at.map(|at| cmp::min(at, poll_at))
}

debug!("waiting until {:?} ms", resume_at);
phy_wait(fd, resume_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
}

println!("--- {} ping statistics ---", remote_addr);
14 changes: 6 additions & 8 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,8 @@ mod utils;
use std::str;
use std::fmt::Write;
use std::time::Instant;
use smoltcp::Error;
use std::os::unix::io::AsRawFd;
use smoltcp::phy::wait as phy_wait;
use smoltcp::wire::{EthernetAddress, IpAddress};
use smoltcp::iface::{ArpCache, SliceArpCache, EthernetInterface};
use smoltcp::socket::{AsSocket, SocketSet};
@@ -25,6 +26,7 @@ fn main() {

let mut matches = utils::parse_options(&opts, free);
let device = utils::parse_tap_options(&mut matches);
let fd = device.as_raw_fd();
let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/false);

let startup_time = Instant::now();
@@ -154,12 +156,8 @@ fn main() {
}
}

let timestamp = Instant::now().duration_since(startup_time);
let timestamp_ms = (timestamp.as_secs() * 1000) +
(timestamp.subsec_nanos() / 1000000) as u64;
match iface.poll(&mut sockets, timestamp_ms) {
Ok(()) | Err(Error::Exhausted) => (),
Err(e) => debug!("poll error: {}", e)
}
let timestamp = utils::millis_since(startup_time);
let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
phy_wait(fd, poll_at).expect("wait error");
}
}
7 changes: 7 additions & 0 deletions examples/utils.rs
Original file line number Diff line number Diff line change
@@ -129,3 +129,10 @@ pub fn parse_middleware_options<D: Device>(matches: &mut Matches, device: D, loo
device.set_bucket_interval(shaping_interval);
device
}

pub fn millis_since(startup_time: Instant) -> u64 {
let duration = Instant::now().duration_since(startup_time);
let duration_ms = (duration.as_secs() * 1000) +
(duration.subsec_nanos() / 1000000) as u64;
duration_ms
}
190 changes: 109 additions & 81 deletions src/iface/ethernet.rs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions src/macros.rs
Original file line number Diff line number Diff line change
@@ -11,21 +11,21 @@ macro_rules! net_log_enabled {
}

macro_rules! net_trace {
($($arg:expr),*) => {
($($arg:expr),*) => {{
#[cfg(feature = "log")]
trace!($($arg),*);
#[cfg(not(feature = "log"))]
$( let _ = $arg );*; // suppress unused variable warnings
}
}}
}

macro_rules! net_debug {
($($arg:expr),*) => {
($($arg:expr),*) => {{
#[cfg(feature = "log")]
debug!($($arg),*);
#[cfg(not(feature = "log"))]
$( let _ = $arg );*; // suppress unused variable warnings
}
}}
}

macro_rules! enum_with_unknown {
3 changes: 3 additions & 0 deletions src/phy/mod.rs
Original file line number Diff line number Diff line change
@@ -119,6 +119,9 @@ mod raw_socket;
#[cfg(all(feature = "tap_interface", target_os = "linux"))]
mod tap_interface;

#[cfg(any(feature = "raw_socket", feature = "tap_interface"))]
pub use self::sys::wait;

pub use self::tracer::Tracer;
pub use self::fault_injector::FaultInjector;
pub use self::pcap_writer::{PcapLinkType, PcapMode, PcapSink, PcapWriter};
7 changes: 7 additions & 0 deletions src/phy/raw_socket.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ use std::cell::RefCell;
use std::vec::Vec;
use std::rc::Rc;
use std::io;
use std::os::unix::io::{RawFd, AsRawFd};

use Result;
use super::{sys, DeviceLimits, Device};
@@ -13,6 +14,12 @@ pub struct RawSocket {
mtu: usize
}

impl AsRawFd for RawSocket {
fn as_raw_fd(&self) -> RawFd {
self.lower.borrow().as_raw_fd()
}
}

impl RawSocket {
/// Creates a raw socket, bound to the interface called `name`.
///
31 changes: 30 additions & 1 deletion src/phy/sys/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use libc;
use std::io;
use std::{mem, ptr, io};
use std::os::unix::io::RawFd;

#[cfg(target_os = "linux")]
#[path = "linux.rs"]
@@ -15,6 +16,34 @@ pub use self::raw_socket::RawSocketDesc;
#[cfg(all(feature = "tap_interface", target_os = "linux"))]
pub use self::tap_interface::TapInterfaceDesc;

/// Wait until given file descriptor becomes readable, but no longer than given timeout.
pub fn wait(fd: RawFd, millis: Option<u64>) -> io::Result<()> {
unsafe {
let mut readfds = mem::uninitialized::<libc::fd_set>();
libc::FD_ZERO(&mut readfds);
libc::FD_SET(fd, &mut readfds);

let mut writefds = mem::uninitialized::<libc::fd_set>();
libc::FD_ZERO(&mut writefds);

let mut exceptfds = mem::uninitialized::<libc::fd_set>();
libc::FD_ZERO(&mut exceptfds);

let mut timeout = libc::timeval { tv_sec: 0, tv_usec: 0 };
let timeout_ptr =
if let Some(millis) = millis {
timeout.tv_usec = (millis * 1_000) as libc::suseconds_t;
&mut timeout as *mut _
} else {
ptr::null_mut()
};

let res = libc::select(fd + 1, &mut readfds, &mut writefds, &mut exceptfds, timeout_ptr);
if res == -1 { return Err(io::Error::last_os_error()) }
Ok(())
}
}

#[repr(C)]
#[derive(Debug)]
struct ifreq {
11 changes: 9 additions & 2 deletions src/phy/sys/raw_socket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use libc;
use std::{mem, io};
use std::os::unix::io::{RawFd, AsRawFd};
use libc;
use super::*;

#[derive(Debug)]
@@ -8,10 +9,16 @@ pub struct RawSocketDesc {
ifreq: ifreq
}

impl AsRawFd for RawSocketDesc {
fn as_raw_fd(&self) -> RawFd {
self.lower
}
}

impl RawSocketDesc {
pub fn new(name: &str) -> io::Result<RawSocketDesc> {
let lower = unsafe {
let lower = libc::socket(libc::AF_PACKET, libc::SOCK_RAW,
let lower = libc::socket(libc::AF_PACKET, libc::SOCK_RAW | libc::SOCK_NONBLOCK,
imp::ETH_P_ALL.to_be() as i32);
if lower == -1 { return Err(io::Error::last_os_error()) }
lower
34 changes: 8 additions & 26 deletions src/phy/sys/tap_interface.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
use std::mem;
use std::io;
use std::os::unix::io::{RawFd, AsRawFd};
use libc;
use super::*;

#[cfg(target_os = "linux")]
#[derive(Debug)]
pub struct TapInterfaceDesc {
lower: libc::c_int,
ifreq: ifreq
}

impl AsRawFd for TapInterfaceDesc {
fn as_raw_fd(&self) -> RawFd {
self.lower
}
}

impl TapInterfaceDesc {
pub fn new(name: &str) -> io::Result<TapInterfaceDesc> {
let lower = unsafe {
let lower = libc::open("/dev/net/tun\0".as_ptr() as *const libc::c_char,
libc::O_RDWR);
libc::O_RDWR | libc::O_NONBLOCK);
if lower == -1 { return Err(io::Error::last_os_error()) }
lower
};
@@ -44,28 +49,7 @@ impl TapInterfaceDesc {
mtu
}

fn wait(&mut self, ms: u32) -> io::Result<bool> {
unsafe {
let mut readfds = mem::uninitialized::<libc::fd_set>();
libc::FD_ZERO(&mut readfds);
libc::FD_SET(self.lower, &mut readfds);
let mut writefds = mem::uninitialized::<libc::fd_set>();
libc::FD_ZERO(&mut writefds);
let mut exceptfds = mem::uninitialized::<libc::fd_set>();
libc::FD_ZERO(&mut exceptfds);
let mut timeout = libc::timeval { tv_sec: 0, tv_usec: (ms * 1_000) as libc::suseconds_t };
let res = libc::select(self.lower + 1, &mut readfds, &mut writefds, &mut exceptfds,
&mut timeout);
if res == -1 { return Err(io::Error::last_os_error()) }
Ok(res == 0)
}
}

pub fn recv(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
// FIXME: here we don't wait forever, in case we need to send several packets in a row
// ideally this would be implemented by going full nonblocking
if self.wait(100)? { return Err(io::ErrorKind::TimedOut)? }

unsafe {
let len = libc::read(self.lower, buffer.as_mut_ptr() as *mut libc::c_void,
buffer.len());
@@ -75,8 +59,6 @@ impl TapInterfaceDesc {
}

pub fn send(&mut self, buffer: &[u8]) -> io::Result<usize> {
self.wait(100)?;

unsafe {
let len = libc::write(self.lower, buffer.as_ptr() as *const libc::c_void,
buffer.len());
11 changes: 9 additions & 2 deletions src/phy/tap_interface.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ use std::cell::RefCell;
use std::vec::Vec;
use std::rc::Rc;
use std::io;
use std::os::unix::io::{RawFd, AsRawFd};

use {Error, Result};
use super::{sys, DeviceLimits, Device};
@@ -13,6 +14,12 @@ pub struct TapInterface {
mtu: usize
}

impl AsRawFd for TapInterface {
fn as_raw_fd(&self) -> RawFd {
self.lower.borrow().as_raw_fd()
}
}

impl TapInterface {
/// Attaches to a TAP interface called `name`, or creates it if it does not exist.
///
@@ -49,10 +56,10 @@ impl Device for TapInterface {
buffer.resize(size, 0);
Ok(buffer)
}
Err(ref err) if err.kind() == io::ErrorKind::TimedOut => {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
Err(Error::Exhausted)
}
Err(err) => panic!(err)
Err(err) => panic!("{}", err)
}
}

4 changes: 4 additions & 0 deletions src/socket/mod.rs
Original file line number Diff line number Diff line change
@@ -78,6 +78,10 @@ impl<'a, 'b> Socket<'a, 'b> {
pub fn set_debug_id(&mut self, id: usize) {
dispatch_socket!(self, |socket [mut]| socket.set_debug_id(id))
}

pub(crate) fn poll_at(&self) -> Option<u64> {
dispatch_socket!(self, |socket []| socket.poll_at())
}
}

/// A conversion trait for network sockets.
51 changes: 30 additions & 21 deletions src/socket/raw.rs
Original file line number Diff line number Diff line change
@@ -199,24 +199,33 @@ impl<'a, 'b> RawSocket<'a, 'b> {
}
}

let mut packet_buf = self.tx_buffer.dequeue()?;
match prepare(self.ip_protocol, packet_buf.as_mut()) {
Ok((ip_repr, raw_packet)) => {
net_trace!("[{}]:{}:{}: sending {} octets",
self.debug_id, self.ip_version, self.ip_protocol,
ip_repr.buffer_len() + raw_packet.len());
emit((ip_repr, raw_packet))
}
Err(error) => {
net_debug!("[{}]:{}:{}: dropping outgoing packet ({})",
self.debug_id, self.ip_version, self.ip_protocol,
error);
// This case is a bit special because in every other socket, no matter what data
// is put into the socket, it can be sent, but it's possible to put data into
// a raw socket that may not be, and we're generic over the result type, so
// we can't possibly return Ok(()) here.
Err(Error::Rejected)
let debug_id = self.debug_id;
let ip_protocol = self.ip_protocol;
let ip_version = self.ip_version;
self.tx_buffer.try_dequeue(|packet_buf| {
match prepare(ip_protocol, packet_buf.as_mut()) {
Ok((ip_repr, raw_packet)) => {
net_trace!("[{}]:{}:{}: sending {} octets",
debug_id, ip_version, ip_protocol,
ip_repr.buffer_len() + raw_packet.len());
emit((ip_repr, raw_packet))
}
Err(error) => {
net_debug!("[{}]:{}:{}: dropping outgoing packet ({})",
debug_id, ip_version, ip_protocol,
error);
// Return Ok(()) so the packet is dequeued.
Ok(())
}
}
})
}

pub(crate) fn poll_at(&self) -> Option<u64> {
if self.tx_buffer.empty() {
None
} else {
Some(0)
}
}
}
@@ -285,13 +294,13 @@ mod test {
assert_eq!(ip_payload, &PACKET_PAYLOAD);
Err(Error::Unaddressable)
}), Err(Error::Unaddressable));
/*assert!(!socket.can_send());*/
assert!(!socket.can_send());

assert_eq!(socket.dispatch(|(ip_repr, ip_payload)| {
assert_eq!(ip_repr, HEADER_REPR);
assert_eq!(ip_payload, &PACKET_PAYLOAD);
Ok(())
}), /*Ok(())*/ Err(Error::Exhausted));
}), Ok(()));
assert!(socket.can_send());
}

@@ -304,14 +313,14 @@ mod test {

assert_eq!(socket.send_slice(&wrong_version[..]), Ok(()));
assert_eq!(socket.dispatch(|_| unreachable!()),
Err(Error::Rejected));
Ok(()));

let mut wrong_protocol = PACKET_BYTES.clone();
Ipv4Packet::new(&mut wrong_protocol).set_protocol(IpProtocol::Tcp);

assert_eq!(socket.send_slice(&wrong_protocol[..]), Ok(()));
assert_eq!(socket.dispatch(|_| unreachable!()),
Err(Error::Rejected));
Ok(()));
}

#[test]
22 changes: 20 additions & 2 deletions src/socket/tcp.rs
Original file line number Diff line number Diff line change
@@ -200,6 +200,14 @@ impl Timer {
}
}

fn poll_at(&self) -> Option<u64> {
match *self {
Timer::Idle => None,
Timer::Retransmit { expires_at, .. } => Some(expires_at),
Timer::Close { expires_at, .. } => Some(expires_at),
}
}

fn reset(&mut self) {
*self = Timer::Idle
}
@@ -1256,6 +1264,16 @@ impl<'a> TcpSocket<'a> {

Ok(())
}

pub(crate) fn poll_at(&self) -> Option<u64> {
self.timer.poll_at().or_else(|| {
if self.tx_buffer.empty() {
None
} else {
Some(0)
}
})
}
}

impl<'a> fmt::Write for TcpSocket<'a> {
@@ -2835,14 +2853,14 @@ mod test {

limits.max_burst_size = None;
s.send_slice(b"abcdef").unwrap();
s.dispatch(0, &limits, |(ip_repr, tcp_repr)| {
s.dispatch(0, &limits, |(_ip_repr, tcp_repr)| {
assert_eq!(tcp_repr.window_len, 32767);
Ok(())
}).unwrap();

limits.max_burst_size = Some(4);
s.send_slice(b"abcdef").unwrap();
s.dispatch(0, &limits, |(ip_repr, tcp_repr)| {
s.dispatch(0, &limits, |(_ip_repr, tcp_repr)| {
assert_eq!(tcp_repr.window_len, 5920);
Ok(())
}).unwrap();
47 changes: 29 additions & 18 deletions src/socket/udp.rs
Original file line number Diff line number Diff line change
@@ -196,23 +196,34 @@ impl<'a, 'b> UdpSocket<'a, 'b> {

pub(crate) fn dispatch<F>(&mut self, emit: F) -> Result<()>
where F: FnOnce((IpRepr, UdpRepr)) -> Result<()> {
let packet_buf = self.tx_buffer.dequeue()?;
net_trace!("[{}]{}:{}: sending {} octets",
self.debug_id, self.endpoint,
packet_buf.endpoint, packet_buf.size);
let debug_id = self.debug_id;
let endpoint = self.endpoint;
self.tx_buffer.try_dequeue(|packet_buf| {
net_trace!("[{}]{}:{}: sending {} octets",
debug_id, endpoint,
packet_buf.endpoint, packet_buf.size);

let repr = UdpRepr {
src_port: endpoint.port,
dst_port: packet_buf.endpoint.port,
payload: &packet_buf.as_ref()[..]
};
let ip_repr = IpRepr::Unspecified {
src_addr: endpoint.addr,
dst_addr: packet_buf.endpoint.addr,
protocol: IpProtocol::Udp,
payload_len: repr.buffer_len()
};
emit((ip_repr, repr))
})
}

let repr = UdpRepr {
src_port: self.endpoint.port,
dst_port: packet_buf.endpoint.port,
payload: &packet_buf.as_ref()[..]
};
let ip_repr = IpRepr::Unspecified {
src_addr: self.endpoint.addr,
dst_addr: packet_buf.endpoint.addr,
protocol: IpProtocol::Udp,
payload_len: repr.buffer_len()
};
emit((ip_repr, repr))
pub(crate) fn poll_at(&self) -> Option<u64> {
if self.tx_buffer.empty() {
None
} else {
Some(0)
}
}
}

@@ -310,13 +321,13 @@ mod test {
assert_eq!(udp_repr, LOCAL_UDP_REPR);
Err(Error::Unaddressable)
}), Err(Error::Unaddressable));
/*assert!(!socket.can_send());*/
assert!(!socket.can_send());

assert_eq!(socket.dispatch(|(ip_repr, udp_repr)| {
assert_eq!(ip_repr, LOCAL_IP_REPR);
assert_eq!(udp_repr, LOCAL_UDP_REPR);
Ok(())
}), /*Ok(())*/ Err(Error::Exhausted));
}), Ok(()));
assert!(socket.can_send());
}

4 changes: 2 additions & 2 deletions src/storage/ring_buffer.rs
Original file line number Diff line number Diff line change
@@ -61,7 +61,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
/// Call `f` with a buffer element, and enqueue the element if `f` returns successfully, or
/// return `Err(Error::Exhausted)` if the buffer is full.
pub fn try_enqueue<'b, R, F>(&'b mut self, f: F) -> Result<R>
where F: Fn(&'b mut T) -> Result<R> {
where F: FnOnce(&'b mut T) -> Result<R> {
if self.full() { return Err(Error::Exhausted) }

let index = self.mask(self.read_at + self.length);
@@ -88,7 +88,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
/// Call `f` with a buffer element, and dequeue the element if `f` returns successfully, or
/// return `Err(Error::Exhausted)` if the buffer is empty.
pub fn try_dequeue<'b, R, F>(&'b mut self, f: F) -> Result<R>
where F: Fn(&'b mut T) -> Result<R> {
where F: FnOnce(&'b mut T) -> Result<R> {
if self.empty() { return Err(Error::Exhausted) }

let next_at = self.incr(self.read_at);

1 comment on commit 39464a5

@whitequark
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@batonius By the way, after this commit:

$ ./target/debug/examples/ping tap0 192.168.69.100
40 bytes from 192.168.69.100: icmp_seq=0, time=2.186ms
40 bytes from 192.168.69.100: icmp_seq=1, time=0.650ms
40 bytes from 192.168.69.100: icmp_seq=2, time=0.663ms
40 bytes from 192.168.69.100: icmp_seq=3, time=0.940ms
--- 192.168.69.100 ping statistics ---
4 packets transmitted, 4 received, 0% packet loss

Sorry, something went wrong.

Please sign in to comment.