@@ -7,7 +7,7 @@ use {Error, Result};
7
7
use phy:: DeviceLimits ;
8
8
use wire:: { IpProtocol , IpAddress , IpEndpoint , TcpSeqNumber , TcpRepr , TcpControl } ;
9
9
use socket:: { Socket , IpRepr } ;
10
- use storage:: RingBuffer ;
10
+ use storage:: { Assembler , RingBuffer } ;
11
11
12
12
pub type SocketBuffer < ' a > = RingBuffer < ' a , u8 > ;
13
13
@@ -171,6 +171,7 @@ pub struct TcpSocket<'a> {
171
171
debug_id : usize ,
172
172
state : State ,
173
173
timer : Timer ,
174
+ assembler : Assembler ,
174
175
rx_buffer : SocketBuffer < ' a > ,
175
176
tx_buffer : SocketBuffer < ' a > ,
176
177
/// Interval after which, if no inbound packets are received, the connection is aborted.
@@ -219,7 +220,7 @@ impl<'a> TcpSocket<'a> {
219
220
/// Create a socket using the given buffers.
220
221
pub fn new < T > ( rx_buffer : T , tx_buffer : T ) -> Socket < ' a , ' static >
221
222
where T : Into < SocketBuffer < ' a > > {
222
- let rx_buffer = rx_buffer. into ( ) ;
223
+ let ( rx_buffer, tx_buffer ) = ( rx_buffer. into ( ) , tx_buffer . into ( ) ) ;
223
224
if rx_buffer. capacity ( ) > <u16 >:: max_value ( ) as usize {
224
225
panic ! ( "buffers larger than {} require window scaling, which is not implemented" ,
225
226
<u16 >:: max_value( ) )
@@ -229,8 +230,9 @@ impl<'a> TcpSocket<'a> {
229
230
debug_id : 0 ,
230
231
state : State :: Closed ,
231
232
timer : Timer :: default ( ) ,
232
- tx_buffer : tx_buffer. into ( ) ,
233
- rx_buffer : rx_buffer. into ( ) ,
233
+ assembler : Assembler :: new ( rx_buffer. capacity ( ) ) ,
234
+ tx_buffer : tx_buffer,
235
+ rx_buffer : rx_buffer,
234
236
timeout : None ,
235
237
keep_alive : None ,
236
238
listen_address : IpAddress :: default ( ) ,
@@ -332,6 +334,9 @@ impl<'a> TcpSocket<'a> {
332
334
fn reset ( & mut self ) {
333
335
self . state = State :: Closed ;
334
336
self . timer = Timer :: default ( ) ;
337
+ self . assembler = Assembler :: new ( self . rx_buffer . capacity ( ) ) ;
338
+ self . tx_buffer . clear ( ) ;
339
+ self . rx_buffer . clear ( ) ;
335
340
self . keep_alive = None ;
336
341
self . timeout = None ;
337
342
self . listen_address = IpAddress :: default ( ) ;
@@ -345,8 +350,6 @@ impl<'a> TcpSocket<'a> {
345
350
self . remote_win_len = 0 ;
346
351
self . remote_mss = DEFAULT_MSS ;
347
352
self . remote_last_ts = None ;
348
- self . tx_buffer . clear ( ) ;
349
- self . rx_buffer . clear ( ) ;
350
353
}
351
354
352
355
/// Start listening on the given endpoint.
@@ -860,13 +863,14 @@ impl<'a> TcpSocket<'a> {
860
863
}
861
864
}
862
865
866
+ let payload_offset;
863
867
match self . state {
864
868
// In LISTEN and SYN-SENT states, we have not yet synchronized with the remote end.
865
- State :: Listen => ( ) ,
866
- State :: SynSent => ( ) ,
869
+ State :: Listen | State :: SynSent =>
870
+ payload_offset = 0 ,
867
871
// In all other states, segments must occupy a valid portion of the receive window.
868
872
_ => {
869
- let mut send_challenge_ack = false ;
873
+ let mut segment_in_window = true ;
870
874
871
875
let window_start = self . remote_seq_no + self . rx_buffer . len ( ) ;
872
876
let window_end = self . remote_seq_no + self . rx_buffer . capacity ( ) ;
@@ -877,34 +881,22 @@ impl<'a> TcpSocket<'a> {
877
881
net_debug ! ( "[{}]{}:{}: non-zero-length segment with zero receive window, \
878
882
will only send an ACK",
879
883
self . debug_id, self . local_endpoint, self . remote_endpoint) ;
880
- send_challenge_ack = true ;
884
+ segment_in_window = false ;
881
885
}
882
886
883
- if !( ( window_start <= segment_start && segment_start <= window_end) ||
887
+ if !( ( window_start <= segment_start && segment_start <= window_end) &&
884
888
( window_start <= segment_end && segment_end <= window_end) ) {
885
889
net_debug ! ( "[{}]{}:{}: segment not in receive window \
886
890
({}..{} not intersecting {}..{}), will send challenge ACK",
887
891
self . debug_id, self . local_endpoint, self . remote_endpoint,
888
892
segment_start, segment_end, window_start, window_end) ;
889
- send_challenge_ack = true ;
890
- }
891
-
892
- // For now, do not actually try to reassemble out-of-order segments.
893
- if segment_start != window_start {
894
- net_debug ! ( "[{}]{}:{}: out-of-order SEQ ({} not equal to {}), \
895
- will send challenge ACK",
896
- self . debug_id, self . local_endpoint, self . remote_endpoint,
897
- segment_start, window_start) ;
898
- // Some segments between what we have last received and this segment
899
- // went missing. Send a duplicate ACK; RFC 793 does not specify the behavior
900
- // required when receiving a duplicate ACK, but in practice (see RFC 1122
901
- // section 4.2.2.21) most congestion control algorithms implement what's called
902
- // a "fast retransmit", where a threshold amount of duplicate ACKs triggers
903
- // retransmission.
904
- send_challenge_ack = true ;
893
+ segment_in_window = false ;
905
894
}
906
895
907
- if send_challenge_ack {
896
+ if segment_in_window {
897
+ // We've checked that segment_start >= window_start above.
898
+ payload_offset = ( segment_start - window_start) as usize ;
899
+ } else {
908
900
// If we're in the TIME-WAIT state, restart the TIME-WAIT timeout, since
909
901
// the remote end may not have realized we've closed the connection.
910
902
if self . state == State :: TimeWait {
@@ -1087,28 +1079,70 @@ impl<'a> TcpSocket<'a> {
1087
1079
1088
1080
if ack_len > 0 {
1089
1081
// Dequeue acknowledged octets.
1082
+ debug_assert ! ( self . tx_buffer. len( ) >= ack_len) ;
1090
1083
net_trace ! ( "[{}]{}:{}: tx buffer: dequeueing {} octets (now {})" ,
1091
1084
self . debug_id, self . local_endpoint, self . remote_endpoint,
1092
1085
ack_len, self . tx_buffer. len( ) - ack_len) ;
1093
- let acked = self . tx_buffer . dequeue_many ( ack_len) ;
1094
- debug_assert ! ( acked. len( ) == ack_len) ;
1086
+ self . tx_buffer . dequeue_many ( ack_len) ;
1095
1087
}
1096
1088
1097
- // We've processed everything in the incoming segment, so advance the local
1098
- // sequence number past it.
1099
1089
if let Some ( ack_number) = repr. ack_number {
1090
+ // We've processed everything in the incoming segment, so advance the local
1091
+ // sequence number past it.
1100
1092
self . local_seq_no = ack_number;
1101
1093
}
1102
1094
1103
- if repr. payload . len ( ) > 0 {
1104
- // Enqueue payload octets, which are guaranteed to be in order.
1095
+ let payload_len = repr. payload . len ( ) ;
1096
+ if payload_len == 0 { return Ok ( None ) }
1097
+
1098
+ println ! ( "before1 {}" , self . assembler) ;
1099
+
1100
+ // Try adding payload octets to the assembler.
1101
+ match self . assembler . add ( payload_offset, payload_len) {
1102
+ Ok ( ( ) ) => {
1103
+ debug_assert ! ( self . assembler. total_size( ) == self . rx_buffer. capacity( ) ) ;
1104
+ println ! ( "after1 {}" , self . assembler) ;
1105
+
1106
+ // Place payload octets into the buffer.
1107
+ net_trace ! ( "[{}]{}:{}: rx buffer: writing {} octets at offset {}" ,
1108
+ self . debug_id, self . local_endpoint, self . remote_endpoint,
1109
+ payload_len, payload_offset) ;
1110
+ self . rx_buffer . get_unallocated ( payload_offset, payload_len)
1111
+ . copy_from_slice ( repr. payload ) ;
1112
+ }
1113
+ Err ( ( ) ) => {
1114
+ net_debug ! ( "[{}]{}:{}: assembler: too many holes to add {} octets at offset {}" ,
1115
+ self . debug_id, self . local_endpoint, self . remote_endpoint,
1116
+ payload_len, payload_offset) ;
1117
+ return Err ( Error :: Dropped )
1118
+ }
1119
+ }
1120
+
1121
+ println ! ( "before2 {}" , self . assembler) ;
1122
+ if let Some ( contig_len) = self . assembler . remove_front ( ) {
1123
+ println ! ( "after2 {}" , self . assembler) ;
1124
+ debug_assert ! ( self . assembler. total_size( ) == self . rx_buffer. capacity( ) ) ;
1125
+ // Enqueue the contiguous data octets in front of the buffer.
1105
1126
net_trace ! ( "[{}]{}:{}: rx buffer: enqueueing {} octets (now {})" ,
1106
1127
self . debug_id, self . local_endpoint, self . remote_endpoint,
1107
- repr . payload . len ( ) , self . rx_buffer. len( ) + repr . payload . len ( ) ) ;
1108
- self . rx_buffer . enqueue_slice ( repr . payload ) ;
1128
+ contig_len , self . rx_buffer. len( ) + contig_len ) ;
1129
+ self . rx_buffer . enqueue_many ( contig_len ) ;
1109
1130
}
1110
1131
1111
- Ok ( None )
1132
+ if self . assembler . is_empty ( ) {
1133
+ Ok ( None )
1134
+ } else {
1135
+ // If the assembler isn't empty, some segments at the start of our window got lost.
1136
+ // Send a reply acknowledging the data we already have; RFC 793 does not specify
1137
+ // the behavior triggerd by such a reply, but RFC 1122 section 4.2.2.21 states that
1138
+ // most congestion control algorithms implement what's called a "fast retransmit",
1139
+ // where a threshold amount of duplicate ACKs triggers retransmission without
1140
+ // the need to wait for a timeout to expire.
1141
+ net_trace ! ( "[{}]{}:{}: assembler: {}" ,
1142
+ self . debug_id, self . local_endpoint, self . remote_endpoint,
1143
+ self . assembler) ;
1144
+ Ok ( Some ( self . ack_reply ( ip_repr, & repr) ) )
1145
+ }
1112
1146
}
1113
1147
1114
1148
fn timed_out ( & self , timestamp : u64 ) -> bool {
@@ -1262,7 +1296,7 @@ impl<'a> TcpSocket<'a> {
1262
1296
}
1263
1297
1264
1298
if repr. payload . len ( ) > 0 {
1265
- net_trace ! ( "[{}]{}:{}: tx buffer: peeking at {} octets (from {}) " ,
1299
+ net_trace ! ( "[{}]{}:{}: tx buffer: reading {} octets at offset {} " ,
1266
1300
self . debug_id, self . local_endpoint, self . remote_endpoint,
1267
1301
repr. payload. len( ) , self . remote_last_seq - self . local_seq_no) ;
1268
1302
} else {
@@ -2685,34 +2719,6 @@ mod test {
2685
2719
} ) ) ) ;
2686
2720
}
2687
2721
2688
- #[ test]
2689
- fn test_missing_segment ( ) {
2690
- let mut s = socket_established ( ) ;
2691
- send ! ( s, TcpRepr {
2692
- seq_number: REMOTE_SEQ + 1 ,
2693
- ack_number: Some ( LOCAL_SEQ + 1 ) ,
2694
- payload: & b"abcdef" [ ..] ,
2695
- ..SEND_TEMPL
2696
- } ) ;
2697
- recv ! ( s, [ TcpRepr {
2698
- seq_number: LOCAL_SEQ + 1 ,
2699
- ack_number: Some ( REMOTE_SEQ + 1 + 6 ) ,
2700
- window_len: 58 ,
2701
- ..RECV_TEMPL
2702
- } ] ) ;
2703
- send ! ( s, TcpRepr {
2704
- seq_number: REMOTE_SEQ + 1 + 6 + 6 ,
2705
- ack_number: Some ( LOCAL_SEQ + 1 ) ,
2706
- payload: & b"mnopqr" [ ..] ,
2707
- ..SEND_TEMPL
2708
- } , Ok ( Some ( TcpRepr {
2709
- seq_number: LOCAL_SEQ + 1 ,
2710
- ack_number: Some ( REMOTE_SEQ + 1 + 6 ) ,
2711
- window_len: 58 ,
2712
- ..RECV_TEMPL
2713
- } ) ) ) ;
2714
- }
2715
-
2716
2722
#[ test]
2717
2723
fn test_data_retransmit ( ) {
2718
2724
let mut s = socket_established ( ) ;
@@ -3013,6 +3019,7 @@ mod test {
3013
3019
fn test_zero_window_ack ( ) {
3014
3020
let mut s = socket_established ( ) ;
3015
3021
s. rx_buffer = SocketBuffer :: new ( vec ! [ 0 ; 6 ] ) ;
3022
+ s. assembler = Assembler :: new ( s. rx_buffer . capacity ( ) ) ;
3016
3023
send ! ( s, TcpRepr {
3017
3024
seq_number: REMOTE_SEQ + 1 ,
3018
3025
ack_number: Some ( LOCAL_SEQ + 1 ) ,
@@ -3042,6 +3049,7 @@ mod test {
3042
3049
fn test_zero_window_ack_on_window_growth ( ) {
3043
3050
let mut s = socket_established ( ) ;
3044
3051
s. rx_buffer = SocketBuffer :: new ( vec ! [ 0 ; 6 ] ) ;
3052
+ s. assembler = Assembler :: new ( s. rx_buffer . capacity ( ) ) ;
3045
3053
send ! ( s, TcpRepr {
3046
3054
seq_number: REMOTE_SEQ + 1 ,
3047
3055
ack_number: Some ( LOCAL_SEQ + 1 ) ,
@@ -3096,7 +3104,7 @@ mod test {
3096
3104
}
3097
3105
3098
3106
// =========================================================================================//
3099
- // Tests for timeouts
3107
+ // Tests for timeouts.
3100
3108
// =========================================================================================//
3101
3109
3102
3110
#[ test]
@@ -3193,7 +3201,7 @@ mod test {
3193
3201
}
3194
3202
3195
3203
// =========================================================================================//
3196
- // Tests for keep-alive
3204
+ // Tests for keep-alive.
3197
3205
// =========================================================================================//
3198
3206
3199
3207
#[ test]
@@ -3258,13 +3266,47 @@ mod test {
3258
3266
}
3259
3267
3260
3268
// =========================================================================================//
3261
- // Tests for packet filtering
3269
+ // Tests for reassembly.
3270
+ // =========================================================================================//
3271
+
3272
+ #[ test]
3273
+ fn test_out_of_order ( ) {
3274
+ let mut s = socket_established ( ) ;
3275
+ send ! ( s, TcpRepr {
3276
+ seq_number: REMOTE_SEQ + 1 + 3 ,
3277
+ ack_number: Some ( LOCAL_SEQ + 1 ) ,
3278
+ payload: & b"def" [ ..] ,
3279
+ ..SEND_TEMPL
3280
+ } , Ok ( Some ( TcpRepr {
3281
+ seq_number: LOCAL_SEQ + 1 ,
3282
+ ack_number: Some ( REMOTE_SEQ + 1 ) ,
3283
+ ..RECV_TEMPL
3284
+ } ) ) ) ;
3285
+ assert_eq ! ( s. recv( 10 ) , Ok ( & b"" [ ..] ) ) ;
3286
+ send ! ( s, TcpRepr {
3287
+ seq_number: REMOTE_SEQ + 1 ,
3288
+ ack_number: Some ( LOCAL_SEQ + 1 ) ,
3289
+ payload: & b"abcdef" [ ..] ,
3290
+ ..SEND_TEMPL
3291
+ } ) ;
3292
+ recv ! ( s, [ TcpRepr {
3293
+ seq_number: LOCAL_SEQ + 1 ,
3294
+ ack_number: Some ( REMOTE_SEQ + 1 + 6 ) ,
3295
+ window_len: 58 ,
3296
+ ..RECV_TEMPL
3297
+ } ] ) ;
3298
+ assert_eq ! ( s. recv( 10 ) , Ok ( & b"abcdef" [ ..] ) ) ;
3299
+ }
3300
+
3301
+ // =========================================================================================//
3302
+ // Tests for packet filtering.
3262
3303
// =========================================================================================//
3263
3304
3264
3305
#[ test]
3265
3306
fn test_doesnt_accept_wrong_port ( ) {
3266
3307
let mut s = socket_established ( ) ;
3267
3308
s. rx_buffer = SocketBuffer :: new ( vec ! [ 0 ; 6 ] ) ;
3309
+ s. assembler = Assembler :: new ( s. rx_buffer . capacity ( ) ) ;
3268
3310
3269
3311
let tcp_repr = TcpRepr {
3270
3312
seq_number : REMOTE_SEQ + 1 ,
0 commit comments