Skip to content

Commit

Permalink
Redesign the phy::Device trait to avoid Drop impls.
Browse files Browse the repository at this point in the history
phil-opp authored and whitequark committed Nov 3, 2017
1 parent d1d80ca commit 198fe23
Showing 10 changed files with 593 additions and 445 deletions.
9 changes: 6 additions & 3 deletions examples/tcpdump.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
extern crate smoltcp;

use std::env;
use smoltcp::phy::{Device, RawSocket};
use smoltcp::phy::{Device, RxToken, RawSocket};
use smoltcp::wire::{PrettyPrinter, EthernetFrame};

fn main() {
let ifname = env::args().nth(1).unwrap();
let mut socket = RawSocket::new(ifname.as_ref()).unwrap();
loop {
let buffer = socket.receive(/*timestamp=*/0).unwrap();
print!("{}", PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &buffer))
let (rx_token, _) = socket.receive().unwrap();
rx_token.consume(/*timestamp = */ 0, |buffer| {
print!("{}", PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &buffer));
Ok(())
}).unwrap();
}
}
2 changes: 1 addition & 1 deletion examples/utils.rs
Original file line number Diff line number Diff line change
@@ -92,7 +92,7 @@ pub fn add_middleware_options(opts: &mut Options, _free: &mut Vec<&str>) {
opts.optopt("", "shaping-interval", "Sets the interval for rate limiting (ms)", "RATE");
}

