Skip to content

Commit fb05226

Browse files
committedSep 7, 2017
Refactor the "continuous" ring buffer interface.
This also makes TcpSocket::{send,recv}_slice slightly more efficient in case when the slice wraps around the corresponding buffer, halving the necessary amount of calls.
1 parent 9cb8131 commit fb05226

File tree

4 files changed

+391
-110
lines changed

4 files changed

+391
-110
lines changed
 

‎src/socket/raw.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl<'a, 'b> RawSocket<'a, 'b> {
127127
/// **Note:** The IP header is parsed and reserialized, and may not match
128128
/// the header actually transmitted bit for bit.
129129
pub fn send(&mut self, size: usize) -> Result<&mut [u8]> {
130-
let packet_buf = self.tx_buffer.try_enqueue(|buf| buf.resize(size))?;
130+
let packet_buf = self.tx_buffer.enqueue_one_with(|buf| buf.resize(size))?;
131131
net_trace!("[{}]:{}:{}: buffer to send {} octets",
132132
self.debug_id, self.ip_version, self.ip_protocol,
133133
packet_buf.size);
@@ -149,7 +149,7 @@ impl<'a, 'b> RawSocket<'a, 'b> {
149149
/// **Note:** The IP header is parsed and reserialized, and may not match
150150
/// the header actually received bit for bit.
151151
pub fn recv(&mut self) -> Result<&[u8]> {
152-
let packet_buf = self.rx_buffer.dequeue()?;
152+
let packet_buf = self.rx_buffer.dequeue_one()?;
153153
net_trace!("[{}]:{}:{}: receive {} buffered octets",
154154
self.debug_id, self.ip_version, self.ip_protocol,
155155
packet_buf.size);
@@ -172,7 +172,7 @@ impl<'a, 'b> RawSocket<'a, 'b> {
172172

173173
let header_len = ip_repr.buffer_len();
174174
let total_len = header_len + payload.len();
175-
let packet_buf = self.rx_buffer.try_enqueue(|buf| buf.resize(total_len))?;
175+
let packet_buf = self.rx_buffer.enqueue_one_with(|buf| buf.resize(total_len))?;
176176
ip_repr.emit(&mut packet_buf.as_mut()[..header_len]);
177177
packet_buf.as_mut()[header_len..].copy_from_slice(payload);
178178
net_trace!("[{}]:{}:{}: receiving {} octets",
@@ -202,7 +202,7 @@ impl<'a, 'b> RawSocket<'a, 'b> {
202202
let debug_id = self.debug_id;
203203
let ip_protocol = self.ip_protocol;
204204
let ip_version = self.ip_version;
205-
self.tx_buffer.try_dequeue(|packet_buf| {
205+
self.tx_buffer.dequeue_one_with(|packet_buf| {
206206
match prepare(ip_protocol, packet_buf.as_mut()) {
207207
Ok((ip_repr, raw_packet)) => {
208208
net_trace!("[{}]:{}:{}: sending {} octets",

‎src/socket/tcp.rs

+32-16
Original file line numberDiff line numberDiff line change
@@ -475,14 +475,13 @@ impl<'a> TcpSocket<'a> {
475475
pub fn send(&mut self, size: usize) -> Result<&mut [u8]> {
476476
if !self.may_send() { return Err(Error::Illegal) }
477477

478-
#[cfg(any(test, feature = "verbose"))]
479-
let old_length = self.tx_buffer.len();
480-
let buffer = self.tx_buffer.enqueue_slice(size);
478+
let _old_length = self.tx_buffer.len();
479+
let buffer = self.tx_buffer.enqueue_many(size);
481480
if buffer.len() > 0 {
482481
#[cfg(any(test, feature = "verbose"))]
483482
net_trace!("[{}]{}:{}: tx buffer: enqueueing {} octets (now {})",
484483
self.debug_id, self.local_endpoint, self.remote_endpoint,
485-
buffer.len(), old_length + buffer.len());
484+
buffer.len(), _old_length + buffer.len());
486485
self.timer.reset();
487486
}
488487
Ok(buffer)
@@ -495,10 +494,18 @@ impl<'a> TcpSocket<'a> {
495494
///
496495
/// See also [send](#method.send).
497496
pub fn send_slice(&mut self, data: &[u8]) -> Result<usize> {
498-
let buffer = self.send(data.len())?;
499-
let data = &data[..buffer.len()];
500-
buffer.copy_from_slice(data);
501-
Ok(buffer.len())
497+
if !self.may_send() { return Err(Error::Illegal) }
498+
499+
let old_length = self.tx_buffer.len();
500+
let enqueued = self.tx_buffer.enqueue_slice(data);
501+
if enqueued != 0 {
502+
#[cfg(any(test, feature = "verbose"))]
503+
net_trace!("[{}]{}:{}: tx buffer: enqueueing {} octets (now {})",
504+
self.debug_id, self.local_endpoint, self.remote_endpoint,
505+
enqueued, old_length + enqueued);
506+
self.timer.reset();
507+
}
508+
Ok(enqueued)
502509
}
503510

504511
/// Dequeue a sequence of received octets, and return a pointer to it.
@@ -517,7 +524,7 @@ impl<'a> TcpSocket<'a> {
517524

518525
#[cfg(any(test, feature = "verbose"))]
519526
let old_length = self.rx_buffer.len();
520-
let buffer = self.rx_buffer.dequeue_slice(size);
527+
let buffer = self.rx_buffer.dequeue_many(size);
521528
self.remote_seq_no += buffer.len();
522529
if buffer.len() > 0 {
523530
#[cfg(any(test, feature = "verbose"))]
@@ -535,10 +542,19 @@ impl<'a> TcpSocket<'a> {
535542
///
536543
/// See also [recv](#method.recv).
537544
pub fn recv_slice(&mut self, data: &mut [u8]) -> Result<usize> {
538-
let buffer = self.recv(data.len())?;
539-
let data = &mut data[..buffer.len()];
540-
data.copy_from_slice(buffer);
541-
Ok(buffer.len())
545+
// See recv() above.
546+
if !self.may_recv() { return Err(Error::Illegal) }
547+
548+
let old_length = self.rx_buffer.len();
549+
let dequeued = self.rx_buffer.dequeue_slice(data);
550+
self.remote_seq_no += dequeued;
551+
if dequeued > 0 {
552+
#[cfg(any(test, feature = "verbose"))]
553+
net_trace!("[{}]{}:{}: rx buffer: dequeueing {} octets (now {})",
554+
self.debug_id, self.local_endpoint, self.remote_endpoint,
555+
dequeued, old_length - dequeued);
556+
}
557+
Ok(dequeued)
542558
}
543559

544560
/// Peek at a sequence of received octets without removing them from
@@ -972,7 +988,7 @@ impl<'a> TcpSocket<'a> {
972988
net_trace!("[{}]{}:{}: tx buffer: dequeueing {} octets (now {})",
973989
self.debug_id, self.local_endpoint, self.remote_endpoint,
974990
ack_len, self.tx_buffer.len() - ack_len);
975-
let acked = self.tx_buffer.dequeue_slice(ack_len);
991+
let acked = self.tx_buffer.dequeue_many(ack_len);
976992
debug_assert!(acked.len() == ack_len);
977993
}
978994

@@ -987,7 +1003,7 @@ impl<'a> TcpSocket<'a> {
9871003
net_trace!("[{}]{}:{}: rx buffer: enqueueing {} octets (now {})",
9881004
self.debug_id, self.local_endpoint, self.remote_endpoint,
9891005
repr.payload.len(), self.rx_buffer.len() + repr.payload.len());
990-
self.rx_buffer.enqueue_slice_all(repr.payload);
1006+
self.rx_buffer.enqueue_slice(repr.payload);
9911007
}
9921008

9931009
Ok(None)
@@ -1750,7 +1766,7 @@ mod test {
17501766
window_len: 58,
17511767
..RECV_TEMPL
17521768
}]);
1753-
assert_eq!(s.rx_buffer.dequeue_slice(6), &b"abcdef"[..]);
1769+
assert_eq!(s.rx_buffer.dequeue_many(6), &b"abcdef"[..]);
17541770
}
17551771

17561772
#[test]

‎src/socket/udp.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
141141
if self.endpoint.port == 0 { return Err(Error::Unaddressable) }
142142
if !endpoint.is_specified() { return Err(Error::Unaddressable) }
143143

144-
let packet_buf = self.tx_buffer.try_enqueue(|buf| buf.resize(size))?;
144+
let packet_buf = self.tx_buffer.enqueue_one_with(|buf| buf.resize(size))?;
145145
packet_buf.endpoint = endpoint;
146146
net_trace!("[{}]{}:{}: buffer to send {} octets",
147147
self.debug_id, self.endpoint, packet_buf.endpoint, size);
@@ -161,7 +161,7 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
161161
///
162162
/// This function returns `Err(Error::Exhausted)` if the receive buffer is empty.
163163
pub fn recv(&mut self) -> Result<(&[u8], IpEndpoint)> {
164-
let packet_buf = self.rx_buffer.dequeue()?;
164+
let packet_buf = self.rx_buffer.dequeue_one()?;
165165
net_trace!("[{}]{}:{}: receive {} buffered octets",
166166
self.debug_id, self.endpoint,
167167
packet_buf.endpoint, packet_buf.size);
@@ -185,7 +185,7 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
185185
if !self.endpoint.addr.is_unspecified() &&
186186
self.endpoint.addr != ip_repr.dst_addr() { return Err(Error::Rejected) }
187187

188-
let packet_buf = self.rx_buffer.try_enqueue(|buf| buf.resize(repr.payload.len()))?;
188+
let packet_buf = self.rx_buffer.enqueue_one_with(|buf| buf.resize(repr.payload.len()))?;
189189
packet_buf.as_mut().copy_from_slice(repr.payload);
190190
packet_buf.endpoint = IpEndpoint { addr: ip_repr.src_addr(), port: repr.src_port };
191191
net_trace!("[{}]{}:{}: receiving {} octets",
@@ -198,7 +198,7 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
198198
where F: FnOnce((IpRepr, UdpRepr)) -> Result<()> {
199199
let debug_id = self.debug_id;
200200
let endpoint = self.endpoint;
201-
self.tx_buffer.try_dequeue(|packet_buf| {
201+
self.tx_buffer.dequeue_one_with(|packet_buf| {
202202
net_trace!("[{}]{}:{}: sending {} octets",
203203
debug_id, endpoint,
204204
packet_buf.endpoint, packet_buf.size);

0 commit comments

Comments
 (0)
Please sign in to comment.