Skip to content

Commit 198fe23

Browse files
phil-oppwhitequark
authored andcommittedNov 3, 2017
Redesign the phy::Device trait to avoid Drop impls.
1 parent d1d80ca commit 198fe23

File tree

10 files changed

+593
-445
lines changed

10 files changed

+593
-445
lines changed
 

‎examples/tcpdump.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
extern crate smoltcp;
22

33
use std::env;
4-
use smoltcp::phy::{Device, RawSocket};
4+
use smoltcp::phy::{Device, RxToken, RawSocket};
55
use smoltcp::wire::{PrettyPrinter, EthernetFrame};
66

77
fn main() {
88
let ifname = env::args().nth(1).unwrap();
99
let mut socket = RawSocket::new(ifname.as_ref()).unwrap();
1010
loop {
11-
let buffer = socket.receive(/*timestamp=*/0).unwrap();
12-
print!("{}", PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &buffer))
11+
let (rx_token, _) = socket.receive().unwrap();
12+
rx_token.consume(/*timestamp = */ 0, |buffer| {
13+
print!("{}", PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &buffer));
14+
Ok(())
15+
}).unwrap();
1316
}
1417
}

‎examples/utils.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub fn add_middleware_options(opts: &mut Options, _free: &mut Vec<&str>) {
9292
opts.optopt("", "shaping-interval", "Sets the interval for rate limiting (ms)", "RATE");
9393
}
9494

