Skip to content

Commit 8fba0a6

Browse files
committedJul 10, 2018
introduce std.event.Group for making parallel async calls
1 parent 0ce6934 commit 8fba0a6

File tree

4 files changed

+189
-8
lines changed

4 files changed

+189
-8
lines changed
 

‎CMakeLists.txt‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ set(ZIG_STD_FILES
459459
"empty.zig"
460460
"event.zig"
461461
"event/channel.zig"
462+
"event/group.zig"
462463
"event/lock.zig"
463464
"event/locked.zig"
464465
"event/loop.zig"

‎src-self-hosted/module.zig‎

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,17 @@ pub const Module = struct {
8585

8686
exported_symbol_names: event.Locked(Decl.Table),
8787

88+
/// Before code generation starts, must wait on this group to make sure
89+
/// the build is complete.
90+
build_group: event.Group(BuildError!void),
91+
92+
const BuildErrorsList = std.SegmentedList(BuildErrorDesc, 1);
93+
94+
pub const BuildErrorDesc = struct {
95+
code: BuildError,
96+
text: []const u8,
97+
};
98+
8899
// TODO handle some of these earlier and report them in a way other than error codes
89100
pub const BuildError = error{
90101
OutOfMemory,
@@ -237,6 +248,7 @@ pub const Module = struct {
237248
.emit_file_type = Emit.Binary,
238249
.link_out_file = null,
239250
.exported_symbol_names = event.Locked(Decl.Table).init(loop, Decl.Table.init(loop.allocator)),
251+
.build_group = event.Group(BuildError!void).init(loop),
240252
});
241253
}
242254

@@ -310,6 +322,9 @@ pub const Module = struct {
310322
const decls = try Scope.Decls.create(self.a(), null);
311323
errdefer decls.destroy();
312324

325+
var decl_group = event.Group(BuildError!void).init(self.loop);
326+
errdefer decl_group.cancelAll();
327+
313328
var it = tree.root_node.decls.iterator(0);
314329
while (it.next()) |decl_ptr| {
315330
const decl = decl_ptr.*;
@@ -342,25 +357,30 @@ pub const Module = struct {
342357
});
343358
errdefer self.a().destroy(fn_decl);
344359

345-
// TODO make this parallel
346-
try await try async self.addTopLevelDecl(tree, &fn_decl.base);
360+
try decl_group.call(addTopLevelDecl, self, tree, &fn_decl.base);
347361
},
348362
ast.Node.Id.TestDecl => @panic("TODO"),
349363
else => unreachable,
350364
}
351365
}
366+
try await (async decl_group.wait() catch unreachable);
367+
try await (async self.build_group.wait() catch unreachable);
352368
}
353369

