Skip to content

Commit

Permalink
Rust: implement idle kernels and session takeover.
Browse files Browse the repository at this point in the history
whitequark committed Oct 2, 2016
1 parent 8bced9d commit 30e997f
Showing 10 changed files with 309 additions and 94 deletions.
6 changes: 3 additions & 3 deletions artiq/runtime.rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion artiq/runtime.rs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ path = "src/lib.rs"

[dependencies]
std_artiq = { path = "libstd_artiq" }
lwip = { path = "liblwip" }
lwip = { path = "liblwip", default-features = false }
fringe = { version = "= 1.1.0", default-features = false, features = ["alloc"] }
log = { version = "0.3", default-features = false }
log_buffer = { version = "1.0" }
4 changes: 4 additions & 0 deletions artiq/runtime.rs/liblwip/Cargo.toml
Original file line number Diff line number Diff line change
@@ -10,3 +10,7 @@ path = "lib.rs"
[dependencies]
lwip-sys = { path = "../liblwip-sys" }
std_artiq = { path = "../libstd_artiq" }

[features]
default = ["preemption"]
preemption = []
12 changes: 12 additions & 0 deletions artiq/runtime.rs/liblwip/lib.rs
Original file line number Diff line number Diff line change
@@ -175,6 +175,9 @@ pub struct Pbuf<'payload> {
phantom: PhantomData<&'payload [u8]>
}

#[cfg(not(feature = "preemption"))]
unsafe impl<'payload> Send for Pbuf<'payload> {}

