Skip to content

Commit 9cb8131

Browse files
committedSep 7, 2017
Merge the TCP ring buffer and generic ring buffer.
This adds a few methods to RingBuffer that don't quite fit into its interface (the slice ones), but we can fix that later.
1 parent 550deb3 commit 9cb8131

File tree

2 files changed

+128
-203
lines changed

2 files changed

+128
-203
lines changed
 

‎src/socket/tcp.rs

+8-148
Original file line numberDiff line numberDiff line change
@@ -8,122 +8,9 @@ use {Error, Result};
88
use phy::DeviceLimits;
99
use wire::{IpProtocol, IpAddress, IpEndpoint, TcpSeqNumber, TcpRepr, TcpControl};
1010
use socket::{Socket, IpRepr};
11+
use storage::RingBuffer;
1112

12-
/// A TCP stream ring buffer.
13-
#[derive(Debug)]
14-
pub struct SocketBuffer<'a> {
15-
storage: Managed<'a, [u8]>,
16-
read_at: usize,
17-
length: usize
18-
}
19-
20-
impl<'a> SocketBuffer<'a> {
21-
/// Create a packet buffer with the given storage.
22-
pub fn new<T>(storage: T) -> SocketBuffer<'a>
23-
where T: Into<Managed<'a, [u8]>> {
24-
SocketBuffer {
25-
storage: storage.into(),
26-
read_at: 0,
27-
length: 0
28-
}
29-
}
30-
31-
fn clear(&mut self) {
32-
self.read_at = 0;
33-
self.length = 0;
34-
}
35-
36-
fn capacity(&self) -> usize {
37-
self.storage.len()
38-
}
39-
40-
fn len(&self) -> usize {
41-
self.length
42-
}
43-
44-
fn window(&self) -> usize {
45-
self.capacity() - self.len()
46-
}
47-
48-
fn empty(&self) -> bool {
49-
self.len() == 0
50-
}
51-
52-
fn full(&self) -> bool {
53-
self.window() == 0
54-
}
55-
56-
fn clamp_writer(&self, mut size: usize) -> (usize, usize) {
57-
let write_at = (self.read_at + self.length) % self.storage.len();
58-
// We can't enqueue more than there is free space.
59-
let free = self.storage.len() - self.length;
60-
if size > free { size = free }
61-
// We can't contiguously enqueue past the beginning of the storage.
62-
let until_end = self.storage.len() - write_at;
63-
if size > until_end { size = until_end }
64-
65-
(write_at, size)
66-
}
67-
68-
fn enqueue(&mut self, size: usize) -> &mut [u8] {
69-
let (write_at, size) = self.clamp_writer(size);
70-
self.length += size;
71-
&mut self.storage[write_at..write_at + size]
72-
}
73-
74-
fn enqueue_slice(&mut self, data: &[u8]) {
75-
let data = {
76-
let mut dest = self.enqueue(data.len());
77-
let (data, rest) = data.split_at(dest.len());
78-
dest.copy_from_slice(data);
79-
rest
80-
};
81-
// Retry, in case we had a wraparound.
82-
let mut dest = self.enqueue(data.len());
83-
let (data, _) = data.split_at(dest.len());
84-
dest.copy_from_slice(data);
85-
}
86-
87-
fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
88-
let read_at = (self.read_at + offset) % self.storage.len();
89-
// We can't read past the end of the queued data.
90-
if offset > self.length { return (read_at, 0) }
91-
// We can't dequeue more than was queued.
92-
let clamped_length = self.length - offset;
93-
if size > clamped_length { size = clamped_length }
94-
// We can't contiguously dequeue past the end of the storage.
95-
let until_end = self.storage.len() - read_at;
96-
if size > until_end { size = until_end }
97-
98-
(read_at, size)
99-
}
100-
101-
fn dequeue(&mut self, size: usize) -> &[u8] {
102-
let (read_at, size) = self.clamp_reader(0, size);
103-
self.read_at = (self.read_at + size) % self.storage.len();
104-
self.length -= size;
105-
&self.storage[read_at..read_at + size]
106-
}
107-
108-
fn peek(&self, offset: usize, size: usize) -> &[u8] {
109-
let (read_at, size) = self.clamp_reader(offset, size);
110-
&self.storage[read_at..read_at + size]
111-
}
112-
113-
fn advance(&mut self, size: usize) {
114-
if size > self.length {
115-
panic!("advancing {} octets into free space", size - self.length)
116-
}
117-
self.read_at = (self.read_at + size) % self.storage.len();
118-
self.length -= size;
119-
}
120-
}
121-
122-
impl<'a> Into<SocketBuffer<'a>> for Managed<'a, [u8]> {
123-
fn into(self) -> SocketBuffer<'a> {
124-
SocketBuffer::new(self)
125-
}
126-
}
13+
pub type SocketBuffer<'a> = RingBuffer<'a, u8>;
12714

