Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: crystal-lang/crystal
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: fcce0c2ec1e6
Choose a base ref
...
head repository: crystal-lang/crystal
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: c26fbc9714c7
Choose a head ref
  • 3 commits
  • 7 files changed
  • 1 contributor

Commits on Aug 30, 2017

  1. Add IO::Syscall

    IO::Syscall is an abstraction over non-blocking IO provided by syscalls. It
    takes care of checking for EAGAIN and managing the lists of waiting fibers.
    RX14 committed Aug 30, 2017
    Copy the full SHA
    f7ca917 View commit details
  2. Copy the full SHA
    135915d View commit details
  3. Seperate Socket and IO::FileDescriptor

    On some platforms - notably windows - socket descriptors are different from file
    descriptors so it makes no sense for them to be shared under a common hierarchy.
    RX14 committed Aug 30, 2017
    Copy the full SHA
    c26fbc9 View commit details
Showing with 343 additions and 184 deletions.
  1. +30 −4 src/concurrent/scheduler.cr
  2. +10 −26 src/event.cr
  3. +5 −1 src/fiber.cr
  4. +13 −144 src/io/file_descriptor.cr
  5. +151 −0 src/io/syscall.cr
  6. +131 −7 src/socket.cr
  7. +3 −2 src/time/span.cr
34 changes: 30 additions & 4 deletions src/concurrent/scheduler.cr
Original file line number Diff line number Diff line change
@@ -36,8 +36,21 @@ class Scheduler
if flags.includes?(LibEvent2::EventFlags::Write)
fd_io.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
fd_io.write_timed_out = true
fd_io.resume_write
fd_io.resume_write(timed_out: true)
end
end
event
end