354370
async fn addTopLevelDecl(self: *Module, tree: *ast.Tree, decl: *Decl) !void {
355371
const is_export = decl.isExported(tree);
356372

357-
{
358-
const exported_symbol_names = await try async self.exported_symbol_names.acquire();
359-
defer exported_symbol_names.release();
373+
if (is_export) {
374+
try self.build_group.call(verifyUniqueSymbol, self, decl);
375+
}
376+
}
360377

361-
if (try exported_symbol_names.value.put(decl.name, decl)) |other_decl| {
362-
@panic("TODO report compile error");
363-
}
378+
async fn verifyUniqueSymbol(self: *Module, decl: *Decl) !void {
379+
const exported_symbol_names = await (async self.exported_symbol_names.acquire() catch unreachable);
380+
defer exported_symbol_names.release();
381+
382+
if (try exported_symbol_names.value.put(decl.name, decl)) |other_decl| {
383+
@panic("TODO report compile error");
364384
}
365385
}
366386

‎std/event.zig‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ pub const Loop = @import("event/loop.zig").Loop;
33
pub const Lock = @import("event/lock.zig").Lock;
44
pub const tcp = @import("event/tcp.zig");
55
pub const Channel = @import("event/channel.zig").Channel;
6+
pub const Group = @import("event/group.zig").Group;
67

78
test "import event tests" {
89
_ = @import("event/locked.zig");
910
_ = @import("event/loop.zig");
1011
_ = @import("event/lock.zig");
1112
_ = @import("event/tcp.zig");
1213
_ = @import("event/channel.zig");
14+
_ = @import("event/group.zig");
1315
}

‎std/event/group.zig‎

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
const std = @import("../index.zig");
2+
const builtin = @import("builtin");
3+
const Lock = std.event.Lock;
4+
const Loop = std.event.Loop;
5+
const AtomicRmwOp = builtin.AtomicRmwOp;
6+
const AtomicOrder = builtin.AtomicOrder;
7+
const assert = std.debug.assert;
8+
9+
/// ReturnType should be `void` or `E!void`
10+
pub fn Group(comptime ReturnType: type) type {
11+
return struct {
12+
coro_stack: Stack,
13+
alloc_stack: Stack,
14+
lock: Lock,
15+
16+
const Self = this;
17+
18+
const Error = switch (@typeInfo(ReturnType)) {
19+
builtin.TypeId.ErrorUnion => |payload| payload.error_set,
20+
else => void,
21+
};
22+
const Stack = std.atomic.Stack(promise->ReturnType);
23+
24+
pub fn init(loop: *Loop) Self {
25+
return Self{
26+
.coro_stack = Stack.init(),
27+
.alloc_stack = Stack.init(),
28+
.lock = Lock.init(loop),
29+
};
30+
}
31+
32+
/// Add a promise to the group. Thread-safe.
33+
pub fn add(self: *Self, handle: promise->ReturnType) (error{OutOfMemory}!void) {
34+
const node = try self.lock.loop.allocator.create(Stack.Node{
35+
.next = undefined,
36+
.data = handle,
37+
});
38+
self.alloc_stack.push(node);
39+
}
40+
41+
/// This is equivalent to an async call, but the async function is added to the group, instead
42+
/// of returning a promise. func must be async and have return type void.
43+
/// Thread-safe.
44+
pub fn call(self: *Self, comptime func: var, args: ...) (error{OutOfMemory}!void) {
45+
const S = struct {
46+
async fn asyncFunc(node: **Stack.Node, args2: ...) ReturnType {
47+
// TODO this is a hack to make the memory following be inside the coro frame
48+
suspend |p| {
49+
var my_node: Stack.Node = undefined;
50+
node.* = &my_node;
51+
resume p;
52+
}
53+
54+
// TODO this allocation elision should be guaranteed because we await it in
55+
// this coro frame
56+
return await (async func(args2) catch unreachable);
57+
}
58+
};
59+
var node: *Stack.Node = undefined;
60+
const handle = try async<self.lock.loop.allocator> S.asyncFunc(&node, args);
61+
node.* = Stack.Node{
62+
.next = undefined,
63+
.data = handle,
64+
};
65+
self.coro_stack.push(node);
66+
}
67+
68+
/// Wait for all the calls and promises of the group to complete.
69+
/// Thread-safe.
70+
pub async fn wait(self: *Self) ReturnType {
71+
// TODO catch unreachable because the allocation can be grouped with
72+
// the coro frame allocation
73+
const held = await (async self.lock.acquire() catch unreachable);
74+
defer held.release();
75+
76+
while (self.coro_stack.pop()) |node| {
77+
if (Error == void) {
78+
await node.data;
79+
} else {
80+
(await node.data) catch |err| {
81+
self.cancelAll();
82+
return err;
83+
};
84+
}
85+
}
86+
while (self.alloc_stack.pop()) |node| {
87+
const handle = node.data;
88+
self.lock.loop.allocator.destroy(node);
89+
if (Error == void) {
90+
await handle;
91+
} else {
92+
(await handle) catch |err| {
93+
self.cancelAll();
94+
return err;
95+
};
96+
}
97+
}
98+
}
99+
100+
/// Cancel all the outstanding promises. May only be called if wait was never called.
101+
pub fn cancelAll(self: *Self) void {
102+
while (self.coro_stack.pop()) |node| {
103+
cancel node.data;
104+
}
105+
while (self.alloc_stack.pop()) |node| {
106+
cancel node.data;
107+
self.lock.loop.allocator.destroy(node);
108+
}
109+
}
110+
};
111+
}
112+
113+
test "std.event.Group" {
114+
var da = std.heap.DirectAllocator.init();
115+
defer da.deinit();
116+
117+
const allocator = &da.allocator;
118+
119+
var loop: Loop = undefined;
120+
try loop.initMultiThreaded(allocator);
121+
defer loop.deinit();
122+
123+
const handle = try async<allocator> testGroup(&loop);
124+
defer cancel handle;
125+
126+
loop.run();
127+
}
128+
129+
async fn testGroup(loop: *Loop) void {
130+
var count: usize = 0;
131+
var group = Group(void).init(loop);
132+
group.add(async sleepALittle(&count) catch @panic("memory")) catch @panic("memory");
133+
group.call(increaseByTen, &count) catch @panic("memory");
134+
await (async group.wait() catch @panic("memory"));
135+
assert(count == 11);
136+
137+
var another = Group(error!void).init(loop);
138+
another.add(async somethingElse() catch @panic("memory")) catch @panic("memory");
139+
another.call(doSomethingThatFails) catch @panic("memory");
140+
std.debug.assertError(await (async another.wait() catch @panic("memory")), error.ItBroke);
141+
}
142+
143+
async fn sleepALittle(count: *usize) void {
144+
std.os.time.sleep(0, 1000000);
145+
_ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
146+
}
147+
148+
async fn increaseByTen(count: *usize) void {
149+
var i: usize = 0;
150+
while (i < 10) : (i += 1) {
151+
_ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
152+
}
153+
}
154+
155+
async fn doSomethingThatFails() error!void {}
156+
async fn somethingElse() error!void {
157+
return error.ItBroke;
158+
}

0 commit comments

Comments
 (0)