Skip to content

Commit 22b7312

Browse files
authoredJul 2, 2018
Merge pull request #1173 from ziglang/event-loop-channel
add event loop Channel abstraction
·
0.15.10.3.0
2 parents 3546352 + 2da9993 commit 22b7312

File tree

6 files changed

+465
-52
lines changed

6 files changed

+465
-52
lines changed
 

‎src-self-hosted/main.zig‎

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const std = @import("std");
22
const builtin = @import("builtin");
33

4+
const event = std.event;
45
const os = std.os;
56
const io = std.io;
67
const mem = std.mem;
@@ -43,6 +44,9 @@ const Command = struct {
4344
};
4445

4546
pub fn main() !void {
47+
// This allocator needs to be thread-safe because we use it for the event.Loop
48+
// which multiplexes coroutines onto kernel threads.
49+
// libc allocator is guaranteed to have this property.
4650
const allocator = std.heap.c_allocator;
4751

4852
var stdout_file = try std.io.getStdOut();
@@ -380,8 +384,10 @@ fn buildOutputType(allocator: *Allocator, args: []const []const u8, out_type: Mo
380384
const zig_lib_dir = introspect.resolveZigLibDir(allocator) catch os.exit(1);
381385
defer allocator.free(zig_lib_dir);
382386

387+
var loop = try event.Loop.init(allocator);
388+
383389
var module = try Module.create(
384-
allocator,
390+
&loop,
385391
root_name,
386392
root_source_file,
387393
Target.Native,
@@ -471,9 +477,35 @@ fn buildOutputType(allocator: *Allocator, args: []const []const u8, out_type: Mo
471477
module.emit_file_type = emit_type;
472478
module.link_objects = link_objects;
473479
module.assembly_files = assembly_files;
480+
module.link_out_file = flags.single("out-file");
474481

475482
try module.build();
476-
try module.link(flags.single("out-file"));
483+
const process_build_events_handle = try async<loop.allocator> processBuildEvents(module, true);
484+
defer cancel process_build_events_handle;
485+
loop.run();
486+
}
487+
488+
async fn processBuildEvents(module: *Module, watch: bool) void {
489+
while (watch) {
490+
// TODO directly awaiting async should guarantee memory allocation elision
491+
const build_event = await (async module.events.get() catch unreachable);
492+
493+
switch (build_event) {
494+
Module.Event.Ok => {
495+
std.debug.warn("Build succeeded\n");
496+
// for now we stop after 1
497+
module.loop.stop();
498+
return;
499+
},
500+
Module.Event.Error => |err| {
501+
std.debug.warn("build failed: {}\n", @errorName(err));
502+
@panic("TODO error return trace");
503+
},
504+
Module.Event.Fail => |errs| {
505+
@panic("TODO print compile error messages");
506+
},
507+
}
508+
}
477509
}
478510

479511
fn cmdBuildExe(allocator: *Allocator, args: []const []const u8) !void {
@@ -780,4 +812,3 @@ const CliPkg = struct {
780812
self.children.deinit();
781813
}
782814
};
783-

‎src-self-hosted/module.zig‎

Lines changed: 101 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ const warn = std.debug.warn;
1111
const Token = std.zig.Token;
1212
const ArrayList = std.ArrayList;
1313
const errmsg = @import("errmsg.zig");
14+
const ast = std.zig.ast;
15+
const event = std.event;
1416

1517
pub const Module = struct {
16-
allocator: *mem.Allocator,
18+
loop: *event.Loop,
1719
name: Buffer,
1820
root_src_path: ?[]const u8,
1921
module: llvm.ModuleRef,
@@ -76,6 +78,51 @@ pub const Module = struct {
7678

7779
kind: Kind,
7880

81+
link_out_file: ?[]const u8,
82+
events: *event.Channel(Event),
83+
84+
// TODO handle some of these earlier and report them in a way other than error codes
85+
pub const BuildError = error{
86+
OutOfMemory,
87+
EndOfStream,
88+
BadFd,
89+
Io,
90+
IsDir,
91+
Unexpected,
92+
SystemResources,
93+
SharingViolation,
94+
PathAlreadyExists,
95+
FileNotFound,
96+
AccessDenied,
97+
PipeBusy,
98+
FileTooBig,
99+
SymLinkLoop,
100+
ProcessFdQuotaExceeded,
101+
NameTooLong,
102+
SystemFdQuotaExceeded,
103+
NoDevice,
104+
PathNotFound,
105+
NoSpaceLeft,
106+
NotDir,
107+
FileSystem,
108+
OperationAborted,
109+
IoPending,
110+
BrokenPipe,
111+
WouldBlock,
112+
FileClosed,
113+
DestinationAddressRequired,
114+
DiskQuota,
115+
InputOutput,
116+
NoStdHandles,
117+
Overflow,
118+
};
119+
120+
pub const Event = union(enum) {
121+
Ok,
122+
Fail: []errmsg.Msg,
123+
Error: BuildError,
124+
};
125+
79126
pub const DarwinVersionMin = union(enum) {
80127
None,
81128
MacOS: []const u8,
@@ -104,7 +151,7 @@ pub const Module = struct {
104151
};
105152

106153
pub fn create(
107-
allocator: *mem.Allocator,
154+
loop: *event.Loop,
108155
name: []const u8,
109156
root_src_path: ?[]const u8,
110157
target: *const Target,
@@ -113,7 +160,7 @@ pub const Module = struct {
113160
zig_lib_dir: []const u8,
114161
cache_dir: []const u8,
115162
) !*Module {
116-
var name_buffer = try Buffer.init(allocator, name);
163+
var name_buffer = try Buffer.init(loop.allocator, name);
117164
errdefer name_buffer.deinit();
118165

119166
const context = c.LLVMContextCreate() orelse return error.OutOfMemory;
@@ -125,8 +172,12 @@ pub const Module = struct {
125172
const builder = c.LLVMCreateBuilderInContext(context) orelse return error.OutOfMemory;
126173
errdefer c.LLVMDisposeBuilder(builder);
127174

128-
const module_ptr = try allocator.create(Module{
129-
.allocator = allocator,
175+
const events = try event.Channel(Event).create(loop, 0);
176+
errdefer events.destroy();
177+
178+
return loop.allocator.create(Module{
179+
.loop = loop,
180+
.events = events,
130181
.name = name_buffer,
131182
.root_src_path = root_src_path,
132183
.module = module,
@@ -171,76 +222,87 @@ pub const Module = struct {
171222
.link_objects = [][]const u8{},
172223
.windows_subsystem_windows = false,
173224
.windows_subsystem_console = false,
174-
.link_libs_list = ArrayList(*LinkLib).init(allocator),
225+
.link_libs_list = ArrayList(*LinkLib).init(loop.allocator),
175226
.libc_link_lib = null,
176227
.err_color = errmsg.Color.Auto,
177228
.darwin_frameworks = [][]const u8{},
178229
.darwin_version_min = DarwinVersionMin.None,
179230
.test_filters = [][]const u8{},
180231
.test_name_prefix = null,
181232
.emit_file_type = Emit.Binary,
233+
.link_out_file = null,
182234
});
183-
errdefer allocator.destroy(module_ptr);
184-
return module_ptr;
185235
}
186236

187237
fn dump(self: *Module) void {
188238
c.LLVMDumpModule(self.module);
189239
}
190240

191241
pub fn destroy(self: *Module) void {
242+
self.events.destroy();
192243
c.LLVMDisposeBuilder(self.builder);
193244
c.LLVMDisposeModule(self.module);
194245
c.LLVMContextDispose(self.context);
195246
self.name.deinit();
196247

197-
self.allocator.destroy(self);
248+
self.a().destroy(self);
198249
}
199250

200251
pub fn build(self: *Module) !void {
201252
if (self.llvm_argv.len != 0) {
202-
var c_compatible_args = try std.cstr.NullTerminated2DArray.fromSlices(self.allocator, [][]const []const u8{
253+
var c_compatible_args = try std.cstr.NullTerminated2DArray.fromSlices(self.a(), [][]const []const u8{
203254
[][]const u8{"zig (LLVM option parsing)"},
204255
self.llvm_argv,
205256
});
206257
defer c_compatible_args.deinit();
258+
// TODO this sets global state
207259
c.ZigLLVMParseCommandLineOptions(self.llvm_argv.len + 1, c_compatible_args.ptr);
208260
}
209261

262+
_ = try async<self.a()> self.buildAsync();
263+
}
264+
265+
async fn buildAsync(self: *Module) void {
266+
while (true) {
267+
// TODO directly awaiting async should guarantee memory allocation elision
268+
// TODO also async before suspending should guarantee memory allocation elision
269+
(await (async self.addRootSrc() catch unreachable)) catch |err| {
270+
await (async self.events.put(Event{ .Error = err }) catch unreachable);
271+
return;
272+
};
273+
await (async self.events.put(Event.Ok) catch unreachable);
274+
}
275+
}
276+
277+
async fn addRootSrc(self: *Module) !void {
210278
const root_src_path = self.root_src_path orelse @panic("TODO handle null root src path");
211-
const root_src_real_path = os.path.real(self.allocator, root_src_path) catch |err| {
279+
const root_src_real_path = os.path.real(self.a(), root_src_path) catch |err| {
212280
try printError("unable to get real path '{}': {}", root_src_path, err);
213281
return err;
214282
};
215-
errdefer self.allocator.free(root_src_real_path);
283+
errdefer self.a().free(root_src_real_path);
216284

217-
const source_code = io.readFileAlloc(self.allocator, root_src_real_path) catch |err| {
285+
const source_code = io.readFileAlloc(self.a(), root_src_real_path) catch |err| {
218286
try printError("unable to open '{}': {}", root_src_real_path, err);
219287
return err;
220288
};
221-
errdefer self.allocator.free(source_code);
222-
223-
warn("====input:====\n");
224-
225-
warn("{}", source_code);
289+
errdefer self.a().free(source_code);
226290

227-
warn("====parse:====\n");
228-
229-
var tree = try std.zig.parse(self.allocator, source_code);
291+
var tree = try std.zig.parse(self.a(), source_code);
230292
defer tree.deinit();
231293

232-
var stderr_file = try std.io.getStdErr();
233-
var stderr_file_out_stream = std.io.FileOutStream.init(&stderr_file);
234-
const out_stream = &stderr_file_out_stream.stream;
235-
236-
warn("====fmt:====\n");
237-
_ = try std.zig.render(self.allocator, out_stream, &tree);
238-
239-
warn("====ir:====\n");
240-
warn("TODO\n\n");
241-
242-
warn("====llvm ir:====\n");
243-
self.dump();
294+
//var it = tree.root_node.decls.iterator();
295+
//while (it.next()) |decl_ptr| {
296+
// const decl = decl_ptr.*;
297+
// switch (decl.id) {
298+
// ast.Node.Comptime => @panic("TODO"),
299+
// ast.Node.VarDecl => @panic("TODO"),
300+
// ast.Node.UseDecl => @panic("TODO"),
301+
// ast.Node.FnDef => @panic("TODO"),
302+
// ast.Node.TestDecl => @panic("TODO"),
303+
// else => unreachable,
304+
// }
305+
//}
244306
}
245307

246308
pub fn link(self: *Module, out_file: ?[]const u8) !void {
@@ -263,18 +325,22 @@ pub const Module = struct {
263325
}
264326
}
265327

266-
const link_lib = try self.allocator.create(LinkLib{
328+
const link_lib = try self.a().create(LinkLib{
267329
.name = name,
268330
.path = null,
269331
.provided_explicitly = provided_explicitly,
270-
.symbols = ArrayList([]u8).init(self.allocator),
332+
.symbols = ArrayList([]u8).init(self.a()),
271333
});
272334
try self.link_libs_list.append(link_lib);
273335
if (is_libc) {
274336
self.libc_link_lib = link_lib;
275337
}
276338
return link_lib;
277339
}
340+
341+
fn a(self: Module) *mem.Allocator {
342+
return self.loop.allocator;
343+
}
278344
};
279345

280346
fn printError(comptime format: []const u8, args: ...) !void {

‎std/atomic/queue_mpsc.zig‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const std = @import("std");
1+
const std = @import("../index.zig");
22
const assert = std.debug.assert;
33
const builtin = @import("builtin");
44
const AtomicOrder = builtin.AtomicOrder;

‎std/event.zig‎

Lines changed: 325 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ const assert = std.debug.assert;
44
const event = this;
55
const mem = std.mem;
66
const posix = std.os.posix;
7+
const AtomicRmwOp = builtin.AtomicRmwOp;
8+
const AtomicOrder = builtin.AtomicOrder;
79

810
pub const TcpServer = struct {
911
handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void,
@@ -93,28 +95,68 @@ pub const TcpServer = struct {
9395

9496
pub const Loop = struct {
9597
allocator: *mem.Allocator,
96-
epollfd: i32,
9798
keep_running: bool,
99+
next_tick_queue: std.atomic.QueueMpsc(promise),
100+
os_data: OsData,
101+
102+
const OsData = switch (builtin.os) {
103+
builtin.Os.linux => struct {
104+
epollfd: i32,
105+
},
106+
else => struct {},
107+
};
108+
109+
pub const NextTickNode = std.atomic.QueueMpsc(promise).Node;
98110

99-
fn init(allocator: *mem.Allocator) !Loop {
100-
const epollfd = try std.os.linuxEpollCreate(std.os.linux.EPOLL_CLOEXEC);
101-
return Loop{
111+
/// The allocator must be thread-safe because we use it for multiplexing
112+
/// coroutines onto kernel threads.
113+
pub fn init(allocator: *mem.Allocator) !Loop {
114+
var self = Loop{
102115
.keep_running = true,
103116
.allocator = allocator,
104-
.epollfd = epollfd,
117+
.os_data = undefined,
118+
.next_tick_queue = std.atomic.QueueMpsc(promise).init(),
105119
};
120+
try self.initOsData();
121+
errdefer self.deinitOsData();
122+
123+
return self;
124+
}
125+
126+
/// must call stop before deinit
127+
pub fn deinit(self: *Loop) void {
128+
self.deinitOsData();
129+
}
130+
131+
const InitOsDataError = std.os.LinuxEpollCreateError;
132+
133+
fn initOsData(self: *Loop) InitOsDataError!void {
134+
switch (builtin.os) {
135+
builtin.Os.linux => {
136+
self.os_data.epollfd = try std.os.linuxEpollCreate(std.os.linux.EPOLL_CLOEXEC);
137+
errdefer std.os.close(self.os_data.epollfd);
138+
},
139+
else => {},
140+
}
141+
}
142+
143+
fn deinitOsData(self: *Loop) void {
144+
switch (builtin.os) {
145+
builtin.Os.linux => std.os.close(self.os_data.epollfd),
146+
else => {},
147+
}
106148
}
107149

108150
pub fn addFd(self: *Loop, fd: i32, prom: promise) !void {
109151
var ev = std.os.linux.epoll_event{
110152
.events = std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
111153
.data = std.os.linux.epoll_data{ .ptr = @ptrToInt(prom) },
112154
};
113-
try std.os.linuxEpollCtl(self.epollfd, std.os.linux.EPOLL_CTL_ADD, fd, &ev);
155+
try std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_ADD, fd, &ev);
114156
}
115157

116158
pub fn removeFd(self: *Loop, fd: i32) void {
117-
std.os.linuxEpollCtl(self.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
159+
std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
118160
}
119161
async fn waitFd(self: *Loop, fd: i32) !void {
120162
defer self.removeFd(fd);
@@ -126,21 +168,250 @@ pub const Loop = struct {
126168
pub fn stop(self: *Loop) void {
127169
// TODO make atomic
128170
self.keep_running = false;
129-
// TODO activate an fd in the epoll set
171+
// TODO activate an fd in the epoll set which should cancel all the promises
172+
}
173+
174+
/// bring your own linked list node. this means it can't fail.
175+
pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
176+
self.next_tick_queue.put(node);
130177
}
131178

132179
pub fn run(self: *Loop) void {
133180
while (self.keep_running) {
134-
var events: [16]std.os.linux.epoll_event = undefined;
135-
const count = std.os.linuxEpollWait(self.epollfd, events[0..], -1);
136-
for (events[0..count]) |ev| {
137-
const p = @intToPtr(promise, ev.data.ptr);
138-
resume p;
181+
// TODO multiplex the next tick queue and the epoll event results onto a thread pool
182+
while (self.next_tick_queue.get()) |node| {
183+
resume node.data;
139184
}
185+
if (!self.keep_running) break;
186+
187+
self.dispatchOsEvents();
188+
}
189+
}
190+
191+
fn dispatchOsEvents(self: *Loop) void {
192+
switch (builtin.os) {
193+
builtin.Os.linux => {
194+
var events: [16]std.os.linux.epoll_event = undefined;
195+
const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
196+
for (events[0..count]) |ev| {
197+
const p = @intToPtr(promise, ev.data.ptr);
198+
resume p;
199+
}
200+
},
201+
else => {},
140202
}
141203
}
142204
};
143205

206+
/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
207+
/// when buffer is empty, consumers suspend and are resumed by producers
208+
/// when buffer is full, producers suspend and are resumed by consumers
209+
pub fn Channel(comptime T: type) type {
210+
return struct {
211+
loop: *Loop,
212+
213+
getters: std.atomic.QueueMpsc(GetNode),
214+
putters: std.atomic.QueueMpsc(PutNode),
215+
get_count: usize,
216+
put_count: usize,
217+
dispatch_lock: u8, // TODO make this a bool
218+
need_dispatch: u8, // TODO make this a bool
219+
220+
// simple fixed size ring buffer
221+
buffer_nodes: []T,
222+
buffer_index: usize,
223+
buffer_len: usize,
224+
225+
const SelfChannel = this;
226+
const GetNode = struct {
227+
ptr: *T,
228+
tick_node: *Loop.NextTickNode,
229+
};
230+
const PutNode = struct {
231+
data: T,
232+
tick_node: *Loop.NextTickNode,
233+
};
234+
235+
/// call destroy when done
236+
pub fn create(loop: *Loop, capacity: usize) !*SelfChannel {
237+
const buffer_nodes = try loop.allocator.alloc(T, capacity);
238+
errdefer loop.allocator.free(buffer_nodes);
239+
240+
const self = try loop.allocator.create(SelfChannel{
241+
.loop = loop,
242+
.buffer_len = 0,
243+
.buffer_nodes = buffer_nodes,
244+
.buffer_index = 0,
245+
.dispatch_lock = 0,
246+
.need_dispatch = 0,
247+
.getters = std.atomic.QueueMpsc(GetNode).init(),
248+
.putters = std.atomic.QueueMpsc(PutNode).init(),
249+
.get_count = 0,
250+
.put_count = 0,
251+
});
252+
errdefer loop.allocator.destroy(self);
253+
254+
return self;
255+
}
256+
257+
/// must be called when all calls to put and get have suspended and no more calls occur
258+
pub fn destroy(self: *SelfChannel) void {
259+
while (self.getters.get()) |get_node| {
260+
cancel get_node.data.tick_node.data;
261+
}
262+
while (self.putters.get()) |put_node| {
263+
cancel put_node.data.tick_node.data;
264+
}
265+
self.loop.allocator.free(self.buffer_nodes);
266+
self.loop.allocator.destroy(self);
267+
}
268+
269+
/// puts a data item in the channel. The promise completes when the value has been added to the
270+
/// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
271+
pub async fn put(self: *SelfChannel, data: T) void {
272+
// TODO should be able to group memory allocation failure before first suspend point
273+
// so that the async invocation catches it
274+
var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
275+
_ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
276+
277+
suspend |handle| {
278+
var my_tick_node = Loop.NextTickNode{
279+
.next = undefined,
280+
.data = handle,
281+
};
282+
var queue_node = std.atomic.QueueMpsc(PutNode).Node{
283+
.data = PutNode{
284+
.tick_node = &my_tick_node,
285+
.data = data,
286+
},
287+
.next = undefined,
288+
};
289+
self.putters.put(&queue_node);
290+
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
291+
292+
self.loop.onNextTick(dispatch_tick_node_ptr);
293+
}
294+
}
295+
296+
/// await this function to get an item from the channel. If the buffer is empty, the promise will
297+
/// complete when the next item is put in the channel.
298+
pub async fn get(self: *SelfChannel) T {
299+
// TODO should be able to group memory allocation failure before first suspend point
300+
// so that the async invocation catches it
301+
var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
302+
_ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
303+
304+
// TODO integrate this function with named return values
305+
// so we can get rid of this extra result copy
306+
var result: T = undefined;
307+
var debug_handle: usize = undefined;
308+
suspend |handle| {
309+
debug_handle = @ptrToInt(handle);
310+
var my_tick_node = Loop.NextTickNode{
311+
.next = undefined,
312+
.data = handle,
313+
};
314+
var queue_node = std.atomic.QueueMpsc(GetNode).Node{
315+
.data = GetNode{
316+
.ptr = &result,
317+
.tick_node = &my_tick_node,
318+
},
319+
.next = undefined,
320+
};
321+
self.getters.put(&queue_node);
322+
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
323+
324+
self.loop.onNextTick(dispatch_tick_node_ptr);
325+
}
326+
return result;
327+
}
328+
329+
async fn dispatch(self: *SelfChannel, tick_node_ptr: **Loop.NextTickNode) void {
330+
// resumed by onNextTick
331+
suspend |handle| {
332+
var tick_node = Loop.NextTickNode{
333+
.data = handle,
334+
.next = undefined,
335+
};
336+
tick_node_ptr.* = &tick_node;
337+
}
338+
339+
// set the "need dispatch" flag
340+
_ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
341+
342+
lock: while (true) {
343+
// set the lock flag
344+
const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
345+
if (prev_lock != 0) return;
346+
347+
// clear the need_dispatch flag since we're about to do it
348+
_ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
349+
350+
while (true) {
351+
one_dispatch: {
352+
// later we correct these extra subtractions
353+
var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
354+
var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
355+
356+
// transfer self.buffer to self.getters
357+
while (self.buffer_len != 0) {
358+
if (get_count == 0) break :one_dispatch;
359+
360+
const get_node = &self.getters.get().?.data;
361+
get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
362+
self.loop.onNextTick(get_node.tick_node);
363+
self.buffer_len -= 1;
364+
365+
get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
366+
}
367+
368+
// direct transfer self.putters to self.getters
369+
while (get_count != 0 and put_count != 0) {
370+
const get_node = &self.getters.get().?.data;
371+
const put_node = &self.putters.get().?.data;
372+
373+
get_node.ptr.* = put_node.data;
374+
self.loop.onNextTick(get_node.tick_node);
375+
self.loop.onNextTick(put_node.tick_node);
376+
377+
get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
378+
put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
379+
}
380+
381+
// transfer self.putters to self.buffer
382+
while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
383+
const put_node = &self.putters.get().?.data;
384+
385+
self.buffer_nodes[self.buffer_index] = put_node.data;
386+
self.loop.onNextTick(put_node.tick_node);
387+
self.buffer_index +%= 1;
388+
self.buffer_len += 1;
389+
390+
put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
391+
}
392+
}
393+
394+
// undo the extra subtractions
395+
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
396+
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
397+
398+
// clear need-dispatch flag
399+
const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
400+
if (need_dispatch != 0) continue;
401+
402+
const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
403+
assert(my_lock != 0);
404+
405+
// we have to check again now that we unlocked
406+
if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock;
407+
408+
return;
409+
}
410+
}
411+
}
412+
};
413+
}
414+
144415
pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File {
145416
var address = _address.*; // TODO https://github.com/ziglang/zig/issues/733
146417

@@ -199,6 +470,7 @@ test "listen on a port, send bytes, receive bytes" {
199470
defer cancel p;
200471
loop.run();
201472
}
473+
202474
async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void {
203475
errdefer @panic("test failure");
204476

@@ -211,3 +483,43 @@ async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void {
211483
assert(mem.eql(u8, msg, "hello from server\n"));
212484
loop.stop();
213485
}
486+
487+
test "std.event.Channel" {
488+
var da = std.heap.DirectAllocator.init();
489+
defer da.deinit();
490+
491+
const allocator = &da.allocator;
492+
493+
var loop = try Loop.init(allocator);
494+
defer loop.deinit();
495+
496+
const channel = try Channel(i32).create(&loop, 0);
497+
defer channel.destroy();
498+
499+
const handle = try async<allocator> testChannelGetter(&loop, channel);
500+
defer cancel handle;
501+
502+
const putter = try async<allocator> testChannelPutter(channel);
503+
defer cancel putter;
504+
505+
loop.run();
506+
}
507+
508+
async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
509+
errdefer @panic("test failed");
510+
511+
const value1_promise = try async channel.get();
512+
const value1 = await value1_promise;
513+
assert(value1 == 1234);
514+
515+
const value2_promise = try async channel.get();
516+
const value2 = await value2_promise;
517+
assert(value2 == 4567);
518+
519+
loop.stop();
520+
}
521+
522+
async fn testChannelPutter(channel: *Channel(i32)) void {
523+
await (async channel.put(1234) catch @panic("out of memory"));
524+
await (async channel.put(4567) catch @panic("out of memory"));
525+
}

‎std/fmt/index.zig‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ pub fn formatType(
130130
try output(context, "error.");
131131
return output(context, @errorName(value));
132132
},
133+
builtin.TypeId.Promise => {
134+
return format(context, Errors, output, "promise@{x}", @ptrToInt(value));
135+
},
133136
builtin.TypeId.Pointer => |ptr_info| switch (ptr_info.size) {
134137
builtin.TypeInfo.Pointer.Size.One => switch (@typeInfo(ptr_info.child)) {
135138
builtin.TypeId.Array => |info| {

‎std/heap.zig‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ fn cFree(self: *Allocator, old_mem: []u8) void {
3838
}
3939

4040
/// This allocator makes a syscall directly for every allocation and free.
41+
/// TODO make this thread-safe. The windows implementation will need some atomics.
4142
pub const DirectAllocator = struct {
4243
allocator: Allocator,
4344
heap_handle: ?HeapHandle,

0 commit comments

Comments
 (0)
Please sign in to comment.