@@ -4,6 +4,8 @@ const assert = std.debug.assert;
4
4
const event = this ;
5
5
const mem = std .mem ;
6
6
const posix = std .os .posix ;
7
+ const AtomicRmwOp = builtin .AtomicRmwOp ;
8
+ const AtomicOrder = builtin .AtomicOrder ;
7
9
8
10
pub const TcpServer = struct {
9
11
handleRequestFn : async < * mem .Allocator > fn (* TcpServer , * const std.net.Address , * const std.os.File ) void ,
@@ -93,28 +95,68 @@ pub const TcpServer = struct {
93
95
94
96
pub const Loop = struct {
95
97
allocator : * mem.Allocator ,
96
- epollfd : i32 ,
97
98
keep_running : bool ,
99
+ next_tick_queue : std .atomic .QueueMpsc (promise ),
100
+ os_data : OsData ,
101
+
102
+ const OsData = switch (builtin .os ) {
103
+ builtin .Os .linux = > struct {
104
+ epollfd : i32 ,
105
+ },
106
+ else = > struct {},
107
+ };
108
+
109
+ pub const NextTickNode = std .atomic .QueueMpsc (promise ).Node ;
98
110
99
- fn init (allocator : * mem.Allocator ) ! Loop {
100
- const epollfd = try std .os .linuxEpollCreate (std .os .linux .EPOLL_CLOEXEC );
101
- return Loop {
111
+ /// The allocator must be thread-safe because we use it for multiplexing
112
+ /// coroutines onto kernel threads.
113
+ pub fn init (allocator : * mem.Allocator ) ! Loop {
114
+ var self = Loop {
102
115
.keep_running = true ,
103
116
.allocator = allocator ,
104
- .epollfd = epollfd ,
117
+ .os_data = undefined ,
118
+ .next_tick_queue = std .atomic .QueueMpsc (promise ).init (),
105
119
};
120
+ try self .initOsData ();
121
+ errdefer self .deinitOsData ();
122
+
123
+ return self ;
124
+ }
125
+
126
+ /// must call stop before deinit
127
+ pub fn deinit (self : * Loop ) void {
128
+ self .deinitOsData ();
129
+ }
130
+
131
+ const InitOsDataError = std .os .LinuxEpollCreateError ;
132
+
133
+ fn initOsData (self : * Loop ) InitOsDataError ! void {
134
+ switch (builtin .os ) {
135
+ builtin .Os .linux = > {
136
+ self .os_data .epollfd = try std .os .linuxEpollCreate (std .os .linux .EPOLL_CLOEXEC );
137
+ errdefer std .os .close (self .os_data .epollfd );
138
+ },
139
+ else = > {},
140
+ }
141
+ }
142
+
143
+ fn deinitOsData (self : * Loop ) void {
144
+ switch (builtin .os ) {
145
+ builtin .Os .linux = > std .os .close (self .os_data .epollfd ),
146
+ else = > {},
147
+ }
106
148
}
107
149
108
150
pub fn addFd (self : * Loop , fd : i32 , prom : promise ) ! void {
109
151
var ev = std.os.linux.epoll_event {
110
152
.events = std .os .linux .EPOLLIN | std .os .linux .EPOLLOUT | std .os .linux .EPOLLET ,
111
153
.data = std.os.linux.epoll_data { .ptr = @ptrToInt (prom ) },
112
154
};
113
- try std .os .linuxEpollCtl (self .epollfd , std .os .linux .EPOLL_CTL_ADD , fd , & ev );
155
+ try std .os .linuxEpollCtl (self .os_data . epollfd , std .os .linux .EPOLL_CTL_ADD , fd , & ev );
114
156
}
115
157
116
158
pub fn removeFd (self : * Loop , fd : i32 ) void {
117
- std .os .linuxEpollCtl (self .epollfd , std .os .linux .EPOLL_CTL_DEL , fd , undefined ) catch {};
159
+ std .os .linuxEpollCtl (self .os_data . epollfd , std .os .linux .EPOLL_CTL_DEL , fd , undefined ) catch {};
118
160
}
119
161
async fn waitFd (self : * Loop , fd : i32 ) ! void {
120
162
defer self .removeFd (fd );
@@ -126,21 +168,250 @@ pub const Loop = struct {
126
168
pub fn stop (self : * Loop ) void {
127
169
// TODO make atomic
128
170
self .keep_running = false ;
129
- // TODO activate an fd in the epoll set
171
+ // TODO activate an fd in the epoll set which should cancel all the promises
172
+ }
173
+
174
+ /// bring your own linked list node. this means it can't fail.
175
+ pub fn onNextTick (self : * Loop , node : * NextTickNode ) void {
176
+ self .next_tick_queue .put (node );
130
177
}
131
178
132
179
pub fn run (self : * Loop ) void {
133
180
while (self .keep_running ) {
134
- var events : [16 ]std.os.linux.epoll_event = undefined ;
135
- const count = std .os .linuxEpollWait (self .epollfd , events [0.. ], -1 );
136
- for (events [0.. count ]) | ev | {
137
- const p = @intToPtr (promise , ev .data .ptr );
138
- resume p ;
181
+ // TODO multiplex the next tick queue and the epoll event results onto a thread pool
182
+ while (self .next_tick_queue .get ()) | node | {
183
+ resume node .data ;
139
184
}
185
+ if (! self .keep_running ) break ;
186
+
187
+ self .dispatchOsEvents ();
188
+ }
189
+ }
190
+
191
+ fn dispatchOsEvents (self : * Loop ) void {
192
+ switch (builtin .os ) {
193
+ builtin .Os .linux = > {
194
+ var events : [16 ]std.os.linux.epoll_event = undefined ;
195
+ const count = std .os .linuxEpollWait (self .os_data .epollfd , events [0.. ], -1 );
196
+ for (events [0.. count ]) | ev | {
197
+ const p = @intToPtr (promise , ev .data .ptr );
198
+ resume p ;
199
+ }
200
+ },
201
+ else = > {},
140
202
}
141
203
}
142
204
};
143
205
206
+ /// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
207
+ /// when buffer is empty, consumers suspend and are resumed by producers
208
+ /// when buffer is full, producers suspend and are resumed by consumers
209
+ pub fn Channel (comptime T : type ) type {
210
+ return struct {
211
+ loop : * Loop ,
212
+
213
+ getters : std .atomic .QueueMpsc (GetNode ),
214
+ putters : std .atomic .QueueMpsc (PutNode ),
215
+ get_count : usize ,
216
+ put_count : usize ,
217
+ dispatch_lock : u8 , // TODO make this a bool
218
+ need_dispatch : u8 , // TODO make this a bool
219
+
220
+ // simple fixed size ring buffer
221
+ buffer_nodes : []T ,
222
+ buffer_index : usize ,
223
+ buffer_len : usize ,
224
+
225
+ const SelfChannel = this ;
226
+ const GetNode = struct {
227
+ ptr : * T ,
228
+ tick_node : * Loop.NextTickNode ,
229
+ };
230
+ const PutNode = struct {
231
+ data : T ,
232
+ tick_node : * Loop.NextTickNode ,
233
+ };
234
+
235
+ /// call destroy when done
236
+ pub fn create (loop : * Loop , capacity : usize ) ! * SelfChannel {
237
+ const buffer_nodes = try loop .allocator .alloc (T , capacity );
238
+ errdefer loop .allocator .free (buffer_nodes );
239
+
240
+ const self = try loop .allocator .create (SelfChannel {
241
+ .loop = loop ,
242
+ .buffer_len = 0 ,
243
+ .buffer_nodes = buffer_nodes ,
244
+ .buffer_index = 0 ,
245
+ .dispatch_lock = 0 ,
246
+ .need_dispatch = 0 ,
247
+ .getters = std .atomic .QueueMpsc (GetNode ).init (),
248
+ .putters = std .atomic .QueueMpsc (PutNode ).init (),
249
+ .get_count = 0 ,
250
+ .put_count = 0 ,
251
+ });
252
+ errdefer loop .allocator .destroy (self );
253
+
254
+ return self ;
255
+ }
256
+
257
+ /// must be called when all calls to put and get have suspended and no more calls occur
258
+ pub fn destroy (self : * SelfChannel ) void {
259
+ while (self .getters .get ()) | get_node | {
260
+ cancel get_node .data .tick_node .data ;
261
+ }
262
+ while (self .putters .get ()) | put_node | {
263
+ cancel put_node .data .tick_node .data ;
264
+ }
265
+ self .loop .allocator .free (self .buffer_nodes );
266
+ self .loop .allocator .destroy (self );
267
+ }
268
+
269
+ /// puts a data item in the channel. The promise completes when the value has been added to the
270
+ /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
271
+ pub async fn put (self : * SelfChannel , data : T ) void {
272
+ // TODO should be able to group memory allocation failure before first suspend point
273
+ // so that the async invocation catches it
274
+ var dispatch_tick_node_ptr : * Loop.NextTickNode = undefined ;
275
+ _ = async self .dispatch (& dispatch_tick_node_ptr ) catch unreachable ;
276
+
277
+ suspend | handle | {
278
+ var my_tick_node = Loop.NextTickNode {
279
+ .next = undefined ,
280
+ .data = handle ,
281
+ };
282
+ var queue_node = std .atomic .QueueMpsc (PutNode ).Node {
283
+ .data = PutNode {
284
+ .tick_node = & my_tick_node ,
285
+ .data = data ,
286
+ },
287
+ .next = undefined ,
288
+ };
289
+ self .putters .put (& queue_node );
290
+ _ = @atomicRmw (usize , & self .put_count , AtomicRmwOp .Add , 1 , AtomicOrder .SeqCst );
291
+
292
+ self .loop .onNextTick (dispatch_tick_node_ptr );
293
+ }
294
+ }
295
+
296
+ /// await this function to get an item from the channel. If the buffer is empty, the promise will
297
+ /// complete when the next item is put in the channel.
298
+ pub async fn get (self : * SelfChannel ) T {
299
+ // TODO should be able to group memory allocation failure before first suspend point
300
+ // so that the async invocation catches it
301
+ var dispatch_tick_node_ptr : * Loop.NextTickNode = undefined ;
302
+ _ = async self .dispatch (& dispatch_tick_node_ptr ) catch unreachable ;
303
+
304
+ // TODO integrate this function with named return values
305
+ // so we can get rid of this extra result copy
306
+ var result : T = undefined ;
307
+ var debug_handle : usize = undefined ;
308
+ suspend | handle | {
309
+ debug_handle = @ptrToInt (handle );
310
+ var my_tick_node = Loop.NextTickNode {
311
+ .next = undefined ,
312
+ .data = handle ,
313
+ };
314
+ var queue_node = std .atomic .QueueMpsc (GetNode ).Node {
315
+ .data = GetNode {
316
+ .ptr = & result ,
317
+ .tick_node = & my_tick_node ,
318
+ },
319
+ .next = undefined ,
320
+ };
321
+ self .getters .put (& queue_node );
322
+ _ = @atomicRmw (usize , & self .get_count , AtomicRmwOp .Add , 1 , AtomicOrder .SeqCst );
323
+
324
+ self .loop .onNextTick (dispatch_tick_node_ptr );
325
+ }
326
+ return result ;
327
+ }
328
+
329
+ async fn dispatch (self : * SelfChannel , tick_node_ptr : ** Loop.NextTickNode ) void {
330
+ // resumed by onNextTick
331
+ suspend | handle | {
332
+ var tick_node = Loop.NextTickNode {
333
+ .data = handle ,
334
+ .next = undefined ,
335
+ };
336
+ tick_node_ptr .* = & tick_node ;
337
+ }
338
+
339
+ // set the "need dispatch" flag
340
+ _ = @atomicRmw (u8 , & self .need_dispatch , AtomicRmwOp .Xchg , 1 , AtomicOrder .SeqCst );
341
+
342
+ lock : while (true ) {
343
+ // set the lock flag
344
+ const prev_lock = @atomicRmw (u8 , & self .dispatch_lock , AtomicRmwOp .Xchg , 1 , AtomicOrder .SeqCst );
345
+ if (prev_lock != 0 ) return ;
346
+
347
+ // clear the need_dispatch flag since we're about to do it
348
+ _ = @atomicRmw (u8 , & self .need_dispatch , AtomicRmwOp .Xchg , 0 , AtomicOrder .SeqCst );
349
+
350
+ while (true ) {
351
+ one_dispatch : {
352
+ // later we correct these extra subtractions
353
+ var get_count = @atomicRmw (usize , & self .get_count , AtomicRmwOp .Sub , 1 , AtomicOrder .SeqCst );
354
+ var put_count = @atomicRmw (usize , & self .put_count , AtomicRmwOp .Sub , 1 , AtomicOrder .SeqCst );
355
+
356
+ // transfer self.buffer to self.getters
357
+ while (self .buffer_len != 0 ) {
358
+ if (get_count == 0 ) break :one_dispatch ;
359
+
360
+ const get_node = & self .getters .get ().? .data ;
361
+ get_node .ptr .* = self .buffer_nodes [self .buffer_index -% self .buffer_len ];
362
+ self .loop .onNextTick (get_node .tick_node );
363
+ self .buffer_len -= 1 ;
364
+
365
+ get_count = @atomicRmw (usize , & self .get_count , AtomicRmwOp .Sub , 1 , AtomicOrder .SeqCst );
366
+ }
367
+
368
+ // direct transfer self.putters to self.getters
369
+ while (get_count != 0 and put_count != 0 ) {
370
+ const get_node = & self .getters .get ().? .data ;
371
+ const put_node = & self .putters .get ().? .data ;
372
+
373
+ get_node .ptr .* = put_node .data ;
374
+ self .loop .onNextTick (get_node .tick_node );
375
+ self .loop .onNextTick (put_node .tick_node );
376
+
377
+ get_count = @atomicRmw (usize , & self .get_count , AtomicRmwOp .Sub , 1 , AtomicOrder .SeqCst );
378
+ put_count = @atomicRmw (usize , & self .put_count , AtomicRmwOp .Sub , 1 , AtomicOrder .SeqCst );
379
+ }
380
+
381
+ // transfer self.putters to self.buffer
382
+ while (self .buffer_len != self .buffer_nodes .len and put_count != 0 ) {
383
+ const put_node = & self .putters .get ().? .data ;
384
+
385
+ self .buffer_nodes [self .buffer_index ] = put_node .data ;
386
+ self .loop .onNextTick (put_node .tick_node );
387
+ self .buffer_index +%= 1 ;
388
+ self .buffer_len += 1 ;
389
+
390
+ put_count = @atomicRmw (usize , & self .put_count , AtomicRmwOp .Sub , 1 , AtomicOrder .SeqCst );
391
+ }
392
+ }
393
+
394
+ // undo the extra subtractions
395
+ _ = @atomicRmw (usize , & self .get_count , AtomicRmwOp .Add , 1 , AtomicOrder .SeqCst );
396
+ _ = @atomicRmw (usize , & self .put_count , AtomicRmwOp .Add , 1 , AtomicOrder .SeqCst );
397
+
398
+ // clear need-dispatch flag
399
+ const need_dispatch = @atomicRmw (u8 , & self .need_dispatch , AtomicRmwOp .Xchg , 0 , AtomicOrder .SeqCst );
400
+ if (need_dispatch != 0 ) continue ;
401
+
402
+ const my_lock = @atomicRmw (u8 , & self .dispatch_lock , AtomicRmwOp .Xchg , 0 , AtomicOrder .SeqCst );
403
+ assert (my_lock != 0 );
404
+
405
+ // we have to check again now that we unlocked
406
+ if (@atomicLoad (u8 , & self .need_dispatch , AtomicOrder .SeqCst ) != 0 ) continue :lock ;
407
+
408
+ return ;
409
+ }
410
+ }
411
+ }
412
+ };
413
+ }
414
+
144
415
pub async fn connect (loop : * Loop , _address : * const std.net.Address ) ! std.os.File {
145
416
var address = _address .* ; // TODO https://github.com/ziglang/zig/issues/733
146
417
@@ -199,6 +470,7 @@ test "listen on a port, send bytes, receive bytes" {
199
470
defer cancel p ;
200
471
loop .run ();
201
472
}
473
+
202
474
async fn doAsyncTest (loop : * Loop , address : * const std.net.Address ) void {
203
475
errdefer @panic ("test failure" );
204
476
@@ -211,3 +483,43 @@ async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void {
211
483
assert (mem .eql (u8 , msg , "hello from server\n " ));
212
484
loop .stop ();
213
485
}
486
+
487
+ test "std.event.Channel" {
488
+ var da = std .heap .DirectAllocator .init ();
489
+ defer da .deinit ();
490
+
491
+ const allocator = & da .allocator ;
492
+
493
+ var loop = try Loop .init (allocator );
494
+ defer loop .deinit ();
495
+
496
+ const channel = try Channel (i32 ).create (& loop , 0 );
497
+ defer channel .destroy ();
498
+
499
+ const handle = try async < allocator > testChannelGetter (& loop , channel );
500
+ defer cancel handle ;
501
+
502
+ const putter = try async < allocator > testChannelPutter (channel );
503
+ defer cancel putter ;
504
+
505
+ loop .run ();
506
+ }
507
+
508
+ async fn testChannelGetter (loop : * Loop , channel : * Channel (i32 )) void {
509
+ errdefer @panic ("test failed" );
510
+
511
+ const value1_promise = try async channel .get ();
512
+ const value1 = await value1_promise ;
513
+ assert (value1 == 1234 );
514
+
515
+ const value2_promise = try async channel .get ();
516
+ const value2 = await value2_promise ;
517
+ assert (value2 == 4567 );
518
+
519
+ loop .stop ();
520
+ }
521
+
522
+ async fn testChannelPutter (channel : * Channel (i32 )) void {
523
+ await (async channel .put (1234 ) catch @panic ("out of memory" ));
524
+ await (async channel .put (4567 ) catch @panic ("out of memory" ));
525
+ }
0 commit comments