Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: crystal-lang/crystal
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: fcce0c2ec1e6
Choose a base ref
...
head repository: crystal-lang/crystal
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: c26fbc9714c7
Choose a head ref
  • 3 commits
  • 7 files changed
  • 1 contributor

Commits on Aug 30, 2017

  1. Add IO::Syscall

    IO::Syscall is an abstraction over non-blocking IO provided by syscalls. It
    takes care of checking for EAGAIN and managing the lists of waiting fibers.
    RX14 committed Aug 30, 2017
    Copy the full SHA
    f7ca917 View commit details
  2. Copy the full SHA
    135915d View commit details
  3. Seperate Socket and IO::FileDescriptor

    On some platforms - notably windows - socket descriptors are different from file
    descriptors so it makes no sense for them to be shared under a common hierarchy.
    RX14 committed Aug 30, 2017
    Copy the full SHA
    c26fbc9 View commit details
Showing with 343 additions and 184 deletions.
  1. +30 −4 src/concurrent/scheduler.cr
  2. +10 −26 src/event.cr
  3. +5 −1 src/fiber.cr
  4. +13 −144 src/io/file_descriptor.cr
  5. +151 −0 src/io/syscall.cr
  6. +131 −7 src/socket.cr
  7. +3 −2 src/time/span.cr
34 changes: 30 additions & 4 deletions src/concurrent/scheduler.cr
Original file line number Diff line number Diff line change
@@ -36,8 +36,21 @@ class Scheduler
if flags.includes?(LibEvent2::EventFlags::Write)
fd_io.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
fd_io.write_timed_out = true
fd_io.resume_write
fd_io.resume_write(timed_out: true)
end
end
event
end

