Skip to content

Commit

Permalink
Merge the TCP ring buffer and generic ring buffer.
Browse files Browse the repository at this point in the history
This adds a few methods to RingBuffer that don't quite fit into
its interface (the slice ones), but we can fix that later.
whitequark committed Sep 7, 2017
1 parent 550deb3 commit 9cb8131
Showing 2 changed files with 128 additions and 203 deletions.
156 changes: 8 additions & 148 deletions src/socket/tcp.rs
Original file line number Diff line number Diff line change
@@ -8,122 +8,9 @@ use {Error, Result};
use phy::DeviceLimits;
use wire::{IpProtocol, IpAddress, IpEndpoint, TcpSeqNumber, TcpRepr, TcpControl};
use socket::{Socket, IpRepr};
use storage::RingBuffer;

/// A TCP stream ring buffer.
#[derive(Debug)]
pub struct SocketBuffer<'a> {
storage: Managed<'a, [u8]>,
read_at: usize,
length: usize
}

impl<'a> SocketBuffer<'a> {
/// Create a packet buffer with the given storage.
pub fn new<T>(storage: T) -> SocketBuffer<'a>
where T: Into<Managed<'a, [u8]>> {
SocketBuffer {
storage: storage.into(),
read_at: 0,
length: 0
}
}

fn clear(&mut self) {
self.read_at = 0;
self.length = 0;
}

fn capacity(&self) -> usize {
self.storage.len()
}

fn len(&self) -> usize {
self.length
}

fn window(&self) -> usize {
self.capacity() - self.len()
}

fn empty(&self) -> bool {
self.len() == 0
}

fn full(&self) -> bool {
self.window() == 0
}

fn clamp_writer(&self, mut size: usize) -> (usize, usize) {
let write_at = (self.read_at + self.length) % self.storage.len();
// We can't enqueue more than there is free space.
let free = self.storage.len() - self.length;
if size > free { size = free }
// We can't contiguously enqueue past the beginning of the storage.
let until_end = self.storage.len() - write_at;
if size > until_end { size = until_end }

(write_at, size)
}

fn enqueue(&mut self, size: usize) -> &mut [u8] {
let (write_at, size) = self.clamp_writer(size);
self.length += size;
&mut self.storage[write_at..write_at + size]
}

fn enqueue_slice(&mut self, data: &[u8]) {
let data = {
let mut dest = self.enqueue(data.len());
let (data, rest) = data.split_at(dest.len());
dest.copy_from_slice(data);
rest
};
// Retry, in case we had a wraparound.
let mut dest = self.enqueue(data.len());
let (data, _) = data.split_at(dest.len());
dest.copy_from_slice(data);
}

fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
let read_at = (self.read_at + offset) % self.storage.len();
// We can't read past the end of the queued data.
if offset > self.length { return (read_at, 0) }
// We can't dequeue more than was queued.
let clamped_length = self.length - offset;
if size > clamped_length { size = clamped_length }
// We can't contiguously dequeue past the end of the storage.
let until_end = self.storage.len() - read_at;
if size > until_end { size = until_end }

(read_at, size)
}

fn dequeue(&mut self, size: usize) -> &[u8] {
let (read_at, size) = self.clamp_reader(0, size);
self.read_at = (self.read_at + size) % self.storage.len();
self.length -= size;
&self.storage[read_at..read_at + size]
}

fn peek(&self, offset: usize, size: usize) -> &[u8] {
let (read_at, size) = self.clamp_reader(offset, size);
&self.storage[read_at..read_at + size]
}

fn advance(&mut self, size: usize) {
if size > self.length {
panic!("advancing {} octets into free space", size - self.length)
}
self.read_at = (self.read_at + size) % self.storage.len();
self.length -= size;
}
}

impl<'a> Into<SocketBuffer<'a>> for Managed<'a, [u8]> {
fn into(self) -> SocketBuffer<'a> {
SocketBuffer::new(self)
}
}
pub type SocketBuffer<'a> = RingBuffer<'a, u8>;

