Skip to content

Commit c73721a

Browse files
committedSep 24, 2017
Completely redo the logic of TCP socket polling.
The previous implementation made no sense. It is obvious that poll_at() should use the same mechanisms to decide whether dispatch() should be called as dispatch() itself uses to decide whether to send anything. This fixes numerous busy looping issues that arise if the return value of poll() is used for waiting.
1 parent 1a81029 commit c73721a

File tree

1 file changed

+114
-53
lines changed

1 file changed

+114
-53
lines changed
 

Diff for: ‎src/socket/tcp.rs

+114-53
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,7 @@ impl<'a> TcpSocket<'a> {
996996
self.local_endpoint = IpEndpoint::new(ip_repr.dst_addr(), repr.dst_port);
997997
self.remote_seq_no = repr.seq_number + 1;
998998
self.remote_last_seq = self.local_seq_no + 1;
999+
self.remote_last_ack = Some(repr.seq_number);
9991000
if let Some(max_seg_size) = repr.max_seg_size {
10001001
self.remote_mss = max_seg_size as usize;
10011002
}
@@ -1063,6 +1064,7 @@ impl<'a> TcpSocket<'a> {
10631064
(State::LastAck, TcpControl::None) => {
10641065
// Clear the remote endpoint, or we'll send an RST there.
10651066
self.set_state(State::Closed);
1067+
self.local_endpoint = IpEndpoint::default();
10661068
self.remote_endpoint = IpEndpoint::default();
10671069
}
10681070

@@ -1154,18 +1156,35 @@ impl<'a> TcpSocket<'a> {
11541156
}
11551157
}
11561158

1157-
fn seq_to_transmit(&self, control: TcpControl) -> bool {
1158-
self.remote_last_seq < self.local_seq_no + self.tx_buffer.len() + control.len()
1159+
fn seq_to_transmit(&self) -> bool {
1160+
let control;
1161+
match self.state {
1162+
State::SynSent | State::SynReceived =>
1163+
control = TcpControl::Syn,
1164+
State::FinWait1 | State::LastAck =>
1165+
control = TcpControl::Fin,
1166+
_ => control = TcpControl::None
1167+
}
1168+
1169+
if self.remote_win_len > 0 {
1170+
self.remote_last_seq < self.local_seq_no + self.tx_buffer.len() + control.len()
1171+
} else {
1172+
false
1173+
}
11591174
}
11601175

11611176
fn ack_to_transmit(&self) -> bool {
11621177
if let Some(remote_last_ack) = self.remote_last_ack {
11631178
remote_last_ack < self.remote_seq_no + self.rx_buffer.len()
11641179
} else {
1165-
true
1180+
false
11661181
}
11671182
}
11681183

1184+
fn window_to_update(&self) -> bool {
1185+
self.rx_buffer.window() as u16 > self.remote_last_win
1186+
}
1187+
11691188
pub(crate) fn dispatch<F>(&mut self, timestamp: u64, limits: &DeviceLimits,
11701189
emit: F) -> Result<()>
11711190
where F: FnOnce((IpRepr, TcpRepr)) -> Result<()> {
@@ -1182,12 +1201,13 @@ impl<'a> TcpSocket<'a> {
11821201
self.remote_last_ts = Some(timestamp);
11831202
}
11841203

1204+
// Check if any state needs to be changed because of a timer.
11851205
if self.timed_out(timestamp) {
11861206
// If a timeout expires, we should abort the connection.
11871207
net_debug!("[{}]{}:{}: timeout exceeded",
11881208
self.debug_id, self.local_endpoint, self.remote_endpoint);
11891209
self.set_state(State::Closed);
1190-
} else if !self.seq_to_transmit(TcpControl::None) {
1210+
} else if !self.seq_to_transmit() {
11911211
if let Some(retransmit_delta) = self.timer.should_retransmit(timestamp) {
11921212
// If a retransmit timer expired, we should resend data starting at the last ACK.
11931213
net_debug!("[{}]{}:{}: retransmitting at t+{}ms",
@@ -1197,6 +1217,25 @@ impl<'a> TcpSocket<'a> {
11971217
}
11981218
}
11991219

1220+
// Decide whether we're sending a packet.
1221+
if self.seq_to_transmit() {
1222+
// If we have data to transmit and it fits into partner's window, do it.
1223+
} else if self.ack_to_transmit() {
1224+
// If we have data to acknowledge, do it.
1225+
} else if self.window_to_update() {
1226+
// If we have window length increase to advertise, do it.
1227+
} else if self.state == State::Closed {
1228+
// If we need to abort the connection, do it.
1229+
} else if self.timer.should_retransmit(timestamp).is_some() {
1230+
// If we have packets to retransmit, do it.
1231+
} else if self.timer.should_keep_alive(timestamp) {
1232+
// If we need to transmit a keep-alive packet, do it.
1233+
} else if self.timer.should_close(timestamp) {
1234+
// If we have spent enough time in the TIME-WAIT state, close the socket.
1235+
} else {
1236+
return Err(Error::Exhausted)
1237+
}
1238+
12001239
// Construct the lowered IP representation.
12011240
// We might need this to calculate the MSS, so do it early.
12021241
let mut ip_repr = IpRepr::Unspecified {
@@ -1279,22 +1318,6 @@ impl<'a> TcpSocket<'a> {
12791318
}
12801319
}
12811320

1282-
if self.seq_to_transmit(repr.control) && repr.segment_len() > 0 {
1283-
// If we have data to transmit and it fits into partner's window, do it.
1284-
} else if self.ack_to_transmit() && repr.ack_number.is_some() {
1285-
// If we have data to acknowledge, do it.
1286-
} else if repr.window_len > self.remote_last_win {
1287-
// If we have window length increase to advertise, do it.
1288-
} else if self.timer.should_retransmit(timestamp).is_some() {
1289-
// If we have packets to retransmit, do it.
1290-
} else if self.timer.should_keep_alive(timestamp) {
1291-
// If we need to transmit a keep-alive packet, do it.
1292-
} else if repr.control == TcpControl::Rst {
1293-
// If we need to abort the connection, do it.
1294-
} else {
1295-
return Err(Error::Exhausted)
1296-
}
1297-
12981321
if repr.payload.len() > 0 {
12991322
net_trace!("[{}]{}:{}: tx buffer: reading {} octets at offset {}",
13001323
self.debug_id, self.local_endpoint, self.remote_endpoint,
@@ -1315,6 +1338,10 @@ impl<'a> TcpSocket<'a> {
13151338
flags);
13161339
}
13171340

1341+
// There might be more than one reason to send a packet. E.g. the keep-alive timer
1342+
// has expired, and we also have data in transmit buffer. Since any packet that occupies
1343+
// sequence space will elicit an ACK, we only need to send an explicit packet if we
1344+
// couldn't fill the sequence space with anything.
13181345
let is_keep_alive;
13191346
if self.timer.should_keep_alive(timestamp) && repr.is_empty() {
13201347
net_trace!("[{}]{}:{}: sending a keep-alive",
@@ -1327,72 +1354,83 @@ impl<'a> TcpSocket<'a> {
13271354
}
13281355

13291356
if repr.control == TcpControl::Syn {
1330-
// See RFC 6691 for an explanation of this calculation.
1357+
// Fill the MSS option. See RFC 6691 for an explanation of this calculation.
13311358
let mut max_segment_size = limits.max_transmission_unit;
13321359
max_segment_size -= ip_repr.buffer_len();
13331360
max_segment_size -= repr.header_len();
13341361
repr.max_seg_size = Some(max_segment_size as u16);
13351362
}
13361363

1364+
// Actually send the packet. If this succeeds, it means the packet is in
1365+
// the device buffer, and its transmission is imminent. If not, we might have
1366+
// a number of problems, e.g. we need neighbor discovery.
1367+
//
1368+
// Bailing out if the packet isn't placed in the device buffer allows us
1369+
// to not waste time waiting for the retransmit timer on packets that we know
1370+
// for sure will not be successfully transmitted.
13371371
ip_repr.set_payload_len(repr.buffer_len());
13381372
emit((ip_repr, repr))?;
13391373

13401374
// We've sent something, whether useful data or a keep-alive packet, so rewind
13411375
// the keep-alive timer.
13421376
self.timer.rewind_keep_alive(timestamp, self.keep_alive);
13431377

1344-
// Leave the rest of the state intact if sending a keep-alive packet.
1378+
// Leave the rest of the state intact if sending a keep-alive packet, since those
1379+
// carry a fake segment.
13451380
if is_keep_alive { return Ok(()) }
13461381

13471382
// We've sent a packet successfully, so we can update the internal state now.
13481383
self.remote_last_seq = repr.seq_number + repr.segment_len();
13491384
self.remote_last_ack = repr.ack_number;
13501385
self.remote_last_win = repr.window_len;
13511386

1352-
if !self.seq_to_transmit(repr.control) && repr.segment_len() > 0 {
1387+
if !self.seq_to_transmit() && repr.segment_len() > 0 {
13531388
// If we've transmitted all data we could (and there was something at all,
13541389
// data or flag, to transmit, not just an ACK), wind up the retransmit timer.
13551390
self.timer.set_for_retransmit(timestamp);
13561391
}
13571392

1358-
if repr.control == TcpControl::Rst {
1359-
// When aborting a connection, forget about it after sending
1360-
// the RST packet once.
1393+
if self.state == State::Closed {
1394+
// When aborting a connection, forget about it after sending a single RST packet.
13611395
self.local_endpoint = IpEndpoint::default();
13621396
self.remote_endpoint = IpEndpoint::default();
13631397
}
13641398

13651399
Ok(())
13661400
}
13671401

1368-
fn has_data_or_fin_to_transmit(&self) -> bool {
1369-
match self.state {
1370-
State::FinWait1 | State::LastAck => true,
1371-
_ if !self.tx_buffer.is_empty() => true,
1372-
_ => false
1373-
}
1374-
}
1375-
13761402
pub(crate) fn poll_at(&self) -> Option<u64> {
1377-
let timeout_poll_at;
1378-
match (self.remote_last_ts, self.timeout) {
1379-
// If we're transmitting or retransmitting data, we need to poll at the moment
1380-
// when the timeout would expire.
1381-
(Some(remote_last_ts), Some(timeout)) if self.has_data_or_fin_to_transmit() =>
1382-
timeout_poll_at = Some(remote_last_ts + timeout),
1383-
// If we're transitioning from a long period of inactivity, and have a timeout set,
1384-
// request an invocation of dispatch(); that will update self.remote_last_ts.
1385-
(None, Some(_timeout)) =>
1386-
timeout_poll_at = Some(0),
1387-
// Otherwise we have no timeout.
1388-
(_, _) =>
1389-
timeout_poll_at = None
1390-
}
1403+
// The logic here mirrors the beginning of dispatch() closely.
1404+
if !self.remote_endpoint.is_specified() {
1405+
// No one to talk to, nothing to transmit.
1406+
None
1407+
} else if self.remote_last_ts.is_none() {
1408+
// Socket stopped being quiet recently, we need to acquire a timestamp.
1409+
Some(0)
1410+
} else if self.state == State::Closed {
1411+
// Socket was aborted, we have an RST packet to transmit.
1412+
Some(0)
1413+
} else if self.seq_to_transmit() || self.ack_to_transmit() || self.window_to_update() {
1414+
// We have a data or flag packet to transmit.
1415+
Some(0)
1416+
} else {
1417+
let timeout_poll_at;
1418+
match (self.remote_last_ts, self.timeout) {
1419+
// If we're transmitting or retransmitting data, we need to poll at the moment
1420+
// when the timeout would expire.
1421+
(Some(remote_last_ts), Some(timeout)) =>
1422+
timeout_poll_at = Some(remote_last_ts + timeout),
1423+
// Otherwise we have no timeout.
1424+
(_, _) =>
1425+
timeout_poll_at = None
1426+
}
13911427

1392-
[self.timer.poll_at(), timeout_poll_at]
1393-
.iter()
1394-
.filter_map(|x| *x)
1395-
.min()
1428+
// We wait for the earliest of our timers to fire.
1429+
[self.timer.poll_at(), timeout_poll_at]
1430+
.iter()
1431+
.filter_map(|x| *x)
1432+
.min()
1433+
}
13961434
}
13971435
}
13981436

@@ -3114,6 +3152,13 @@ mod test {
31143152
// Tests for timeouts.
31153153
// =========================================================================================//
31163154

3155+
#[test]
3156+
fn test_listen_timeout() {
3157+
let mut s = socket_listen();
3158+
s.set_timeout(Some(100));
3159+
assert_eq!(s.poll_at(), None);
3160+
}
3161+
31173162
#[test]
31183163
fn test_connect_timeout() {
31193164
let mut s = socket();
@@ -3143,7 +3188,7 @@ mod test {
31433188
let mut s = socket_established();
31443189
s.set_timeout(Some(200));
31453190
recv!(s, time 250, Err(Error::Exhausted));
3146-
assert_eq!(s.poll_at(), None);
3191+
assert_eq!(s.poll_at(), Some(450));
31473192
s.send_slice(b"abcdef").unwrap();
31483193
assert_eq!(s.poll_at(), Some(0));
31493194
recv!(s, time 255, Ok(TcpRepr {
@@ -3247,6 +3292,22 @@ mod test {
32473292
assert_eq!(s.state, State::Closed);
32483293
}
32493294

3295+
#[test]
3296+
fn test_closed_timeout() {
3297+
let mut s = socket_established();
3298+
s.set_timeout(Some(200));
3299+
s.remote_last_ts = Some(100);
3300+
s.abort();
3301+
assert_eq!(s.poll_at(), Some(0));
3302+
recv!(s, time 100, Ok(TcpRepr {
3303+
control: TcpControl::Rst,
3304+
seq_number: LOCAL_SEQ + 1,
3305+
ack_number: Some(REMOTE_SEQ + 1),
3306+
..RECV_TEMPL
3307+
}));
3308+
assert_eq!(s.poll_at(), None);
3309+
}
3310+
32503311
// =========================================================================================//
32513312
// Tests for keep-alive.
32523313
// =========================================================================================//

0 commit comments

Comments
 (0)
Please sign in to comment.