Skip to content

Commit

Permalink
Merge pull request #237 from racker/add-callback-state-saver
Browse files Browse the repository at this point in the history
Add callback state saver
  • Loading branch information
philips committed Jun 7, 2012
2 parents 52b7010 + 512ecb6 commit 8b9f4f0
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 41 deletions.
31 changes: 7 additions & 24 deletions src/luv_fs.c
Expand Up @@ -177,11 +177,8 @@ void luv_after_fs(uv_fs_t* req) {
lua_State *L = ref->L;
int argc;

lua_rawgeti(L, LUA_REGISTRYINDEX, ref->rstr);
luaL_unref(L, LUA_REGISTRYINDEX, ref->rstr);

lua_rawgeti(L, LUA_REGISTRYINDEX, ref->rcb);
luaL_unref(L, LUA_REGISTRYINDEX, ref->rcb);
luv_io_ctx_callback_rawgeti(L, &ref->cbs);
luv_io_ctx_unref(L, &ref->cbs);

argc = luv_process_fs_result(L, req);

Expand All @@ -195,29 +192,14 @@ static luv_fs_ref_t* luv_fs_ref_alloc(lua_State* L) {
luv_fs_ref_t* ref = malloc(sizeof(luv_fs_ref_t));
ref->L = L;
ref->fs_req.data = ref;
ref->rcb = LUA_NOREF;
ref->rstr = LUA_NOREF;
return ref;
}

static void luv_fs_ref_callback(luv_fs_ref_t* ref, int index) {
if (lua_isfunction(ref->L, index)) {
lua_pushvalue(ref->L, index); /* Store the callback */
ref->rcb = luaL_ref(ref->L, LUA_REGISTRYINDEX);
}
}

static void luv_fs_ref_string(luv_fs_ref_t* ref, int index) {
if (lua_isstring(ref->L, index)) {
lua_pushvalue(ref->L, index);
ref->rstr = luaL_ref(ref->L, LUA_REGISTRYINDEX);
}
}

/* Utility for storing the callback in the fs_req token */
uv_fs_t* luv_fs_store_callback(lua_State* L, int index) {
luv_fs_ref_t* ref = luv_fs_ref_alloc(L);
luv_fs_ref_callback(ref, index);
luv_io_ctx_init(&ref->cbs);
luv_io_ctx_callback_add(L, &ref->cbs, index);
return &ref->fs_req;
}

Expand Down Expand Up @@ -281,8 +263,9 @@ int luv_fs_write(lua_State* L) {
uv_fs_t* req;
void* chunk = (void*)luaL_checklstring(L, 3, &length);
luv_fs_ref_t* ref = luv_fs_ref_alloc(L);
luv_fs_ref_string(ref, 3);
luv_fs_ref_callback(ref, 4);
luv_io_ctx_init(&ref->cbs);
luv_io_ctx_add(L, &ref->cbs, 3);
luv_io_ctx_callback_add(L, &ref->cbs, 4);
req = &ref->fs_req;
FS_CALL(write, 4, NULL, file, chunk, length, offset);
}
Expand Down
3 changes: 1 addition & 2 deletions src/luv_fs.h
Expand Up @@ -62,9 +62,8 @@ int luv_fs_fchown(lua_State* L);

typedef struct {
lua_State* L;
int rcb; /* callback ref */
int rstr; /* string ref */
uv_fs_t fs_req;
luv_io_ctx_t cbs;
void* buf;
} luv_fs_ref_t;

Expand Down
38 changes: 24 additions & 14 deletions src/luv_stream.c
Expand Up @@ -71,13 +71,15 @@ void luv_after_connect(uv_connect_t* req, int status) {


void luv_after_shutdown(uv_shutdown_t* req, int status) {
luv_io_ctx_t *cbs = req->data;

/* load the lua state and the userdata */
lua_State *L = luv_handle_get_lua(req->handle->data);
lua_pop(L, 1); /* We don't need the userdata */
/* load the request callback */
lua_rawgeti(L, LUA_REGISTRYINDEX, (uintptr_t)req->data);
luaL_unref(L, LUA_REGISTRYINDEX, (uintptr_t)req->data);

/* load the request callback */
luv_io_ctx_callback_rawgeti(L, cbs);
luv_io_ctx_unref(L, cbs);

if (lua_isfunction(L, -1)) {
if (status == -1) {
Expand All @@ -91,17 +93,20 @@ void luv_after_shutdown(uv_shutdown_t* req, int status) {
}

luv_handle_unref(L, req->handle->data);
free(req->data);
free(req);
}

void luv_after_write(uv_write_t* req, int status) {
luv_io_ctx_t *cbs = req->data;

/* load the lua state and the userdata */
lua_State *L = luv_handle_get_lua(req->handle->data);
lua_pop(L, 1); /* We don't need the userdata */
/* load the callback */
lua_rawgeti(L, LUA_REGISTRYINDEX, (uintptr_t)req->data);
luaL_unref(L, LUA_REGISTRYINDEX, (uintptr_t)req->data);

/* load the request callback */
luv_io_ctx_callback_rawgeti(L, cbs);
luv_io_ctx_unref(L, cbs);

if (lua_isfunction(L, -1)) {
if (status == -1) {
Expand All @@ -115,17 +120,21 @@ void luv_after_write(uv_write_t* req, int status) {
}

luv_handle_unref(L, req->handle->data);
free(req->data);
free(req);
}

int luv_shutdown(lua_State* L) {
luv_io_ctx_t *cbs;
uv_stream_t* handle = (uv_stream_t*)luv_checkudata(L, 1, "stream");

uv_shutdown_t* req = (uv_shutdown_t*)malloc(sizeof(uv_shutdown_t));
cbs = malloc(sizeof(luv_io_ctx_t));
luv_io_ctx_init(cbs);

/* Store a reference to the callback */
lua_pushvalue(L, 2);
req->data = (void*)(uintptr_t)luaL_ref(L, LUA_REGISTRYINDEX);
luv_io_ctx_callback_add(L, cbs, 2);
req->data = (void*)cbs;

luv_handle_ref(L, handle->data, 1);

Expand Down Expand Up @@ -194,19 +203,20 @@ int luv_write(lua_State* L) {
uv_buf_t buf;
uv_stream_t* handle = (uv_stream_t*)luv_checkudata(L, 1, "stream");
size_t len;
luv_io_ctx_t *cbs;
const char* chunk = luaL_checklstring(L, 2, &len);

uv_write_t* req = (uv_write_t*)malloc(sizeof(uv_write_t));
cbs = malloc(sizeof(luv_io_ctx_t));
luv_io_ctx_init(cbs);

/* Store a reference to the callback */
lua_pushvalue(L, 3);
req->data = (void*)(long)luaL_ref(L, LUA_REGISTRYINDEX);
luv_io_ctx_add(L, cbs, 2);
luv_io_ctx_callback_add(L, cbs, 3);

luv_handle_ref(L, handle->data, 1);
req->data = (void*)cbs;

/* Store the chunk
* TODO: this is probably unsafe, should investigate
*/
luv_handle_ref(L, handle->data, 1);
buf = uv_buf_init((char*)chunk, len);

uv_write(req, handle, &buf, 1, luv_after_write);
Expand Down
52 changes: 51 additions & 1 deletion src/utils.c
Expand Up @@ -95,6 +95,57 @@ uv_handle_t* luv_checkudata(lua_State* L, int index, const char* type) {
return ((luv_handle_t*)lua_touserdata(L, index))->handle;
}

void luv_io_ctx_init(luv_io_ctx_t *cbs)
{
cbs->rcb = LUA_NOREF;
cbs->rdata = LUA_NOREF;
}

void luv_io_ctx_add(lua_State* L, luv_io_ctx_t *cbs, int index)
{
int n;

/* Create the data table if it doesn't exist */
if (cbs->rdata == LUA_NOREF) {
lua_newtable(L);
cbs->rdata = luaL_ref(L, LUA_REGISTRYINDEX);
}

/* grab state table */
lua_rawgeti(L, LUA_REGISTRYINDEX, cbs->rdata);

/* find next slot in state table */
n = luaL_getn(L, -1);
lua_pushinteger(L, n + 1);

/* push the state variable to be reffed */
lua_pushvalue(L, index);

/* ref into table and cleanup */
lua_settable(L, -3);
lua_pop(L, 1);
}

void luv_io_ctx_callback_add(lua_State *L, luv_io_ctx_t *cbs, int index)
{
if (lua_isfunction(L, index)) {
lua_pushvalue(L, index); /* Store the callback */
cbs->rcb = luaL_ref(L, LUA_REGISTRYINDEX);
}
}

void luv_io_ctx_callback_rawgeti(lua_State *L, luv_io_ctx_t *cbs)
{
lua_rawgeti(L, LUA_REGISTRYINDEX, cbs->rcb);
}

void luv_io_ctx_unref(lua_State* L, luv_io_ctx_t *cbs)
{
luaL_unref(L, LUA_REGISTRYINDEX, cbs->rdata);
luaL_unref(L, LUA_REGISTRYINDEX, cbs->rcb);
luv_io_ctx_init(cbs);
}

const char* luv_handle_type_to_string(uv_handle_type type) {
switch (type) {
case UV_TCP: return "TCP";
Expand Down Expand Up @@ -129,7 +180,6 @@ uv_loop_t* luv_get_loop(lua_State *L) {
}



/* Initialize a new lhandle and push the new userdata on the stack. */
luv_handle_t* luv_handle_create(lua_State* L, size_t size, const char* type) {

Expand Down
14 changes: 14 additions & 0 deletions src/utils.h
Expand Up @@ -68,6 +68,20 @@ typedef struct {
*/
luv_handle_t* luv_handle_create(lua_State* L, size_t size, const char* type);

/* callback refs ensure that the callback function and any required data
* survive until they are needed and don't get gc'd. Usage in stream & fs
*/
typedef struct {
int rcb; /* callback ref */
int rdata; /* string ref */
} luv_io_ctx_t;

void luv_io_ctx_init(luv_io_ctx_t *cbs);
void luv_io_ctx_add(lua_State* L, luv_io_ctx_t *cbs, int index);
void luv_io_ctx_callback_add(lua_State* L, luv_io_ctx_t *cbs, int index);
void luv_io_ctx_callback_rawgeti(lua_State *L, luv_io_ctx_t *cbs);
void luv_io_ctx_unref(lua_State* L, luv_io_ctx_t *cbs);

/* Convenience wrappers */
uv_udp_t* luv_create_udp(lua_State* L);
uv_fs_event_t* luv_create_fs_watcher(lua_State* L);
Expand Down

0 comments on commit 8b9f4f0

Please sign in to comment.