/// The state of a TCP socket, according to [RFC 793][rfc793].
/// [rfc793]: https://tools.ietf.org/html/rfc793
@@ -590,7 +477,7 @@ impl<'a> TcpSocket<'a> {

#[cfg(any(test, feature = "verbose"))]
let old_length = self.tx_buffer.len();
let buffer = self.tx_buffer.enqueue(size);
let buffer = self.tx_buffer.enqueue_slice(size);
if buffer.len() > 0 {
#[cfg(any(test, feature = "verbose"))]
net_trace!("[{}]{}:{}: tx buffer: enqueueing {} octets (now {})",
@@ -630,7 +517,7 @@ impl<'a> TcpSocket<'a> {

#[cfg(any(test, feature = "verbose"))]
let old_length = self.rx_buffer.len();
let buffer = self.rx_buffer.dequeue(size);
let buffer = self.rx_buffer.dequeue_slice(size);
self.remote_seq_no += buffer.len();
if buffer.len() > 0 {
#[cfg(any(test, feature = "verbose"))]
@@ -1085,7 +972,8 @@ impl<'a> TcpSocket<'a> {
net_trace!("[{}]{}:{}: tx buffer: dequeueing {} octets (now {})",
self.debug_id, self.local_endpoint, self.remote_endpoint,
ack_len, self.tx_buffer.len() - ack_len);
self.tx_buffer.advance(ack_len);
let acked = self.tx_buffer.dequeue_slice(ack_len);
debug_assert!(acked.len() == ack_len);
}

// We've processed everything in the incoming segment, so advance the local
@@ -1099,7 +987,7 @@ impl<'a> TcpSocket<'a> {
net_trace!("[{}]{}:{}: rx buffer: enqueueing {} octets (now {})",
self.debug_id, self.local_endpoint, self.remote_endpoint,
repr.payload.len(), self.rx_buffer.len() + repr.payload.len());
self.rx_buffer.enqueue_slice(repr.payload);
self.rx_buffer.enqueue_slice_all(repr.payload);
}

Ok(None)
@@ -1317,34 +1205,6 @@ mod test {
use wire::{IpAddress, Ipv4Address};
use super::*;

#[test]
fn test_buffer() {
let mut buffer = SocketBuffer::new(vec![0; 8]); // ........
buffer.enqueue(6).copy_from_slice(b"foobar"); // foobar..
assert_eq!(buffer.dequeue(3), b"foo"); // ...bar..
buffer.enqueue(6).copy_from_slice(b"ba"); // ...barba
buffer.enqueue(4).copy_from_slice(b"zho"); // zhobarba
assert_eq!(buffer.dequeue(6), b"barba"); // zho.....
assert_eq!(buffer.dequeue(8), b"zho"); // ........
buffer.enqueue(8).copy_from_slice(b"gefug"); // ...gefug
}

#[test]
fn test_buffer_wraparound() {
let mut buffer = SocketBuffer::new(vec![0; 8]); // ........
buffer.enqueue_slice(&b"foobar"[..]); // foobar..
assert_eq!(buffer.dequeue(3), b"foo"); // ...bar..
buffer.enqueue_slice(&b"bazhoge"[..]); // zhobarba
}

#[test]
fn test_buffer_peek() {
let mut buffer = SocketBuffer::new(vec![0; 8]); // ........
buffer.enqueue_slice(&b"foobar"[..]); // foobar..
assert_eq!(buffer.peek(0, 8), &b"foobar"[..]);
assert_eq!(buffer.peek(3, 8), &b"bar"[..]);
}

#[test]
fn test_timer_retransmit() {
let mut r = Timer::Idle;
@@ -1890,7 +1750,7 @@ mod test {
window_len: 58,
..RECV_TEMPL
}]);
assert_eq!(s.rx_buffer.dequeue(6), &b"abcdef"[..]);
assert_eq!(s.rx_buffer.dequeue_slice(6), &b"abcdef"[..]);
}

#[test]
175 changes: 120 additions & 55 deletions src/storage/ring_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,81 @@
use managed::Managed;
use managed::{Managed, ManagedSlice};

use {Error, Result};
use super::Resettable;

/// A ring buffer.
#[derive(Debug)]
pub struct RingBuffer<'a, T: 'a> {
storage: Managed<'a, [T]>,
storage: ManagedSlice<'a, T>,
read_at: usize,
length: usize,
length: usize,
}

impl<'a, T: 'a> RingBuffer<'a, T> {
/// Create a ring buffer with the given storage.
///
/// During creation, every element in `storage` is reset.
pub fn new<S>(storage: S) -> RingBuffer<'a, T>
where S: Into<Managed<'a, [T]>>, T: Resettable,
where S: Into<ManagedSlice<'a, T>>,
{
let mut storage = storage.into();
for elem in storage.iter_mut() {
elem.reset();
}

RingBuffer {
storage: storage,
storage: storage.into(),
read_at: 0,
length: 0,
}
}

fn mask(&self, index: usize) -> usize {
index % self.storage.len()
/// Clear the ring buffer.
pub fn clear(&mut self) {
self.read_at = 0;
self.length = 0;
}

/// Clear the ring buffer, and reset every element.
pub fn reset(&mut self)
where T: Resettable {
self.clear();
for elem in self.storage.iter_mut() {
elem.reset();
}
}

fn incr(&self, index: usize) -> usize {
self.mask(index + 1)
/// Return the current number of elements in the ring buffer.
pub fn len(&self) -> usize {
self.length
}

/// Return the maximum number of elements in the ring buffer.
pub fn capacity(&self) -> usize {
self.storage.len()
}

/// Return the number of elements that can be added to the ring buffer.
pub fn window(&self) -> usize {
self.capacity() - self.len()
}

/// Query whether the buffer is empty.
pub fn empty(&self) -> bool {
self.length == 0
self.len() == 0
}

/// Query whether the buffer is full.
pub fn full(&self) -> bool {
self.length == self.storage.len()
}

/// Enqueue an element into the buffer, and return a pointer to it, or return
/// `Err(Error::Exhausted)` if the buffer is full.
pub fn enqueue<'b>(&'b mut self) -> Result<&'b mut T> {
if self.full() { return Err(Error::Exhausted) }

let index = self.mask(self.read_at + self.length);
self.length += 1;
Ok(&mut self.storage[index])
self.window() == 0
}
}

/// Call `f` with a buffer element, and enqueue the element if `f` returns successfully, or
/// return `Err(Error::Exhausted)` if the buffer is full.
// This is the "discrete" ring buffer interface: it operates with single elements,
// and boundary conditions (empty/full) are errors.
impl<'a, T: 'a> RingBuffer<'a, T> {
/// Call `f` with a single buffer element, and enqueue the element if `f`
/// returns successfully, or return `Err(Error::Exhausted)` if the buffer is full.
pub fn try_enqueue<'b, R, F>(&'b mut self, f: F) -> Result<R>
where F: FnOnce(&'b mut T) -> Result<R> {
if self.full() { return Err(Error::Exhausted) }

let index = self.mask(self.read_at + self.length);
let index = (self.read_at + self.length) % self.capacity();
match f(&mut self.storage[index]) {
Ok(result) => {
self.length += 1;
@@ -74,15 +85,10 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
}
}

/// Dequeue an element from the buffer, and return a mutable reference to it, or return
/// `Err(Error::Exhausted)` if the buffer is empty.
pub fn dequeue(&mut self) -> Result<&mut T> {
if self.empty() { return Err(Error::Exhausted) }

let read_at = self.read_at;
self.length -= 1;
self.read_at = self.incr(self.read_at);
Ok(&mut self.storage[read_at])
/// Enqueue a single element into the buffer, and return a pointer to it,
/// or return `Err(Error::Exhausted)` if the buffer is full.
pub fn enqueue<'b>(&'b mut self) -> Result<&'b mut T> {
self.try_enqueue(Ok)
}

/// Call `f` with a buffer element, and dequeue the element if `f` returns successfully, or
@@ -91,7 +97,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
where F: FnOnce(&'b mut T) -> Result<R> {
if self.empty() { return Err(Error::Exhausted) }

let next_at = self.incr(self.read_at);
let next_at = (self.read_at + 1) % self.capacity();
match f(&mut self.storage[self.read_at]) {
Ok(result) => {
self.length -= 1;
@@ -101,32 +107,91 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
Err(error) => Err(error)
}
}

/// Dequeue an element from the buffer, and return a mutable reference to it, or return
/// `Err(Error::Exhausted)` if the buffer is empty.
pub fn dequeue(&mut self) -> Result<&mut T> {
self.try_dequeue(Ok)
}
}

#[cfg(test)]
mod test {
use super::*;
// This is the "continuous" ring buffer interface: it operates with element slices,
// and boundary conditions (empty/full) simply result in empty slices.
impl<'a, T: 'a> RingBuffer<'a, T> {
fn clamp_writer(&self, mut size: usize) -> (usize, usize) {
let write_at = (self.read_at + self.length) % self.capacity();
// We can't enqueue more than there is free space.
let free = self.capacity() - self.length;
if size > free { size = free }
// We can't contiguously enqueue past the beginning of the storage.
let until_end = self.capacity() - write_at;
if size > until_end { size = until_end }

(write_at, size)
}

impl Resettable for usize {
fn reset(&mut self) {
*self = 0;
}
pub(crate) fn enqueue_slice<'b>(&'b mut self, size: usize) -> &'b mut [T] {
let (write_at, size) = self.clamp_writer(size);
self.length += size;
&mut self.storage[write_at..write_at + size]
}

const SIZE: usize = 5;
pub(crate) fn enqueue_slice_all(&mut self, data: &[T])
where T: Copy {
let data = {
let mut dest = self.enqueue_slice(data.len());
let (data, rest) = data.split_at(dest.len());
dest.copy_from_slice(data);
rest
};
// Retry, in case we had a wraparound.
let mut dest = self.enqueue_slice(data.len());
let (data, _) = data.split_at(dest.len());
dest.copy_from_slice(data);
}

fn buffer() -> RingBuffer<'static, usize> {
let mut storage = vec![];
for i in 0..SIZE {
storage.push(i + 10);
}
fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
let read_at = (self.read_at + offset) % self.capacity();
// We can't read past the end of the queued data.
if offset > self.length { return (read_at, 0) }
// We can't dequeue more than was queued.
let clamped_length = self.length - offset;
if size > clamped_length { size = clamped_length }
// We can't contiguously dequeue past the end of the storage.
let until_end = self.capacity() - read_at;
if size > until_end { size = until_end }

(read_at, size)
}

pub(crate) fn dequeue_slice(&mut self, size: usize) -> &[T] {
let (read_at, size) = self.clamp_reader(0, size);
self.read_at = (self.read_at + size) % self.capacity();
self.length -= size;
&self.storage[read_at..read_at + size]
}

RingBuffer::new(storage)
pub(crate) fn peek(&self, offset: usize, size: usize) -> &[T] {
let (read_at, size) = self.clamp_reader(offset, size);
&self.storage[read_at..read_at + size]
}
}

impl<'a, T: 'a> From<ManagedSlice<'a, T>> for RingBuffer<'a, T> {
fn from(slice: ManagedSlice<'a, T>) -> RingBuffer<'a, T> {
RingBuffer::new(slice)
}
}

#[cfg(test)]
mod test {
use super::*;

const SIZE: usize = 5;

#[test]
pub fn test_buffer() {
let mut buf = buffer();
let mut buf = RingBuffer::new(vec![0; SIZE]);
assert!(buf.empty());
assert!(!buf.full());
assert_eq!(buf.dequeue(), Err(Error::Exhausted));
@@ -152,7 +217,7 @@ mod test {

#[test]
pub fn test_buffer_try() {
let mut buf = buffer();
let mut buf = RingBuffer::new(vec![0; SIZE]);
assert!(buf.empty());
assert!(!buf.full());
assert_eq!(buf.try_dequeue(|_| unreachable!()) as Result<()>,

1 comment on commit 9cb8131

@whitequark
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @batonius on this and next two commits

Sorry, something went wrong.

Please sign in to comment.