Skip to content

Commit 269cedd

Browse files
author
whitequark
committedSep 6, 2016
Rust: integrate lwip with the I/O scheduler.
1 parent b7b6e7b commit 269cedd

File tree

3 files changed

+192
-57
lines changed

3 files changed

+192
-57
lines changed
 

Diff for: ‎artiq/runtime.rs/liblwip/lib.rs

+114-35
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,13 @@ fn result_from<T, F>(err: lwip_sys::err, f: F) -> Result<T>
5959
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
6060
pub enum IpAddr {
6161
Ip4([u8; 4]),
62-
Ip6([u16; 8])
62+
Ip6([u16; 8]),
63+
IpAny
6364
}
6465

6566
pub const IP4_ANY: IpAddr = IpAddr::Ip4([0, 0, 0, 0]);
6667
pub const IP6_ANY: IpAddr = IpAddr::Ip6([0, 0, 0, 0, 0, 0, 0, 0]);
68+
pub const IP_ANY: IpAddr = IpAddr::IpAny;
6769

6870
impl IpAddr {
6971
fn into_raw(self) -> lwip_sys::ip_addr {
@@ -84,6 +86,11 @@ impl IpAddr {
8486
(segments[4] as u32) << 16 | (segments[5] as u32),
8587
(segments[6] as u32) << 16 | (segments[7] as u32)],
8688
type_: lwip_sys::IPADDR_TYPE_V6
89+
},
90+
IpAddr::IpAny =>
91+
lwip_sys::ip_addr {
92+
data: [0; 4],
93+
type_: lwip_sys::IPADDR_TYPE_ANY
8794
}
8895
}
8996
}
@@ -100,7 +107,8 @@ impl IpAddr {
100107
(data[1] >> 16) as u16, data[1] as u16,
101108
(data[2] >> 16) as u16, data[2] as u16,
102109
(data[3] >> 16) as u16, data[3] as u16]),
103-
_ => panic!("unknown IP address type")
110+
lwip_sys::ip_addr { type_: lwip_sys::IPADDR_TYPE_ANY, .. } =>
111+
IpAddr::IpAny
104112
}
105113
}
106114
}
@@ -186,10 +194,21 @@ impl<'a> Drop for Pbuf<'a> {
186194
}
187195
}
188196