impl<'payload> Pbuf<'payload> {
unsafe fn from_raw(raw: *mut lwip_sys::pbuf) -> Pbuf<'payload> {
Pbuf { raw: raw, phantom: PhantomData }
@@ -259,6 +262,9 @@ pub struct UdpSocket {
state: Box<RefCell<UdpSocketState>>
}

#[cfg(not(feature = "preemption"))]
unsafe impl Send for UdpSocket {}

impl UdpSocket {
pub fn new() -> Result<UdpSocket> {
extern fn recv(arg: *mut c_void, _pcb: *mut lwip_sys::udp_pcb,
@@ -347,6 +353,9 @@ pub struct TcpListener {
state: Box<RefCell<TcpListenerState>>
}

#[cfg(not(feature = "preemption"))]
unsafe impl Send for TcpListener {}

impl TcpListener {
pub fn bind(addr: SocketAddr) -> Result<TcpListener> {
extern fn accept(arg: *mut c_void, newpcb: *mut lwip_sys::tcp_pcb,
@@ -428,6 +437,9 @@ pub struct TcpStream {
state: Box<RefCell<TcpStreamState>>
}

#[cfg(not(feature = "preemption"))]
unsafe impl Send for TcpStream {}

impl TcpStream {
fn from_raw(raw: *mut lwip_sys::tcp_pcb) -> TcpStream {
extern fn recv(arg: *mut c_void, _raw: *mut lwip_sys::tcp_pcb,
12 changes: 6 additions & 6 deletions artiq/runtime.rs/src/kernel.rs
Original file line number Diff line number Diff line change
@@ -15,14 +15,14 @@ pub unsafe fn start() {
stop();

extern {
static _binary_ksupport_elf_start: ();
static _binary_ksupport_elf_end: ();
static _binary_ksupport_elf_start: u8;
static _binary_ksupport_elf_end: u8;
}
let ksupport_start = &_binary_ksupport_elf_start as *const _ as usize;
let ksupport_end = &_binary_ksupport_elf_end as *const _ as usize;
ptr::copy_nonoverlapping(ksupport_start as *const u8,
let ksupport_start = &_binary_ksupport_elf_start as *const _;
let ksupport_end = &_binary_ksupport_elf_end as *const _;
ptr::copy_nonoverlapping(ksupport_start,
(KERNELCPU_EXEC_ADDRESS - KSUPPORT_HEADER_SIZE) as *mut u8,
ksupport_end - ksupport_start);
ksupport_end as usize - ksupport_start as usize);

csr::kernel_cpu::reset_write(0);
}
19 changes: 10 additions & 9 deletions artiq/runtime.rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![no_std]
#![feature(libc)]
#![feature(libc, borrow_state, const_fn)]

#[macro_use]
extern crate std_artiq as std;
@@ -8,16 +8,19 @@ extern crate libc;
extern crate log;
extern crate log_buffer;
extern crate byteorder;
extern crate fringe;
extern crate lwip;

use logger::BufferLogger;

mod board;
mod sched;
mod config;
mod clock;
mod rtio_crg;
mod mailbox;

mod urc;
mod sched;
mod logger;
mod cache;

@@ -36,23 +39,21 @@ include!(concat!(env!("OUT_DIR"), "/git_info.rs"));

#[no_mangle]
pub unsafe extern fn rust_main() {
static mut log_buffer: [u8; 4096] = [0; 4096];
BufferLogger::new(&mut log_buffer[..])
.register(move |logger| {
static mut LOG_BUFFER: [u8; 4096] = [0; 4096];
BufferLogger::new(&mut LOG_BUFFER[..])
.register(move || {
info!("booting ARTIQ runtime ({})", GIT_COMMIT);

clock::init();
rtio_crg::init();
network_init();

let mut scheduler = sched::Scheduler::new();
scheduler.spawn(8192, move |waiter| {
session::handler(waiter, logger)
});
scheduler.spawner().spawn(8192, session::handler);

loop {
scheduler.run();
lwip_service();
scheduler.run()
}
})
}
16 changes: 13 additions & 3 deletions artiq/runtime.rs/src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use core::{mem, ptr};
use core::cell::RefCell;
use log::{self, Log, LogMetadata, LogRecord, LogLevelFilter};
use log_buffer::LogBuffer;
@@ -6,17 +7,18 @@ pub struct BufferLogger {
buffer: RefCell<LogBuffer<&'static mut [u8]>>
}

// We can never preempt from within the logger, so there can be no data races.
unsafe impl Sync for BufferLogger {}

static mut LOGGER: *const BufferLogger = ptr::null();

impl BufferLogger {
pub fn new(buffer: &'static mut [u8]) -> BufferLogger {
BufferLogger {
buffer: RefCell::new(LogBuffer::new(buffer))
}
}

pub fn register<F: FnOnce(&BufferLogger)>(&self, f: F) {
pub fn register<F: FnOnce()>(&self, f: F) {
// log::set_logger_raw captures a pointer to ourselves, so we must prevent
// ourselves from being moved or dropped after that function is called (and
// before log::shutdown_logger_raw is called).
@@ -25,9 +27,17 @@ impl BufferLogger {
max_log_level.set(LogLevelFilter::Trace);
self as *const Log
}).expect("global logger can only be initialized once");
LOGGER = self;
}
f(self);
f();
log::shutdown_logger_raw().unwrap();
unsafe {
LOGGER = ptr::null();
}
}

pub fn with_instance<R, F: FnOnce(&BufferLogger) -> R>(f: F) -> R {
f(unsafe { mem::transmute::<*const BufferLogger, &BufferLogger>(LOGGER) })
}

pub fn clear(&self) {
158 changes: 131 additions & 27 deletions artiq/runtime.rs/src/sched.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#![allow(dead_code)]

extern crate fringe;
extern crate lwip;

use std::cell::RefCell;
use std::cell::{RefCell, BorrowState};
use std::vec::Vec;
use std::rc::Rc;
use std::time::{Instant, Duration};
use std::io::{Read, Write, Result, Error, ErrorKind};
use self::fringe::OwnedStack;
use self::fringe::generator::{Generator, Yielder};
use fringe::OwnedStack;
use fringe::generator::{Generator, Yielder, State as GeneratorState};
use lwip;
use urc::Urc;

#[derive(Debug)]
struct WaitRequest {
@@ -25,38 +25,87 @@ enum WaitResult {

#[derive(Debug)]
struct Thread {
generator: Generator<WaitResult, WaitRequest, OwnedStack>,
generator: Generator<WaitResult, WaitRequest, OwnedStack>,
waiting_for: WaitRequest,
interrupted: bool
}

impl Thread {
unsafe fn new<F>(spawner: Spawner, stack_size: usize, f: F) -> ThreadHandle
where F: 'static + FnOnce(Waiter, Spawner) + Send {
let stack = OwnedStack::new(stack_size);
ThreadHandle::new(Thread {
generator: Generator::unsafe_new(stack, |yielder, _| {
f(Waiter(yielder), spawner)
}),
waiting_for: WaitRequest {
timeout: None,
event: None
},
interrupted: false
})
}

pub fn terminated(&self) -> bool {
// FIXME: https://github.com/nathan7/libfringe/pull/56
match self.generator.state() {
GeneratorState::Unavailable => true,
GeneratorState::Runnable => false
}
}

pub fn interrupt(&mut self) {
self.interrupted = true
}
}

#[derive(Debug, Clone)]
pub struct ThreadHandle(Urc<RefCell<Thread>>);

impl ThreadHandle {
fn new(thread: Thread) -> ThreadHandle {
ThreadHandle(Urc::new(RefCell::new(thread)))
}

pub fn terminated(&self) -> bool {
match self.0.borrow_state() {
BorrowState::Unused => self.0.borrow().terminated(),
_ => false // the running thread hasn't terminated
}
}

pub fn interrupt(&self) {
// FIXME: use try_borrow() instead once it's available
match self.0.borrow_state() {
BorrowState::Unused => self.0.borrow_mut().interrupt(),
_ => panic!("cannot interrupt the running thread")
}
}
}

#[derive(Debug)]
pub struct Scheduler {
threads: Vec<Thread>,
index: usize
threads: Vec<ThreadHandle>,
index: usize,
spawner: Spawner
}

impl Scheduler {
pub fn new() -> Scheduler {
Scheduler { threads: Vec::new(), index: 0 }
Scheduler {
threads: Vec::new(),
index: 0,
spawner: Spawner::new()
}
}

pub unsafe fn spawn<F: FnOnce(Waiter) + Send>(&mut self, stack_size: usize, f: F) {
let stack = OwnedStack::new(stack_size);
let thread = Thread {
generator: Generator::unsafe_new(stack, move |yielder, _| {
f(Waiter(yielder))
}),
waiting_for: WaitRequest {
timeout: None,
event: None
},
interrupted: false
};
self.threads.push(thread)
pub fn spawner(&self) -> &Spawner {
&self.spawner
}

pub fn run(&mut self) {
self.threads.append(&mut *self.spawner.queue.borrow_mut());

if self.threads.len() == 0 { return }

let now = Instant::now();
@@ -66,7 +115,7 @@ impl Scheduler {
self.index = (self.index + 1) % self.threads.len();

let result = {
let thread = &mut self.threads[self.index];
let thread = &mut *self.threads[self.index].0.borrow_mut();
match thread.waiting_for {
_ if thread.interrupted => {
thread.interrupted = false;
@@ -97,7 +146,8 @@ impl Scheduler {
},
Some(wait_request) => {
// The thread has suspended itself.
self.threads[self.index].waiting_for = wait_request
let thread = &mut *self.threads[self.index].0.borrow_mut();
thread.waiting_for = wait_request
}
}

@@ -106,8 +156,27 @@ impl Scheduler {
}
}

#[derive(Debug, Clone)]
pub struct Spawner {
queue: Urc<RefCell<Vec<ThreadHandle>>>
}

impl Spawner {
fn new() -> Spawner {
Spawner { queue: Urc::new(RefCell::new(Vec::new())) }
}

pub fn spawn<F>(&self, stack_size: usize, f: F) -> ThreadHandle
where F: 'static + FnOnce(Waiter, Spawner) + Send {
let handle = unsafe { Thread::new(self.clone(), stack_size, f) };
self.queue.borrow_mut().push(handle.clone());
handle
}
}

enum WaitEvent {
Completion(*const (Fn() -> bool + 'static)),
Termination(*const RefCell<Thread>),
UdpReadable(*const RefCell<lwip::UdpSocketState>),
TcpAcceptable(*const RefCell<lwip::TcpListenerState>),
TcpWriteable(*const RefCell<lwip::TcpStreamState>),
@@ -119,6 +188,8 @@ impl WaitEvent {
match *self {
WaitEvent::Completion(f) =>
unsafe { (*f)() },
WaitEvent::Termination(thread) =>
unsafe { (*thread).borrow().terminated() },
WaitEvent::UdpReadable(state) =>
unsafe { (*state).borrow().readable() },
WaitEvent::TcpAcceptable(state) =>
@@ -173,6 +244,13 @@ impl<'a> Waiter<'a> {
}
}

pub fn join(&self, thread: ThreadHandle) -> Result<()> {
self.suspend(WaitRequest {
timeout: None,
event: Some(WaitEvent::Termination(&*thread.0))
})
}

pub fn until<F: Fn() -> bool + 'static>(&self, f: F) -> Result<()> {
self.suspend(WaitRequest {
timeout: None,
@@ -211,7 +289,7 @@ impl<'a> Waiter<'a> {

// Wrappers around lwip

pub use self::lwip::{IpAddr, IP4_ANY, IP6_ANY, IP_ANY, SocketAddr};
pub use lwip::{IpAddr, IP4_ANY, IP6_ANY, IP_ANY, SocketAddr};

#[derive(Debug)]
pub struct UdpSocket<'a> {
@@ -227,6 +305,14 @@ impl<'a> UdpSocket<'a> {
})
}

pub fn into_lower(self) -> lwip::UdpSocket {
self.lower
}

pub fn from_lower(waiter: Waiter<'a>, inner: lwip::UdpSocket) -> UdpSocket {
UdpSocket { waiter: waiter, lower: inner }
}

pub fn bind(&self, addr: SocketAddr) -> Result<()> {
Ok(try!(self.lower.bind(addr)))
}
@@ -285,6 +371,14 @@ impl<'a> TcpListener<'a> {
})
}

pub fn into_lower(self) -> lwip::TcpListener {
self.lower
}

pub fn from_lower(waiter: Waiter<'a>, inner: lwip::TcpListener) -> TcpListener {
TcpListener { waiter: waiter, lower: inner }
}

pub fn accept(&self) -> Result<(TcpStream, SocketAddr)> {
try!(self.waiter.tcp_acceptable(&self.lower));
let stream_lower = self.lower.try_accept().unwrap();
@@ -301,7 +395,9 @@ impl<'a> TcpListener<'a> {
}
}

pub use self::lwip::Shutdown;
pub use lwip::Shutdown;

pub struct TcpStreamInner(lwip::TcpStream, Option<(lwip::Pbuf<'static>, usize)>);

#[derive(Debug)]
pub struct TcpStream<'a> {
@@ -311,6 +407,14 @@ pub struct TcpStream<'a> {
}

impl<'a> TcpStream<'a> {
pub fn into_lower(self) -> TcpStreamInner {
TcpStreamInner(self.lower, self.buffer)
}

pub fn from_lower(waiter: Waiter<'a>, inner: TcpStreamInner) -> TcpStream {
TcpStream { waiter: waiter, lower: inner.0, buffer: inner.1 }
}

pub fn shutdown(&self, how: Shutdown) -> Result<()> {
Ok(try!(self.lower.shutdown(how)))
}
143 changes: 98 additions & 45 deletions artiq/runtime.rs/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::prelude::v1::*;
use std::mem;
use std::str;
use std::{mem, str};
use std::cell::RefCell;
use std::io::{self, Read};
use {config, rtio_crg, clock, mailbox, kernel};
use logger::BufferLogger;
use cache::Cache;
use sched::{Waiter, TcpListener, TcpStream, SocketAddr, IP_ANY};
use urc::Urc;
use sched::{ThreadHandle, Waiter, Spawner};
use sched::{TcpListener, TcpStream, SocketAddr, IP_ANY};

use session_proto as host;
use kernel_proto as kern;
@@ -49,14 +51,16 @@ enum KernelState {

// Per-connection state
#[derive(Debug)]
struct Session {
struct Session<'a> {
congress: &'a mut Congress,
kernel_state: KernelState,
watchdog_set: clock::WatchdogSet
}

impl Session {
fn new() -> Session {
impl<'a> Session<'a> {
fn new(congress: &mut Congress) -> Session {
Session {
congress: congress,
kernel_state: KernelState::Absent,
watchdog_set: clock::WatchdogSet::new()
}
@@ -70,7 +74,7 @@ impl Session {
}
}

impl Drop for Session {
impl<'a> Drop for Session<'a> {
fn drop(&mut self) {
kernel::stop()
}
@@ -123,10 +127,9 @@ fn kern_acknowledge() -> io::Result<()> {
Ok(())
}

fn comm_handle(logger: &BufferLogger,
waiter: Waiter,
stream: &mut TcpStream,
session: &mut Session) -> io::Result<()> {
fn process_host_message(waiter: Waiter,
stream: &mut TcpStream,
session: &mut Session) -> io::Result<()> {
match try!(host_read(stream)) {
host::Request::Ident =>
host_write(stream, host::Reply::Ident(::board::ident(&mut [0; 64]))),
@@ -135,13 +138,15 @@ fn comm_handle(logger: &BufferLogger,
host::Request::Log => {
// Logging the packet with the log is inadvisable
trace!("comm->host Log(...)");
logger.extract(move |log| {
host::Reply::Log(log).write_to(stream)
BufferLogger::with_instance(|logger| {
logger.extract(|log| {
host::Reply::Log(log).write_to(stream)
})
})
}

host::Request::LogClear => {
logger.clear();
BufferLogger::with_instance(|logger| logger.clear());
host_write(stream, host::Reply::Log(""))
}

@@ -221,9 +226,8 @@ fn comm_handle(logger: &BufferLogger,
}
}

fn kern_handle(waiter: Waiter,
congress: &mut Congress,
session: &mut Session) -> io::Result<()> {
fn process_kern_message(waiter: Waiter,
session: &mut Session) -> io::Result<()> {
kern::Message::wait_and_receive(waiter, |request| {
match (&request, session.kernel_state) {
(&kern::LoadReply { .. }, KernelState::Loaded) |
@@ -246,10 +250,10 @@ fn kern_handle(waiter: Waiter,
}

kern::NowInitRequest =>
kern_send(waiter, kern::NowInitReply(congress.now)),
kern_send(waiter, kern::NowInitReply(session.congress.now)),

kern::NowSave(now) => {
congress.now = now;
session.congress.now = now;
kern_acknowledge()
}

@@ -265,14 +269,14 @@ fn kern_handle(waiter: Waiter,
}

kern::CacheGetRequest { key } => {
let value = congress.cache.get(key);
let value = session.congress.cache.get(key);
kern_send(waiter, kern::CacheGetReply {
value: unsafe { mem::transmute::<*const [u32], &'static [u32]>(value) }
})
}

kern::CachePutRequest { key, value } => {
let succeeded = congress.cache.put(key, value).is_ok();
let succeeded = session.congress.cache.put(key, value).is_ok();
kern_send(waiter, kern::CachePutReply { succeeded: succeeded })
}

@@ -281,20 +285,17 @@ fn kern_handle(waiter: Waiter,
})
}

fn handle(logger: &BufferLogger,
waiter: Waiter,
stream: &mut TcpStream,
congress: &mut Congress) -> io::Result<()> {
try!(check_magic(stream));

let mut session = Session::new();
fn host_kernel_worker(waiter: Waiter,
stream: &mut TcpStream,
congress: &mut Congress) -> io::Result<()> {
let mut session = Session::new(congress);
loop {
if stream.readable() {
try!(comm_handle(logger, waiter, stream, &mut session))
try!(process_host_message(waiter, stream, &mut session));
}

if mailbox::receive() != 0 {
try!(kern_handle(waiter, congress, &mut session))
try!(process_kern_message(waiter, &mut session))
}

if session.kernel_state == KernelState::Running {
@@ -313,27 +314,79 @@ fn handle(logger: &BufferLogger,
}
}

pub fn handler(waiter: Waiter,
logger: &BufferLogger) {
let mut congress = Congress::new();
fn flash_kernel_worker(waiter: Waiter,
congress: &mut Congress) -> io::Result<()> {
let mut session = Session::new(congress);
loop {
try!(process_kern_message(waiter, &mut session))
}
}

fn respawn<F>(spawner: Spawner, waiter: Waiter,
handle: &mut Option<ThreadHandle>,
f: F) where F: 'static + FnOnce(Waiter, Spawner) + Send {
match handle.take() {
None => (),
Some(handle) => {
info!("terminating running kernel");
handle.interrupt();
waiter.join(handle).expect("cannot join interrupt thread")
}
}

*handle = Some(spawner.spawn(8192, f))
}

pub fn handler(waiter: Waiter, spawner: Spawner) {
let congress = Urc::new(RefCell::new(Congress::new()));

let addr = SocketAddr::new(IP_ANY, 1381);
let listener = TcpListener::bind(waiter, addr).unwrap();
let listener = TcpListener::bind(waiter, addr).expect("cannot bind socket");
info!("accepting network sessions in Rust");

let mut kernel_thread = None;
loop {
let (mut stream, addr) = listener.accept().unwrap();
info!("new connection from {:?}", addr);

match handle(logger, waiter, &mut stream, &mut congress) {
Ok(()) => (),
Err(err) => {
if err.kind() == io::ErrorKind::UnexpectedEof {
info!("connection closed");
} else {
error!("session aborted: {:?}", err);
}
if listener.acceptable() {
let (mut stream, addr) = listener.accept().expect("cannot accept client");
match check_magic(&mut stream) {
Ok(()) => (),
Err(_) => continue
}
info!("new connection from {:?}", addr);

let stream = stream.into_lower();
let congress = congress.clone();
respawn(spawner.clone(), waiter, &mut kernel_thread, move |waiter, _spawner| {
let mut stream = TcpStream::from_lower(waiter, stream);
let mut congress = congress.borrow_mut();
match host_kernel_worker(waiter, &mut stream, &mut congress) {
Ok(()) => (),
Err(err) => {
if err.kind() == io::ErrorKind::UnexpectedEof {
info!("connection closed");
} else {
error!("session aborted: {:?}", err);
}
}
}
})
}

if kernel_thread.is_none() {
info!("no connection, starting idle kernel");
let congress = congress.clone();
respawn(spawner.clone(), waiter, &mut kernel_thread, move |waiter, _spawner| {
let mut congress = congress.borrow_mut();
match flash_kernel_worker(waiter, &mut congress) {
Ok(()) =>
info!("idle kernel finished, standing by"),
Err(err) => {
error!("idle kernel aborted: {:?}", err);
}
}
})
}

waiter.relinquish()
}
}
31 changes: 31 additions & 0 deletions artiq/runtime.rs/src/urc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::rc::Rc;
use std::ops::{Deref, DerefMut};
use std::fmt;

pub struct Urc<T: ?Sized>(Rc<T>);

impl<T> Urc<T> {
pub fn new(value: T) -> Urc<T> { Urc(Rc::new(value)) }
}

unsafe impl<T: ?Sized> Send for Urc<T> {}

unsafe impl<T: ?Sized> Sync for Urc<T> {}

impl<T: ?Sized> Deref for Urc<T> {
type Target = T;

fn deref(&self) -> &Self::Target { self.0.deref() }
}

impl<T: ?Sized> Clone for Urc<T> {
fn clone(&self) -> Urc<T> {
Urc(self.0.clone())
}
}

impl<T: ?Sized + fmt::Debug> fmt::Debug for Urc<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}

0 comments on commit 30e997f

Please sign in to comment.