Skip to content

Commit 7d64756

Browse files
lipanskiRX14
authored andcommittedApr 6, 2018
Re-raise exceptions in parallel macro (#5726)
1 parent d536c9c commit 7d64756

File tree

2 files changed

+109
-26
lines changed

2 files changed

+109
-26
lines changed
 

Diff for: ‎spec/std/concurrent_spec.cr

+50-23
Original file line numberDiff line numberDiff line change
@@ -8,38 +8,65 @@ private def method_named(expected_named)
88
Fiber.current.name.should eq(expected_named)
99
end
1010

11+
class SomeParallelJobException < Exception
12+
end
13+
14+
private def raising_job : String
15+
raise SomeParallelJobException.new("boom")
16+
"result"
17+
end
18+
1119
describe "concurrent" do
12-
it "does four things concurrently" do
13-
a, b, c, d = parallel(1 + 2, "hello".size, [1, 2, 3, 4].size, nil)
14-
a.should eq(3)
15-
b.should eq(5)
16-
c.should eq(4)
17-
d.should be_nil
18-
end
20+
describe "parallel" do
21+
it "does four things concurrently" do
22+
a, b, c, d = parallel(1 + 2, "hello".size, [1, 2, 3, 4].size, nil)
23+
a.should eq(3)
24+
b.should eq(5)
25+
c.should eq(4)
26+
d.should be_nil
27+
end
1928

20-
it "uses spawn macro" do
21-
chan = Channel(Int32).new
29+
it "re-raises errors from Fibers as ConcurrentExecutionException" do
30+
exception = expect_raises(ConcurrentExecutionException) do
31+
a, b = parallel(raising_job, raising_job)
32+
end
2233

23-
spawn method_with_named_args(chan)
24-
chan.receive.should eq(3)
34+
exception.cause.should be_a(SomeParallelJobException)
35+
end
2536

26-
spawn method_with_named_args(chan, y: 20)
27-
chan.receive.should eq(21)
37+
it "is strict about the return value type" do
38+
a, b = parallel(1 + 2, "hello")
2839

29-
spawn method_with_named_args(chan, x: 10, y: 20)
30-
chan.receive.should eq(30)
40+
typeof(a).should eq(Int32)
41+
typeof(b).should eq(String)
42+
end
3143
end
3244

33-
it "spawns named" do
34-
spawn(name: "sub") do
35-
Fiber.current.name.should eq("sub")
45+
describe "spawn" do
46+
it "uses spawn macro" do
47+
chan = Channel(Int32).new
48+
49+
spawn method_with_named_args(chan)
50+
chan.receive.should eq(3)
51+
52+
spawn method_with_named_args(chan, y: 20)
53+
chan.receive.should eq(21)
54+
55+
spawn method_with_named_args(chan, x: 10, y: 20)
56+
chan.receive.should eq(30)
3657
end
37-
Fiber.yield
38-
end
3958

40-
it "spawns named with macro" do
41-
spawn method_named("foo"), name: "foo"
42-
Fiber.yield
59+
it "spawns named" do
60+
spawn(name: "sub") do
61+
Fiber.current.name.should eq("sub")
62+
end
63+
Fiber.yield
64+
end
65+
66+
it "spawns named with macro" do
67+
spawn method_named("foo"), name: "foo"
68+
Fiber.yield
69+
end
4370
end
4471

4572
it "accepts method call with receiver" do

Diff for: ‎src/concurrent.cr

+59-3
Original file line numberDiff line numberDiff line change
@@ -128,21 +128,77 @@ macro spawn(call, *, name = nil)
128128
{% end %}
129129
end
130130

131+
# Wraps around exceptions re-raised from concurrent calls.
132+
# The original exception can be accessed via `#cause`.
133+
class ConcurrentExecutionException < Exception
134+
end
135+
136+
# Runs the commands passed as arguments concurrently (in Fibers) and waits
137+
# for them to finish.
138+
#
139+
# ```
140+
# def say(word)
141+
# puts word
142+
# end
143+
#
144+
# # Will print out the three words concurrently
145+
# parallel(
146+
# say("concurrency"),
147+
# say("is"),
148+
# say("easy")
149+
# )
150+
# ```
151+
#
152+
# Can also be used to conveniently collect the return values of the
153+
# concurrent operations.
154+
#
155+
# ```
156+
# def concurrent_job(word)
157+
# word
158+
# end
159+
#
160+
# a, b, c =
161+
# parallel(
162+
# concurrent_job("concurrency"),
163+
# concurrent_job("is"),
164+
# concurrent_job("easy")
165+
# )
166+
#
167+
# puts a # => "concurrency"
168+
# puts b # => "is"
169+
# puts c # => "easy"
170+
# ```
171+
#
172+
# Due to the concurrent nature of this macro, it is highly recommended
173+
# to handle any exceptions within the concurrent calls. Unhandled
174+
# exceptions raised within the concurrent operations will be re-raised
175+
# inside the parent fiber as `ConcurrentExecutionException`, with the
176+
# `cause` attribute set to the original exception.
131177
macro parallel(*jobs)
132-
%channel = Channel(Nil).new
178+
%channel = Channel(Exception | Nil).new
133179

134180
{% for job, i in jobs %}
135181
%ret{i} = uninitialized typeof({{job}})
136182
spawn do
137183
begin
138184
%ret{i} = {{job}}
139-
ensure
185+
rescue e : Exception
186+
%channel.send e
187+
else
140188
%channel.send nil
141189
end
142190
end
143191
{% end %}
144192

145-
{{ jobs.size }}.times { %channel.receive }
193+
{{ jobs.size }}.times do
194+
%value = %channel.receive
195+
if %value.is_a?(Exception)
196+
raise ConcurrentExecutionException.new(
197+
"An unhandled error occured inside a `parallel` call",
198+
cause: %value
199+
)
200+
end
201+
end
146202

147203
{
148204
{% for job, i in jobs %}

0 commit comments

Comments
 (0)
Please sign in to comment.