pub fn parse_middleware_options<D: Device>(matches: &mut Matches, device: D, loopback: bool)
pub fn parse_middleware_options<D: for<'a> Device<'a>>(matches: &mut Matches, device: D, loopback: bool)
-> FaultInjector<EthernetTracer<PcapWriter<D, Rc<PcapSink>>>> {
let drop_chance = matches.opt_str("drop-chance").map(|s| u8::from_str(&s).unwrap())
.unwrap_or(0);
315 changes: 184 additions & 131 deletions src/iface/ethernet.rs

Large diffs are not rendered by default.

182 changes: 103 additions & 79 deletions src/phy/fault_injector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use core::cell::RefCell;

use {Error, Result};
use super::{DeviceCapabilities, Device};
use phy::{self, DeviceCapabilities, Device};

// We use our own RNG to stay compatible with #![no_std].
// The use of the RNG below has a slight bias, but it doesn't matter.
@@ -26,7 +28,7 @@ struct Config {
interval: u64,
}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
struct State {
rng_seed: u32,
refilled_at: u64,
@@ -86,13 +88,13 @@ impl State {
/// adverse network conditions (such as random packet loss or corruption), or software
/// or hardware limitations (such as a limited number or size of usable network buffers).
#[derive(Debug)]
pub struct FaultInjector<D: Device> {
inner: D,
state: State,
config: Config
pub struct FaultInjector<D: for<'a> Device<'a>> {
inner: D,
state: RefCell<State>,
config: Config,
}

impl<D: Device> FaultInjector<D> {
impl<D: for<'a> Device<'a>> FaultInjector<D> {
/// Create a fault injector device, using the given random number generator seed.
pub fn new(inner: D, seed: u32) -> FaultInjector<D> {
let state = State {
@@ -103,8 +105,8 @@ impl<D: Device> FaultInjector<D> {
};
FaultInjector {
inner: inner,
state: state,
config: Config::default()
state: RefCell::new(state),
config: Config::default(),
}
}

@@ -178,15 +180,16 @@ impl<D: Device> FaultInjector<D> {

/// Set the interval for packet rate limiting, in milliseconds.
pub fn set_bucket_interval(&mut self, interval: u64) {
self.state.refilled_at = 0;
self.state.borrow_mut().refilled_at = 0;
self.config.interval = interval
}
}

impl<D: Device> Device for FaultInjector<D>
where D::RxBuffer: AsMut<[u8]> {
type RxBuffer = D::RxBuffer;
type TxBuffer = TxBuffer<D::TxBuffer>;
impl<'a, D> Device<'a> for FaultInjector<D>
where D: for<'b> Device<'b>,
{
type RxToken = RxToken<'a, <D as Device<'a>>::RxToken>;
type TxToken = TxToken<'a, <D as Device<'a>>::TxToken>;

fn capabilities(&self) -> DeviceCapabilities {
let mut caps = self.inner.capabilities();
@@ -196,88 +199,109 @@ impl<D: Device> Device for FaultInjector<D>
caps
}

fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer> {
let mut buffer = self.inner.receive(timestamp)?;
if self.state.maybe(self.config.drop_pct) {
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
let &mut Self { ref mut inner, ref state, config } = self;
inner.receive().map(|(rx_token, tx_token)| {
let rx = RxToken {
state: &state,
config: config,
token: rx_token,
corrupt: [0; MTU],
};
let tx = TxToken {
state: &state,
config: config,
token: tx_token,
junk: [0; MTU],
};
(rx, tx)
})
}

fn transmit(&'a mut self) -> Option<Self::TxToken> {
let &mut Self { ref mut inner, ref state, config } = self;
inner.transmit().map(|token| TxToken {
state: &state,
config: config,
token: token,
junk: [0; MTU],
})
}
}

#[doc(hidden)]
pub struct RxToken<'a, Rx: phy::RxToken> {
state: &'a RefCell<State>,
config: Config,
token: Rx,
corrupt: [u8; MTU],
}

impl<'a, Rx: phy::RxToken> phy::RxToken for RxToken<'a, Rx> {
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, timestamp: u64, f: F) -> Result<R> {
if self.state.borrow_mut().maybe(self.config.drop_pct) {
net_trace!("rx: randomly dropping a packet");
return Err(Error::Exhausted)
}
if self.state.maybe(self.config.corrupt_pct) {
net_trace!("rx: randomly corrupting a packet");
self.state.corrupt(&mut buffer)
}
if self.config.max_size > 0 && buffer.as_ref().len() > self.config.max_size {
net_trace!("rx: dropping a packet that is too large");
return Err(Error::Exhausted)
}
if !self.state.maybe_receive(&self.config, timestamp) {
if !self.state.borrow_mut().maybe_receive(&self.config, timestamp) {
net_trace!("rx: dropping a packet because of rate limiting");
return Err(Error::Exhausted)
}
Ok(buffer)
}

fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
let buffer;
if self.state.maybe(self.config.drop_pct) {
net_trace!("tx: randomly dropping a packet");
buffer = None;
} else if self.config.max_size > 0 && length > self.config.max_size {
net_trace!("tx: dropping a packet that is too large");
buffer = None;
} else if !self.state.maybe_transmit(&self.config, timestamp) {
net_trace!("tx: dropping a packet because of rate limiting");
buffer = None;
} else {
buffer = Some(self.inner.transmit(timestamp, length)?);
}
Ok(TxBuffer {
buffer: buffer,
state: self.state.clone(),
config: self.config,
junk: [0; MTU],
length: length
let Self { token, config, state, mut corrupt } = self;
token.consume(timestamp, |buffer| {
if config.max_size > 0 && buffer.as_ref().len() > config.max_size {
net_trace!("rx: dropping a packet that is too large");
return Err(Error::Exhausted)
}
if state.borrow_mut().maybe(config.corrupt_pct) {
net_trace!("rx: randomly corrupting a packet");
let mut corrupt = &mut corrupt[..buffer.len()];
corrupt.copy_from_slice(buffer);
state.borrow_mut().corrupt(&mut corrupt);
f(&mut corrupt)
} else {
f(buffer)
}
})
}
}

#[doc(hidden)]
pub struct TxBuffer<B: AsRef<[u8]> + AsMut<[u8]>> {
state: State,
pub struct TxToken<'a, Tx: phy::TxToken> {
state: &'a RefCell<State>,
config: Config,
buffer: Option<B>,
token: Tx,
junk: [u8; MTU],
length: usize
}

impl<B: AsRef<[u8]> + AsMut<[u8]>> AsRef<[u8]> for TxBuffer<B> {
fn as_ref(&self) -> &[u8] {
match self.buffer {
Some(ref buf) => buf.as_ref(),
None => &self.junk[..self.length]
}
}
}
impl<'a, Tx: phy::TxToken> phy::TxToken for TxToken<'a, Tx> {
fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(mut self, timestamp: u64, len: usize, f: F)
-> Result<R>
{
let drop = if self.state.borrow_mut().maybe(self.config.drop_pct) {
net_trace!("tx: randomly dropping a packet");
true
} else if self.config.max_size > 0 && len > self.config.max_size {
net_trace!("tx: dropping a packet that is too large");
true
} else if !self.state.borrow_mut().maybe_transmit(&self.config, timestamp) {
net_trace!("tx: dropping a packet because of rate limiting");
true
} else {
false
};

impl<B: AsRef<[u8]> + AsMut<[u8]>> AsMut<[u8]> for TxBuffer<B> {
fn as_mut(&mut self) -> &mut [u8] {
match self.buffer {
Some(ref mut buf) => buf.as_mut(),
None => &mut self.junk[..self.length]
if drop {
return f(&mut self.junk);
}
}
}

impl<B: AsRef<[u8]> + AsMut<[u8]>> Drop for TxBuffer<B> {
fn drop(&mut self) {
match self.buffer {
Some(ref mut buf) => {
if self.state.maybe(self.config.corrupt_pct) {
net_trace!("tx: corrupting a packet");
self.state.corrupt(buf)
}
},
None => ()
}
let Self { token, state, config, .. } = self;
token.consume(timestamp, len, |mut buf| {
if state.borrow_mut().maybe(config.corrupt_pct) {
net_trace!("tx: corrupting a packet");
state.borrow_mut().corrupt(&mut buf)
}
f(buf)
})
}
}
72 changes: 37 additions & 35 deletions src/phy/loopback.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
use core::mem::swap;
use core::cell::RefCell;
#[cfg(feature = "std")]
use std::rc::Rc;
#[cfg(feature = "alloc")]
use alloc::rc::Rc;
#[cfg(feature = "std")]
use std::vec::Vec;
#[cfg(feature = "std")]
use std::collections::VecDeque;
#[cfg(feature = "alloc")]
use alloc::{Vec, VecDeque};

use {Error, Result};
use super::{Device, DeviceCapabilities};
use Result;
use phy::{self, Device, DeviceCapabilities};

/// A loopback device.
#[derive(Debug)]
pub struct Loopback(Rc<RefCell<VecDeque<Vec<u8>>>>);
pub struct Loopback {
queue: VecDeque<Vec<u8>>,
}

impl Loopback {
/// Creates a loopback device.
///
/// Every packet transmitted through this device will be received through it
/// in FIFO order.
pub fn new() -> Loopback {
Loopback(Rc::new(RefCell::new(VecDeque::new())))
Loopback {
queue: VecDeque::new(),
}
}
}

impl Device for Loopback {
type RxBuffer = Vec<u8>;
type TxBuffer = TxBuffer;
impl<'a> Device<'a> for Loopback {
type RxToken = RxToken;
type TxToken = TxToken<'a>;

fn capabilities(&self) -> DeviceCapabilities {
DeviceCapabilities {
@@ -39,41 +37,45 @@ impl Device for Loopback {
}
}

fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
match self.0.borrow_mut().pop_front() {
Some(packet) => Ok(packet),
None => Err(Error::Exhausted)
}
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
self.queue.pop_front().map(move |buffer| {
let rx = RxToken { buffer: buffer };
let tx = TxToken { queue: &mut self.queue };
(rx, tx)
})
}

fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
let mut buffer = Vec::new();
buffer.resize(length, 0);
Ok(TxBuffer {
queue: self.0.clone(),
buffer: buffer
fn transmit(&'a mut self) -> Option<Self::TxToken> {
Some(TxToken {
queue: &mut self.queue,
})
}
}

#[doc(hidden)]
pub struct TxBuffer {
queue: Rc<RefCell<VecDeque<Vec<u8>>>>,
buffer: Vec<u8>
pub struct RxToken {
buffer: Vec<u8>,
}

impl AsRef<[u8]> for TxBuffer {
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
impl phy::RxToken for RxToken {
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, _timestamp: u64, f: F) -> Result<R> {
f(&self.buffer)
}
}

impl AsMut<[u8]> for TxBuffer {
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
#[doc(hidden)]
pub struct TxToken<'a> {
queue: &'a mut VecDeque<Vec<u8>>,
}

impl Drop for TxBuffer {
fn drop(&mut self) {
impl<'a> phy::TxToken for TxToken<'a> {
fn consume<R, F>(self, _timestamp: u64, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
{
let mut buffer = Vec::new();
swap(&mut buffer, &mut self.buffer);
self.queue.borrow_mut().push_back(buffer)
buffer.resize(len, 0);
let result = f(&mut buffer);
self.queue.push_back(buffer);
result
}
}
160 changes: 82 additions & 78 deletions src/phy/mod.rs
Original file line number Diff line number Diff line change
@@ -19,87 +19,66 @@
//!
/*!
```rust
use std::slice;
use smoltcp::{Error, Result};
use smoltcp::phy::{DeviceCapabilities, Device};
use smoltcp::Result;
use smoltcp::phy::{self, DeviceCapabilities, Device};
const TX_BUFFERS: [*mut u8; 2] = [0x10000000 as *mut u8, 0x10001000 as *mut u8];
const RX_BUFFERS: [*mut u8; 2] = [0x10002000 as *mut u8, 0x10003000 as *mut u8];
fn rx_full() -> bool {
/* platform-specific code to check if an incoming packet has arrived */
false
}
fn rx_setup(_buf: *mut u8, _length: &mut usize) {
/* platform-specific code to receive a packet into a buffer */
struct StmPhy {
rx_buffer: [u8; 1536],
tx_buffer: [u8; 1536],
}
fn tx_empty() -> bool {
/* platform-specific code to check if an outgoing packet can be sent */
false
impl<'a> StmPhy {
fn new() -> StmPhy {
StmPhy {
rx_buffer: [0; 1536],
tx_buffer: [0; 1536],
}
}
}
fn tx_setup(_buf: *const u8, _length: usize) {
/* platform-specific code to send a buffer with a packet */
}
impl<'a> phy::Device<'a> for StmPhy {
type RxToken = StmPhyRxToken<'a>;
type TxToken = StmPhyTxToken<'a>;
# #[allow(dead_code)]
pub struct EthernetDevice {
tx_next: usize,
rx_next: usize
}
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
Some((StmPhyRxToken(&mut self.rx_buffer[..]),
StmPhyTxToken(&mut self.tx_buffer[..])))
}
impl Device for EthernetDevice {
type RxBuffer = &'static [u8];
type TxBuffer = EthernetTxBuffer;
fn transmit(&'a mut self) -> Option<Self::TxToken> {
Some(StmPhyTxToken(&mut self.tx_buffer[..]))
}
fn capabilities(&self) -> DeviceCapabilities {
let mut caps = DeviceCapabilities::default();
caps.max_transmission_unit = 1536;
caps.max_burst_size = Some(2);
caps.max_burst_size = Some(1);
caps
}
fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
if rx_full() {
let index = self.rx_next;
self.rx_next = (self.rx_next + 1) % RX_BUFFERS.len();
let mut length = 0;
rx_setup(RX_BUFFERS[self.rx_next], &mut length);
Ok(unsafe {
slice::from_raw_parts(RX_BUFFERS[index], length)
})
} else {
Err(Error::Exhausted)
}
}
fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
if tx_empty() {
let index = self.tx_next;
self.tx_next = (self.tx_next + 1) % TX_BUFFERS.len();
Ok(EthernetTxBuffer(unsafe {
slice::from_raw_parts_mut(TX_BUFFERS[index], length)
}))
} else {
Err(Error::Exhausted)
}
}
}
pub struct EthernetTxBuffer(&'static mut [u8]);
struct StmPhyRxToken<'a>(&'a [u8]);
impl AsRef<[u8]> for EthernetTxBuffer {
fn as_ref(&self) -> &[u8] { self.0 }
impl<'a> phy::RxToken for StmPhyRxToken<'a> {
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, _timestamp: u64, f: F) -> Result<R> {
// TODO: receive packet into buffer
let result = f(self.0);
println!("rx called");
result
}
}
impl AsMut<[u8]> for EthernetTxBuffer {
fn as_mut(&mut self) -> &mut [u8] { self.0 }
}
struct StmPhyTxToken<'a>(&'a mut [u8]);
impl Drop for EthernetTxBuffer {
fn drop(&mut self) { tx_setup(self.0.as_ptr(), self.0.len()) }
impl<'a> phy::TxToken for StmPhyTxToken<'a> {
fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(self, _timestamp: u64, len: usize, f: F)
-> Result<R>
{
let result = f(&mut self.0[..len]);
println!("tx called {}", len);
// TODO: send packet out
result
}
}
```
*/
@@ -229,27 +208,52 @@ pub struct DeviceCapabilities {

/// An interface for sending and receiving raw network frames.
///
/// It is expected that a `Device` implementation would allocate memory for both sending
/// and receiving packets from memory pools; hence, the stack borrows the buffer for a packet
/// that it is about to receive, as well for a packet that it is about to send, from the device.
pub trait Device {
type RxBuffer: AsRef<[u8]>;
type TxBuffer: AsRef<[u8]> + AsMut<[u8]>;
/// The interface is based on _tokens_, which are types that allow to receive/transmit a
/// single packet. The `receive` and `transmit` functions only construct such tokens, the
/// real sending/receiving operation are performed when the tokens are consumed.
pub trait Device<'a> {
type RxToken: RxToken + 'a;
type TxToken: TxToken + 'a;

/// Construct a token pair consisting of one receive token and one transmit token.
///
/// The additional transmit token makes it possible to generate a reply packet based
/// on the contents of the received packet. For example, this makes it possible to
/// handle arbitrarily large ICMP echo ("ping") requests, where the all received bytes
/// need to be sent back, without heap allocation.
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)>;

/// Construct a transmit token.
fn transmit(&'a mut self) -> Option<Self::TxToken>;

/// Get a description of device capabilities.
fn capabilities(&self) -> DeviceCapabilities;
}

/// Receive a frame.
/// A token to receive a single network packet.
pub trait RxToken {
/// Consumes the token to receive a single network packet.
///
/// It is expected that a `receive` implementation, once a packet is written to memory
/// through DMA, would gain ownership of the underlying buffer, provide it for parsing,
/// and return it to the network device once it is dropped.
fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer>;
/// This method receives a packet and then calls the given closure `f` with the raw
/// packet bytes as argument.
///
/// The timestamp must be a number of milliseconds, monotonically increasing since an
/// arbitrary moment in time, such as system startup.
fn consume<R, F>(self, timestamp: u64, f: F) -> Result<R>
where F: FnOnce(&[u8]) -> Result<R>;
}

/// Transmit a frame.
/// A token to transmit a single network packet.
pub trait TxToken {
/// Consumes the token to send a single network packet.
///
/// This method constructs a transmit buffer of size `len` and calls the passed
/// closure `f` with a mutable reference to that buffer. The closure should construct
/// a valid network packet (e.g. an ethernet packet) in the buffer. When the closure
/// returns, the transmit buffer is sent out.
///
/// It is expected that a `transmit` implementation would gain ownership of a buffer with
/// the requested length, provide it for emission, and schedule it to be read from
/// memory by the network device once it is dropped.
fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer>;
/// The timestamp must be a number of milliseconds, monotonically increasing since an
/// arbitrary moment in time, such as system startup.
fn consume<R, F>(self, timestamp: u64, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>;
}
97 changes: 60 additions & 37 deletions src/phy/pcap_writer.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ use std::io::Write;
use byteorder::{ByteOrder, NativeEndian};

use Result;
use super::{DeviceCapabilities, Device};
use phy::{self, DeviceCapabilities, Device};

enum_with_unknown! {
/// Captured packet header type.
@@ -114,67 +114,90 @@ impl<T: AsMut<Write>> PcapSink for RefCell<T> {
/// [libpcap]: https://wiki.wireshark.org/Development/LibpcapFileFormat
/// [sink]: trait.PcapSink.html
#[derive(Debug)]
pub struct PcapWriter<D: Device, S: PcapSink + Clone> {
pub struct PcapWriter<D, S>
where D: for<'a> Device<'a>,
S: PcapSink + Clone,
{
lower: D,
sink: S,
mode: PcapMode
mode: PcapMode,
}

impl<D: Device, S: PcapSink + Clone> PcapWriter<D, S> {
impl<D: for<'a> Device<'a>, S: PcapSink + Clone> PcapWriter<D, S> {
/// Creates a packet capture writer.
pub fn new(lower: D, sink: S, mode: PcapMode, link_type: PcapLinkType) -> PcapWriter<D, S> {
sink.global_header(link_type);
PcapWriter { lower, sink, mode }
}
}

impl<D: Device, S: PcapSink + Clone> Device for PcapWriter<D, S> {
type RxBuffer = D::RxBuffer;
type TxBuffer = TxBuffer<D::TxBuffer, S>;
impl<'a, D, S> Device<'a> for PcapWriter<D, S>
where D: for<'b> Device<'b>,
S: PcapSink + Clone + 'a,
{
type RxToken = RxToken<<D as Device<'a>>::RxToken, S>;
type TxToken = TxToken<<D as Device<'a>>::TxToken, S>;

fn capabilities(&self) -> DeviceCapabilities { self.lower.capabilities() }

fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer> {
let buffer = self.lower.receive(timestamp)?;
match self.mode {
PcapMode::Both | PcapMode::RxOnly =>
self.sink.packet(timestamp, buffer.as_ref()),
PcapMode::TxOnly => ()
}
Ok(buffer)
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
let &mut Self { ref mut lower, ref sink, mode, .. } = self;
lower.receive().map(|(rx_token, tx_token)| {
let rx = RxToken { token: rx_token, sink: sink.clone(), mode: mode };
let tx = TxToken { token: tx_token, sink: sink.clone(), mode: mode };
(rx, tx)
})
}

fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
let buffer = self.lower.transmit(timestamp, length)?;
Ok(TxBuffer { buffer, timestamp, sink: self.sink.clone(), mode: self.mode })
fn transmit(&'a mut self) -> Option<Self::TxToken> {
let &mut Self { ref mut lower, ref sink, mode } = self;
lower.transmit().map(|token| {
TxToken { token, sink: sink.clone(), mode: mode }
})
}
}

#[doc(hidden)]
pub struct TxBuffer<B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink> {
buffer: B,
timestamp: u64,
sink: S,
mode: PcapMode
pub struct RxToken<Rx: phy::RxToken, S: PcapSink> {
token: Rx,
sink: S,
mode: PcapMode,
}

impl<B, S> AsRef<[u8]> for TxBuffer<B, S>
where B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink {
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
impl<Rx: phy::RxToken, S: PcapSink> phy::RxToken for RxToken<Rx, S> {
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, timestamp: u64, f: F) -> Result<R> {
let Self { token, sink, mode } = self;
token.consume(timestamp, |buffer| {
match mode {
PcapMode::Both | PcapMode::RxOnly =>
sink.packet(timestamp, buffer.as_ref()),
PcapMode::TxOnly => ()
}
f(buffer)
})
}
}

impl<B, S> AsMut<[u8]> for TxBuffer<B, S>
where B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink {
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
#[doc(hidden)]
pub struct TxToken<Tx: phy::TxToken, S: PcapSink> {
token: Tx,
sink: S,
mode: PcapMode
}

impl<B, S> Drop for TxBuffer<B, S>
where B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink {
fn drop(&mut self) {
match self.mode {
PcapMode::Both | PcapMode::TxOnly =>
self.sink.packet(self.timestamp, self.as_ref()),
PcapMode::RxOnly => ()
}
impl<Tx: phy::TxToken, S: PcapSink> phy::TxToken for TxToken<Tx, S> {
fn consume<R, F>(self, timestamp: u64, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
{
let Self { token, sink, mode } = self;
token.consume(timestamp, len, |buffer| {
let result = f(buffer);
match mode {
PcapMode::Both | PcapMode::TxOnly =>
sink.packet(timestamp, &buffer),
PcapMode::RxOnly => ()
};
result
})
}
}
56 changes: 32 additions & 24 deletions src/phy/raw_socket.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::cell::RefCell;
use std::vec::Vec;
use std::rc::Rc;
use std::io;
use std::os::unix::io::{RawFd, AsRawFd};

use Result;
use super::{sys, DeviceCapabilities, Device};
use phy::{self, sys, DeviceCapabilities, Device};

/// A socket that captures or transmits the complete frame.
#[derive(Debug)]
@@ -36,9 +35,9 @@ impl RawSocket {
}
}

impl Device for RawSocket {
type RxBuffer = Vec<u8>;
type TxBuffer = TxBuffer;
impl<'a> Device<'a> for RawSocket {
type RxToken = RxToken;
type TxToken = TxToken;

fn capabilities(&self) -> DeviceCapabilities {
DeviceCapabilities {
@@ -47,39 +46,48 @@ impl Device for RawSocket {
}
}

fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
let mut lower = self.lower.borrow_mut();
let mut buffer = vec![0; self.mtu];
let size = lower.recv(&mut buffer[..]).unwrap();
buffer.resize(size, 0);
Ok(buffer)
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
let rx = RxToken { lower: self.lower.clone(), mtu: self.mtu };
let tx = TxToken { lower: self.lower.clone() };
Some((rx, tx))
}

fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
Ok(TxBuffer {
lower: self.lower.clone(),
buffer: vec![0; length]
fn transmit(&'a mut self) -> Option<Self::TxToken> {
Some(TxToken {
lower: self.lower.clone(),
})
}
}

#[doc(hidden)]
pub struct TxBuffer {
pub struct RxToken {
lower: Rc<RefCell<sys::RawSocketDesc>>,
buffer: Vec<u8>
mtu: usize,
}

impl AsRef<[u8]> for TxBuffer {
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
impl phy::RxToken for RxToken {
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, _timestamp: u64, f: F) -> Result<R> {
let mut lower = self.lower.borrow_mut();
let mut buffer = vec![0; self.mtu];
let size = lower.recv(&mut buffer[..]).unwrap();
buffer.resize(size, 0);
f(&mut buffer)
}
}

impl AsMut<[u8]> for TxBuffer {
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
#[doc(hidden)]
pub struct TxToken {
lower: Rc<RefCell<sys::RawSocketDesc>>,
}

impl Drop for TxBuffer {
fn drop(&mut self) {
impl phy::TxToken for TxToken {
fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(self, _timestamp: u64, len: usize, f: F)
-> Result<R>
{
let mut lower = self.lower.borrow_mut();
lower.send(&mut self.buffer[..]).unwrap();
let mut buffer = vec![0; len];
let result = f(&mut buffer);
lower.send(&mut buffer[..]).unwrap();
result
}
}
66 changes: 38 additions & 28 deletions src/phy/tap_interface.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::cell::RefCell;
use std::vec::Vec;
use std::rc::Rc;
use std::io;
use std::os::unix::io::{RawFd, AsRawFd};

use {Error, Result};
use super::{sys, DeviceCapabilities, Device};
use phy::{self, sys, DeviceCapabilities, Device};

/// A virtual Ethernet interface.
#[derive(Debug)]
@@ -37,9 +36,9 @@ impl TapInterface {
}
}

impl Device for TapInterface {
type RxBuffer = Vec<u8>;
type TxBuffer = TxBuffer;
impl<'a> Device<'a> for TapInterface {
type RxToken = RxToken;
type TxToken = TxToken;

fn capabilities(&self) -> DeviceCapabilities {
DeviceCapabilities {
@@ -48,46 +47,57 @@ impl Device for TapInterface {
}
}

fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
let rx = RxToken { lower: self.lower.clone(), mtu: self.mtu };
let tx = TxToken { lower: self.lower.clone(), };
Some((rx, tx))
}

fn transmit(&'a mut self) -> Option<Self::TxToken> {
Some(TxToken {
lower: self.lower.clone(),
})
}
}

#[doc(hidden)]
pub struct RxToken {
lower: Rc<RefCell<sys::TapInterfaceDesc>>,
mtu: usize,
}

impl phy::RxToken for RxToken {
fn consume<R, F>(self, _timestamp: u64, f: F) -> Result<R>
where F: FnOnce(&[u8]) -> Result<R>
{
let mut lower = self.lower.borrow_mut();
let mut buffer = vec![0; self.mtu];
match lower.recv(&mut buffer[..]) {
Ok(size) => {
buffer.resize(size, 0);
Ok(buffer)
f(&buffer)
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
Err(Error::Exhausted)
}
Err(err) => panic!("{}", err)
}
}

fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
Ok(TxBuffer {
lower: self.lower.clone(),
buffer: vec![0; length]
})
}
}

#[doc(hidden)]
pub struct TxBuffer {
lower: Rc<RefCell<sys::TapInterfaceDesc>>,
buffer: Vec<u8>
}

impl AsRef<[u8]> for TxBuffer {
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
}

impl AsMut<[u8]> for TxBuffer {
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
pub struct TxToken {
lower: Rc<RefCell<sys::TapInterfaceDesc>>,
}

impl Drop for TxBuffer {
fn drop(&mut self) {
impl phy::TxToken for TxToken {
fn consume<R, F>(self, _timestamp: u64, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
{
let mut lower = self.lower.borrow_mut();
lower.send(&mut self.buffer[..]).unwrap();
let mut buffer = vec![0; len];
let result = f(&mut buffer);
lower.send(&mut buffer[..]).unwrap();
result
}
}
79 changes: 50 additions & 29 deletions src/phy/tracer.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
use Result;
use wire::pretty_print::{PrettyPrint, PrettyPrinter};
use super::{DeviceCapabilities, Device};
use phy::{self, DeviceCapabilities, Device};

/// A tracer device.
///
/// A tracer is a device that pretty prints all packets traversing it
/// using the provided writer function, and then passes them to another
/// device.
pub struct Tracer<D: Device, P: PrettyPrint> {
inner: D,
writer: fn(u64, PrettyPrinter<P>)
pub struct Tracer<D: for<'a> Device<'a>, P: PrettyPrint> {
inner: D,
writer: fn(u64, PrettyPrinter<P>),
}

impl<D: Device, P: PrettyPrint> Tracer<D, P> {
impl<D: for<'a> Device<'a>, P: PrettyPrint> Tracer<D, P> {
/// Create a tracer device.
pub fn new(inner: D, writer: fn(timestamp: u64, printer: PrettyPrinter<P>)) -> Tracer<D, P> {
Tracer {
inner: inner,
writer: writer
}
Tracer { inner, writer }
}

/// Return the underlying device, consuming the tracer.
@@ -27,41 +24,65 @@ impl<D: Device, P: PrettyPrint> Tracer<D, P> {
}
}

impl<D: Device, P: PrettyPrint> Device for Tracer<D, P> {
type RxBuffer = D::RxBuffer;
type TxBuffer = TxBuffer<D::TxBuffer, P>;
impl<'a, D, P> Device<'a> for Tracer<D, P>
where D: for<'b> Device<'b>,
P: PrettyPrint + 'a,
{
type RxToken = RxToken<<D as Device<'a>>::RxToken, P>;
type TxToken = TxToken<<D as Device<'a>>::TxToken, P>;

fn capabilities(&self) -> DeviceCapabilities { self.inner.capabilities() }

fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer> {
let buffer = self.inner.receive(timestamp)?;
(self.writer)(timestamp, PrettyPrinter::<P>::new("<- ", &buffer));
Ok(buffer)
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
let &mut Self { ref mut inner, writer, .. } = self;
inner.receive().map(|(rx_token, tx_token)| {
let rx = RxToken { token: rx_token, writer: writer };
let tx = TxToken { token: tx_token, writer: writer };
(rx, tx)
})
}

fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
let buffer = self.inner.transmit(timestamp, length)?;
Ok(TxBuffer { buffer, timestamp, writer: self.writer })
fn transmit(&'a mut self) -> Option<Self::TxToken> {
let &mut Self { ref mut inner, writer } = self;
inner.transmit().map(|tx_token| {
TxToken { token: tx_token, writer: writer }
})
}
}

#[doc(hidden)]
pub struct TxBuffer<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> {
buffer: B,
timestamp: u64,
pub struct RxToken<Rx: phy::RxToken, P: PrettyPrint> {
token: Rx,
writer: fn(u64, PrettyPrinter<P>)
}

impl<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> AsRef<[u8]> for TxBuffer<B, P> {
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
impl<Rx: phy::RxToken, P: PrettyPrint> phy::RxToken for RxToken<Rx, P> {
fn consume<R, F>(self, timestamp: u64, f: F) -> Result<R>
where F: FnOnce(&[u8]) -> Result<R>
{
let Self { token, writer } = self;
token.consume(timestamp, |buffer| {
writer(timestamp, PrettyPrinter::<P>::new("<- ", &buffer));
f(buffer)
})
}
}

impl<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> AsMut<[u8]> for TxBuffer<B, P> {
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
#[doc(hidden)]
pub struct TxToken<Tx: phy::TxToken, P: PrettyPrint> {
token: Tx,
writer: fn(u64, PrettyPrinter<P>)
}

impl<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> Drop for TxBuffer<B, P> {
fn drop(&mut self) {
(self.writer)(self.timestamp, PrettyPrinter::<P>::new("-> ", &self.buffer));
impl<Tx: phy::TxToken, P: PrettyPrint> phy::TxToken for TxToken<Tx, P> {
fn consume<R, F>(self, timestamp: u64, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
{
let Self { token, writer } = self;
token.consume(timestamp, len, |buffer| {
let result = f(buffer);
writer(timestamp, PrettyPrinter::<P>::new("-> ", &buffer));
result
})
}
}

0 comments on commit 198fe23

Please sign in to comment.