Skip to content

Commit

Permalink
Rust: implement cache.
Browse files Browse the repository at this point in the history
whitequark committed Oct 1, 2016
1 parent d825393 commit 8bced9d
Showing 5 changed files with 100 additions and 17 deletions.
47 changes: 47 additions & 0 deletions artiq/runtime.rs/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::vec::Vec;
use std::string::String;
use std::btree_map::BTreeMap;

#[derive(Debug)]
struct Entry {
data: Vec<u32>,
borrowed: bool
}

#[derive(Debug)]
pub struct Cache {
entries: BTreeMap<String, Entry>
}

impl Cache {
pub fn new() -> Cache {
Cache { entries: BTreeMap::new() }
}

pub fn get(&mut self, key: &str) -> *const [u32] {
match self.entries.get_mut(key) {
None => &[],
Some(ref mut entry) => {
entry.borrowed = true;
&entry.data[..]
}
}
}

pub fn put(&mut self, key: &str, data: &[u32]) -> Result<(), ()> {
match self.entries.get_mut(key) {
None => (),
Some(ref mut entry) => {
if entry.borrowed { return Err(()) }
entry.data = Vec::from(data);
return Ok(())
}
}

self.entries.insert(String::from(key), Entry {
data: Vec::from(data),
borrowed: false
});
Ok(())
}
}
1 change: 1 addition & 0 deletions artiq/runtime.rs/src/kernel_proto.rs
Original file line number Diff line number Diff line change
@@ -249,6 +249,7 @@ mod c {

#[repr(u32)]
#[derive(Debug)]
#[allow(dead_code)]
pub enum Type {
LoadRequest,
LoadReply,
1 change: 1 addition & 0 deletions artiq/runtime.rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ mod rtio_crg;
mod mailbox;

mod logger;
mod cache;

mod kernel_proto;
mod session_proto;
2 changes: 2 additions & 0 deletions artiq/runtime.rs/src/sched.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(dead_code)]

extern crate fringe;
extern crate lwip;

66 changes: 49 additions & 17 deletions artiq/runtime.rs/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::prelude::v1::*;
use std::mem;
use std::str;
use std::io::{self, Read};
use {config, rtio_crg, clock, mailbox, kernel};
use logger::BufferLogger;
use cache::Cache;
use sched::{Waiter, TcpListener, TcpStream, SocketAddr, IP_ANY};

use session_proto as host;
@@ -21,6 +23,22 @@ fn io_error(msg: &str) -> io::Error {
io::Error::new(io::ErrorKind::Other, msg)
}

// Persistent state
#[derive(Debug)]
struct Congress {
now: u64,
cache: Cache
}

impl Congress {
fn new() -> Congress {
Congress {
now: 0,
cache: Cache::new()
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum KernelState {
Absent,
@@ -29,23 +47,22 @@ enum KernelState {
RpcWait
}

// Per-connection state
#[derive(Debug)]
pub struct Session {
struct Session {
kernel_state: KernelState,
watchdog_set: clock::WatchdogSet,
now: u64
watchdog_set: clock::WatchdogSet
}

impl Session {
pub fn new() -> Session {
fn new() -> Session {
Session {
kernel_state: KernelState::Absent,
watchdog_set: clock::WatchdogSet::new(),
now: 0
watchdog_set: clock::WatchdogSet::new()
}
}

pub fn running(&self) -> bool {
fn running(&self) -> bool {
match self.kernel_state {
KernelState::Absent | KernelState::Loaded => false,
KernelState::Running | KernelState::RpcWait => true
@@ -106,9 +123,9 @@ fn kern_acknowledge() -> io::Result<()> {
Ok(())
}

fn comm_handle(waiter: Waiter,
fn comm_handle(logger: &BufferLogger,
waiter: Waiter,
stream: &mut TcpStream,
logger: &BufferLogger,
session: &mut Session) -> io::Result<()> {
match try!(host_read(stream)) {
host::Request::Ident =>
@@ -205,7 +222,7 @@ fn comm_handle(waiter: Waiter,
}

fn kern_handle(waiter: Waiter,
stream: &mut TcpStream,
congress: &mut Congress,
session: &mut Session) -> io::Result<()> {
kern::Message::wait_and_receive(waiter, |request| {
match (&request, session.kernel_state) {
@@ -229,10 +246,10 @@ fn kern_handle(waiter: Waiter,
}

kern::NowInitRequest =>
kern_send(waiter, kern::NowInitReply(session.now)),
kern_send(waiter, kern::NowInitReply(congress.now)),

kern::NowSave(now) => {
session.now = now;
congress.now = now;
kern_acknowledge()
}

@@ -247,24 +264,37 @@ fn kern_handle(waiter: Waiter,
kern_acknowledge()
}

kern::CacheGetRequest { key } => {
let value = congress.cache.get(key);
kern_send(waiter, kern::CacheGetReply {
value: unsafe { mem::transmute::<*const [u32], &'static [u32]>(value) }
})
}

kern::CachePutRequest { key, value } => {
let succeeded = congress.cache.put(key, value).is_ok();
kern_send(waiter, kern::CachePutReply { succeeded: succeeded })
}

request => unexpected!("unexpected request {:?} from kernel CPU", request)
}
})
}

fn handle(waiter: Waiter,
fn handle(logger: &BufferLogger,
waiter: Waiter,
stream: &mut TcpStream,
logger: &BufferLogger) -> io::Result<()> {
congress: &mut Congress) -> io::Result<()> {
try!(check_magic(stream));

let mut session = Session::new();
loop {
if stream.readable() {
try!(comm_handle(waiter, stream, logger, &mut session))
try!(comm_handle(logger, waiter, stream, &mut session))
}

if mailbox::receive() != 0 {
try!(kern_handle(waiter, stream, &mut session))
try!(kern_handle(waiter, congress, &mut session))
}

if session.kernel_state == KernelState::Running {
@@ -285,6 +315,8 @@ fn handle(waiter: Waiter,

pub fn handler(waiter: Waiter,
logger: &BufferLogger) {
let mut congress = Congress::new();

let addr = SocketAddr::new(IP_ANY, 1381);
let listener = TcpListener::bind(waiter, addr).unwrap();
info!("accepting network sessions in Rust");
@@ -293,7 +325,7 @@ pub fn handler(waiter: Waiter,
let (mut stream, addr) = listener.accept().unwrap();
info!("new connection from {:?}", addr);

match handle(waiter, &mut stream, logger) {
match handle(logger, waiter, &mut stream, &mut congress) {
Ok(()) => (),
Err(err) => {
if err.kind() == io::ErrorKind::UnexpectedEof {

0 comments on commit 8bced9d

Please sign in to comment.