12815
/// The state of a TCP socket, according to [RFC 793][rfc793].
12916
/// [rfc793]: https://tools.ietf.org/html/rfc793
@@ -590,7 +477,7 @@ impl<'a> TcpSocket<'a> {
590477

591478
#[cfg(any(test, feature = "verbose"))]
592479
let old_length = self.tx_buffer.len();
593-
let buffer = self.tx_buffer.enqueue(size);
480+
let buffer = self.tx_buffer.enqueue_slice(size);
594481
if buffer.len() > 0 {
595482
#[cfg(any(test, feature = "verbose"))]
596483
net_trace!("[{}]{}:{}: tx buffer: enqueueing {} octets (now {})",
@@ -630,7 +517,7 @@ impl<'a> TcpSocket<'a> {
630517

631518
#[cfg(any(test, feature = "verbose"))]
632519
let old_length = self.rx_buffer.len();
633-
let buffer = self.rx_buffer.dequeue(size);
520+
let buffer = self.rx_buffer.dequeue_slice(size);
634521
self.remote_seq_no += buffer.len();
635522
if buffer.len() > 0 {
636523
#[cfg(any(test, feature = "verbose"))]
@@ -1085,7 +972,8 @@ impl<'a> TcpSocket<'a> {
1085972
net_trace!("[{}]{}:{}: tx buffer: dequeueing {} octets (now {})",
1086973
self.debug_id, self.local_endpoint, self.remote_endpoint,
1087974
ack_len, self.tx_buffer.len() - ack_len);
1088-
self.tx_buffer.advance(ack_len);
975+
let acked = self.tx_buffer.dequeue_slice(ack_len);
976+
debug_assert!(acked.len() == ack_len);
1089977
}
1090978

1091979
// We've processed everything in the incoming segment, so advance the local
@@ -1099,7 +987,7 @@ impl<'a> TcpSocket<'a> {
1099987
net_trace!("[{}]{}:{}: rx buffer: enqueueing {} octets (now {})",
1100988
self.debug_id, self.local_endpoint, self.remote_endpoint,
1101989
repr.payload.len(), self.rx_buffer.len() + repr.payload.len());
1102-
self.rx_buffer.enqueue_slice(repr.payload);
990+
self.rx_buffer.enqueue_slice_all(repr.payload);
1103991
}
1104992

1105993
Ok(None)
@@ -1317,34 +1205,6 @@ mod test {
13171205
use wire::{IpAddress, Ipv4Address};
13181206
use super::*;
13191207

1320-
#[test]
1321-
fn test_buffer() {
1322-
let mut buffer = SocketBuffer::new(vec![0; 8]); // ........
1323-
buffer.enqueue(6).copy_from_slice(b"foobar"); // foobar..
1324-
assert_eq!(buffer.dequeue(3), b"foo"); // ...bar..
1325-
buffer.enqueue(6).copy_from_slice(b"ba"); // ...barba
1326-
buffer.enqueue(4).copy_from_slice(b"zho"); // zhobarba
1327-
assert_eq!(buffer.dequeue(6), b"barba"); // zho.....
1328-
assert_eq!(buffer.dequeue(8), b"zho"); // ........
1329-
buffer.enqueue(8).copy_from_slice(b"gefug"); // ...gefug
1330-
}
1331-
1332-
#[test]
1333-
fn test_buffer_wraparound() {
1334-
let mut buffer = SocketBuffer::new(vec![0; 8]); // ........
1335-
buffer.enqueue_slice(&b"foobar"[..]); // foobar..
1336-
assert_eq!(buffer.dequeue(3), b"foo"); // ...bar..
1337-
buffer.enqueue_slice(&b"bazhoge"[..]); // zhobarba
1338-
}
1339-
1340-
#[test]
1341-
fn test_buffer_peek() {
1342-
let mut buffer = SocketBuffer::new(vec![0; 8]); // ........
1343-
buffer.enqueue_slice(&b"foobar"[..]); // foobar..
1344-
assert_eq!(buffer.peek(0, 8), &b"foobar"[..]);
1345-
assert_eq!(buffer.peek(3, 8), &b"bar"[..]);
1346-
}
1347-
13481208
#[test]
13491209
fn test_timer_retransmit() {
13501210
let mut r = Timer::Idle;
@@ -1890,7 +1750,7 @@ mod test {
18901750
window_len: 58,
18911751
..RECV_TEMPL
18921752
}]);
1893-
assert_eq!(s.rx_buffer.dequeue(6), &b"abcdef"[..]);
1753+
assert_eq!(s.rx_buffer.dequeue_slice(6), &b"abcdef"[..]);
18941754
}
18951755

18961756
#[test]

‎src/storage/ring_buffer.rs

+120-55
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,81 @@
1-
use managed::Managed;
1+
use managed::{Managed, ManagedSlice};
22

33
use {Error, Result};
44
use super::Resettable;
55

66
/// A ring buffer.
77
#[derive(Debug)]
88
pub struct RingBuffer<'a, T: 'a> {
9-
storage: Managed<'a, [T]>,
9+
storage: ManagedSlice<'a, T>,
1010
read_at: usize,
11-
length: usize,
11+
length: usize,
1212
}
1313

1414
impl<'a, T: 'a> RingBuffer<'a, T> {
1515
/// Create a ring buffer with the given storage.
1616
///
1717
/// During creation, every element in `storage` is reset.
1818
pub fn new<S>(storage: S) -> RingBuffer<'a, T>
19-
where S: Into<Managed<'a, [T]>>, T: Resettable,
19+
where S: Into<ManagedSlice<'a, T>>,
2020
{
21-
let mut storage = storage.into();
22-
for elem in storage.iter_mut() {
23-
elem.reset();
24-
}
25-
2621
RingBuffer {
27-
storage: storage,
22+
storage: storage.into(),
2823
read_at: 0,
2924
length: 0,
3025
}
3126
}
3227

33-
fn mask(&self, index: usize) -> usize {
34-
index % self.storage.len()
28+
/// Clear the ring buffer.
29+
pub fn clear(&mut self) {
30+
self.read_at = 0;
31+
self.length = 0;
32+
}
33+
34+
/// Clear the ring buffer, and reset every element.
35+
pub fn reset(&mut self)
36+
where T: Resettable {
37+
self.clear();
38+
for elem in self.storage.iter_mut() {
39+
elem.reset();
40+
}
3541
}
3642

37-
fn incr(&self, index: usize) -> usize {
38-
self.mask(index + 1)
43+
/// Return the current number of elements in the ring buffer.
44+
pub fn len(&self) -> usize {
45+
self.length
46+
}
47+
48+
/// Return the maximum number of elements in the ring buffer.
49+
pub fn capacity(&self) -> usize {
50+
self.storage.len()
51+
}
52+
53+
/// Return the number of elements that can be added to the ring buffer.
54+
pub fn window(&self) -> usize {
55+
self.capacity() - self.len()
3956
}
4057

4158
/// Query whether the buffer is empty.
4259
pub fn empty(&self) -> bool {
43-
self.length == 0
60+
self.len() == 0
4461
}
4562

4663
/// Query whether the buffer is full.
4764
pub fn full(&self) -> bool {
48-
self.length == self.storage.len()
49-
}
50-
51-
/// Enqueue an element into the buffer, and return a pointer to it, or return
52-
/// `Err(Error::Exhausted)` if the buffer is full.
53-
pub fn enqueue<'b>(&'b mut self) -> Result<&'b mut T> {
54-
if self.full() { return Err(Error::Exhausted) }
55-
56-
let index = self.mask(self.read_at + self.length);
57-
self.length += 1;
58-
Ok(&mut self.storage[index])
65+
self.window() == 0
5966
}
67+
}
6068

61-
/// Call `f` with a buffer element, and enqueue the element if `f` returns successfully, or
62-
/// return `Err(Error::Exhausted)` if the buffer is full.
69+
// This is the "discrete" ring buffer interface: it operates with single elements,
70+
// and boundary conditions (empty/full) are errors.
71+
impl<'a, T: 'a> RingBuffer<'a, T> {
72+
/// Call `f` with a single buffer element, and enqueue the element if `f`
73+
/// returns successfully, or return `Err(Error::Exhausted)` if the buffer is full.
6374
pub fn try_enqueue<'b, R, F>(&'b mut self, f: F) -> Result<R>
6475
where F: FnOnce(&'b mut T) -> Result<R> {
6576
if self.full() { return Err(Error::Exhausted) }
6677

67-
let index = self.mask(self.read_at + self.length);
78+
let index = (self.read_at + self.length) % self.capacity();
6879
match f(&mut self.storage[index]) {
6980
Ok(result) => {
7081
self.length += 1;
@@ -74,15 +85,10 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
7485
}
7586
}
7687

77-
/// Dequeue an element from the buffer, and return a mutable reference to it, or return
78-
/// `Err(Error::Exhausted)` if the buffer is empty.
79-
pub fn dequeue(&mut self) -> Result<&mut T> {
80-
if self.empty() { return Err(Error::Exhausted) }
81-
82-
let read_at = self.read_at;
83-
self.length -= 1;
84-
self.read_at = self.incr(self.read_at);
85-
Ok(&mut self.storage[read_at])
88+
/// Enqueue a single element into the buffer, and return a pointer to it,
89+
/// or return `Err(Error::Exhausted)` if the buffer is full.
90+
pub fn enqueue<'b>(&'b mut self) -> Result<&'b mut T> {
91+
self.try_enqueue(Ok)
8692
}
8793

8894
/// Call `f` with a buffer element, and dequeue the element if `f` returns successfully, or
@@ -91,7 +97,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
9197
where F: FnOnce(&'b mut T) -> Result<R> {
9298
if self.empty() { return Err(Error::Exhausted) }
9399

94-
let next_at = self.incr(self.read_at);
100+
let next_at = (self.read_at + 1) % self.capacity();
95101
match f(&mut self.storage[self.read_at]) {
96102
Ok(result) => {
97103
self.length -= 1;
@@ -101,32 +107,91 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
101107
Err(error) => Err(error)
102108
}
103109
}
110+
111+
/// Dequeue an element from the buffer, and return a mutable reference to it, or return
112+
/// `Err(Error::Exhausted)` if the buffer is empty.
113+
pub fn dequeue(&mut self) -> Result<&mut T> {
114+
self.try_dequeue(Ok)
115+
}
104116
}
105117

106-
#[cfg(test)]
107-
mod test {
108-
use super::*;
118+
// This is the "continuous" ring buffer interface: it operates with element slices,
119+
// and boundary conditions (empty/full) simply result in empty slices.
120+
impl<'a, T: 'a> RingBuffer<'a, T> {
121+
fn clamp_writer(&self, mut size: usize) -> (usize, usize) {
122+
let write_at = (self.read_at + self.length) % self.capacity();
123+
// We can't enqueue more than there is free space.
124+
let free = self.capacity() - self.length;
125+
if size > free { size = free }
126+
// We can't contiguously enqueue past the beginning of the storage.
127+
let until_end = self.capacity() - write_at;
128+
if size > until_end { size = until_end }
129+
130+
(write_at, size)
131+
}
109132

110-
impl Resettable for usize {
111-
fn reset(&mut self) {
112-
*self = 0;
113-
}
133+
pub(crate) fn enqueue_slice<'b>(&'b mut self, size: usize) -> &'b mut [T] {
134+
let (write_at, size) = self.clamp_writer(size);
135+
self.length += size;
136+
&mut self.storage[write_at..write_at + size]
114137
}
115138

116-
const SIZE: usize = 5;
139+
pub(crate) fn enqueue_slice_all(&mut self, data: &[T])
140+
where T: Copy {
141+
let data = {
142+
let mut dest = self.enqueue_slice(data.len());
143+
let (data, rest) = data.split_at(dest.len());
144+
dest.copy_from_slice(data);
145+
rest
146+
};
147+
// Retry, in case we had a wraparound.
148+
let mut dest = self.enqueue_slice(data.len());
149+
let (data, _) = data.split_at(dest.len());
150+
dest.copy_from_slice(data);
151+
}
117152

118-
fn buffer() -> RingBuffer<'static, usize> {
119-
let mut storage = vec![];
120-
for i in 0..SIZE {
121-
storage.push(i + 10);
122-
}
153+
fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
154+
let read_at = (self.read_at + offset) % self.capacity();
155+
// We can't read past the end of the queued data.
156+
if offset > self.length { return (read_at, 0) }
157+
// We can't dequeue more than was queued.
158+
let clamped_length = self.length - offset;
159+
if size > clamped_length { size = clamped_length }
160+
// We can't contiguously dequeue past the end of the storage.
161+
let until_end = self.capacity() - read_at;
162+
if size > until_end { size = until_end }
163+
164+
(read_at, size)
165+
}
166+
167+
pub(crate) fn dequeue_slice(&mut self, size: usize) -> &[T] {
168+
let (read_at, size) = self.clamp_reader(0, size);
169+
self.read_at = (self.read_at + size) % self.capacity();
170+
self.length -= size;
171+
&self.storage[read_at..read_at + size]
172+
}
123173

124-
RingBuffer::new(storage)
174+
pub(crate) fn peek(&self, offset: usize, size: usize) -> &[T] {
175+
let (read_at, size) = self.clamp_reader(offset, size);
176+
&self.storage[read_at..read_at + size]
125177
}
178+
}
179+
180+
impl<'a, T: 'a> From<ManagedSlice<'a, T>> for RingBuffer<'a, T> {
181+
fn from(slice: ManagedSlice<'a, T>) -> RingBuffer<'a, T> {
182+
RingBuffer::new(slice)
183+
}
184+
}
185+
186+
#[cfg(test)]
187+
mod test {
188+
use super::*;
189+
190+
const SIZE: usize = 5;
126191

127192
#[test]
128193
pub fn test_buffer() {
129-
let mut buf = buffer();
194+
let mut buf = RingBuffer::new(vec![0; SIZE]);
130195
assert!(buf.empty());
131196
assert!(!buf.full());
132197
assert_eq!(buf.dequeue(), Err(Error::Exhausted));
@@ -152,7 +217,7 @@ mod test {
152217

153218
#[test]
154219
pub fn test_buffer_try() {
155-
let mut buf = buffer();
220+
let mut buf = RingBuffer::new(vec![0; SIZE]);
156221
assert!(buf.empty());
157222
assert!(!buf.full());
158223
assert_eq!(buf.try_dequeue(|_| unreachable!()) as Result<()>,

0 commit comments

Comments
 (0)