|
| 1 | +const std = @import("../index.zig"); |
| 2 | +const assert = std.debug.assert; |
| 3 | +const builtin = @import("builtin"); |
| 4 | +const AtomicRmwOp = builtin.AtomicRmwOp; |
| 5 | +const AtomicOrder = builtin.AtomicOrder; |
| 6 | +const Lock = std.event.Lock; |
| 7 | +const Loop = std.event.Loop; |
| 8 | + |
| 9 | +/// This is a value that starts out unavailable, until a value is put(). |
| 10 | +/// While it is unavailable, coroutines suspend when they try to get() it, |
| 11 | +/// and then are resumed when the value is put(). |
| 12 | +/// At this point the value remains forever available, and another put() is not allowed. |
| 13 | +pub fn Future(comptime T: type) type { |
| 14 | + return struct { |
| 15 | + lock: Lock, |
| 16 | + data: T, |
| 17 | + available: u8, // TODO make this a bool |
| 18 | + |
| 19 | + const Self = this; |
| 20 | + const Queue = std.atomic.QueueMpsc(promise); |
| 21 | + |
| 22 | + pub fn init(loop: *Loop) Self { |
| 23 | + return Self{ |
| 24 | + .lock = Lock.initLocked(loop), |
| 25 | + .available = 0, |
| 26 | + .data = undefined, |
| 27 | + }; |
| 28 | + } |
| 29 | + |
| 30 | + /// Obtain the value. If it's not available, wait until it becomes |
| 31 | + /// available. |
| 32 | + /// Thread-safe. |
| 33 | + pub async fn get(self: *Self) T { |
| 34 | + if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 1) { |
| 35 | + return self.data; |
| 36 | + } |
| 37 | + const held = await (async self.lock.acquire() catch unreachable); |
| 38 | + defer held.release(); |
| 39 | + |
| 40 | + return self.data; |
| 41 | + } |
| 42 | + |
| 43 | + /// Make the data become available. May be called only once. |
| 44 | + pub fn put(self: *Self, value: T) void { |
| 45 | + self.data = value; |
| 46 | + const prev = @atomicRmw(u8, &self.available, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); |
| 47 | + assert(prev == 0); // put() called twice |
| 48 | + Lock.Held.release(Lock.Held{ .lock = &self.lock }); |
| 49 | + } |
| 50 | + }; |
| 51 | +} |
| 52 | + |
| 53 | +test "std.event.Future" { |
| 54 | + var da = std.heap.DirectAllocator.init(); |
| 55 | + defer da.deinit(); |
| 56 | + |
| 57 | + const allocator = &da.allocator; |
| 58 | + |
| 59 | + var loop: Loop = undefined; |
| 60 | + try loop.initMultiThreaded(allocator); |
| 61 | + defer loop.deinit(); |
| 62 | + |
| 63 | + const handle = try async<allocator> testFuture(&loop); |
| 64 | + defer cancel handle; |
| 65 | + |
| 66 | + loop.run(); |
| 67 | +} |
| 68 | + |
| 69 | +async fn testFuture(loop: *Loop) void { |
| 70 | + var future = Future(i32).init(loop); |
| 71 | + |
| 72 | + const a = async waitOnFuture(&future) catch @panic("memory"); |
| 73 | + const b = async waitOnFuture(&future) catch @panic("memory"); |
| 74 | + const c = async resolveFuture(&future) catch @panic("memory"); |
| 75 | + |
| 76 | + const result = (await a) + (await b); |
| 77 | + cancel c; |
| 78 | + assert(result == 12); |
| 79 | +} |
| 80 | + |
| 81 | +async fn waitOnFuture(future: *Future(i32)) i32 { |
| 82 | + return await (async future.get() catch @panic("memory")); |
| 83 | +} |
| 84 | + |
| 85 | +async fn resolveFuture(future: *Future(i32)) void { |
| 86 | + future.put(6); |
| 87 | +} |
0 commit comments