Skip to content

Commit

Permalink
Implement receiving data from RPCs.
Browse files Browse the repository at this point in the history
  • Loading branch information
whitequark committed Aug 9, 2015
1 parent 02b1543 commit d4270cf
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 75 deletions.
21 changes: 11 additions & 10 deletions artiq/compiler/transforms/llvm_ir_generator.py
Expand Up @@ -162,7 +162,7 @@ def llbuiltin(self, name):
llty = ll.FunctionType(ll.VoidType(), [ll.IntType(32), ll.IntType(8).as_pointer()],
var_arg=True)
elif name == "recv_rpc":
llty = ll.FunctionType(ll.IntType(32), [ll.IntType(8).as_pointer().as_pointer()])
llty = ll.FunctionType(ll.IntType(32), [ll.IntType(8).as_pointer()])
else:
assert False

Expand Down Expand Up @@ -571,7 +571,7 @@ def _prepare_closure_call(self, insn):
llfun = self.llbuilder.extract_value(llclosure, 1)
return llfun, [llenv] + list(llargs)

# See session.c:send_rpc_value and session.c:recv_rpc_value.
# See session.c:{send,receive}_rpc_value and comm_generic.py:_{send,receive}_rpc_value.
def _rpc_tag(self, typ, error_handler):
if types.is_tuple(typ):
assert len(typ.elts) < 256
Expand Down Expand Up @@ -666,29 +666,30 @@ def ret_error_handler(typ):
llalloc = self.llbuilder.append_basic_block(name=llprehead.name + ".rpc.alloc")
lltail = self.llbuilder.append_basic_block(name=llprehead.name + ".rpc.tail")

llslot = self.llbuilder.alloca(ll.IntType(8).as_pointer())
self.llbuilder.store(ll.Constant(ll.IntType(8).as_pointer(), None), llslot)
llretty = self.llty_of_type(fun_type.ret)
llslot = self.llbuilder.alloca(llretty)
llslotgen = self.llbuilder.bitcast(llslot, ll.IntType(8).as_pointer())
self.llbuilder.branch(llhead)