def self.create_fd_write_event(sock : Socket, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Write
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
event = @@eb.new_event(sock.fd, flags, sock) do |s, flags, data|
sock_ref = data.as(Socket)
if flags.includes?(LibEvent2::EventFlags::Write)
sock_ref.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
sock_ref.resume_write(timed_out: true)
end
end
event
@@ -51,8 +64,21 @@ class Scheduler
if flags.includes?(LibEvent2::EventFlags::Read)
fd_io.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
fd_io.read_timed_out = true
fd_io.resume_read
fd_io.resume_read(timed_out: true)
end
end
event
end

def self.create_fd_read_event(sock : Socket, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Read
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
event = @@eb.new_event(sock.fd, flags, sock) do |s, flags, data|
sock_ref = data.as(Socket)
if flags.includes?(LibEvent2::EventFlags::Read)
sock_ref.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
sock_ref.resume_read(timed_out: true)
end
end
event
36 changes: 10 additions & 26 deletions src/event.cr
Original file line number Diff line number Diff line change
@@ -14,41 +14,25 @@ module Event
@freed = false
end

def add
LibEvent2.event_add(@event, nil)
end

def add(timeout)
def add(timeout : LibC::Timeval? = nil)
if timeout
t = to_timeval(timeout)
LibEvent2.event_add(@event, pointerof(t))
timeout_copy = timeout
LibEvent2.event_add(@event, pointerof(timeout_copy))
else
add
LibEvent2.event_add(@event, nil)
end
end

def add(timeout : Time::Span)
seconds, remainder_ticks = timeout.ticks.divmod(Time::Span::TicksPerSecond)
timeval = LibC::Timeval.new(tv_sec: seconds, tv_usec: remainder_ticks / Time::Span::TicksPerMicrosecond)
add(timeval)
end

def free
LibEvent2.event_free(@event) unless @freed
@freed = true
end

private def to_timeval(time : Int)
t = uninitialized LibC::Timeval
t.tv_sec = typeof(t.tv_sec).new(time)
t.tv_usec = typeof(t.tv_usec).new(0)
t
end

private def to_timeval(time : Float)
t = uninitialized LibC::Timeval

seconds = typeof(t.tv_sec).new(time)
useconds = typeof(t.tv_usec).new((time - seconds) * 1e6)

t.tv_sec = seconds
t.tv_usec = useconds
t
end
end

# :nodoc:
6 changes: 5 additions & 1 deletion src/fiber.cr
Original file line number Diff line number Diff line change
@@ -265,12 +265,16 @@ class Fiber
{% end %}
end

def sleep(time)
def sleep(time : Time::Span)
event = @resume_event ||= Scheduler.create_resume_event(self)
event.add(time)
Scheduler.reschedule
end

def sleep(time : Number)
sleep(time.seconds)
end

def yield
sleep(0)
end
157 changes: 13 additions & 144 deletions src/io/file_descriptor.cr
Original file line number Diff line number Diff line change
@@ -1,64 +1,22 @@
require "./syscall"
require "c/fcntl"

# An `IO` over a file descriptor.
class IO::FileDescriptor
include Buffered
include IO::Buffered
include IO::Syscall

@read_timeout : Float64?
@write_timeout : Float64?
@read_event : Event::Event?
@write_event : Event::Event?

# :nodoc:
property read_timed_out : Bool
property write_timed_out : Bool

def initialize(@fd : Int32, blocking = false, edge_triggerable = false)
@edge_triggerable = !!edge_triggerable
def initialize(@fd : Int32, blocking = false)
@closed = false
@read_timed_out = false
@write_timed_out = false
@fd = fd

unless blocking
self.blocking = false
if @edge_triggerable
@read_event = Scheduler.create_fd_read_event(self, @edge_triggerable)
@write_event = Scheduler.create_fd_write_event(self, @edge_triggerable)
end
end
end

# Set the number of seconds to wait when reading before raising an `IO::Timeout`.
def read_timeout=(read_timeout : Number)
@read_timeout = read_timeout.to_f
end

# ditto
def read_timeout=(read_timeout : Time::Span)
self.read_timeout = read_timeout.total_seconds
end

# Sets no timeout on read operations, so an `IO::Timeout` will never be raised.
def read_timeout=(read_timeout : Nil)
@read_timeout = nil
end

# Set the number of seconds to wait when writing before raising an `IO::Timeout`.
def write_timeout=(write_timeout : Number)
@write_timeout = write_timeout.to_f
end

# ditto
def write_timeout=(write_timeout : Time::Span)
self.write_timeout = write_timeout.total_seconds
end

# Sets no timeout on write operations, so an `IO::Timeout` will never be raised.
def write_timeout=(write_timeout : Nil)
@write_timeout = nil
end

def blocking
fcntl(LibC::F_GETFL) & LibC::O_NONBLOCK == 0
end
@@ -93,20 +51,6 @@ class IO::FileDescriptor
self.class.fcntl @fd, cmd, arg
end

# :nodoc:
def resume_read
if reader = @readers.try &.shift?
reader.resume
end
end

# :nodoc:
def resume_write
if writer = @writers.try &.shift?
writer.resume
end
end

def stat
if LibC.fstat(@fd, out stat) != 0
raise Errno.new("Unable to get stat")
@@ -238,97 +182,29 @@ class IO::FileDescriptor
end

private def unbuffered_read(slice : Bytes)
count = slice.size
loop do
bytes_read = LibC.read(@fd, slice.pointer(count).as(Void*), count)
if bytes_read != -1
return bytes_read
end

if Errno.value == Errno::EAGAIN
wait_readable
else
raise Errno.new "Error reading file"
end
end
ensure
if (readers = @readers) && !readers.empty?
add_read_event
read_syscall_helper(slice, "Error reading file") do
# `to_i32` is acceptable because `Slice#size` is a Int32
LibC.read(@fd, slice, slice.size).to_i32
end
end

private def unbuffered_write(slice : Bytes)
count = slice.size
total = count
loop do
bytes_written = LibC.write(@fd, slice.pointer(count).as(Void*), count)
if bytes_written != -1
count -= bytes_written
return total if count == 0
slice += bytes_written
else
if Errno.value == Errno::EAGAIN
wait_writable
next
elsif Errno.value == Errno::EBADF
write_syscall_helper(slice, "Error writing file") do |slice|
LibC.write(@fd, slice, slice.size).tap do |return_code|
if return_code == -1 && Errno.value == Errno::EBADF
raise IO::Error.new "File not open for writing"
else
raise Errno.new "Error writing file"
end
end
end
ensure
if (writers = @writers) && !writers.empty?
add_write_event
end
end

private def wait_readable
wait_readable { |err| raise err }
end

private def wait_readable
readers = (@readers ||= Deque(Fiber).new)
readers << Fiber.current
add_read_event
Scheduler.reschedule

if @read_timed_out
@read_timed_out = false
yield Timeout.new("Read timed out")
end

nil
end

private def add_read_event
return if @edge_triggerable
private def add_read_event(timeout = @read_timeout)
event = @read_event ||= Scheduler.create_fd_read_event(self)
event.add @read_timeout
nil
end

private def wait_writable(timeout = @write_timeout)
wait_writable(timeout: timeout) { |err| raise err }
end

# msg/timeout are overridden in nonblock_connect
private def wait_writable(msg = "Write timed out", timeout = @write_timeout)
writers = (@writers ||= Deque(Fiber).new)
writers << Fiber.current
add_write_event timeout
Scheduler.reschedule

if @write_timed_out
@write_timed_out = false
yield Timeout.new(msg)
end

event.add timeout
nil
end

private def add_write_event(timeout = @write_timeout)
return if @edge_triggerable
event = @write_event ||= Scheduler.create_fd_write_event(self)
event.add timeout
nil
@@ -358,15 +234,8 @@ class IO::FileDescriptor
@read_event = nil
@write_event.try &.free
@write_event = nil
if readers = @readers
Scheduler.enqueue readers
readers.clear
end

if writers = @writers
Scheduler.enqueue writers
writers.clear
end
reschedule_waiting

raise err if err
end
151 changes: 151 additions & 0 deletions src/io/syscall.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
module IO::Syscall
@read_timed_out = false
@write_timed_out = false

@read_timeout : Time::Span?
@write_timeout : Time::Span?

@readers : Deque(Fiber)?
@writers : Deque(Fiber)?

# Returns the time to wait when reading before raising an `IO::Timeout`.
def read_timeout : Time::Span?
@read_timeout
end

# Sets the time to wait when reading before raising an `IO::Timeout`.
def read_timeout=(timeout : Time::Span?) : ::Time::Span?
@read_timeout = timeout
end

# Set the number of seconds to wait when reading before raising an `IO::Timeout`.
def read_timeout=(read_timeout : Number) : Number
self.read_timeout = read_timeout.seconds
read_timeout
end

# Returns the time to wait when writing before raising an `IO::Timeout`.
def write_timeout : Time::Span?
@write_timeout
end

# Sets the time to wait when writing before raising an `IO::Timeout`.
def write_timeout=(timeout : Time::Span?) : ::Time::Span?
@write_timeout = timeout
end

# Set the number of seconds to wait when writing before raising an `IO::Timeout`.
def write_timeout=(write_timeout : Number) : Number
self.write_timeout = write_timeout.seconds
write_timeout
end

def read_syscall_helper(slice : Bytes, errno_msg : String) : Int32
loop do
bytes_read = yield
if bytes_read != -1
return bytes_read
end

if Errno.value == Errno::EAGAIN
wait_readable
else
raise Errno.new(errno_msg)
end
end
ensure
if (readers = @readers) && !readers.empty?
add_read_event
end
end

def write_syscall_helper(slice : Bytes, errno_msg : String) : Nil
loop do
bytes_written = yield slice
if bytes_written != -1
slice += bytes_written
return if slice.size == 0
else
if Errno.value == Errno::EAGAIN
wait_writable
else
raise Errno.new(errno_msg)
end
end
end
ensure
if (writers = @writers) && !writers.empty?
add_write_event
end
end

# :nodoc:
def resume_read(timed_out = false)
@read_timed_out = timed_out

if reader = @readers.try &.shift?
reader.resume
end
end

# :nodoc:
def resume_write(timed_out = false)
@write_timed_out = timed_out

if writer = @writers.try &.shift?
writer.resume
end
end

private def wait_readable(timeout = @read_timeout)
wait_readable(timeout: timeout) { |err| raise err }
end

private def wait_readable(timeout = @read_timeout)
readers = (@readers ||= Deque(Fiber).new)
readers << Fiber.current
add_read_event(timeout)
Scheduler.reschedule

if @read_timed_out
@read_timed_out = false
yield Timeout.new("Read timed out")
end

nil
end

private abstract def add_read_event(timeout = @read_timeout)

private def wait_writable(timeout = @write_timeout)
wait_writable(timeout: timeout) { |err| raise err }
end

private def wait_writable(timeout = @write_timeout)
writers = (@writers ||= Deque(Fiber).new)
writers << Fiber.current
add_write_event(timeout)
Scheduler.reschedule

if @write_timed_out
@write_timed_out = false
yield Timeout.new("Write timed out")
end

nil
end

private abstract def add_write_event(timeout = @write_timeout)

private def reschedule_waiting
if readers = @readers
Scheduler.enqueue readers
readers.clear
end

if writers = @writers
Scheduler.enqueue writers
writers.clear
end
end
end
138 changes: 131 additions & 7 deletions src/socket.cr
Original file line number Diff line number Diff line change
@@ -5,7 +5,10 @@ require "c/netinet/tcp"
require "c/sys/socket"
require "c/sys/un"

class Socket < IO::FileDescriptor
class Socket
include IO::Buffered
include IO::Syscall

class Error < Exception
end

@@ -34,6 +37,13 @@ class Socket < IO::FileDescriptor
# :nodoc:
SOMAXCONN = 128

getter fd : Int32

@read_event : Event::Event?
@write_event : Event::Event?

@closed = false

getter family : Family
getter type : Type
getter protocol : Protocol
@@ -60,14 +70,21 @@ class Socket < IO::FileDescriptor
fd = LibC.socket(family, type, protocol)
raise Errno.new("failed to create socket:") if fd == -1
init_close_on_exec(fd)
super(fd, blocking)
@fd = fd

self.sync = true
unless blocking
self.blocking = false
end
end

protected def initialize(fd : Int32, @family, @type, @protocol = Protocol::IP)
init_close_on_exec(fd)
super fd, blocking: false
protected def initialize(@fd : Int32, @family, @type, @protocol = Protocol::IP, blocking = false)
init_close_on_exec(@fd)

self.sync = true
unless blocking
self.blocking = false
end
end

# Force opened sockets to be closed on `exec(2)`. Only for platforms that don't
@@ -103,6 +120,7 @@ class Socket < IO::FileDescriptor
# Tries to connect to a remote address. Yields an `IO::Timeout` or an
# `Errno` error if the connection failed.
def connect(addr, timeout = nil)
timeout = timeout.seconds unless timeout.is_a? Time::Span | Nil
loop do
if LibC.connect(fd, addr, addr.size) == 0
return
@@ -111,8 +129,8 @@ class Socket < IO::FileDescriptor
when Errno::EISCONN
return
when Errno::EINPROGRESS, Errno::EALREADY
wait_writable(msg: "connect timed out", timeout: timeout) do |error|
return yield error
wait_writable(timeout: timeout) do |error|
return yield IO::Timeout.new("connect timed out")
end
else
return yield Errno.new("connect")
@@ -456,6 +474,112 @@ class Socket < IO::FileDescriptor
ptr = pointerof(addr).as(Void*)
LibC.inet_pton(LibC::AF_INET, string, ptr) > 0 || LibC.inet_pton(LibC::AF_INET6, string, ptr) > 0
end

def blocking
fcntl(LibC::F_GETFL) & LibC::O_NONBLOCK == 0
end

def blocking=(value)
flags = fcntl(LibC::F_GETFL)
if value
flags &= ~LibC::O_NONBLOCK
else
flags |= LibC::O_NONBLOCK
end
fcntl(LibC::F_SETFL, flags)
end

def close_on_exec?
flags = fcntl(LibC::F_GETFD)
(flags & LibC::FD_CLOEXEC) == LibC::FD_CLOEXEC
end

def close_on_exec=(arg : Bool)
fcntl(LibC::F_SETFD, arg ? LibC::FD_CLOEXEC : 0)
arg
end

def self.fcntl(fd, cmd, arg = 0)
r = LibC.fcntl fd, cmd, arg
raise Errno.new("fcntl() failed") if r == -1
r
end

def fcntl(cmd, arg = 0)
self.class.fcntl @fd, cmd, arg
end

def finalize
return if closed?

close rescue nil
end

def closed?
@closed
end

def tty?
LibC.isatty(fd) == 1
end

private def unbuffered_read(slice : Bytes)
read_syscall_helper(slice, "Error reading socket") do
# `to_i32` is acceptable because `Slice#size` is a Int32
LibC.recv(@fd, slice, slice.size, 0).to_i32
end
end

private def unbuffered_write(slice : Bytes)
write_syscall_helper(slice, "Error writing to socket") do |slice|
LibC.send(@fd, slice, slice.size, 0)
end
end

private def add_read_event(timeout = @read_timeout)
event = @read_event ||= Scheduler.create_fd_read_event(self)
event.add timeout
nil
end

private def add_write_event(timeout = @write_timeout)
event = @write_event ||= Scheduler.create_fd_write_event(self)
event.add timeout
nil
end

private def unbuffered_rewind
raise IO::Error.new("Can't rewind")
end

private def unbuffered_close
return if @closed

err = nil
if LibC.close(@fd) != 0
case Errno.value
when Errno::EINTR, Errno::EINPROGRESS
# ignore
else
err = Errno.new("Error closing socket")
end
end

@closed = true

@read_event.try &.free
@read_event = nil
@write_event.try &.free
@write_event = nil

reschedule_waiting

raise err if err
end

private def unbuffered_flush
# Nothing
end
end

require "./socket/*"
5 changes: 3 additions & 2 deletions src/time/span.cr
Original file line number Diff line number Diff line change
@@ -42,7 +42,8 @@ struct Time::Span

include Comparable(self)

TicksPerMillisecond = 10_000_i64
TicksPerMicrosecond = 10_i64
TicksPerMillisecond = TicksPerMicrosecond * 1000
TicksPerSecond = TicksPerMillisecond * 1000
TicksPerMinute = TicksPerSecond * 60
TicksPerHour = TicksPerMinute * 60
@@ -52,7 +53,7 @@ struct Time::Span
MinValue = new Int64::MIN
Zero = new 0

# 1 tick is a tenth of a millisecond
# 1 tick is a tenth of a microsecond
@ticks : Int64

getter ticks