Navigation Menu

Skip to content

Commit

Permalink
Add new stream, new fiber, and new continuable examples
Browse files Browse the repository at this point in the history
  • Loading branch information
creationix committed Aug 31, 2012
1 parent 2808f7f commit b52c6fc
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 0 deletions.
115 changes: 115 additions & 0 deletions examples/stream/modules/file.lua
@@ -0,0 +1,115 @@
-- This is a partial port of the built-in fs module as continuable format

local native = require('uv_native')
local iStream = require('stream').iStream

local fs = {}

fs.umask = tonumber("644", 8)

local function noop() end

function fs.open(path, flag, mode)
if type(path) ~= "string" then
error("open(path, flag, [mode]): path must be a string")
end
if type(flag) ~= "string" then
error("open(path, flag, [mode]): flag must be a string")
end
if not mode then mode = fs.umask end
if type(mode) ~= "number" then
error("open(path, flag, [mode]): mode must be a number")
end
return function (callback)
return native.fsOpen(path, flag, mode, callback or noop)
end
end

function fs.read(fd, offset, length)
if type(fd) ~= "number" then
error("read(fd, offset, length): fd must be a number")
end
if type(offset) ~= "number" then
error("read(fd, offset, length): offset must be a number")
end
if type(length) ~= "number" then
error("read(fd, offset, length): length must be a number")
end
return function (callback)
return native.fsRead(fd, offset, length, callback or noop)
end
end

function fs.write(fd, offset, chunk)
if type(fd) ~= "number" then
error("write(fd, offset, chunk): fd must be a number")
end
if type(offset) ~= "number" then
error("write(fd, offset, chunk): offset must be a number")
end
if type(chunk) ~= "string" then
error("write(fd, offset, chunk): chunk must be a string")
end
return function (callback)
return native.fsWrite(fd, offset, chunk, callback or noop)
end
end

function fs.close(fd)
if type(fd) ~= "number" then
error("close(fd): fd must be a number")
end
return function (callback)
return native.fsClose(fd, callback or noop)
end
end

local ReadStream = iStream:extend()
fs.ReadStream = ReadStream

function ReadStream:initialize(fd)
self.fd = fd
self.offset = 0
self.chunkSize = 10
end

function ReadStream:read()
return function (callback)
fs.read(self.fd, self.offset, self.chunkSize)(function (err, chunk)
if err then
return callback(err)
end
local bytesRead = #chunk
if bytesRead == 0 then
return callback()
end
self.offset = self.offset + bytesRead
callback(nil, chunk)
end)
end
end

local WriteStream = iStream:extend()
fs.WriteStream = WriteStream

function WriteStream:initialize(fd)
self.fd = fd
self.offset = 0
end

function WriteStream:write(chunk)
return function (callback)
if not chunk then
return callback()
end
fs.write(self.fd, self.offset, chunk)(function (err, bytesWritten)
if err then
return callback(err)
end
self.offset = self.offset + bytesWritten
callback()
end)
end
end

return fs
25 changes: 25 additions & 0 deletions examples/stream/modules/stream.lua
@@ -0,0 +1,25 @@
-- This is a re-implementation of core.iStream, but using the new stream API and continuables

local Emitter = require('core').Emitter

local core = {}

local iStream = Emitter:extend()
core.iStream = iStream

function iStream:pipe(dest)
return function (callback)
local consume, onRead
consume = function (err)
if err then return callback(err) end
self:read()(onRead)
end
onRead = function (err, chunk)
if err then return callback(err) end
return dest:write(chunk)(chunk and consume or callback)
end
consume()
end
end

return core
66 changes: 66 additions & 0 deletions examples/stream/modules/wait.lua
@@ -0,0 +1,66 @@

local coroutine = require('coroutine')
local debug = require 'debug'
local fiber = {}

function fiber.new(block) return function (callback)
local paused
local co = coroutine.create(block)

local function formatError(err)
local stack = debug.traceback(co, tostring(err))
if type(err) == "table" then
err.message = stack
return err
end
return stack
end

local function check(success, ...)
if not success then
if callback then
return callback(formatError(...))
else
error(formatError(...))
end
end
if not paused then
return callback and callback(nil, ...)
end
paused = false
end

local function wait(fn)
if type(fn) ~= "function" then
error("can only wait on functions")
end
local sync, ret
fn(function (...)
if sync == nil then
sync = true
ret = {...}
return
end
check(coroutine.resume(co, ...))
end)
if sync then
return unpack(ret)
end
sync = false
paused = true
return coroutine.yield()
end

local function await(fn)
local results = {wait(fn)}
if results[1] then
error(results[1])
end
return unpack(results, 2)
end

check(coroutine.resume(co, wait, await))

end end

return fiber
52 changes: 52 additions & 0 deletions examples/stream/test.lua
@@ -0,0 +1,52 @@
local fs = require('file')
local fiber = require('wait')

local input = __filename
local output = __filename .. ".out2"

-- Create a normal continuable using callbacks
local normal = function (callback)
fs.open(input, "r")(function (err, fd)
if err then return callback(err) end
local readable = fs.ReadStream:new(fd)
fs.open(output, "w")(function (err, fd)
if err then return callback(err) end
local writable = fs.WriteStream:new(fd)
local consume, onRead, onDone
consume = function (err)
if err then return callback(err) end
readable:read()(onRead)
end
onRead = function (err, chunk)
if err then return callback(err) end
return writable:write(chunk)(chunk and consume or onDone)
end
onDone = function (err)
if err then return callback(err) end
callback(nil, "done")
end
consume()
end)
end)
end

-- Create another that runs in a coroutine
local fibered = fiber.new(function (wait, await)
local readable = fs.ReadStream:new(await(fs.open(input, "r")))
local writable = fs.WriteStream:new(await(fs.open(output, "w")))
repeat
local chunk = await(readable:read())
await(writable:write(chunk))
until not chunk
return "done2"
end)

-- Run the normal one
normal(function (err, message)
p{name="normal", err=err,message=message}
end)

-- Also run the fibered one in parallel
fibered(function (err, message)
p{name="fibered", err=err,message=message}
end)

0 comments on commit b52c6fc

Please sign in to comment.