Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: smoltcp-rs/smoltcp
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 2b824d6a9582
Choose a base ref
...
head repository: smoltcp-rs/smoltcp
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: c3eee36b8fe5
Choose a head ref
  • 2 commits
  • 3 files changed
  • 1 contributor

Commits on Dec 26, 2016

  1. Copy the full SHA
    070a0b5 View commit details
  2. Copy the full SHA
    c3eee36 View commit details
Showing with 98 additions and 41 deletions.
  1. +2 −0 README.md
  2. +13 −8 examples/smoltcpserver.rs
  3. +83 −33 src/socket/tcp.rs
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -109,6 +109,8 @@ It responds to:
* UDP packets on port 6969 (`socat stdio udp4-connect:192.168.69.1:6969 <<<"abcdefg"`),
* TCP packets on port 6969 (`socat stdio tcp4-connect:192.168.69.1:6969 <<<"abcdefg"`),

The buffers are only 64 bytes long, for convenience of testing resource exhaustion conditions.

License
-------

21 changes: 13 additions & 8 deletions examples/smoltcpserver.rs
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ extern crate log;
extern crate env_logger;
extern crate smoltcp;

use std::str;
use std::env;
use std::time::Instant;
use log::{LogLevelFilter, LogRecord};
@@ -49,12 +50,12 @@ fn main() {

let endpoint = IpEndpoint::new(IpAddress::default(), 6969);

let udp_rx_buffer = UdpSocketBuffer::new(vec![UdpPacketBuffer::new(vec![0; 2048])]);
let udp_tx_buffer = UdpSocketBuffer::new(vec![UdpPacketBuffer::new(vec![0; 2048])]);
let udp_rx_buffer = UdpSocketBuffer::new(vec![UdpPacketBuffer::new(vec![0; 64])]);
let udp_tx_buffer = UdpSocketBuffer::new(vec![UdpPacketBuffer::new(vec![0; 64])]);
let udp_socket = UdpSocket::new(endpoint, udp_rx_buffer, udp_tx_buffer);

let tcp_rx_buffer = TcpSocketBuffer::new(vec![0; 8192]);
let tcp_tx_buffer = TcpSocketBuffer::new(vec![0; 8192]);
let tcp_rx_buffer = TcpSocketBuffer::new(vec![0; 64]);
let tcp_tx_buffer = TcpSocketBuffer::new(vec![0; 64]);
let mut tcp_socket = TcpSocket::new(tcp_rx_buffer, tcp_tx_buffer);
(tcp_socket.as_socket() : &mut TcpSocket).listen(endpoint);

@@ -74,7 +75,8 @@ fn main() {
let socket: &mut UdpSocket = iface.sockets()[0].as_socket();
let client = match socket.recv() {
Ok((endpoint, data)) => {
debug!("udp recv data: {:?} from {}", data, endpoint);
debug!("udp recv data: {} from {}",
str::from_utf8(data.as_ref()).unwrap(), endpoint);
Some(endpoint)
}
Err(Error::Exhausted) => {
@@ -87,7 +89,8 @@ fn main() {
};
if let Some(endpoint) = client {
let data = b"yo dawg";
debug!("udp send data: {:?}", data);
debug!("udp send data: {}",
str::from_utf8(data.as_ref()).unwrap());
socket.send_slice(endpoint, data).unwrap()
}
}
@@ -97,15 +100,17 @@ fn main() {
let data = {
let mut data = socket.recv(128).to_owned();
if data.len() > 0 {
debug!("tcp recv data: {:?}", &data[..]);
debug!("tcp recv data: {}",
str::from_utf8(data.as_ref()).unwrap());
data = data.split(|&b| b == b'\n').next().unwrap().to_owned();
data.reverse();
data.extend(b"\n");
}
data
};
if data.len() > 0 {
debug!("tcp send data: {:?}", &data[..]);
debug!("tcp send data: {}",
str::from_utf8(data.as_ref()).unwrap());
socket.send_slice(&data[..]);
}
}
116 changes: 83 additions & 33 deletions src/socket/tcp.rs
Original file line number Diff line number Diff line change
@@ -68,8 +68,8 @@ impl<'a> SocketBuffer<'a> {
dest.copy_from_slice(data);
}

fn clamp_reader(&self, mut size: usize) -> (usize, usize) {
let read_at = self.read_at;
fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
let read_at = (self.read_at + offset) % self.storage.len();
// We can't dequeue more than was queued.
if size > self.length { size = self.length }
// We can't contiguously dequeue past the end of the storage.
@@ -79,23 +79,24 @@ impl<'a> SocketBuffer<'a> {
(read_at, size)
}

fn peek(&self, size: usize) -> &[u8] {
let (read_at, size) = self.clamp_reader(size);
#[allow(dead_code)] // only used in tests
fn dequeue(&mut self, size: usize) -> &[u8] {
let (read_at, size) = self.clamp_reader(0, size);
self.read_at = (self.read_at + size) % self.storage.len();
self.length -= size;
&self.storage[read_at..read_at + size]
}

fn advance(&mut self, size: usize) {
let (read_at, size) = self.clamp_reader(size);
self.read_at = (read_at + size) % self.storage.len();
self.length -= size;
fn peek(&self, offset: usize, size: usize) -> &[u8] {
if offset > self.length { panic!("peeking {} octets past free space", offset) }
let (read_at, size) = self.clamp_reader(offset, size);
&self.storage[read_at..read_at + size]
}

#[allow(dead_code)] // only used in tests
fn dequeue(&mut self, size: usize) -> &[u8] {
let (read_at, size) = self.clamp_reader(size);
fn advance(&mut self, size: usize) {
if size > self.length { panic!("advancing {} octets into free space", size) }
self.read_at = (self.read_at + size) % self.storage.len();
self.length -= size;
&self.storage[read_at..read_at + size]
}
}

@@ -162,13 +163,33 @@ impl Retransmit {
/// A Transmission Control Protocol data stream.
#[derive(Debug)]
pub struct TcpSocket<'a> {
/// State of the socket.
state: State,
/// Address passed to `listen()`. `listen_address` is set when `listen()` is called and
/// used every time the socket is reset back to the `LISTEN` state.
listen_address: IpAddress,
/// Current local endpoint. This is used for both filtering the incoming packets and
/// setting the source address. When listening or initiating connection on/from
/// an unspecified address, this field is updated with the chosen source address before
/// any packets are sent.
local_endpoint: IpEndpoint,
/// Current remote endpoint. This is used for both filtering the incoming packets and
/// setting the destination address.
remote_endpoint: IpEndpoint,
/// The sequence number corresponding to the beginning of the transmit buffer.
/// I.e. an ACK(local_seq_no+n) packet removes n bytes from the transmit buffer.
local_seq_no: i32,
/// The sequence number corresponding to the beginning of the receive buffer.
/// I.e. userspace reading n bytes adds n to remote_seq_no.
remote_seq_no: i32,
/// The last sequence number sent.
/// I.e. in an idle socket, local_seq_no+tx_buffer.len().
remote_last_seq: i32,
/// The last acknowledgement number sent.
/// I.e. in an idle socket, remote_seq_no+rx_buffer.len().
remote_last_ack: i32,
/// The speculative remote window size.
/// I.e. the actual remote window size minus the count of in-flight octets.
remote_win_len: usize,
retransmit: Retransmit,
rx_buffer: SocketBuffer<'a>,
@@ -192,8 +213,9 @@ impl<'a> TcpSocket<'a> {
remote_endpoint: IpEndpoint::default(),
local_seq_no: 0,
remote_seq_no: 0,
remote_win_len: 0,
remote_last_seq: 0,
remote_last_ack: 0,
remote_win_len: 0,
retransmit: Retransmit::new(),
tx_buffer: tx_buffer.into(),
rx_buffer: rx_buffer.into()
@@ -252,7 +274,7 @@ impl<'a> TcpSocket<'a> {
pub fn send(&mut self, size: usize) -> &mut [u8] {
let buffer = self.tx_buffer.enqueue(size);
if buffer.len() > 0 {
net_trace!("tcp:{}:{}: buffer to send {} octets",
net_trace!("tcp:{}:{}: tx buffer: enqueueing {} octets",
self.local_endpoint, self.remote_endpoint, buffer.len());
}
buffer
@@ -266,6 +288,7 @@ impl<'a> TcpSocket<'a> {
/// See also [send](#method.send).
pub fn send_slice(&mut self, data: &[u8]) -> usize {
let buffer = self.send(data.len());
let data = &data[..buffer.len()];
buffer.copy_from_slice(data);
buffer.len()
}
@@ -279,7 +302,7 @@ impl<'a> TcpSocket<'a> {
let buffer = self.rx_buffer.dequeue(size);
self.remote_seq_no += buffer.len() as i32;
if buffer.len() > 0 {
net_trace!("tcp:{}:{}: receive {} buffered octets",
net_trace!("tcp:{}:{}: rx buffer: dequeueing {} octets",
self.local_endpoint, self.remote_endpoint, buffer.len());
}
buffer
@@ -293,7 +316,8 @@ impl<'a> TcpSocket<'a> {
/// See also [recv](#method.recv).
pub fn recv_slice(&mut self, data: &mut [u8]) -> usize {
let buffer = self.recv(data.len());
data[..buffer.len()].copy_from_slice(buffer);
let data = &mut data[..buffer.len()];
data.copy_from_slice(buffer);
buffer.len()
}

@@ -429,6 +453,7 @@ impl<'a> TcpSocket<'a> {

// SYN|ACK packets in the SYN_RECEIVED state change it to ESTABLISHED.
(State::SynReceived, TcpRepr { control: TcpControl::None, .. }) => {
self.remote_last_seq = self.local_seq_no + 1;
self.set_state(State::Established);
self.retransmit.reset()
}
@@ -447,12 +472,8 @@ impl<'a> TcpSocket<'a> {
if let Some(ack_number) = repr.ack_number {
let control_len =
if old_state == State::SynReceived { 1 } else { 0 };
if control_len > 0 {
net_trace!("tcp:{}:{}: ACK for a control flag",
self.local_endpoint, self.remote_endpoint);
}
if ack_number - self.local_seq_no - control_len > 0 {
net_trace!("tcp:{}:{}: ACK for {} octets",
net_trace!("tcp:{}:{}: tx buffer: dequeueing {} octets",
self.local_endpoint, self.remote_endpoint,
ack_number - self.local_seq_no - control_len);
}
@@ -462,7 +483,7 @@ impl<'a> TcpSocket<'a> {

// Enqueue payload octets, which is guaranteed to be in order, unless we already did.
if repr.payload.len() > 0 {
net_trace!("tcp:{}:{}: receiving {} octets",
net_trace!("tcp:{}:{}: rx buffer: enqueueing {} octets",
self.local_endpoint, self.remote_endpoint, repr.payload.len());
self.rx_buffer.enqueue_slice(repr.payload)
}
@@ -502,24 +523,36 @@ impl<'a> TcpSocket<'a> {
repr.control = TcpControl::Syn;
net_trace!("tcp:{}:{}: sending SYN|ACK",
self.local_endpoint, self.remote_endpoint);
self.remote_last_ack = self.remote_seq_no;
}

State::Established => {
if self.tx_buffer.len() > 0 && self.remote_win_len > 0 {
if !self.retransmit.check() { return Err(Error::Exhausted) }
// See if we should send data to the remote end because:
// 1. the retransmit timer has expired, or...
let mut may_send = self.retransmit.check();
// 2. we've got new data in the transmit buffer.
let remote_next_seq = self.local_seq_no + self.tx_buffer.len() as i32;
if self.remote_last_seq != remote_next_seq {
may_send = true;
}

if self.tx_buffer.len() > 0 && self.remote_win_len > 0 && may_send {
// We can send something, so let's do that.
let offset = self.remote_last_seq - self.local_seq_no;
let mut size = self.remote_win_len;
// Clamp to MSS. Currently we only support the default MSS value.
if size > 536 { size = 536 }
// Extract data from the buffer. This may return less than what we want,
// in case it's not possible to extract a contiguous slice.
let data = self.tx_buffer.peek(size);

net_trace!("tcp:{}:{}: sending {} octets",
let data = self.tx_buffer.peek(offset as usize, size);
// Send the extracted data.
net_trace!("tcp:{}:{}: tx buffer: peeking at {} octets",
self.local_endpoint, self.remote_endpoint, data.len());
repr.payload = data;
// Speculatively shrink the remote window. This will get updated the next
// time we receive a packet.
self.remote_win_len -= data.len();
// Advance the in-flight sequence number.
self.remote_last_seq += data.len() as i32;
} else if self.remote_last_ack != ack_number {
// We don't have anything to send, or can't because the remote end does not
// have any space to accept it, but we haven't yet acknowledged everything
@@ -604,7 +637,7 @@ mod test {
src_port: LOCAL_PORT, dst_port: REMOTE_PORT,
control: TcpControl::None,
seq_number: 0, ack_number: Some(0),
window_len: 128, payload: &[]
window_len: 64, payload: &[]
};

fn send(socket: &mut TcpSocket, repr: &TcpRepr) -> Result<(), Error> {
@@ -684,8 +717,8 @@ mod test {
fn socket() -> TcpSocket<'static> {
init_logger();

let rx_buffer = SocketBuffer::new(vec![0; 128]);
let tx_buffer = SocketBuffer::new(vec![0; 128]);
let rx_buffer = SocketBuffer::new(vec![0; 64]);
let tx_buffer = SocketBuffer::new(vec![0; 64]);
match TcpSocket::new(rx_buffer, tx_buffer) {
Socket::Tcp(socket) => socket,
_ => unreachable!()
@@ -824,12 +857,14 @@ mod test {
s.remote_endpoint = REMOTE_END;
s.local_seq_no = LOCAL_SEQ + 1;
s.remote_seq_no = REMOTE_SEQ + 1;
s.remote_last_seq = LOCAL_SEQ + 1;
s.remote_last_ack = REMOTE_SEQ + 1;
s.remote_win_len = 128;
s
}

#[test]
fn test_established_receive() {
fn test_established_recv() {
let mut s = socket_established();
send!(s, [TcpRepr {
seq_number: REMOTE_SEQ + 1,
@@ -840,7 +875,7 @@ mod test {
recv!(s, [TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + 6),
window_len: 122,
window_len: 58,
..RECV_TEMPL
}]);
assert_eq!(s.rx_buffer.dequeue(6), &b"abcdef"[..]);
@@ -849,6 +884,7 @@ mod test {
#[test]
fn test_established_send() {
let mut s = socket_established();
// First roundtrip after establishing.
s.tx_buffer.enqueue_slice(b"abcdef");
recv!(s, [TcpRepr {
seq_number: LOCAL_SEQ + 1,
@@ -863,6 +899,20 @@ mod test {
..SEND_TEMPL
}]);
assert_eq!(s.tx_buffer.len(), 0);
// Second roundtrip.
s.tx_buffer.enqueue_slice(b"foobar");
recv!(s, [TcpRepr {
seq_number: LOCAL_SEQ + 1 + 6,
ack_number: Some(REMOTE_SEQ + 1),
payload: &b"foobar"[..],
..RECV_TEMPL
}]);
send!(s, [TcpRepr {
seq_number: REMOTE_SEQ + 1,
ack_number: Some(LOCAL_SEQ + 1 + 6 + 6),
..SEND_TEMPL
}]);
assert_eq!(s.tx_buffer.len(), 0);
}

#[test]