95-
pub fn parse_middleware_options<D: Device>(matches: &mut Matches, device: D, loopback: bool)
95+
pub fn parse_middleware_options<D: for<'a> Device<'a>>(matches: &mut Matches, device: D, loopback: bool)
9696
-> FaultInjector<EthernetTracer<PcapWriter<D, Rc<PcapSink>>>> {
9797
let drop_chance = matches.opt_str("drop-chance").map(|s| u8::from_str(&s).unwrap())
9898
.unwrap_or(0);

‎src/iface/ethernet.rs

+184-131
Large diffs are not rendered by default.

‎src/phy/fault_injector.rs

+103-79
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
use core::cell::RefCell;
2+
13
use {Error, Result};
2-
use super::{DeviceCapabilities, Device};
4+
use phy::{self, DeviceCapabilities, Device};
35

46
// We use our own RNG to stay compatible with #![no_std].
57
// The use of the RNG below has a slight bias, but it doesn't matter.
@@ -26,7 +28,7 @@ struct Config {
2628
interval: u64,
2729
}
2830

29-
#[derive(Debug, Clone, Copy)]
31+
#[derive(Debug, Clone)]
3032
struct State {
3133
rng_seed: u32,
3234
refilled_at: u64,
@@ -86,13 +88,13 @@ impl State {
8688
/// adverse network conditions (such as random packet loss or corruption), or software
8789
/// or hardware limitations (such as a limited number or size of usable network buffers).
8890
#[derive(Debug)]
89-
pub struct FaultInjector<D: Device> {
90-
inner: D,
91-
state: State,
92-
config: Config
91+
pub struct FaultInjector<D: for<'a> Device<'a>> {
92+
inner: D,
93+
state: RefCell<State>,
94+
config: Config,
9395
}
9496

95-
impl<D: Device> FaultInjector<D> {
97+
impl<D: for<'a> Device<'a>> FaultInjector<D> {
9698
/// Create a fault injector device, using the given random number generator seed.
9799
pub fn new(inner: D, seed: u32) -> FaultInjector<D> {
98100
let state = State {
@@ -103,8 +105,8 @@ impl<D: Device> FaultInjector<D> {
103105
};
104106
FaultInjector {
105107
inner: inner,
106-
state: state,
107-
config: Config::default()
108+
state: RefCell::new(state),
109+
config: Config::default(),
108110
}
109111
}
110112

@@ -178,15 +180,16 @@ impl<D: Device> FaultInjector<D> {
178180

179181
/// Set the interval for packet rate limiting, in milliseconds.
180182
pub fn set_bucket_interval(&mut self, interval: u64) {
181-
self.state.refilled_at = 0;
183+
self.state.borrow_mut().refilled_at = 0;
182184
self.config.interval = interval
183185
}
184186
}
185187

186-
impl<D: Device> Device for FaultInjector<D>
187-
where D::RxBuffer: AsMut<[u8]> {
188-
type RxBuffer = D::RxBuffer;
189-
type TxBuffer = TxBuffer<D::TxBuffer>;
188+
impl<'a, D> Device<'a> for FaultInjector<D>
189+
where D: for<'b> Device<'b>,
190+
{
191+
type RxToken = RxToken<'a, <D as Device<'a>>::RxToken>;
192+
type TxToken = TxToken<'a, <D as Device<'a>>::TxToken>;
190193

191194
fn capabilities(&self) -> DeviceCapabilities {
192195
let mut caps = self.inner.capabilities();
@@ -196,88 +199,109 @@ impl<D: Device> Device for FaultInjector<D>
196199
caps
197200
}
198201

199-
fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer> {
200-
let mut buffer = self.inner.receive(timestamp)?;
201-
if self.state.maybe(self.config.drop_pct) {
202+
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
203+
let &mut Self { ref mut inner, ref state, config } = self;
204+
inner.receive().map(|(rx_token, tx_token)| {
205+
let rx = RxToken {
206+
state: &state,
207+
config: config,
208+
token: rx_token,
209+
corrupt: [0; MTU],
210+
};
211+
let tx = TxToken {
212+
state: &state,
213+
config: config,
214+
token: tx_token,
215+
junk: [0; MTU],
216+
};
217+
(rx, tx)
218+
})
219+
}
220+
221+
fn transmit(&'a mut self) -> Option<Self::TxToken> {
222+
let &mut Self { ref mut inner, ref state, config } = self;
223+
inner.transmit().map(|token| TxToken {
224+
state: &state,
225+
config: config,
226+
token: token,
227+
junk: [0; MTU],
228+
})
229+
}
230+
}
231+
232+
#[doc(hidden)]
233+
pub struct RxToken<'a, Rx: phy::RxToken> {
234+
state: &'a RefCell<State>,
235+
config: Config,
236+
token: Rx,
237+
corrupt: [u8; MTU],
238+
}
239+
240+
impl<'a, Rx: phy::RxToken> phy::RxToken for RxToken<'a, Rx> {
241+
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, timestamp: u64, f: F) -> Result<R> {
242+
if self.state.borrow_mut().maybe(self.config.drop_pct) {
202243
net_trace!("rx: randomly dropping a packet");
203244
return Err(Error::Exhausted)
204245
}
205-
if self.state.maybe(self.config.corrupt_pct) {
206-
net_trace!("rx: randomly corrupting a packet");
207-
self.state.corrupt(&mut buffer)
208-
}
209-
if self.config.max_size > 0 && buffer.as_ref().len() > self.config.max_size {
210-
net_trace!("rx: dropping a packet that is too large");
211-
return Err(Error::Exhausted)
212-
}
213-
if !self.state.maybe_receive(&self.config, timestamp) {
246+
if !self.state.borrow_mut().maybe_receive(&self.config, timestamp) {
214247
net_trace!("rx: dropping a packet because of rate limiting");
215248
return Err(Error::Exhausted)
216249
}
217-
Ok(buffer)
218-
}
219-
220-
fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
221-
let buffer;
222-
if self.state.maybe(self.config.drop_pct) {
223-
net_trace!("tx: randomly dropping a packet");
224-
buffer = None;
225-
} else if self.config.max_size > 0 && length > self.config.max_size {
226-
net_trace!("tx: dropping a packet that is too large");
227-
buffer = None;
228-
} else if !self.state.maybe_transmit(&self.config, timestamp) {
229-
net_trace!("tx: dropping a packet because of rate limiting");
230-
buffer = None;
231-
} else {
232-
buffer = Some(self.inner.transmit(timestamp, length)?);
233-
}
234-
Ok(TxBuffer {
235-
buffer: buffer,
236-
state: self.state.clone(),
237-
config: self.config,
238-
junk: [0; MTU],
239-
length: length
250+
let Self { token, config, state, mut corrupt } = self;
251+
token.consume(timestamp, |buffer| {
252+
if config.max_size > 0 && buffer.as_ref().len() > config.max_size {
253+
net_trace!("rx: dropping a packet that is too large");
254+
return Err(Error::Exhausted)
255+
}
256+
if state.borrow_mut().maybe(config.corrupt_pct) {
257+
net_trace!("rx: randomly corrupting a packet");
258+
let mut corrupt = &mut corrupt[..buffer.len()];
259+
corrupt.copy_from_slice(buffer);
260+
state.borrow_mut().corrupt(&mut corrupt);
261+
f(&mut corrupt)
262+
} else {
263+
f(buffer)
264+
}
240265
})
241266
}
242267
}
243268

244269
#[doc(hidden)]
245-
pub struct TxBuffer<B: AsRef<[u8]> + AsMut<[u8]>> {
246-
state: State,
270+
pub struct TxToken<'a, Tx: phy::TxToken> {
271+
state: &'a RefCell<State>,
247272
config: Config,
248-
buffer: Option<B>,
273+
token: Tx,
249274
junk: [u8; MTU],
250-
length: usize
251275
}
252276

253-
impl<B: AsRef<[u8]> + AsMut<[u8]>> AsRef<[u8]> for TxBuffer<B> {
254-
fn as_ref(&self) -> &[u8] {
255-
match self.buffer {
256-
Some(ref buf) => buf.as_ref(),
257-
None => &self.junk[..self.length]
258-
}
259-
}
260-
}
277+
impl<'a, Tx: phy::TxToken> phy::TxToken for TxToken<'a, Tx> {
278+
fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(mut self, timestamp: u64, len: usize, f: F)
279+
-> Result<R>
280+
{
281+
let drop = if self.state.borrow_mut().maybe(self.config.drop_pct) {
282+
net_trace!("tx: randomly dropping a packet");
283+
true
284+
} else if self.config.max_size > 0 && len > self.config.max_size {
285+
net_trace!("tx: dropping a packet that is too large");
286+
true
287+
} else if !self.state.borrow_mut().maybe_transmit(&self.config, timestamp) {
288+
net_trace!("tx: dropping a packet because of rate limiting");
289+
true
290+
} else {
291+
false
292+
};
261293

262-
impl<B: AsRef<[u8]> + AsMut<[u8]>> AsMut<[u8]> for TxBuffer<B> {
263-
fn as_mut(&mut self) -> &mut [u8] {
264-
match self.buffer {
265-
Some(ref mut buf) => buf.as_mut(),
266-
None => &mut self.junk[..self.length]
294+
if drop {
295+
return f(&mut self.junk);
267296
}
268-
}
269-
}
270297

271-
impl<B: AsRef<[u8]> + AsMut<[u8]>> Drop for TxBuffer<B> {
272-
fn drop(&mut self) {
273-
match self.buffer {
274-
Some(ref mut buf) => {
275-
if self.state.maybe(self.config.corrupt_pct) {
276-
net_trace!("tx: corrupting a packet");
277-
self.state.corrupt(buf)
278-
}
279-
},
280-
None => ()
281-
}
298+
let Self { token, state, config, .. } = self;
299+
token.consume(timestamp, len, |mut buf| {
300+
if state.borrow_mut().maybe(config.corrupt_pct) {
301+
net_trace!("tx: corrupting a packet");
302+
state.borrow_mut().corrupt(&mut buf)
303+
}
304+
f(buf)
305+
})
282306
}
283307
}

‎src/phy/loopback.rs

+37-35
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,34 @@
1-
use core::mem::swap;
2-
use core::cell::RefCell;
3-
#[cfg(feature = "std")]
4-
use std::rc::Rc;
5-
#[cfg(feature = "alloc")]
6-
use alloc::rc::Rc;
71
#[cfg(feature = "std")]
82
use std::vec::Vec;
93
#[cfg(feature = "std")]
104
use std::collections::VecDeque;
115
#[cfg(feature = "alloc")]
126
use alloc::{Vec, VecDeque};
137

14-
use {Error, Result};
15-
use super::{Device, DeviceCapabilities};
8+
use Result;
9+
use phy::{self, Device, DeviceCapabilities};
1610

1711
/// A loopback device.
1812
#[derive(Debug)]
19-
pub struct Loopback(Rc<RefCell<VecDeque<Vec<u8>>>>);
13+
pub struct Loopback {
14+
queue: VecDeque<Vec<u8>>,
15+
}
2016

2117
impl Loopback {
2218
/// Creates a loopback device.
2319
///
2420
/// Every packet transmitted through this device will be received through it
2521
/// in FIFO order.
2622
pub fn new() -> Loopback {
27-
Loopback(Rc::new(RefCell::new(VecDeque::new())))
23+
Loopback {
24+
queue: VecDeque::new(),
25+
}
2826
}
2927
}
3028

31-
impl Device for Loopback {
32-
type RxBuffer = Vec<u8>;
33-
type TxBuffer = TxBuffer;
29+
impl<'a> Device<'a> for Loopback {
30+
type RxToken = RxToken;
31+
type TxToken = TxToken<'a>;
3432

3533
fn capabilities(&self) -> DeviceCapabilities {
3634
DeviceCapabilities {
@@ -39,41 +37,45 @@ impl Device for Loopback {
3937
}
4038
}
4139

42-
fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
43-
match self.0.borrow_mut().pop_front() {
44-
Some(packet) => Ok(packet),
45-
None => Err(Error::Exhausted)
46-
}
40+
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
41+
self.queue.pop_front().map(move |buffer| {
42+
let rx = RxToken { buffer: buffer };
43+
let tx = TxToken { queue: &mut self.queue };
44+
(rx, tx)
45+
})
4746
}
4847

49-
fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
50-
let mut buffer = Vec::new();
51-
buffer.resize(length, 0);
52-
Ok(TxBuffer {
53-
queue: self.0.clone(),
54-
buffer: buffer
48+
fn transmit(&'a mut self) -> Option<Self::TxToken> {
49+
Some(TxToken {
50+
queue: &mut self.queue,
5551
})
5652
}
5753
}
5854

5955
#[doc(hidden)]
60-
pub struct TxBuffer {
61-
queue: Rc<RefCell<VecDeque<Vec<u8>>>>,
62-
buffer: Vec<u8>
56+
pub struct RxToken {
57+
buffer: Vec<u8>,
6358
}
6459

65-
impl AsRef<[u8]> for TxBuffer {
66-
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
60+
impl phy::RxToken for RxToken {
61+
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, _timestamp: u64, f: F) -> Result<R> {
62+
f(&self.buffer)
63+
}
6764
}
6865

69-
impl AsMut<[u8]> for TxBuffer {
70-
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
66+
#[doc(hidden)]
67+
pub struct TxToken<'a> {
68+
queue: &'a mut VecDeque<Vec<u8>>,
7169
}
7270

73-
impl Drop for TxBuffer {
74-
fn drop(&mut self) {
71+
impl<'a> phy::TxToken for TxToken<'a> {
72+
fn consume<R, F>(self, _timestamp: u64, len: usize, f: F) -> Result<R>
73+
where F: FnOnce(&mut [u8]) -> Result<R>
74+
{
7575
let mut buffer = Vec::new();
76-
swap(&mut buffer, &mut self.buffer);
77-
self.queue.borrow_mut().push_back(buffer)
76+
buffer.resize(len, 0);
77+
let result = f(&mut buffer);
78+
self.queue.push_back(buffer);
79+
result
7880
}
7981
}

‎src/phy/mod.rs

+82-78
Original file line numberDiff line numberDiff line change
@@ -19,87 +19,66 @@
1919
//!
2020
/*!
2121
```rust
22-
use std::slice;
23-
use smoltcp::{Error, Result};
24-
use smoltcp::phy::{DeviceCapabilities, Device};
22+
use smoltcp::Result;
23+
use smoltcp::phy::{self, DeviceCapabilities, Device};
2524
26-
const TX_BUFFERS: [*mut u8; 2] = [0x10000000 as *mut u8, 0x10001000 as *mut u8];
27-
const RX_BUFFERS: [*mut u8; 2] = [0x10002000 as *mut u8, 0x10003000 as *mut u8];
28-
29-
fn rx_full() -> bool {
30-
/* platform-specific code to check if an incoming packet has arrived */
31-
false
32-
}
33-
34-
fn rx_setup(_buf: *mut u8, _length: &mut usize) {
35-
/* platform-specific code to receive a packet into a buffer */
25+
struct StmPhy {
26+
rx_buffer: [u8; 1536],
27+
tx_buffer: [u8; 1536],
3628
}
3729
38-
fn tx_empty() -> bool {
39-
/* platform-specific code to check if an outgoing packet can be sent */
40-
false
30+
impl<'a> StmPhy {
31+
fn new() -> StmPhy {
32+
StmPhy {
33+
rx_buffer: [0; 1536],
34+
tx_buffer: [0; 1536],
35+
}
36+
}
4137
}
4238
43-
fn tx_setup(_buf: *const u8, _length: usize) {
44-
/* platform-specific code to send a buffer with a packet */
45-
}
39+
impl<'a> phy::Device<'a> for StmPhy {
40+
type RxToken = StmPhyRxToken<'a>;
41+
type TxToken = StmPhyTxToken<'a>;
4642
47-
# #[allow(dead_code)]
48-
pub struct EthernetDevice {
49-
tx_next: usize,
50-
rx_next: usize
51-
}
43+
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
44+
Some((StmPhyRxToken(&mut self.rx_buffer[..]),
45+
StmPhyTxToken(&mut self.tx_buffer[..])))
46+
}
5247
53-
impl Device for EthernetDevice {
54-
type RxBuffer = &'static [u8];
55-
type TxBuffer = EthernetTxBuffer;
48+
fn transmit(&'a mut self) -> Option<Self::TxToken> {
49+
Some(StmPhyTxToken(&mut self.tx_buffer[..]))
50+
}
5651
5752
fn capabilities(&self) -> DeviceCapabilities {
5853
let mut caps = DeviceCapabilities::default();
5954
caps.max_transmission_unit = 1536;
60-
caps.max_burst_size = Some(2);
55+
caps.max_burst_size = Some(1);
6156
caps
6257
}
63-
64-
fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
65-
if rx_full() {
66-
let index = self.rx_next;
67-
self.rx_next = (self.rx_next + 1) % RX_BUFFERS.len();
68-
let mut length = 0;
69-
rx_setup(RX_BUFFERS[self.rx_next], &mut length);
70-
Ok(unsafe {
71-
slice::from_raw_parts(RX_BUFFERS[index], length)
72-
})
73-
} else {
74-
Err(Error::Exhausted)
75-
}
76-
}
77-
78-
fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
79-
if tx_empty() {
80-
let index = self.tx_next;
81-
self.tx_next = (self.tx_next + 1) % TX_BUFFERS.len();
82-
Ok(EthernetTxBuffer(unsafe {
83-
slice::from_raw_parts_mut(TX_BUFFERS[index], length)
84-
}))
85-
} else {
86-
Err(Error::Exhausted)
87-
}
88-
}
8958
}
9059
91-
pub struct EthernetTxBuffer(&'static mut [u8]);
60+
struct StmPhyRxToken<'a>(&'a [u8]);
9261
93-
impl AsRef<[u8]> for EthernetTxBuffer {
94-
fn as_ref(&self) -> &[u8] { self.0 }
62+
impl<'a> phy::RxToken for StmPhyRxToken<'a> {
63+
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, _timestamp: u64, f: F) -> Result<R> {
64+
// TODO: receive packet into buffer
65+
let result = f(self.0);
66+
println!("rx called");
67+
result
68+
}
9569
}
9670
97-
impl AsMut<[u8]> for EthernetTxBuffer {
98-
fn as_mut(&mut self) -> &mut [u8] { self.0 }
99-
}
71+
struct StmPhyTxToken<'a>(&'a mut [u8]);
10072
101-
impl Drop for EthernetTxBuffer {
102-
fn drop(&mut self) { tx_setup(self.0.as_ptr(), self.0.len()) }
73+
impl<'a> phy::TxToken for StmPhyTxToken<'a> {
74+
fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(self, _timestamp: u64, len: usize, f: F)
75+
-> Result<R>
76+
{
77+
let result = f(&mut self.0[..len]);
78+
println!("tx called {}", len);
79+
// TODO: send packet out
80+
result
81+
}
10382
}
10483
```
10584
*/
@@ -229,27 +208,52 @@ pub struct DeviceCapabilities {
229208

230209
/// An interface for sending and receiving raw network frames.
231210
///
232-
/// It is expected that a `Device` implementation would allocate memory for both sending
233-
/// and receiving packets from memory pools; hence, the stack borrows the buffer for a packet
234-
/// that it is about to receive, as well for a packet that it is about to send, from the device.
235-
pub trait Device {
236-
type RxBuffer: AsRef<[u8]>;
237-
type TxBuffer: AsRef<[u8]> + AsMut<[u8]>;
211+
/// The interface is based on _tokens_, which are types that allow to receive/transmit a
212+
/// single packet. The `receive` and `transmit` functions only construct such tokens, the
213+
/// real sending/receiving operation are performed when the tokens are consumed.
214+
pub trait Device<'a> {
215+
type RxToken: RxToken + 'a;
216+
type TxToken: TxToken + 'a;
217+
218+
/// Construct a token pair consisting of one receive token and one transmit token.
219+
///
220+
/// The additional transmit token makes it possible to generate a reply packet based
221+
/// on the contents of the received packet. For example, this makes it possible to
222+
/// handle arbitrarily large ICMP echo ("ping") requests, where the all received bytes
223+
/// need to be sent back, without heap allocation.
224+
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)>;
225+
226+
/// Construct a transmit token.
227+
fn transmit(&'a mut self) -> Option<Self::TxToken>;
238228

239229
/// Get a description of device capabilities.
240230
fn capabilities(&self) -> DeviceCapabilities;
231+
}
241232

242-
/// Receive a frame.
233+
/// A token to receive a single network packet.
234+
pub trait RxToken {
235+
/// Consumes the token to receive a single network packet.
243236
///
244-
/// It is expected that a `receive` implementation, once a packet is written to memory
245-
/// through DMA, would gain ownership of the underlying buffer, provide it for parsing,
246-
/// and return it to the network device once it is dropped.
247-
fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer>;
237+
/// This method receives a packet and then calls the given closure `f` with the raw
238+
/// packet bytes as argument.
239+
///
240+
/// The timestamp must be a number of milliseconds, monotonically increasing since an
241+
/// arbitrary moment in time, such as system startup.
242+
fn consume<R, F>(self, timestamp: u64, f: F) -> Result<R>
243+
where F: FnOnce(&[u8]) -> Result<R>;
244+
}
248245

249-
/// Transmit a frame.
246+
/// A token to transmit a single network packet.
247+
pub trait TxToken {
248+
/// Consumes the token to send a single network packet.
249+
///
250+
/// This method constructs a transmit buffer of size `len` and calls the passed
251+
/// closure `f` with a mutable reference to that buffer. The closure should construct
252+
/// a valid network packet (e.g. an ethernet packet) in the buffer. When the closure
253+
/// returns, the transmit buffer is sent out.
250254
///
251-
/// It is expected that a `transmit` implementation would gain ownership of a buffer with
252-
/// the requested length, provide it for emission, and schedule it to be read from
253-
/// memory by the network device once it is dropped.
254-
fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer>;
255+
/// The timestamp must be a number of milliseconds, monotonically increasing since an
256+
/// arbitrary moment in time, such as system startup.
257+
fn consume<R, F>(self, timestamp: u64, len: usize, f: F) -> Result<R>
258+
where F: FnOnce(&mut [u8]) -> Result<R>;
255259
}

‎src/phy/pcap_writer.rs

+60-37
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::io::Write;
55
use byteorder::{ByteOrder, NativeEndian};
66

77
use Result;
8-
use super::{DeviceCapabilities, Device};
8+
use phy::{self, DeviceCapabilities, Device};
99

1010
enum_with_unknown! {
1111
/// Captured packet header type.
@@ -114,67 +114,90 @@ impl<T: AsMut<Write>> PcapSink for RefCell<T> {
114114
/// [libpcap]: https://wiki.wireshark.org/Development/LibpcapFileFormat
115115
/// [sink]: trait.PcapSink.html
116116
#[derive(Debug)]
117-
pub struct PcapWriter<D: Device, S: PcapSink + Clone> {
117+
pub struct PcapWriter<D, S>
118+
where D: for<'a> Device<'a>,
119+
S: PcapSink + Clone,
120+
{
118121
lower: D,
119122
sink: S,
120-
mode: PcapMode
123+
mode: PcapMode,
121124
}
122125

123-
impl<D: Device, S: PcapSink + Clone> PcapWriter<D, S> {
126+
impl<D: for<'a> Device<'a>, S: PcapSink + Clone> PcapWriter<D, S> {
124127
/// Creates a packet capture writer.
125128
pub fn new(lower: D, sink: S, mode: PcapMode, link_type: PcapLinkType) -> PcapWriter<D, S> {
126129
sink.global_header(link_type);
127130
PcapWriter { lower, sink, mode }
128131
}
129132
}
130133

131-
impl<D: Device, S: PcapSink + Clone> Device for PcapWriter<D, S> {
132-
type RxBuffer = D::RxBuffer;
133-
type TxBuffer = TxBuffer<D::TxBuffer, S>;
134+
impl<'a, D, S> Device<'a> for PcapWriter<D, S>
135+
where D: for<'b> Device<'b>,
136+
S: PcapSink + Clone + 'a,
137+
{
138+
type RxToken = RxToken<<D as Device<'a>>::RxToken, S>;
139+
type TxToken = TxToken<<D as Device<'a>>::TxToken, S>;
134140

135141
fn capabilities(&self) -> DeviceCapabilities { self.lower.capabilities() }
136142

137-
fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer> {
138-
let buffer = self.lower.receive(timestamp)?;
139-
match self.mode {
140-
PcapMode::Both | PcapMode::RxOnly =>
141-
self.sink.packet(timestamp, buffer.as_ref()),
142-
PcapMode::TxOnly => ()
143-
}
144-
Ok(buffer)
143+
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
144+
let &mut Self { ref mut lower, ref sink, mode, .. } = self;
145+
lower.receive().map(|(rx_token, tx_token)| {
146+
let rx = RxToken { token: rx_token, sink: sink.clone(), mode: mode };
147+
let tx = TxToken { token: tx_token, sink: sink.clone(), mode: mode };
148+
(rx, tx)
149+
})
145150
}
146151

147-
fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
148-
let buffer = self.lower.transmit(timestamp, length)?;
149-
Ok(TxBuffer { buffer, timestamp, sink: self.sink.clone(), mode: self.mode })
152+
fn transmit(&'a mut self) -> Option<Self::TxToken> {
153+
let &mut Self { ref mut lower, ref sink, mode } = self;
154+
lower.transmit().map(|token| {
155+
TxToken { token, sink: sink.clone(), mode: mode }
156+
})
150157
}
151158
}
152159

153160
#[doc(hidden)]
154-
pub struct TxBuffer<B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink> {
155-
buffer: B,
156-
timestamp: u64,
157-
sink: S,
158-
mode: PcapMode
161+
pub struct RxToken<Rx: phy::RxToken, S: PcapSink> {
162+
token: Rx,
163+
sink: S,
164+
mode: PcapMode,
159165
}
160166

161-
impl<B, S> AsRef<[u8]> for TxBuffer<B, S>
162-
where B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink {
163-
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
167+
impl<Rx: phy::RxToken, S: PcapSink> phy::RxToken for RxToken<Rx, S> {
168+
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, timestamp: u64, f: F) -> Result<R> {
169+
let Self { token, sink, mode } = self;
170+
token.consume(timestamp, |buffer| {
171+
match mode {
172+
PcapMode::Both | PcapMode::RxOnly =>
173+
sink.packet(timestamp, buffer.as_ref()),
174+
PcapMode::TxOnly => ()
175+
}
176+
f(buffer)
177+
})
178+
}
164179
}
165180

166-
impl<B, S> AsMut<[u8]> for TxBuffer<B, S>
167-
where B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink {
168-
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
181+
#[doc(hidden)]
182+
pub struct TxToken<Tx: phy::TxToken, S: PcapSink> {
183+
token: Tx,
184+
sink: S,
185+
mode: PcapMode
169186
}
170187

171-
impl<B, S> Drop for TxBuffer<B, S>
172-
where B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink {
173-
fn drop(&mut self) {
174-
match self.mode {
175-
PcapMode::Both | PcapMode::TxOnly =>
176-
self.sink.packet(self.timestamp, self.as_ref()),
177-
PcapMode::RxOnly => ()
178-
}
188+
impl<Tx: phy::TxToken, S: PcapSink> phy::TxToken for TxToken<Tx, S> {
189+
fn consume<R, F>(self, timestamp: u64, len: usize, f: F) -> Result<R>
190+
where F: FnOnce(&mut [u8]) -> Result<R>
191+
{
192+
let Self { token, sink, mode } = self;
193+
token.consume(timestamp, len, |buffer| {
194+
let result = f(buffer);
195+
match mode {
196+
PcapMode::Both | PcapMode::TxOnly =>
197+
sink.packet(timestamp, &buffer),
198+
PcapMode::RxOnly => ()
199+
};
200+
result
201+
})
179202
}
180203
}

‎src/phy/raw_socket.rs

+32-24
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use std::cell::RefCell;
2-
use std::vec::Vec;
32
use std::rc::Rc;
43
use std::io;
54
use std::os::unix::io::{RawFd, AsRawFd};
65

76
use Result;
8-
use super::{sys, DeviceCapabilities, Device};
7+
use phy::{self, sys, DeviceCapabilities, Device};
98

109
/// A socket that captures or transmits the complete frame.
1110
#[derive(Debug)]
@@ -36,9 +35,9 @@ impl RawSocket {
3635
}
3736
}
3837

39-
impl Device for RawSocket {
40-
type RxBuffer = Vec<u8>;
41-
type TxBuffer = TxBuffer;
38+
impl<'a> Device<'a> for RawSocket {
39+
type RxToken = RxToken;
40+
type TxToken = TxToken;
4241

4342
fn capabilities(&self) -> DeviceCapabilities {
4443
DeviceCapabilities {
@@ -47,39 +46,48 @@ impl Device for RawSocket {
4746
}
4847
}
4948

50-
fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
51-
let mut lower = self.lower.borrow_mut();
52-
let mut buffer = vec![0; self.mtu];
53-
let size = lower.recv(&mut buffer[..]).unwrap();
54-
buffer.resize(size, 0);
55-
Ok(buffer)
49+
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
50+
let rx = RxToken { lower: self.lower.clone(), mtu: self.mtu };
51+
let tx = TxToken { lower: self.lower.clone() };
52+
Some((rx, tx))
5653
}
5754

58-
fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
59-
Ok(TxBuffer {
60-
lower: self.lower.clone(),
61-
buffer: vec![0; length]
55+
fn transmit(&'a mut self) -> Option<Self::TxToken> {
56+
Some(TxToken {
57+
lower: self.lower.clone(),
6258
})
6359
}
6460
}
6561

6662
#[doc(hidden)]
67-
pub struct TxBuffer {
63+
pub struct RxToken {
6864
lower: Rc<RefCell<sys::RawSocketDesc>>,
69-
buffer: Vec<u8>
65+
mtu: usize,
7066
}
7167

72-
impl AsRef<[u8]> for TxBuffer {
73-
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
68+
impl phy::RxToken for RxToken {
69+
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, _timestamp: u64, f: F) -> Result<R> {
70+
let mut lower = self.lower.borrow_mut();
71+
let mut buffer = vec![0; self.mtu];
72+
let size = lower.recv(&mut buffer[..]).unwrap();
73+
buffer.resize(size, 0);
74+
f(&mut buffer)
75+
}
7476
}
7577

76-
impl AsMut<[u8]> for TxBuffer {
77-
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
78+
#[doc(hidden)]
79+
pub struct TxToken {
80+
lower: Rc<RefCell<sys::RawSocketDesc>>,
7881
}
7982

80-
impl Drop for TxBuffer {
81-
fn drop(&mut self) {
83+
impl phy::TxToken for TxToken {
84+
fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(self, _timestamp: u64, len: usize, f: F)
85+
-> Result<R>
86+
{
8287
let mut lower = self.lower.borrow_mut();
83-
lower.send(&mut self.buffer[..]).unwrap();
88+
let mut buffer = vec![0; len];
89+
let result = f(&mut buffer);
90+
lower.send(&mut buffer[..]).unwrap();
91+
result
8492
}
8593
}

‎src/phy/tap_interface.rs

+38-28
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use std::cell::RefCell;
2-
use std::vec::Vec;
32
use std::rc::Rc;
43
use std::io;
54
use std::os::unix::io::{RawFd, AsRawFd};
65

76
use {Error, Result};
8-
use super::{sys, DeviceCapabilities, Device};
7+
use phy::{self, sys, DeviceCapabilities, Device};
98

109
/// A virtual Ethernet interface.
1110
#[derive(Debug)]
@@ -37,9 +36,9 @@ impl TapInterface {
3736
}
3837
}
3938

40-
impl Device for TapInterface {
41-
type RxBuffer = Vec<u8>;
42-
type TxBuffer = TxBuffer;
39+
impl<'a> Device<'a> for TapInterface {
40+
type RxToken = RxToken;
41+
type TxToken = TxToken;
4342

4443
fn capabilities(&self) -> DeviceCapabilities {
4544
DeviceCapabilities {
@@ -48,46 +47,57 @@ impl Device for TapInterface {
4847
}
4948
}
5049

51-
fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
50+
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
51+
let rx = RxToken { lower: self.lower.clone(), mtu: self.mtu };
52+
let tx = TxToken { lower: self.lower.clone(), };
53+
Some((rx, tx))
54+
}
55+
56+
fn transmit(&'a mut self) -> Option<Self::TxToken> {
57+
Some(TxToken {
58+
lower: self.lower.clone(),
59+
})
60+
}
61+
}
62+
63+
#[doc(hidden)]
64+
pub struct RxToken {
65+
lower: Rc<RefCell<sys::TapInterfaceDesc>>,
66+
mtu: usize,
67+
}
68+
69+
impl phy::RxToken for RxToken {
70+
fn consume<R, F>(self, _timestamp: u64, f: F) -> Result<R>
71+
where F: FnOnce(&[u8]) -> Result<R>
72+
{
5273
let mut lower = self.lower.borrow_mut();
5374
let mut buffer = vec![0; self.mtu];
5475
match lower.recv(&mut buffer[..]) {
5576
Ok(size) => {
5677
buffer.resize(size, 0);
57-
Ok(buffer)
78+
f(&buffer)
5879
}
5980
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
6081
Err(Error::Exhausted)
6182
}
6283
Err(err) => panic!("{}", err)
6384
}
6485
}
65-
66-
fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
67-
Ok(TxBuffer {
68-
lower: self.lower.clone(),
69-
buffer: vec![0; length]
70-
})
71-
}
7286
}
7387

7488
#[doc(hidden)]
75-
pub struct TxBuffer {
76-
lower: Rc<RefCell<sys::TapInterfaceDesc>>,
77-
buffer: Vec<u8>
78-
}
79-
80-
impl AsRef<[u8]> for TxBuffer {
81-
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
82-
}
83-
84-
impl AsMut<[u8]> for TxBuffer {
85-
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
89+
pub struct TxToken {
90+
lower: Rc<RefCell<sys::TapInterfaceDesc>>,
8691
}
8792

88-
impl Drop for TxBuffer {
89-
fn drop(&mut self) {
93+
impl phy::TxToken for TxToken {
94+
fn consume<R, F>(self, _timestamp: u64, len: usize, f: F) -> Result<R>
95+
where F: FnOnce(&mut [u8]) -> Result<R>
96+
{
9097
let mut lower = self.lower.borrow_mut();
91-
lower.send(&mut self.buffer[..]).unwrap();
98+
let mut buffer = vec![0; len];
99+
let result = f(&mut buffer);
100+
lower.send(&mut buffer[..]).unwrap();
101+
result
92102
}
93103
}

‎src/phy/tracer.rs

+50-29
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,21 @@
11
use Result;
22
use wire::pretty_print::{PrettyPrint, PrettyPrinter};
3-
use super::{DeviceCapabilities, Device};
3+
use phy::{self, DeviceCapabilities, Device};
44

55
/// A tracer device.
66
///
77
/// A tracer is a device that pretty prints all packets traversing it
88
/// using the provided writer function, and then passes them to another
99
/// device.
10-
pub struct Tracer<D: Device, P: PrettyPrint> {
11-
inner: D,
12-
writer: fn(u64, PrettyPrinter<P>)
10+
pub struct Tracer<D: for<'a> Device<'a>, P: PrettyPrint> {
11+
inner: D,
12+
writer: fn(u64, PrettyPrinter<P>),
1313
}
1414

15-
impl<D: Device, P: PrettyPrint> Tracer<D, P> {
15+
impl<D: for<'a> Device<'a>, P: PrettyPrint> Tracer<D, P> {
1616
/// Create a tracer device.
1717
pub fn new(inner: D, writer: fn(timestamp: u64, printer: PrettyPrinter<P>)) -> Tracer<D, P> {
18-
Tracer {
19-
inner: inner,
20-
writer: writer
21-
}
18+
Tracer { inner, writer }
2219
}
2320

2421
/// Return the underlying device, consuming the tracer.
@@ -27,41 +24,65 @@ impl<D: Device, P: PrettyPrint> Tracer<D, P> {
2724
}
2825
}
2926

30-
impl<D: Device, P: PrettyPrint> Device for Tracer<D, P> {
31-
type RxBuffer = D::RxBuffer;
32-
type TxBuffer = TxBuffer<D::TxBuffer, P>;
27+
impl<'a, D, P> Device<'a> for Tracer<D, P>
28+
where D: for<'b> Device<'b>,
29+
P: PrettyPrint + 'a,
30+
{
31+
type RxToken = RxToken<<D as Device<'a>>::RxToken, P>;
32+
type TxToken = TxToken<<D as Device<'a>>::TxToken, P>;
3333

3434
fn capabilities(&self) -> DeviceCapabilities { self.inner.capabilities() }
3535

36-
fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer> {
37-
let buffer = self.inner.receive(timestamp)?;
38-
(self.writer)(timestamp, PrettyPrinter::<P>::new("<- ", &buffer));
39-
Ok(buffer)
36+
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
37+
let &mut Self { ref mut inner, writer, .. } = self;
38+
inner.receive().map(|(rx_token, tx_token)| {
39+
let rx = RxToken { token: rx_token, writer: writer };
40+
let tx = TxToken { token: tx_token, writer: writer };
41+
(rx, tx)
42+
})
4043
}
4144

42-
fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
43-
let buffer = self.inner.transmit(timestamp, length)?;
44-
Ok(TxBuffer { buffer, timestamp, writer: self.writer })
45+
fn transmit(&'a mut self) -> Option<Self::TxToken> {
46+
let &mut Self { ref mut inner, writer } = self;
47+
inner.transmit().map(|tx_token| {
48+
TxToken { token: tx_token, writer: writer }
49+
})
4550
}
4651
}
4752

4853
#[doc(hidden)]
49-
pub struct TxBuffer<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> {
50-
buffer: B,
51-
timestamp: u64,
54+
pub struct RxToken<Rx: phy::RxToken, P: PrettyPrint> {
55+
token: Rx,
5256
writer: fn(u64, PrettyPrinter<P>)
5357
}
5458

55-
impl<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> AsRef<[u8]> for TxBuffer<B, P> {
56-
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
59+
impl<Rx: phy::RxToken, P: PrettyPrint> phy::RxToken for RxToken<Rx, P> {
60+
fn consume<R, F>(self, timestamp: u64, f: F) -> Result<R>
61+
where F: FnOnce(&[u8]) -> Result<R>
62+
{
63+
let Self { token, writer } = self;
64+
token.consume(timestamp, |buffer| {
65+
writer(timestamp, PrettyPrinter::<P>::new("<- ", &buffer));
66+
f(buffer)
67+
})
68+
}
5769
}
5870

59-
impl<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> AsMut<[u8]> for TxBuffer<B, P> {
60-
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
71+
#[doc(hidden)]
72+
pub struct TxToken<Tx: phy::TxToken, P: PrettyPrint> {
73+
token: Tx,
74+
writer: fn(u64, PrettyPrinter<P>)
6175
}
6276

63-
impl<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> Drop for TxBuffer<B, P> {
64-
fn drop(&mut self) {
65-
(self.writer)(self.timestamp, PrettyPrinter::<P>::new("-> ", &self.buffer));
77+
impl<Tx: phy::TxToken, P: PrettyPrint> phy::TxToken for TxToken<Tx, P> {
78+
fn consume<R, F>(self, timestamp: u64, len: usize, f: F) -> Result<R>
79+
where F: FnOnce(&mut [u8]) -> Result<R>
80+
{
81+
let Self { token, writer } = self;
82+
token.consume(timestamp, len, |buffer| {
83+
let result = f(buffer);
84+
writer(timestamp, PrettyPrinter::<P>::new("-> ", &buffer));
85+
result
86+
})
6687
}
6788
}

0 commit comments

Comments
 (0)
Please sign in to comment.