def self.create_fd_write_event(sock : Socket, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Write
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
event = @@eb.new_event(sock.fd, flags, sock) do |s, flags, data|
sock_ref = data.as(Socket)
if flags.includes?(LibEvent2::EventFlags::Write)
sock_ref.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
sock_ref.resume_write(timed_out: true)
end
end
event
@@ -51,8 +64,21 @@ class Scheduler
if flags.includes?(LibEvent2::EventFlags::Read)
fd_io.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
fd_io.read_timed_out = true
fd_io.resume_read
fd_io.resume_read(timed_out: true)
end
end
event
end

def self.create_fd_read_event(sock : Socket, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Read
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
event = @@eb.new_event(sock.fd, flags, sock) do |s, flags, data|
sock_ref = data.as(Socket)
if flags.includes?(LibEvent2::EventFlags::Read)
sock_ref.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
sock_ref.resume_read(timed_out: true)
end
end
event
36 changes: 10 additions & 26 deletions src/event.cr
Original file line number Diff line number Diff line change
@@ -14,41 +14,25 @@ module Event
@freed = false
end

def add
LibEvent2.event_add(@event, nil)
end

def add(timeout)
def add(timeout : LibC::Timeval? = nil)
if timeout
t = to_timeval(timeout)
LibEvent2.event_add(@event, pointerof(t))
timeout_copy = timeout
LibEvent2.event_add(@event, pointerof(timeout_copy))
else
add
LibEvent2.event_add(@event, nil)
end
end

def add(timeout : Time::Span)
seconds, remainder_ticks = timeout.ticks.divmod(Time::Span::TicksPerSecond)
timeval = LibC::Timeval.new(tv_sec: seconds, tv_usec: remainder_ticks / Time::Span::TicksPerMicrosecond)
add(timeval)
end

def free
LibEvent2.event_free(@event) unless @freed
@freed = true
end

private def to_timeval(time : Int)
t = uninitialized LibC::Timeval
t.tv_sec = typeof(t.tv_sec).new(time)
t.tv_usec = typeof(t.tv_usec).new(0)
t
end

private def to_timeval(time : Float)
t = uninitialized LibC::Timeval

seconds = typeof(t.tv_sec).new(time)
useconds = typeof(t.tv_usec).new((time - seconds) * 1e6)

t.tv_sec = seconds
t.tv_usec = useconds
t
end
end

# :nodoc:
6 changes: 5 additions & 1 deletion src/fiber.cr
Original file line number Diff line number Diff line change
@@ -265,12 +265,16 @@ class Fiber
{% end %}
end

def sleep(time)
def sleep(time : Time::Span)
event = @resume_event ||= Scheduler.create_resume_event(self)
event.add(time)
Scheduler.reschedule
end

def sleep(time : Number)
sleep(time.seconds)
end

def yield
sleep(0)
end
157 changes: 13 additions & 144 deletions src/io/file_descriptor.cr
Original file line number Diff line number Diff line change
@@ -1,64 +1,22 @@
require "./syscall"
require "c/fcntl"

# An `IO` over a file descriptor.
class IO::FileDescriptor
include Buffered
include IO::Buffered
include IO::Syscall

@read_timeout : Float64?
@write_timeout : Float64?
@read_event : Event::Event?
@write_event : Event::Event?

# :nodoc:
property read_timed_out : Bool
property write_timed_out : Bool

def initialize(@fd : Int32, blocking = false, edge_triggerable = false)
@edge_triggerable = !!edge_triggerable
def initialize(@fd : Int32, blocking = false)
@closed = false
@read_timed_out = false
@write_timed_out = false
@fd = fd

unless blocking
self.blocking = false
if @edge_triggerable
@read_event = Scheduler.create_fd_read_event(self, @edge_triggerable)
@write_event = Scheduler.create_fd_write_event(self, @edge_triggerable)
end
end
end

# Set the number of seconds to wait when reading before raising an `IO::Timeout`.
def read_timeout=(read_timeout : Number)
@read_timeout = read_timeout.to_f
end

# ditto
def read_timeout=(read_timeout : Time::Span)
self.read_timeout = read_timeout.total_seconds
end

# Sets no timeout on read operations, so an `IO::Timeout` will never be raised.
def read_timeout=(read_timeout : Nil)
@read_timeout = nil
end

# Set the number of seconds to wait when writing before raising an `IO::Timeout`.
def write_timeout=(write_timeout : Number)
@write_timeout = write_timeout.to_f
end

# ditto
def write_timeout=(write_timeout : Time::Span)
self.write_timeout = write_timeout.total_seconds
end

# Sets no timeout on write operations, so an `IO::Timeout` will never be raised.
def write_timeout=(write_timeout : Nil)
@write_timeout = nil
end

def blocking
fcntl(LibC::F_GETFL) & LibC::O_NONBLOCK == 0
end
@@ -93,20 +51,6 @@ class IO::FileDescriptor
self.class.fcntl @fd, cmd, arg
end

# :nodoc:
def resume_read
if reader = @readers.try &.shift?
reader.resume
end
end

# :nodoc:
def resume_write
if writer = @writers.try &.shift?
writer.resume
end
end

def stat
if LibC.fstat(@fd, out stat) != 0
raise Errno.new("Unable to get stat")
@@ -238,97 +182,29 @@ class IO::FileDescriptor
end

private def unbuffered_read(slice : Bytes)
count = slice.size
loop do
bytes_read = LibC.read(@fd, slice.pointer(count).as(Void*), count)
if bytes_read != -1
return bytes_read
end

if Errno.value == Errno::EAGAIN
wait_readable
else
raise Errno.new "Error reading file"
end
end
ensure
if (readers = @readers) && !readers.empty?
add_read_event
read_syscall_helper(slice, "Error reading file") do
# `to_i32` is acceptable because `Slice#size` is a Int32
LibC.read(@fd, slice, slice.size).to_i32
end
end

private def unbuffered_write(slice : Bytes)
count = slice.size
total = count
loop do
bytes_written = LibC.write(@fd, slice.pointer(count).as(Void*), count)
if bytes_written != -1
count -= bytes_written
return total if count == 0
slice += bytes_written
else
if Errno.value == Errno::EAGAIN
wait_writable
next
elsif Errno.value == Errno::EBADF
write_syscall_helper(slice, "Error writing file") do |slice|
LibC.write(@fd, slice, slice.size).tap do |return_code|
if return_code == -1 && Errno.value == Errno::EBADF
raise IO::Error.new "File not open for writing"
else
raise Errno.new "Error writing file"
end
end
end
ensure
if (writers = @writers) && !writers.empty?
add_write_event
end
end

private def wait_readable
wait_readable { |err| raise err }
end

private def wait_readable
readers = (@readers ||= Deque(Fiber).new)
readers << Fiber.current
add_read_event
Scheduler.reschedule

if @read_timed_out
@read_timed_out = false
yield Timeout.new("Read timed out")
end

nil
end

private def add_read_event
return if @edge_triggerable
private def add_read_event(timeout = @read_timeout)
event = @read_event ||= Scheduler.create_fd_read_event(self)
event.add @read_timeout
nil
end

private def wait_writable(timeout = @write_timeout)
wait_writable(timeout: timeout) { |err| raise err }
end

# msg/timeout are overridden in nonblock_connect
private def wait_writable(msg = "Write timed out", timeout = @write_timeout)
writers = (@writers ||= Deque(Fiber).new)
writers << Fiber.current
add_write_event timeout
Scheduler.reschedule

if @write_timed_out
@write_timed_out = false
yield Timeout.new(msg)
end

event.add timeout
nil
end

private def add_write_event(timeout = @write_timeout)
return if @edge_triggerable
event = @write_event ||= Scheduler.create_fd_write_event(self)
event.add timeout
nil
@@ -358,15 +234,8 @@ class IO::FileDescriptor
@read_event = nil
@write_event.try &.free
@write_event = nil
if readers = @readers
Scheduler.enqueue readers
readers.clear
end

if writers = @writers
Scheduler.enqueue writers
writers.clear
end
reschedule_waiting

raise err if err
end
Loading