197+
#[derive(Debug)]
198+
pub struct UdpSocketState {
199+
recv_buffer: LinkedList<(SocketAddr, Pbuf<'static>)>
200+
}
201+
202+
impl UdpSocketState {
203+
pub fn readable(&self) -> bool {
204+
!self.recv_buffer.is_empty()
205+
}
206+
}
207+
189208
#[derive(Debug)]
190209
pub struct UdpSocket {
191-
raw: *mut lwip_sys::udp_pcb,
192-
buffer: Box<LinkedList<(SocketAddr, Pbuf<'static>)>>
210+
raw: *mut lwip_sys::udp_pcb,
211+
state: Box<UdpSocketState>
193212
}
194213

195214
impl UdpSocket {
@@ -198,23 +217,27 @@ impl UdpSocket {
198217
pbuf: *mut lwip_sys::pbuf,
199218
addr: *mut lwip_sys::ip_addr, port: u16) {
200219
unsafe {
201-
let buffer = arg as *mut LinkedList<(SocketAddr, Pbuf)>;
220+
let state = arg as *mut UdpSocketState;
202221
let socket_addr = SocketAddr { ip: IpAddr::from_raw(addr), port: port };
203-
(*buffer).push_back((socket_addr, Pbuf::from_raw(pbuf)));
222+
(*state).recv_buffer.push_back((socket_addr, Pbuf::from_raw(pbuf)));
204223
}
205224
}
206225

207226
unsafe {
208227
let raw = lwip_sys::udp_new();
209228
if raw.is_null() { return Err(Error::OutOfMemory) }
210229

211-
let mut buffer = Box::new(LinkedList::new());
212-
let arg = &mut *buffer as *mut LinkedList<(SocketAddr, Pbuf)> as *mut _;
230+
let mut state = Box::new(UdpSocketState { recv_buffer: LinkedList::new() });
231+
let arg = &mut *state as *mut UdpSocketState as *mut _;
213232
lwip_sys::udp_recv(raw, recv, arg);
214-
Ok(UdpSocket { raw: raw, buffer: buffer })
233+
Ok(UdpSocket { raw: raw, state: state })
215234
}
216235
}
217236

237+
pub fn state(&self) -> *const UdpSocketState {
238+
&*self.state
239+
}
240+
218241
pub fn bind(&mut self, addr: SocketAddr) -> Result<()> {
219242
result_from(unsafe {
220243
lwip_sys::udp_bind(self.raw, &mut addr.ip.into_raw(), addr.port)
@@ -247,7 +270,11 @@ impl UdpSocket {
247270
}
248271

249272
pub fn try_recv(&mut self) -> Option<(SocketAddr, Pbuf<'static>)> {
250-
self.buffer.pop_front()
273+
self.state.recv_buffer.pop_front()
274+
}
275+
276+
pub fn close(self) {
277+
// just drop
251278
}
252279
}
253280

@@ -257,10 +284,21 @@ impl Drop for UdpSocket {
257284
}
258285
}
259286

287+
#[derive(Debug)]
288+
pub struct TcpListenerState {
289+
backlog: LinkedList<TcpStream>
290+
}
291+
292+
impl TcpListenerState {
293+
pub fn acceptable(&self) -> bool {
294+
!self.backlog.is_empty()
295+
}
296+
}
297+
260298
#[derive(Debug)]
261299
pub struct TcpListener {
262-
raw: *mut lwip_sys::tcp_pcb,
263-
backlog: Box<LinkedList<TcpStream>>
300+
raw: *mut lwip_sys::tcp_pcb,
301+
state: Box<TcpListenerState>
264302
}
265303

266304
impl TcpListener {
@@ -269,19 +307,15 @@ impl TcpListener {
269307
err: lwip_sys::err) -> lwip_sys::err {
270308
if err != lwip_sys::ERR_OK { return err }
271309
unsafe {
272-
let backlog = arg as *mut LinkedList<TcpStream>;
273-
(*backlog).push_back(TcpStream::from_raw(newpcb));
310+
let state = arg as *mut TcpListenerState;
311+
(*state).backlog.push_back(TcpStream::from_raw(newpcb));
274312
}
275313
lwip_sys::ERR_OK
276314
}
277315

278316
unsafe {
279317
let raw = lwip_sys::tcp_new();
280318
if raw.is_null() { return Err(Error::OutOfMemory) }
281-
282-
let mut backlog = Box::new(LinkedList::new());
283-
let arg = &mut *backlog as *mut LinkedList<TcpStream> as *mut _;
284-
lwip_sys::tcp_arg(raw, arg);
285319
try!(result_from(lwip_sys::tcp_bind(raw, &mut addr.ip.into_raw(), addr.port),
286320
|| ()));
287321

@@ -290,13 +324,23 @@ impl TcpListener {
290324
lwip_sys::tcp_abort(raw);
291325
return Err(Error::OutOfMemory)
292326
}
327+
328+
let mut state = Box::new(TcpListenerState {
329+
backlog: LinkedList::new()
330+
});
331+
let arg = &mut *state as *mut TcpListenerState as *mut _;
332+
lwip_sys::tcp_arg(raw, arg);
293333
lwip_sys::tcp_accept(raw2, accept);
294-
Ok(TcpListener { raw: raw2, backlog: backlog })
334+
Ok(TcpListener { raw: raw2, state: state })
295335
}
296336
}
297337

338+
pub fn state(&self) -> *const TcpListenerState {
339+
&*self.state
340+
}
341+
298342
pub fn try_accept(&mut self) -> Option<TcpStream> {
299-
self.backlog.pop_front()
343+
self.state.backlog.pop_front()
300344
}
301345

302346
pub fn close(self) {
@@ -320,61 +364,96 @@ pub enum Shutdown {
320364
Both,
321365
}
322366

367+
#[derive(Debug)]
368+
pub struct TcpStreamState {
369+
recv_buffer: LinkedList<Result<Pbuf<'static>>>,
370+
send_avail: usize
371+
}
372+
373+
impl TcpStreamState {
374+
pub fn readable(&self) -> bool {
375+
!self.recv_buffer.is_empty()
376+
}
377+
378+
pub fn writeable(&self) -> bool {
379+
!(self.send_avail == 0)
380+
}
381+
}
382+
323383
#[derive(Debug)]
324384
pub struct TcpStream {
325-
raw: *mut lwip_sys::tcp_pcb,
326-
buffer: Box<LinkedList<Result<Pbuf<'static>>>>
385+
raw: *mut lwip_sys::tcp_pcb,
386+
state: Box<TcpStreamState>
327387
}
328388

329389
impl TcpStream {
330390
fn from_raw(raw: *mut lwip_sys::tcp_pcb) -> TcpStream {
331-
extern fn recv(arg: *mut c_void, _tcb: *mut lwip_sys::tcp_pcb,
391+
extern fn recv(arg: *mut c_void, _raw: *mut lwip_sys::tcp_pcb,
332392
pbuf: *mut lwip_sys::pbuf, err: lwip_sys::err) -> lwip_sys::err {
333393
if err != lwip_sys::ERR_OK { return err }
334394
unsafe {
335-
let buffer = arg as *mut LinkedList<Result<Pbuf<'static>>>;
395+
let state = arg as *mut TcpStreamState;
336396
if pbuf.is_null() {
337-
(*buffer).push_back(Err(Error::ConnectionClosed))
397+
(*state).recv_buffer.push_back(Err(Error::ConnectionClosed))
338398
} else {
339-
(*buffer).push_back(Ok(Pbuf::from_raw(pbuf)))
399+
(*state).recv_buffer.push_back(Ok(Pbuf::from_raw(pbuf)))
340400
}
341401
}
342402
lwip_sys::ERR_OK
343403
}
344404

405+
extern fn sent(arg: *mut c_void, raw: *mut lwip_sys::tcp_pcb,
406+
_len: u16) -> lwip_sys::err {
407+
unsafe {
408+
let state = arg as *mut TcpStreamState;
409+
(*state).send_avail = lwip_sys::tcp_sndbuf_(raw) as usize;
410+
}
411+
lwip_sys::ERR_OK
412+
}
413+
345414
extern fn err(arg: *mut c_void, err: lwip_sys::err) {
346415
unsafe {
347-
let buffer = arg as *mut LinkedList<Result<Pbuf<'static>>>;
348-
(*buffer).push_back(result_from(err, || unreachable!()))
416+
let state = arg as *mut TcpStreamState;
417+
(*state).recv_buffer.push_back(result_from(err, || unreachable!()))
349418
}
350419
}
351420

352421
unsafe {
353-
let mut buffer = Box::new(LinkedList::new());
354-
let arg = &mut *buffer as *mut LinkedList<Result<Pbuf<'static>>> as *mut _;
422+
let mut state = Box::new(TcpStreamState {
423+
recv_buffer: LinkedList::new(),
424+
send_avail: lwip_sys::tcp_sndbuf_(raw) as usize
425+
});
426+
let arg = &mut *state as *mut TcpStreamState as *mut _;
355427
lwip_sys::tcp_arg(raw, arg);
356428
lwip_sys::tcp_recv(raw, recv);
429+
lwip_sys::tcp_sent(raw, sent);
357430
lwip_sys::tcp_err(raw, err);
358-
TcpStream { raw: raw, buffer: buffer }
431+
TcpStream { raw: raw, state: state }
359432
}
360433
}
361434

435+
pub fn state(&self) -> *const TcpStreamState {
436+
&*self.state
437+
}
438+
362439
pub fn write(&mut self, data: &[u8]) -> Result<usize> {
363440
let sndbuf = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize;
364441
let len = if data.len() < sndbuf { data.len() } else { sndbuf };
365-
result_from(unsafe {
442+
let result = result_from(unsafe {
366443
lwip_sys::tcp_write(self.raw, data as *const [u8] as *const _, len as u16,
367444
lwip_sys::TCP_WRITE_FLAG_COPY)
368-
}, || len)
445+
}, || len);
446+
self.state.send_avail = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize;
447+
result
369448
}
370449

371450
pub fn try_read(&mut self) -> Result<Option<Pbuf<'static>>> {
372-
match self.buffer.front() {
451+
match self.state.recv_buffer.front() {
373452
None => return Ok(None),
374453
Some(&Err(err)) => return Err(err),
375454
Some(_) => ()
376455
}
377-
match self.buffer.pop_front() {
456+
match self.state.recv_buffer.pop_front() {
378457
Some(Ok(pbuf)) => return Ok(Some(pbuf)),
379458
_ => unreachable!()
380459
}

Diff for: ‎artiq/runtime.rs/src/io.rs

+64-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
extern crate fringe;
2+
extern crate lwip;
23

34
use std::vec::Vec;
45
use std::time::{Instant, Duration};
@@ -102,15 +103,38 @@ impl Scheduler {
102103
}
103104

104105
#[derive(Debug)]
105-
enum WaitEvent {}
106+
enum WaitEvent {
107+
UdpReadable(*const lwip::UdpSocketState),
108+
TcpAcceptable(*const lwip::TcpListenerState),
109+
TcpWriteable(*const lwip::TcpStreamState),
110+
TcpReadable(*const lwip::TcpStreamState),
111+
}
106112

107113
impl WaitEvent {
108114
fn completed(&self) -> bool {
109-
match *self {}
115+
match *self {
116+
WaitEvent::UdpReadable(state) =>
117+
unsafe { (*state).readable() },
118+
WaitEvent::TcpAcceptable(state) =>
119+
unsafe { (*state).acceptable() },
120+
WaitEvent::TcpWriteable(state) =>
121+
unsafe { (*state).writeable() },
122+
WaitEvent::TcpReadable(state) =>
123+
unsafe { (*state).readable() },
124+
}
110125
}
111126
}
112127

113-
pub type Result<T> = ::std::result::Result<T, ()>;
128+
unsafe impl Send for WaitEvent {}
129+
130+
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
131+
pub enum Error {
132+
Lwip(lwip::Error),
133+
TimedOut,
134+
Interrupted
135+
}
136+
137+
pub type Result<T> = ::std::result::Result<T, Error>;
114138

115139
#[derive(Debug)]
116140
pub struct Waiter<'a>(&'a mut Yielder<WaitResult, WaitRequest, OwnedStack>);
@@ -124,8 +148,44 @@ impl<'a> Waiter<'a> {
124148

125149
match self.0.suspend(request) {
126150
WaitResult::TimedOut => Ok(()),
127-
WaitResult::Interrupted => Err(()),
151+
WaitResult::Interrupted => Err(Error::Interrupted),
128152
_ => unreachable!()
129153
}
130154
}
155+
156+
fn suspend(&mut self, request: WaitRequest) -> Result<()> {
157+
match self.0.suspend(request) {
158+
WaitResult::Completed => Ok(()),
159+
WaitResult::TimedOut => Err(Error::TimedOut),
160+
WaitResult::Interrupted => Err(Error::Interrupted)
161+
}
162+
}
163+
164+
pub fn udp_readable(&mut self, socket: &lwip::UdpSocket) -> Result<()> {
165+
self.suspend(WaitRequest {
166+
timeout: None,
167+
event: Some(WaitEvent::UdpReadable(socket.state()))
168+
})
169+
}
170+
171+
pub fn tcp_acceptable(&mut self, socket: &lwip::TcpListener) -> Result<()> {
172+
self.suspend(WaitRequest {
173+
timeout: None,
174+
event: Some(WaitEvent::TcpAcceptable(socket.state()))
175+
})
176+
}
177+
178+
pub fn tcp_writeable(&mut self, socket: &lwip::TcpStream) -> Result<()> {
179+
self.suspend(WaitRequest {
180+
timeout: None,
181+
event: Some(WaitEvent::TcpWriteable(socket.state()))
182+
})
183+
}
184+
185+
pub fn tcp_readable(&mut self, socket: &lwip::TcpStream) -> Result<()> {
186+
self.suspend(WaitRequest {
187+
timeout: None,
188+
event: Some(WaitEvent::TcpReadable(socket.state()))
189+
})
190+
}
131191
}

Diff for: ‎artiq/runtime.rs/src/lib.rs

+14-18
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,21 @@ extern {
1414
fn lwip_service();
1515
}
1616

17-
fn test1(mut waiter: io::Waiter) {
17+
fn timer(mut waiter: io::Waiter) {
1818
loop {
19-
println!("A");
20-
waiter.sleep(std::time::Duration::from_millis(1000));
19+
println!("tick");
20+
waiter.sleep(std::time::Duration::from_millis(1000)).unwrap();
2121
}
2222
}
2323

24-
fn test2(mut waiter: io::Waiter) {
24+
fn echo(mut waiter: io::Waiter) {
25+
let mut socket = lwip::UdpSocket::new().unwrap();
26+
socket.bind(lwip::SocketAddr::new(lwip::IP_ANY, 1234)).unwrap();
2527
loop {
26-
println!("B");
27-
waiter.sleep(std::time::Duration::from_millis(500));
28+
waiter.udp_readable(&socket).unwrap();
29+
let (addr, pbuf) = socket.try_recv().unwrap();
30+
println!("{:?}", core::str::from_utf8(pbuf.as_slice()));
31+
socket.send_to(addr, pbuf).unwrap();
2832
}
2933
}
3034

@@ -33,19 +37,11 @@ pub unsafe extern fn rust_main() {
3337
println!("Accepting network sessions in Rust.");
3438
network_init();
3539

36-
let addr = lwip::SocketAddr::new(lwip::IP4_ANY, 1234);
37-
let mut listener = lwip::TcpListener::bind(addr).unwrap();
38-
let mut stream = None;
40+
let mut scheduler = io::Scheduler::new();
41+
scheduler.spawn(4096, timer);
42+
scheduler.spawn(4096, echo);
3943
loop {
4044
lwip_service();
41-
if let Some(new_stream) = listener.try_accept() {
42-
stream = Some(new_stream)
43-
}
44-
if let Some(ref mut stream) = stream {
45-
if let Some(pbuf) = stream.try_read().expect("read") {
46-
println!("{:?}", pbuf.as_slice());
47-
stream.write(pbuf.as_slice()).expect("write");
48-
}
49-
}
45+
scheduler.run()
5046
}
5147
}

0 commit comments

Comments
 (0)
Please sign in to comment.