self.llbuilder.position_at_end(llhead)
llphi = self.llbuilder.phi(llslotgen.type)
llphi.add_incoming(llslotgen, llprehead)
if llunwindblock:
llsize = self.llbuilder.invoke(self.llbuiltin("recv_rpc"), [llslot],
llsize = self.llbuilder.invoke(self.llbuiltin("recv_rpc"), [llphi],
llheadu, llunwindblock)
self.llbuilder.position_at_end(llheadu)
else:
llsize = self.llbuilder.call(self.llbuiltin("recv_rpc"), [llslot])
llsize = self.llbuilder.call(self.llbuiltin("recv_rpc"), [llphi])
lldone = self.llbuilder.icmp_unsigned('==', llsize, ll.Constant(llsize.type, 0))
self.llbuilder.cbranch(lldone, lltail, llalloc)

self.llbuilder.position_at_end(llalloc)
llalloca = self.llbuilder.alloca(ll.IntType(8), llsize)
self.llbuilder.store(llalloca, llslot)
llphi.add_incoming(llalloca, llalloc)
self.llbuilder.branch(llhead)

self.llbuilder.position_at_end(lltail)
llretty = self.llty_of_type(fun_type.ret, for_return=True)
llretptr = self.llbuilder.bitcast(llslot, llretty.as_pointer())
llret = self.llbuilder.load(llretptr)
llret = self.llbuilder.load(llslot)
if not builtins.is_allocated(fun_type.ret):
# We didn't allocate anything except the slot for the value itself.
# Don't waste stack space.
Expand Down
114 changes: 100 additions & 14 deletions artiq/coredevice/comm_generic.py
Expand Up @@ -8,7 +8,6 @@


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class _H2DMsgType(Enum):
Expand Down Expand Up @@ -51,6 +50,9 @@ class _D2HMsgType(Enum):
class UnsupportedDevice(Exception):
pass

class RPCReturnValueError(ValueError):
pass


class CommGeneric:
def __init__(self):
Expand Down Expand Up @@ -279,6 +281,7 @@ def run(self):

_rpc_sentinel = object()

# See session.c:{send,receive}_rpc_value and llvm_ir_generator.py:_rpc_tag.
def _receive_rpc_value(self, rpc_map):
tag = chr(self._read_int8())
if tag == "\x00":
Expand Down Expand Up @@ -306,11 +309,15 @@ def _receive_rpc_value(self, rpc_map):
length = self._read_int32()
return [self._receive_rpc_value(rpc_map) for _ in range(length)]
elif tag == "r":
lower = self._receive_rpc_value(rpc_map)
upper = self._receive_rpc_value(rpc_map)
start = self._receive_rpc_value(rpc_map)
stop = self._receive_rpc_value(rpc_map)
step = self._receive_rpc_value(rpc_map)
return range(lower, upper, step)
return range(start, stop, step)
elif tag == "o":
present = self._read_int8()
if present:
return self._receive_rpc_value(rpc_map)
elif tag == "O":
return rpc_map[self._read_int32()]
else:
raise IOError("Unknown RPC value tag: {}".format(repr(tag)))
Expand All @@ -323,16 +330,101 @@ def _receive_rpc_args(self, rpc_map):
return args
args.append(value)

def _skip_rpc_value(self, tags):
tag = tags.pop(0)
if tag == "t":
length = tags.pop(0)
for _ in range(length):
self._skip_rpc_value(tags)
elif tag == "l":
self._skip_rpc_value(tags)
elif tag == "r":
self._skip_rpc_value(tags)
else:
pass

def _send_rpc_value(self, tags, value, root, function):
def check(cond, expected):
if not cond:
raise RPCReturnValueError(
"type mismatch: cannot serialize {value} as {type}"
" ({function} has returned {root})".format(
value=repr(value), type=expected(),
function=function, root=root))

tag = chr(tags.pop(0))
if tag == "t":
length = tags.pop(0)
check(isinstance(value, tuple) and length == len(value),
lambda: "tuple of {}".format(length))
for elt in value:
self._send_rpc_value(tags, elt, root, function)
elif tag == "n":
check(value is None,
lambda: "None")
elif tag == "b":
check(isinstance(value, bool),
lambda: "bool")
self._write_int8(value)
elif tag == "i":
check(isinstance(value, int) and (-2**31 < value < 2**31-1),
lambda: "32-bit int")
self._write_int32(value)
elif tag == "I":
check(isinstance(value, int) and (-2**63 < value < 2**63-1),
lambda: "64-bit int")
self._write_int64(value)
elif tag == "f":
check(isinstance(value, float),
lambda: "float")
self._write_float64(value)
elif tag == "F":
check(isinstance(value, Fraction) and
(-2**63 < value.numerator < 2**63-1) and
(-2**63 < value.denominator < 2**63-1),
lambda: "64-bit Fraction")
self._write_int64(value.numerator)
self._write_int64(value.denominator)
elif tag == "s":
check(isinstance(value, str) and "\x00" not in value,
lambda: "str")
self._write_string(value)
elif tag == "l":
check(isinstance(value, list),
lambda: "list")
self._write_int32(len(value))
for elt in value:
tags_copy = bytearray(tags)
self._send_rpc_value(tags_copy, elt, root, function)
self._skip_rpc_value(tags)
elif tag == "r":
check(isinstance(value, range),
lambda: "range")
tags_copy = bytearray(tags)
self._send_rpc_value(tags_copy, value.start, root, function)
tags_copy = bytearray(tags)
self._send_rpc_value(tags_copy, value.stop, root, function)
tags_copy = bytearray(tags)
self._send_rpc_value(tags_copy, value.step, root, function)
tags = tags_copy
else:
raise IOError("Unknown RPC value tag: {}".format(repr(tag)))

def _serve_rpc(self, rpc_map):
service = self._read_int32()
args = self._receive_rpc_args(rpc_map)
return_tag = self._read_string()
logger.debug("rpc service: %d %r -> %s", service, args, return_tag)
return_tags = self._read_bytes()
logger.debug("rpc service: %d %r -> %s", service, args, return_tags)

try:
result = rpc_map[service](*args)
if not isinstance(result, int) or not (-2**31 < result < 2**31-1):
raise ValueError("An RPC must return an int(width=32)")
logger.debug("rpc service: %d %r == %r", service, args, result)

self._write_header(_H2DMsgType.RPC_REPLY)
self._write_bytes(return_tags)
self._send_rpc_value(bytearray(return_tags), result, result,
rpc_map[service])
self._write_flush()
except core_language.ARTIQException as exn:
logger.debug("rpc service: %d %r ! %r", service, args, exn)

Expand Down Expand Up @@ -364,12 +456,6 @@ def _serve_rpc(self, rpc_map):
self._write_string(function)

self._write_flush()
else:
logger.debug("rpc service: %d %r == %r", service, args, result)

self._write_header(_H2DMsgType.RPC_REPLY)
self._write_int32(result)
self._write_flush()

def _serve_exception(self):
name = self._read_string()
Expand Down
2 changes: 1 addition & 1 deletion soc/runtime/ksupport.c
Expand Up @@ -314,7 +314,7 @@ void send_rpc(int service, const char *tag, ...)
va_end(request.args);
}

int recv_rpc(void **slot) {
int recv_rpc(void *slot) {
struct msg_rpc_recv_request request;
struct msg_rpc_recv_reply *reply;

Expand Down
2 changes: 1 addition & 1 deletion soc/runtime/ksupport.h
Expand Up @@ -6,7 +6,7 @@ void now_save(long long int now);
int watchdog_set(int ms);
void watchdog_clear(int id);
void send_rpc(int service, const char *tag, ...);
int recv_rpc(void **slot);
int recv_rpc(void *slot);
void lognonl(const char *fmt, ...);
void log(const char *fmt, ...);

Expand Down
2 changes: 1 addition & 1 deletion soc/runtime/messages.h
Expand Up @@ -89,7 +89,7 @@ struct msg_rpc_send {

struct msg_rpc_recv_request {
int type;
void **slot;
void *slot;
};

struct msg_rpc_recv_reply {
Expand Down

0 comments on commit d4270cf

Please sign in to comment.