Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: smoltcp-rs/smoltcp
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 614323b9a9f1
Choose a base ref
...
head repository: smoltcp-rs/smoltcp
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: cd82aa62fe45
Choose a head ref
  • 2 commits
  • 1 file changed
  • 1 contributor

Commits on Aug 30, 2017

  1. test

    whitequark committed Aug 30, 2017

    Verified

    This commit was signed with the committer’s verified signature.
    lforst Luca Forstner
    Copy the full SHA
    3f69e40 View commit details
  2. Rework TCP retransmit logic to be much more robust.

    Before this commit, if the amount of data in the buffer caused it
    to be split among many outgoing packets, and retransmit timer
    was active, the socket would behave very erratically and flood
    the peer.
    whitequark committed Aug 30, 2017
    Copy the full SHA
    cd82aa6 View commit details
Showing with 94 additions and 44 deletions.
  1. +94 −44 src/socket/tcp.rs
138 changes: 94 additions & 44 deletions src/socket/tcp.rs
Original file line number Diff line number Diff line change
@@ -176,10 +176,6 @@ const RETRANSMIT_DELAY: u64 = 100;
const CLOSE_DELAY: u64 = 10_000;

impl Timer {
fn is_idle(&self) -> bool {
*self == Timer::Idle
}

fn should_retransmit(&self, timestamp: u64) -> Option<u64> {
match *self {
Timer::Retransmit { expires_at, delay }
@@ -212,7 +208,7 @@ impl Timer {
*self = Timer::Idle
}

fn set_for_data(&mut self, timestamp: u64) {
fn set_for_retransmit(&mut self, timestamp: u64) {
match *self {
Timer::Idle => {
*self = Timer::Retransmit {
@@ -273,7 +269,7 @@ pub struct TcpSocket<'a> {
remote_seq_no: TcpSeqNumber,
/// The last sequence number sent.
/// I.e. in an idle socket, local_seq_no+tx_buffer.len().
remote_next_seq: TcpSeqNumber,
remote_last_seq: TcpSeqNumber,
/// The last acknowledgement number sent.
/// I.e. in an idle socket, remote_seq_no+rx_buffer.len().
remote_last_ack: TcpSeqNumber,
@@ -307,7 +303,7 @@ impl<'a> TcpSocket<'a> {
remote_endpoint: IpEndpoint::default(),
local_seq_no: TcpSeqNumber::default(),
remote_seq_no: TcpSeqNumber::default(),
remote_next_seq: TcpSeqNumber::default(),
remote_last_seq: TcpSeqNumber::default(),
remote_last_ack: TcpSeqNumber::default(),
remote_win_len: 0,
remote_mss: DEFAULT_MSS,
@@ -353,7 +349,7 @@ impl<'a> TcpSocket<'a> {
self.remote_endpoint = IpEndpoint::default();
self.local_seq_no = TcpSeqNumber::default();
self.remote_seq_no = TcpSeqNumber::default();
self.remote_next_seq = TcpSeqNumber::default();
self.remote_last_seq = TcpSeqNumber::default();
self.remote_last_ack = TcpSeqNumber::default();
self.remote_win_len = 0;
self.remote_mss = DEFAULT_MSS;
@@ -421,7 +417,7 @@ impl<'a> TcpSocket<'a> {
self.local_endpoint = local_endpoint;
self.remote_endpoint = remote_endpoint;
self.local_seq_no = local_seq_no;
self.remote_next_seq = local_seq_no;
self.remote_last_seq = local_seq_no;
self.set_state(State::SynSent);
Ok(())
}
@@ -742,7 +738,7 @@ impl<'a> TcpSocket<'a> {
// [...] an empty acknowledgment segment containing the current send-sequence number
// and an acknowledgment indicating the next sequence number expected
// to be received.
reply_repr.seq_number = self.remote_next_seq;
reply_repr.seq_number = self.remote_last_seq;
reply_repr.ack_number = Some(self.remote_last_ack);
reply_repr.window_len = self.rx_buffer.window() as u16;

@@ -944,8 +940,8 @@ impl<'a> TcpSocket<'a> {
self.remote_endpoint = IpEndpoint::new(ip_repr.src_addr(), repr.src_port);
// FIXME: use something more secure here
self.local_seq_no = TcpSeqNumber(-repr.seq_number.0);
self.remote_next_seq = self.local_seq_no;
self.remote_seq_no = repr.seq_number + 1;
self.remote_last_seq = self.local_seq_no;
if let Some(max_seg_size) = repr.max_seg_size {
self.remote_mss = max_seg_size as usize
}
@@ -973,8 +969,8 @@ impl<'a> TcpSocket<'a> {
net_trace!("[{}]{}:{}: received SYN|ACK",
self.debug_id, self.local_endpoint, self.remote_endpoint);
self.local_endpoint = IpEndpoint::new(ip_repr.dst_addr(), repr.dst_port);
self.remote_next_seq = self.local_seq_no + 1;
self.remote_seq_no = repr.seq_number + 1;
self.remote_last_seq = self.local_seq_no + 1;
self.remote_last_ack = repr.seq_number;
if let Some(max_seg_size) = repr.max_seg_size {
self.remote_mss = max_seg_size as usize;
@@ -1087,24 +1083,34 @@ impl<'a> TcpSocket<'a> {
}
}

fn seq_to_transmit(&self, control: TcpControl) -> bool {
self.remote_last_seq < self.local_seq_no + self.tx_buffer.len() + control.len()
}

fn ack_to_transmit(&self) -> bool {
self.remote_last_ack < self.remote_seq_no + self.rx_buffer.len()
}

pub(crate) fn dispatch<F>(&mut self, timestamp: u64, limits: &DeviceLimits,
emit: F) -> Result<()>
where F: FnOnce((IpRepr, TcpRepr)) -> Result<()> {
if !self.remote_endpoint.is_specified() { return Err(Error::Exhausted) }

if let Some(retransmit_delta) = self.timer.should_retransmit(timestamp) {
// If a retransmit timer expired, we should resend data starting at the last ACK.
net_debug!("[{}]{}:{}: retransmitting at t+{}ms",
self.debug_id, self.local_endpoint, self.remote_endpoint,
retransmit_delta);
self.remote_next_seq = self.local_seq_no;
if !self.seq_to_transmit(TcpControl::None) {
if let Some(retransmit_delta) = self.timer.should_retransmit(timestamp) {
// If a retransmit timer expired, we should resend data starting at the last ACK.
net_debug!("[{}]{}:{}: retransmitting at t+{}ms",
self.debug_id, self.local_endpoint, self.remote_endpoint,
retransmit_delta);
self.remote_last_seq = self.local_seq_no;
}
}

let mut repr = TcpRepr {
src_port: self.local_endpoint.port,
dst_port: self.remote_endpoint.port,
control: TcpControl::None,
seq_number: self.remote_next_seq,
seq_number: self.remote_last_seq,
ack_number: Some(self.remote_seq_no + self.rx_buffer.len()),
window_len: self.rx_buffer.window() as u16,
max_seg_size: None,
@@ -1136,7 +1142,7 @@ impl<'a> TcpSocket<'a> {
State::Established | State::FinWait1 | State::CloseWait | State::LastAck => {
// Extract as much data as the remote side can receive in this packet
// from the transmit buffer.
let offset = self.remote_next_seq - self.local_seq_no;
let offset = self.remote_last_seq - self.local_seq_no;
let size = cmp::min(self.remote_win_len, self.remote_mss);
repr.payload = self.tx_buffer.peek(offset, size);
// If we've sent everything we had in the buffer, follow it with the PSH or FIN
@@ -1171,14 +1177,14 @@ impl<'a> TcpSocket<'a> {
}
}

if self.timer.should_retransmit(timestamp).is_some() {
if self.seq_to_transmit(repr.control) && repr.segment_len() > 0 {
// If we have data to transmit and it fits into partner's window, do it.
} else if self.ack_to_transmit() {
// If we have data to acknowledge, do it.
} else if self.timer.should_retransmit(timestamp).is_some() {
// If we have packets to retransmit, do it.
} else if repr.segment_len() > 0 && self.timer.is_idle() {
// If we have something new to transmit, do it.
} else if repr.control == TcpControl::Rst {
// If we need to abort the connection, do it.
} else if self.remote_seq_no + self.rx_buffer.len() != self.remote_last_ack {
// If we have something to acknowledge, do it.
} else {
return Err(Error::Exhausted)
}
@@ -1197,7 +1203,7 @@ impl<'a> TcpSocket<'a> {
if repr.payload.len() > 0 {
net_trace!("[{}]{}:{}: tx buffer: peeking at {} octets (from {})",
self.debug_id, self.local_endpoint, self.remote_endpoint,
repr.payload.len(), self.remote_next_seq - self.local_seq_no);
repr.payload.len(), self.remote_last_seq - self.local_seq_no);
} else {
net_debug!("[{}]{}:{}: sending {}",
self.debug_id, self.local_endpoint, self.remote_endpoint,
@@ -1245,14 +1251,13 @@ impl<'a> TcpSocket<'a> {
emit((ip_repr, repr))?;

// We've sent a packet successfully, so we can update the internal state now.
self.remote_next_seq = repr.seq_number + repr.segment_len();
self.remote_last_seq = repr.seq_number + repr.segment_len();
self.remote_last_ack = repr.ack_number.unwrap_or_default();

if self.remote_next_seq - self.local_seq_no >= self.tx_buffer.len() &&
repr.segment_len() > 0 {
// If we've transmitted all we could (and there was something to transmit),
// wind up the retransmit timer.
self.timer.set_for_data(timestamp);
if !self.seq_to_transmit(repr.control) && repr.segment_len() > 0 {
// If we've transmitted all data could (and there was something at all,
// data or flag, to transmit, not just an ACK), wind up the retransmit timer.
self.timer.set_for_retransmit(timestamp);
}

if repr.control == TcpControl::Rst {
@@ -1324,11 +1329,11 @@ mod test {
fn test_timer_retransmit() {
let mut r = Timer::Idle;
assert_eq!(r.should_retransmit(1000), None);
r.set_for_data(1000);
r.set_for_retransmit(1000);
assert_eq!(r.should_retransmit(1000), None);
assert_eq!(r.should_retransmit(1050), None);
assert_eq!(r.should_retransmit(1101), Some(101));
r.set_for_data(1101);
r.set_for_retransmit(1101);
assert_eq!(r.should_retransmit(1101), None);
assert_eq!(r.should_retransmit(1150), None);
assert_eq!(r.should_retransmit(1200), None);
@@ -1442,7 +1447,7 @@ mod test {
assert_eq!(s1.remote_endpoint, s2.remote_endpoint, "remote_endpoint");
assert_eq!(s1.local_seq_no, s2.local_seq_no, "local_seq_no");
assert_eq!(s1.remote_seq_no, s2.remote_seq_no, "remote_seq_no");
assert_eq!(s1.remote_next_seq, s2.remote_next_seq, "remote_next_seq");
assert_eq!(s1.remote_last_seq, s2.remote_last_seq, "remote_last_seq");
assert_eq!(s1.remote_last_ack, s2.remote_last_ack, "remote_last_ack");
assert_eq!(s1.remote_win_len, s2.remote_win_len, "remote_win_len");
assert_eq!(s1.timer, s2.timer, "timer");
@@ -1599,7 +1604,7 @@ mod test {
s.remote_endpoint = REMOTE_END;
s.local_seq_no = LOCAL_SEQ;
s.remote_seq_no = REMOTE_SEQ + 1;
s.remote_next_seq = LOCAL_SEQ;
s.remote_last_seq = LOCAL_SEQ;
s.remote_win_len = 256;
s
}
@@ -1689,7 +1694,7 @@ mod test {
s.local_endpoint = IpEndpoint::new(IpAddress::v4(0, 0, 0, 0), LOCAL_PORT);
s.remote_endpoint = REMOTE_END;
s.local_seq_no = LOCAL_SEQ;
s.remote_next_seq = LOCAL_SEQ;
s.remote_last_seq = LOCAL_SEQ;
s
}

@@ -1841,7 +1846,7 @@ mod test {
let mut s = socket_syn_received();
s.state = State::Established;
s.local_seq_no = LOCAL_SEQ + 1;
s.remote_next_seq = LOCAL_SEQ + 1;
s.remote_last_seq = LOCAL_SEQ + 1;
s.remote_last_ack = REMOTE_SEQ + 1;
s
}
@@ -2146,7 +2151,7 @@ mod test {
let mut s = socket_fin_wait_1();
s.state = State::FinWait2;
s.local_seq_no = LOCAL_SEQ + 1 + 1;
s.remote_next_seq = LOCAL_SEQ + 1 + 1;
s.remote_last_seq = LOCAL_SEQ + 1 + 1;
s
}

@@ -2176,7 +2181,7 @@ mod test {
fn socket_closing() -> TcpSocket<'static> {
let mut s = socket_fin_wait_1();
s.state = State::Closing;
s.remote_next_seq = LOCAL_SEQ + 1 + 1;
s.remote_last_seq = LOCAL_SEQ + 1 + 1;
s.remote_seq_no = REMOTE_SEQ + 1 + 1;
s
}
@@ -2655,6 +2660,51 @@ mod test {
}));
}

#[test]
fn test_data_retransmit_bursts() {
let mut s = socket_established();
s.remote_win_len = 6;
s.send_slice(b"abcdef012345").unwrap();

recv!(s, time 0, Ok(TcpRepr {
control: TcpControl::None,
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1),
payload: &b"abcdef"[..],
..RECV_TEMPL
}), exact);
s.remote_win_len = 6;
recv!(s, time 0, Ok(TcpRepr {
control: TcpControl::Psh,
seq_number: LOCAL_SEQ + 1 + 6,
ack_number: Some(REMOTE_SEQ + 1),
payload: &b"012345"[..],
..RECV_TEMPL
}), exact);
s.remote_win_len = 6;
recv!(s, time 0, Err(Error::Exhausted));

recv!(s, time 50, Err(Error::Exhausted));

recv!(s, time 100, Ok(TcpRepr {
control: TcpControl::None,
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1),
payload: &b"abcdef"[..],
..RECV_TEMPL
}), exact);
s.remote_win_len = 6;
recv!(s, time 150, Ok(TcpRepr {
control: TcpControl::Psh,
seq_number: LOCAL_SEQ + 1 + 6,
ack_number: Some(REMOTE_SEQ + 1),
payload: &b"012345"[..],
..RECV_TEMPL
}), exact);
s.remote_win_len = 6;
recv!(s, time 200, Err(Error::Exhausted));
}

#[test]
fn test_send_data_after_syn_ack_retransmit() {
let mut s = socket_syn_received();
@@ -2806,6 +2856,10 @@ mod test {
}));
}

// =========================================================================================//
// Tests for window management.
// =========================================================================================//

#[test]
fn test_maximum_segment_size() {
let mut s = socket_listen();
@@ -2839,10 +2893,6 @@ mod test {
}));
}

// =========================================================================================//
// Tests for window management.
// =========================================================================================//

#[test]
fn test_window_size_clamp() {
let mut s = socket_established();