Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 78359e2

Browse files
committedSep 22, 2017
Implement reassembly of out-of-order TCP segments.
1 parent af51f2a commit 78359e2

File tree

3 files changed

+167
-107
lines changed

3 files changed

+167
-107
lines changed
 

Diff for: ‎README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ The TCP protocol is supported over IPv4. Server and client sockets are supported
5757
* Header checksum is generated and validated.
5858
* Maximum segment size is negotiated.
5959
* Multiple packets are transmitted without waiting for an acknowledgement.
60+
* Reassembly of out-of-order segments is supported, with no more than 4 missing sequence ranges.
6061
* Lost packets are retransmitted with exponential backoff, starting at a fixed delay of 100 ms.
6162
* Sending keep-alive packets is supported, with a configurable interval.
6263
* Connection, retransmission and keep-alive timeouts are supported, with a configurable duration.
6364
* After arriving at the TIME-WAIT state, sockets close after a fixed delay of 10 s.
6465
* Urgent pointer is **not** supported; any urgent octets will be received alongside data octets.
65-
* Reassembly of out-of-order segments is **not** supported.
6666
* Silly window syndrome avoidance is **not** supported for either transmission or reception.
6767
* Congestion control is **not** implemented.
6868
* Delayed acknowledgements are **not** implemented.

Diff for: ‎src/socket/tcp.rs

+111-69
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use {Error, Result};
77
use phy::DeviceLimits;
88
use wire::{IpProtocol, IpAddress, IpEndpoint, TcpSeqNumber, TcpRepr, TcpControl};
99
use socket::{Socket, IpRepr};
10-
use storage::RingBuffer;
10+
use storage::{Assembler, RingBuffer};
1111

1212
pub type SocketBuffer<'a> = RingBuffer<'a, u8>;
1313

@@ -171,6 +171,7 @@ pub struct TcpSocket<'a> {
171171
debug_id: usize,
172172
state: State,
173173
timer: Timer,
174+
assembler: Assembler,
174175
rx_buffer: SocketBuffer<'a>,
175176
tx_buffer: SocketBuffer<'a>,
176177
/// Interval after which, if no inbound packets are received, the connection is aborted.
@@ -219,7 +220,7 @@ impl<'a> TcpSocket<'a> {
219220
/// Create a socket using the given buffers.
220221
pub fn new<T>(rx_buffer: T, tx_buffer: T) -> Socket<'a, 'static>
221222
where T: Into<SocketBuffer<'a>> {
222-
let rx_buffer = rx_buffer.into();
223+
let (rx_buffer, tx_buffer) = (rx_buffer.into(), tx_buffer.into());
223224
if rx_buffer.capacity() > <u16>::max_value() as usize {
224225
panic!("buffers larger than {} require window scaling, which is not implemented",
225226
<u16>::max_value())
@@ -229,8 +230,9 @@ impl<'a> TcpSocket<'a> {
229230
debug_id: 0,
230231
state: State::Closed,
231232
timer: Timer::default(),
232-
tx_buffer: tx_buffer.into(),
233-
rx_buffer: rx_buffer.into(),
233+
assembler: Assembler::new(rx_buffer.capacity()),
234+
tx_buffer: tx_buffer,
235+
rx_buffer: rx_buffer,
234236
timeout: None,
235237
keep_alive: None,
236238
listen_address: IpAddress::default(),
@@ -332,6 +334,9 @@ impl<'a> TcpSocket<'a> {
332334
fn reset(&mut self) {
333335
self.state = State::Closed;
334336
self.timer = Timer::default();
337+
self.assembler = Assembler::new(self.rx_buffer.capacity());
338+
self.tx_buffer.clear();
339+
self.rx_buffer.clear();
335340
self.keep_alive = None;
336341
self.timeout = None;
337342
self.listen_address = IpAddress::default();
@@ -345,8 +350,6 @@ impl<'a> TcpSocket<'a> {
345350
self.remote_win_len = 0;
346351
self.remote_mss = DEFAULT_MSS;
347352
self.remote_last_ts = None;
348-
self.tx_buffer.clear();
349-
self.rx_buffer.clear();
350353
}
351354

352355
/// Start listening on the given endpoint.
@@ -860,13 +863,14 @@ impl<'a> TcpSocket<'a> {
860863
}
861864
}
862865

866+
let payload_offset;
863867
match self.state {
864868
// In LISTEN and SYN-SENT states, we have not yet synchronized with the remote end.
865-
State::Listen => (),
866-
State::SynSent => (),
869+
State::Listen | State::SynSent =>
870+
payload_offset = 0,
867871
// In all other states, segments must occupy a valid portion of the receive window.
868872
_ => {
869-
let mut send_challenge_ack = false;
873+
let mut segment_in_window = true;
870874

871875
let window_start = self.remote_seq_no + self.rx_buffer.len();
872876
let window_end = self.remote_seq_no + self.rx_buffer.capacity();
@@ -877,34 +881,22 @@ impl<'a> TcpSocket<'a> {
877881
net_debug!("[{}]{}:{}: non-zero-length segment with zero receive window, \
878882
will only send an ACK",
879883
self.debug_id, self.local_endpoint, self.remote_endpoint);
880-
send_challenge_ack = true;
884+
segment_in_window = false;
881885
}
882886

883-
if !((window_start <= segment_start && segment_start <= window_end) ||
887+
if !((window_start <= segment_start && segment_start <= window_end) &&
884888
(window_start <= segment_end && segment_end <= window_end)) {
885889
net_debug!("[{}]{}:{}: segment not in receive window \
886890
({}..{} not intersecting {}..{}), will send challenge ACK",
887891
self.debug_id, self.local_endpoint, self.remote_endpoint,
888892
segment_start, segment_end, window_start, window_end);
889-
send_challenge_ack = true;
890-
}
891-
892-
// For now, do not actually try to reassemble out-of-order segments.
893-
if segment_start != window_start {
894-
net_debug!("[{}]{}:{}: out-of-order SEQ ({} not equal to {}), \
895-
will send challenge ACK",
896-
self.debug_id, self.local_endpoint, self.remote_endpoint,
897-
segment_start, window_start);
898-
// Some segments between what we have last received and this segment
899-
// went missing. Send a duplicate ACK; RFC 793 does not specify the behavior
900-
// required when receiving a duplicate ACK, but in practice (see RFC 1122
901-
// section 4.2.2.21) most congestion control algorithms implement what's called
902-
// a "fast retransmit", where a threshold amount of duplicate ACKs triggers
903-
// retransmission.
904-
send_challenge_ack = true;
893+
segment_in_window = false;
905894
}
906895

907-
if send_challenge_ack {
896+
if segment_in_window {
897+
// We've checked that segment_start >= window_start above.
898+
payload_offset = (segment_start - window_start) as usize;
899+
} else {
908900
// If we're in the TIME-WAIT state, restart the TIME-WAIT timeout, since
909901
// the remote end may not have realized we've closed the connection.
910902
if self.state == State::TimeWait {
@@ -1087,28 +1079,70 @@ impl<'a> TcpSocket<'a> {
10871079

10881080
if ack_len > 0 {
10891081
// Dequeue acknowledged octets.
1082+
debug_assert!(self.tx_buffer.len() >= ack_len);
10901083
net_trace!("[{}]{}:{}: tx buffer: dequeueing {} octets (now {})",
10911084
self.debug_id, self.local_endpoint, self.remote_endpoint,
10921085
ack_len, self.tx_buffer.len() - ack_len);
1093-
let acked = self.tx_buffer.dequeue_many(ack_len);
1094-
debug_assert!(acked.len() == ack_len);
1086+
self.tx_buffer.dequeue_many(ack_len);
10951087
}
10961088

1097-
// We've processed everything in the incoming segment, so advance the local
1098-
// sequence number past it.
10991089
if let Some(ack_number) = repr.ack_number {
1090+
// We've processed everything in the incoming segment, so advance the local
1091+
// sequence number past it.
11001092
self.local_seq_no = ack_number;
11011093
}
11021094

1103-
if repr.payload.len() > 0 {
1104-
// Enqueue payload octets, which are guaranteed to be in order.
1095+
let payload_len = repr.payload.len();
1096+
if payload_len == 0 { return Ok(None) }
1097+
1098+
println!("before1 {}", self.assembler);
1099+
1100+
// Try adding payload octets to the assembler.
1101+
match self.assembler.add(payload_offset, payload_len) {
1102+
Ok(()) => {
1103+
debug_assert!(self.assembler.total_size() == self.rx_buffer.capacity());
1104+
println!("after1 {}", self.assembler);
1105+
1106+
// Place payload octets into the buffer.
1107+
net_trace!("[{}]{}:{}: rx buffer: writing {} octets at offset {}",
1108+
self.debug_id, self.local_endpoint, self.remote_endpoint,
1109+
payload_len, payload_offset);
1110+
self.rx_buffer.get_unallocated(payload_offset, payload_len)
1111+
.copy_from_slice(repr.payload);
1112+
}
1113+
Err(()) => {
1114+
net_debug!("[{}]{}:{}: assembler: too many holes to add {} octets at offset {}",
1115+
self.debug_id, self.local_endpoint, self.remote_endpoint,
1116+
payload_len, payload_offset);
1117+
return Err(Error::Dropped)
1118+
}
1119+
}
1120+
1121+
println!("before2 {}", self.assembler);
1122+
if let Some(contig_len) = self.assembler.remove_front() {
1123+
println!("after2 {}", self.assembler);
1124+
debug_assert!(self.assembler.total_size() == self.rx_buffer.capacity());
1125+
// Enqueue the contiguous data octets in front of the buffer.
11051126
net_trace!("[{}]{}:{}: rx buffer: enqueueing {} octets (now {})",
11061127
self.debug_id, self.local_endpoint, self.remote_endpoint,
1107-
repr.payload.len(), self.rx_buffer.len() + repr.payload.len());
1108-
self.rx_buffer.enqueue_slice(repr.payload);
1128+
contig_len, self.rx_buffer.len() + contig_len);
1129+
self.rx_buffer.enqueue_many(contig_len);
11091130
}
11101131

1111-
Ok(None)
1132+
if self.assembler.is_empty() {
1133+
Ok(None)
1134+
} else {
1135+
// If the assembler isn't empty, some segments at the start of our window got lost.
1136+
// Send a reply acknowledging the data we already have; RFC 793 does not specify
1137+
// the behavior triggerd by such a reply, but RFC 1122 section 4.2.2.21 states that
1138+
// most congestion control algorithms implement what's called a "fast retransmit",
1139+
// where a threshold amount of duplicate ACKs triggers retransmission without
1140+
// the need to wait for a timeout to expire.
1141+
net_trace!("[{}]{}:{}: assembler: {}",
1142+
self.debug_id, self.local_endpoint, self.remote_endpoint,
1143+
self.assembler);
1144+
Ok(Some(self.ack_reply(ip_repr, &repr)))
1145+
}
11121146
}
11131147

11141148
fn timed_out(&self, timestamp: u64) -> bool {
@@ -1262,7 +1296,7 @@ impl<'a> TcpSocket<'a> {
12621296
}
12631297

12641298
if repr.payload.len() > 0 {
1265-
net_trace!("[{}]{}:{}: tx buffer: peeking at {} octets (from {})",
1299+
net_trace!("[{}]{}:{}: tx buffer: reading {} octets at offset {}",
12661300
self.debug_id, self.local_endpoint, self.remote_endpoint,
12671301
repr.payload.len(), self.remote_last_seq - self.local_seq_no);
12681302
} else {
@@ -2685,34 +2719,6 @@ mod test {
26852719
})));
26862720
}
26872721

2688-
#[test]
2689-
fn test_missing_segment() {
2690-
let mut s = socket_established();
2691-
send!(s, TcpRepr {
2692-
seq_number: REMOTE_SEQ + 1,
2693-
ack_number: Some(LOCAL_SEQ + 1),
2694-
payload: &b"abcdef"[..],
2695-
..SEND_TEMPL
2696-
});
2697-
recv!(s, [TcpRepr {
2698-
seq_number: LOCAL_SEQ + 1,
2699-
ack_number: Some(REMOTE_SEQ + 1 + 6),
2700-
window_len: 58,
2701-
..RECV_TEMPL
2702-
}]);
2703-
send!(s, TcpRepr {
2704-
seq_number: REMOTE_SEQ + 1 + 6 + 6,
2705-
ack_number: Some(LOCAL_SEQ + 1),
2706-
payload: &b"mnopqr"[..],
2707-
..SEND_TEMPL
2708-
}, Ok(Some(TcpRepr {
2709-
seq_number: LOCAL_SEQ + 1,
2710-
ack_number: Some(REMOTE_SEQ + 1 + 6),
2711-
window_len: 58,
2712-
..RECV_TEMPL
2713-
})));
2714-
}
2715-
27162722
#[test]
27172723
fn test_data_retransmit() {
27182724
let mut s = socket_established();
@@ -3013,6 +3019,7 @@ mod test {
30133019
fn test_zero_window_ack() {
30143020
let mut s = socket_established();
30153021
s.rx_buffer = SocketBuffer::new(vec![0; 6]);
3022+
s.assembler = Assembler::new(s.rx_buffer.capacity());
30163023
send!(s, TcpRepr {
30173024
seq_number: REMOTE_SEQ + 1,
30183025
ack_number: Some(LOCAL_SEQ + 1),
@@ -3042,6 +3049,7 @@ mod test {
30423049
fn test_zero_window_ack_on_window_growth() {
30433050
let mut s = socket_established();
30443051
s.rx_buffer = SocketBuffer::new(vec![0; 6]);
3052+
s.assembler = Assembler::new(s.rx_buffer.capacity());
30453053
send!(s, TcpRepr {
30463054
seq_number: REMOTE_SEQ + 1,
30473055
ack_number: Some(LOCAL_SEQ + 1),
@@ -3096,7 +3104,7 @@ mod test {
30963104
}
30973105

30983106
// =========================================================================================//
3099-
// Tests for timeouts
3107+
// Tests for timeouts.
31003108
// =========================================================================================//
31013109

31023110
#[test]
@@ -3193,7 +3201,7 @@ mod test {
31933201
}
31943202

31953203
// =========================================================================================//
3196-
// Tests for keep-alive
3204+
// Tests for keep-alive.
31973205
// =========================================================================================//
31983206

31993207
#[test]
@@ -3258,13 +3266,47 @@ mod test {
32583266
}
32593267

32603268
// =========================================================================================//
3261-
// Tests for packet filtering
3269+
// Tests for reassembly.
3270+
// =========================================================================================//
3271+
3272+
#[test]
3273+
fn test_out_of_order() {
3274+
let mut s = socket_established();
3275+
send!(s, TcpRepr {
3276+
seq_number: REMOTE_SEQ + 1 + 3,
3277+
ack_number: Some(LOCAL_SEQ + 1),
3278+
payload: &b"def"[..],
3279+
..SEND_TEMPL
3280+
}, Ok(Some(TcpRepr {
3281+
seq_number: LOCAL_SEQ + 1,
3282+
ack_number: Some(REMOTE_SEQ + 1),
3283+
..RECV_TEMPL
3284+
})));
3285+
assert_eq!(s.recv(10), Ok(&b""[..]));
3286+
send!(s, TcpRepr {
3287+
seq_number: REMOTE_SEQ + 1,
3288+
ack_number: Some(LOCAL_SEQ + 1),
3289+
payload: &b"abcdef"[..],
3290+
..SEND_TEMPL
3291+
});
3292+
recv!(s, [TcpRepr {
3293+
seq_number: LOCAL_SEQ + 1,
3294+
ack_number: Some(REMOTE_SEQ + 1 + 6),
3295+
window_len: 58,
3296+
..RECV_TEMPL
3297+
}]);
3298+
assert_eq!(s.recv(10), Ok(&b"abcdef"[..]));
3299+
}
3300+
3301+
// =========================================================================================//
3302+
// Tests for packet filtering.
32623303
// =========================================================================================//
32633304

32643305
#[test]
32653306
fn test_doesnt_accept_wrong_port() {
32663307
let mut s = socket_established();
32673308
s.rx_buffer = SocketBuffer::new(vec![0; 6]);
3309+
s.assembler = Assembler::new(s.rx_buffer.capacity());
32683310

32693311
let tcp_repr = TcpRepr {
32703312
seq_number: REMOTE_SEQ + 1,

Diff for: ‎src/storage/assembler.rs

+55-37
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ use core::fmt;
33
/// A contiguous chunk of absent data, followed by a contiguous chunk of present data.
44
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55
struct Contig {
6-
hole_size: u32,
7-
data_size: u32
6+
hole_size: usize,
7+
data_size: usize
88
}
99

1010
impl fmt::Display for Contig {
1111
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1212
if self.has_hole() { write!(f, "({})", self.hole_size)?; }
13-
if self.has_data() { write!(f, " {}", self.data_size)?; }
13+
if self.has_hole() && self.has_data() { write!(f, " ")?; }
14+
if self.has_data() { write!(f, "{}", self.data_size)?; }
1415
Ok(())
1516
}
1617
}
@@ -20,11 +21,11 @@ impl Contig {
2021
Contig { hole_size: 0, data_size: 0 }
2122
}
2223

23-
fn hole(size: u32) -> Contig {
24+
fn hole(size: usize) -> Contig {
2425
Contig { hole_size: size, data_size: 0 }
2526
}
2627

27-
fn hole_and_data(hole_size: u32, data_size: u32) -> Contig {
28+
fn hole_and_data(hole_size: usize, data_size: usize) -> Contig {
2829
Contig { hole_size, data_size }
2930
}
3031

@@ -36,23 +37,23 @@ impl Contig {
3637
self.data_size != 0
3738
}
3839

39-
fn total_size(&self) -> u32 {
40+
fn total_size(&self) -> usize {
4041
self.hole_size + self.data_size
4142
}
4243

4344
fn is_empty(&self) -> bool {
4445
self.total_size() == 0
4546
}
4647

47-
fn expand_data_by(&mut self, size: u32) {
48+
fn expand_data_by(&mut self, size: usize) {
4849
self.data_size += size;
4950
}
5051

51-
fn shrink_hole_by(&mut self, size: u32) {
52+
fn shrink_hole_by(&mut self, size: usize) {
5253
self.hole_size -= size;
5354
}
5455

55-
fn shrink_hole_to(&mut self, size: u32) {
56+
fn shrink_hole_to(&mut self, size: usize) {
5657
assert!(self.hole_size >= size);
5758

5859
let total_size = self.total_size();
@@ -86,13 +87,13 @@ impl fmt::Display for Assembler {
8687

8788
impl Assembler {
8889
/// Create a new buffer assembler for buffers of the given size.
89-
pub fn new(size: u32) -> Assembler {
90+
pub fn new(size: usize) -> Assembler {
9091
let mut contigs = [Contig::empty(); CONTIG_COUNT];
9192
contigs[0] = Contig::hole(size);
9293
Assembler { contigs }
9394
}
9495

95-
pub(crate) fn total_size(&self) -> u32 {
96+
pub(crate) fn total_size(&self) -> usize {
9697
self.contigs
9798
.iter()
9899
.map(|contig| contig.total_size())
@@ -107,13 +108,20 @@ impl Assembler {
107108
self.contigs[self.contigs.len() - 1]
108109
}
109110

110-
/// Remove a contig at the given index, and return a pointer to the the first empty contig.
111+
/// Return whether the assembler contains no data.
112+
pub fn is_empty(&self) -> bool {
113+
!self.front().has_data()
114+
}
115+
116+
/// Remove a contig at the given index, and return a pointer to the first contig
117+
/// without data.
111118
fn remove_contig_at(&mut self, at: usize) -> &mut Contig {
112119
debug_assert!(!self.contigs[at].is_empty());
113120

114121
for i in at..self.contigs.len() - 1 {
115122
self.contigs[i] = self.contigs[i + 1];
116-
if self.contigs[i].is_empty() {
123+
if !self.contigs[i].has_data() {
124+
self.contigs[i + 1] = Contig::empty();
117125
return &mut self.contigs[i]
118126
}
119127
}
@@ -139,7 +147,7 @@ impl Assembler {
139147

140148
/// Add a new contiguous range to the assembler, and return `Ok(())`,
141149
/// or return `Err(())` if too many discontiguities are already recorded.
142-
pub fn add(&mut self, mut offset: u32, mut size: u32) -> Result<(), ()> {
150+
pub fn add(&mut self, mut offset: usize, mut size: usize) -> Result<(), ()> {
143151
let mut index = 0;
144152
while index != self.contigs.len() && size != 0 {
145153
let contig = self.contigs[index];
@@ -172,8 +180,8 @@ impl Assembler {
172180
// The range being added covers a part of the hole but not of the data
173181
// in this contig, add a new contig containing the range.
174182
self.contigs[index].shrink_hole_by(offset + size);
175-
let empty = self.add_contig_at(index)?;
176-
*empty = Contig::hole_and_data(offset, size);
183+
let inserted = self.add_contig_at(index)?;
184+
*inserted = Contig::hole_and_data(offset, size);
177185
index += 2;
178186
} else {
179187
unreachable!()
@@ -192,30 +200,20 @@ impl Assembler {
192200
Ok(())
193201
}
194202

195-
/// Return `Ok(size)` with the size of a contiguous range in the front of the assembler,
196-
/// or return `Err(())` if there is no such range.
197-
pub fn front_len(&self) -> u32 {
198-
let front = self.front();
199-
if front.has_hole() {
200-
0
201-
} else {
202-
debug_assert!(front.data_size > 0);
203-
front.data_size
204-
}
205-
}
206-
207-
/// Remove a contiguous range from the front of the assembler and `Ok(data_size)`,
208-
/// or return `Err(())` if there is no such range.
209-
pub fn front_remove(&mut self) -> u32 {
203+
/// Remove a contiguous range from the front of the assembler and `Some(data_size)`,
204+
/// or return `None` if there is no such range.
205+
pub fn remove_front(&mut self) -> Option<usize> {
210206
let front = self.front();
211207
if front.has_hole() {
212-
0
208+
None
213209
} else {
214-
let empty = self.remove_contig_at(0);
215-
*empty = Contig::hole(front.data_size);
210+
println!("{}", self);{
211+
let last_hole = self.remove_contig_at(0);
212+
last_hole.hole_size += front.data_size;
213+
}println!("{}", self);
216214

217215
debug_assert!(front.data_size > 0);
218-
front.data_size
216+
Some(front.data_size)
219217
}
220218
}
221219
}
@@ -225,8 +223,8 @@ mod test {
225223
use std::vec::Vec;
226224
use super::*;
227225

228-
impl From<Vec<(u32, u32)>> for Assembler {
229-
fn from(vec: Vec<(u32, u32)>) -> Assembler {
226+
impl From<Vec<(usize, usize)>> for Assembler {
227+
fn from(vec: Vec<(usize, usize)>) -> Assembler {
230228
let mut contigs = [Contig::empty(); CONTIG_COUNT];
231229
for (i, &(hole_size, data_size)) in vec.iter().enumerate() {
232230
contigs[i] = Contig { hole_size, data_size };
@@ -331,4 +329,24 @@ mod test {
331329
assert_eq!(assr.add(2, 12), Ok(()));
332330
assert_eq!(assr, contigs![(2, 12), (2, 0)]);
333331
}
332+
333+
#[test]
334+
fn test_empty_remove_front() {
335+
let mut assr = contigs![(12, 0)];
336+
assert_eq!(assr.remove_front(), None);
337+
}
338+
339+
#[test]
340+
fn test_trailing_hole_remove_front() {
341+
let mut assr = contigs![(0, 4), (8, 0)];
342+
assert_eq!(assr.remove_front(), Some(4));
343+
assert_eq!(assr, contigs![(12, 0)]);
344+
}
345+
346+
#[test]
347+
fn test_trailing_data_remove_front() {
348+
let mut assr = contigs![(0, 4), (4, 4)];
349+
assert_eq!(assr.remove_front(), Some(4));
350+
assert_eq!(assr, contigs![(4, 4), (4, 0)]);
351+
}
334352
}

0 commit comments

Comments
 (0)
Please sign in to comment.