Skip to content

Commit b6eb404

Browse files
committedJul 10, 2018
organize std.event into directories
·
0.15.10.3.0
1 parent ccef60a commit b6eb404

File tree

7 files changed

+1277
-1222
lines changed

7 files changed

+1277
-1222
lines changed
 

‎CMakeLists.txt‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,11 @@ set(ZIG_STD_FILES
458458
"elf.zig"
459459
"empty.zig"
460460
"event.zig"
461+
"event/channel.zig"
462+
"event/lock.zig"
463+
"event/locked.zig"
464+
"event/loop.zig"
465+
"event/tcp.zig"
461466
"fmt/errol/enum3.zig"
462467
"fmt/errol/index.zig"
463468
"fmt/errol/lookup.zig"

‎std/event.zig‎

Lines changed: 12 additions & 1222 deletions
Large diffs are not rendered by default.

‎std/event/channel.zig‎

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
const std = @import("../index.zig");
2+
const builtin = @import("builtin");
3+
const assert = std.debug.assert;
4+
const AtomicRmwOp = builtin.AtomicRmwOp;
5+
const AtomicOrder = builtin.AtomicOrder;
6+
const Loop = std.event.Loop;
7+
8+
/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
9+
/// when buffer is empty, consumers suspend and are resumed by producers
10+
/// when buffer is full, producers suspend and are resumed by consumers
11+
pub fn Channel(comptime T: type) type {
12+
return struct {
13+
loop: *Loop,
14+
15+
getters: std.atomic.QueueMpsc(GetNode),
16+
putters: std.atomic.QueueMpsc(PutNode),
17+
get_count: usize,
18+
put_count: usize,
19+
dispatch_lock: u8, // TODO make this a bool
20+
need_dispatch: u8, // TODO make this a bool
21+
22+
// simple fixed size ring buffer
23+
buffer_nodes: []T,
24+
buffer_index: usize,
25+
buffer_len: usize,
26+
27+
const SelfChannel = this;
28+
const GetNode = struct {
29+
ptr: *T,
30+
tick_node: *Loop.NextTickNode,
31+
};
32+
const PutNode = struct {
33+
data: T,
34+
tick_node: *Loop.NextTickNode,
35+
};
36+
37+
/// call destroy when done
38+
pub fn create(loop: *Loop, capacity: usize) !*SelfChannel {
39+
const buffer_nodes = try loop.allocator.alloc(T, capacity);
40+
errdefer loop.allocator.free(buffer_nodes);
41+
42+
const self = try loop.allocator.create(SelfChannel{
43+
.loop = loop,
44+
.buffer_len = 0,
45+
.buffer_nodes = buffer_nodes,
46+
.buffer_index = 0,
47+
.dispatch_lock = 0,
48+
.need_dispatch = 0,
49+
.getters = std.atomic.QueueMpsc(GetNode).init(),
50+
.putters = std.atomic.QueueMpsc(PutNode).init(),
51+
.get_count = 0,
52+
.put_count = 0,
53+
});
54+
errdefer loop.allocator.destroy(self);
55+
56+
return self;
57+
}
58+
59+
/// must be called when all calls to put and get have suspended and no more calls occur
60+
pub fn destroy(self: *SelfChannel) void {
61+
while (self.getters.get()) |get_node| {
62+
cancel get_node.data.tick_node.data;
63+
}
64+
while (self.putters.get()) |put_node| {
65+
cancel put_node.data.tick_node.data;
66+
}
67+
self.loop.allocator.free(self.buffer_nodes);
68+
self.loop.allocator.destroy(self);
69+
}
70+
71+
/// puts a data item in the channel. The promise completes when the value has been added to the
72+
/// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
73+
pub async fn put(self: *SelfChannel, data: T) void {
74+
// TODO should be able to group memory allocation failure before first suspend point
75+
// so that the async invocation catches it
76+
var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
77+
_ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
78+
79+
suspend |handle| {
80+
var my_tick_node = Loop.NextTickNode{
81+
.next = undefined,
82+
.data = handle,
83+
};
84+
var queue_node = std.atomic.QueueMpsc(PutNode).Node{
85+
.data = PutNode{
86+
.tick_node = &my_tick_node,
87+
.data = data,
88+
},
89+
.next = undefined,
90+
};
91+
self.putters.put(&queue_node);
92+
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
93+
94+
self.loop.onNextTick(dispatch_tick_node_ptr);
95+
}
96+
}
97+
98+
/// await this function to get an item from the channel. If the buffer is empty, the promise will
99+
/// complete when the next item is put in the channel.
100+
pub async fn get(self: *SelfChannel) T {
101+
// TODO should be able to group memory allocation failure before first suspend point
102+
// so that the async invocation catches it
103+
var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
104+
_ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
105+
106+
// TODO integrate this function with named return values
107+
// so we can get rid of this extra result copy
108+
var result: T = undefined;
109+
suspend |handle| {
110+
var my_tick_node = Loop.NextTickNode{
111+
.next = undefined,
112+
.data = handle,
113+
};
114+
var queue_node = std.atomic.QueueMpsc(GetNode).Node{
115+
.data = GetNode{
116+
.ptr = &result,
117+
.tick_node = &my_tick_node,
118+
},
119+
.next = undefined,
120+
};
121+
self.getters.put(&queue_node);
122+
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
123+
124+
self.loop.onNextTick(dispatch_tick_node_ptr);
125+
}
126+
return result;
127+
}
128+
129+
async fn dispatch(self: *SelfChannel, tick_node_ptr: **Loop.NextTickNode) void {
130+
// resumed by onNextTick
131+
suspend |handle| {
132+
var tick_node = Loop.NextTickNode{
133+
.data = handle,
134+
.next = undefined,
135+
};
136+
tick_node_ptr.* = &tick_node;
137+
}
138+
139+
// set the "need dispatch" flag
140+
_ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
141+
142+
lock: while (true) {
143+
// set the lock flag
144+
const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
145+
if (prev_lock != 0) return;
146+
147+
// clear the need_dispatch flag since we're about to do it
148+
_ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
149+
150+
while (true) {
151+
one_dispatch: {
152+
// later we correct these extra subtractions
153+
var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
154+
var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
155+
156+
// transfer self.buffer to self.getters
157+
while (self.buffer_len != 0) {
158+
if (get_count == 0) break :one_dispatch;
159+
160+
const get_node = &self.getters.get().?.data;
161+
get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
162+
self.loop.onNextTick(get_node.tick_node);
163+
self.buffer_len -= 1;
164+
165+
get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
166+
}
167+
168+
// direct transfer self.putters to self.getters
169+
while (get_count != 0 and put_count != 0) {
170+
const get_node = &self.getters.get().?.data;
171+
const put_node = &self.putters.get().?.data;
172+
173+
get_node.ptr.* = put_node.data;
174+
self.loop.onNextTick(get_node.tick_node);
175+
self.loop.onNextTick(put_node.tick_node);
176+
177+
get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
178+
put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
179+
}
180+
181+
// transfer self.putters to self.buffer
182+
while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
183+
const put_node = &self.putters.get().?.data;
184+
185+
self.buffer_nodes[self.buffer_index] = put_node.data;
186+
self.loop.onNextTick(put_node.tick_node);
187+
self.buffer_index +%= 1;
188+
self.buffer_len += 1;
189+
190+
put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
191+
}
192+
}
193+
194+
// undo the extra subtractions
195+
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
196+
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
197+
198+
// clear need-dispatch flag
199+
const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
200+
if (need_dispatch != 0) continue;
201+
202+
const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
203+
assert(my_lock != 0);
204+
205+
// we have to check again now that we unlocked
206+
if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock;
207+
208+
return;
209+
}
210+
}
211+
}
212+
};
213+
}
214+
215+
test "std.event.Channel" {
216+
var da = std.heap.DirectAllocator.init();
217+
defer da.deinit();
218+
219+
const allocator = &da.allocator;
220+
221+
var loop: Loop = undefined;
222+
// TODO make a multi threaded test
223+
try loop.initSingleThreaded(allocator);
224+
defer loop.deinit();
225+
226+
const channel = try Channel(i32).create(&loop, 0);
227+
defer channel.destroy();
228+
229+
const handle = try async<allocator> testChannelGetter(&loop, channel);
230+
defer cancel handle;
231+
232+
const putter = try async<allocator> testChannelPutter(channel);
233+
defer cancel putter;
234+
235+
loop.run();
236+
}
237+
238+
async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
239+
errdefer @panic("test failed");
240+
241+
const value1_promise = try async channel.get();
242+
const value1 = await value1_promise;
243+
assert(value1 == 1234);
244+
245+
const value2_promise = try async channel.get();
246+
const value2 = await value2_promise;
247+
assert(value2 == 4567);
248+
}
249+
250+
async fn testChannelPutter(channel: *Channel(i32)) void {
251+
await (async channel.put(1234) catch @panic("out of memory"));
252+
await (async channel.put(4567) catch @panic("out of memory"));
253+
}
254+

‎std/event/lock.zig‎

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
const std = @import("../index.zig");
2+
const builtin = @import("builtin");
3+
const assert = std.debug.assert;
4+
const mem = std.mem;
5+
const AtomicRmwOp = builtin.AtomicRmwOp;
6+
const AtomicOrder = builtin.AtomicOrder;
7+
const Loop = std.event.Loop;
8+
9+
/// Thread-safe async/await lock.
10+
/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
11+
/// are resumed when the lock is released, in order.
12+
pub const Lock = struct {
13+
loop: *Loop,
14+
shared_bit: u8, // TODO make this a bool
15+
queue: Queue,
16+
queue_empty_bit: u8, // TODO make this a bool
17+
18+
const Queue = std.atomic.QueueMpsc(promise);
19+
20+
pub const Held = struct {
21+
lock: *Lock,
22+
23+
pub fn release(self: Held) void {
24+
// Resume the next item from the queue.
25+
if (self.lock.queue.get()) |node| {
26+
self.lock.loop.onNextTick(node);
27+
return;
28+
}
29+
30+
// We need to release the lock.
31+
_ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
32+
_ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
33+
34+
// There might be a queue item. If we know the queue is empty, we can be done,
35+
// because the other actor will try to obtain the lock.
36+
// But if there's a queue item, we are the actor which must loop and attempt
37+
// to grab the lock again.
38+
if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
39+
return;
40+
}
41+
42+
while (true) {
43+
const old_bit = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
44+
if (old_bit != 0) {
45+
// We did not obtain the lock. Great, the queue is someone else's problem.
46+
return;
47+
}
48+
49+
// Resume the next item from the queue.
50+
if (self.lock.queue.get()) |node| {
51+
self.lock.loop.onNextTick(node);
52+
return;
53+
}
54+
55+
// Release the lock again.
56+
_ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
57+
_ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
58+
59+
// Find out if we can be done.
60+
if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
61+
return;
62+
}
63+
}
64+
}
65+
};
66+
67+
pub fn init(loop: *Loop) Lock {
68+
return Lock{
69+
.loop = loop,
70+
.shared_bit = 0,
71+
.queue = Queue.init(),
72+
.queue_empty_bit = 1,
73+
};
74+
}
75+
76+
/// Must be called when not locked. Not thread safe.
77+
/// All calls to acquire() and release() must complete before calling deinit().
78+
pub fn deinit(self: *Lock) void {
79+
assert(self.shared_bit == 0);
80+
while (self.queue.get()) |node| cancel node.data;
81+
}
82+
83+
pub async fn acquire(self: *Lock) Held {
84+
s: suspend |handle| {
85+
// TODO explicitly put this memory in the coroutine frame #1194
86+
var my_tick_node = Loop.NextTickNode{
87+
.data = handle,
88+
.next = undefined,
89+
};
90+
91+
self.queue.put(&my_tick_node);
92+
93+
// At this point, we are in the queue, so we might have already been resumed and this coroutine
94+
// frame might be destroyed. For the rest of the suspend block we cannot access the coroutine frame.
95+
96+
// We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor
97+
// will attempt to grab the lock.
98+
_ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
99+
100+
while (true) {
101+
const old_bit = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
102+
if (old_bit != 0) {
103+
// We did not obtain the lock. Trust that our queue entry will resume us, and allow
104+
// suspend to complete.
105+
break;
106+
}
107+
// We got the lock. However we might have already been resumed from the queue.
108+
if (self.queue.get()) |node| {
109+
// Whether this node is us or someone else, we tail resume it.
110+
resume node.data;
111+
break;
112+
} else {
113+
// We already got resumed, and there are none left in the queue, which means that
114+
// we aren't even supposed to hold the lock right now.
115+
_ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
116+
_ = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
117+
118+
// There might be a queue item. If we know the queue is empty, we can be done,
119+
// because the other actor will try to obtain the lock.
120+
// But if there's a queue item, we are the actor which must loop and attempt
121+
// to grab the lock again.
122+
if (@atomicLoad(u8, &self.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
123+
break;
124+
} else {
125+
continue;
126+
}
127+
}
128+
unreachable;
129+
}
130+
}
131+
132+
return Held{ .lock = self };
133+
}
134+
};
135+
136+
test "std.event.Lock" {
137+
var da = std.heap.DirectAllocator.init();
138+
defer da.deinit();
139+
140+
const allocator = &da.allocator;
141+
142+
var loop: Loop = undefined;
143+
try loop.initMultiThreaded(allocator);
144+
defer loop.deinit();
145+
146+
var lock = Lock.init(&loop);
147+
defer lock.deinit();
148+
149+
const handle = try async<allocator> testLock(&loop, &lock);
150+
defer cancel handle;
151+
loop.run();
152+
153+
assert(mem.eql(i32, shared_test_data, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len));
154+
}
155+
156+
async fn testLock(loop: *Loop, lock: *Lock) void {
157+
// TODO explicitly put next tick node memory in the coroutine frame #1194
158+
suspend |p| {
159+
resume p;
160+
}
161+
const handle1 = async lockRunner(lock) catch @panic("out of memory");
162+
var tick_node1 = Loop.NextTickNode{
163+
.next = undefined,
164+
.data = handle1,
165+
};
166+
loop.onNextTick(&tick_node1);
167+
168+
const handle2 = async lockRunner(lock) catch @panic("out of memory");
169+
var tick_node2 = Loop.NextTickNode{
170+
.next = undefined,
171+
.data = handle2,
172+
};
173+
loop.onNextTick(&tick_node2);
174+
175+
const handle3 = async lockRunner(lock) catch @panic("out of memory");
176+
var tick_node3 = Loop.NextTickNode{
177+
.next = undefined,
178+
.data = handle3,
179+
};
180+
loop.onNextTick(&tick_node3);
181+
182+
await handle1;
183+
await handle2;
184+
await handle3;
185+
}
186+
187+
var shared_test_data = [1]i32{0} ** 10;
188+
var shared_test_index: usize = 0;
189+
190+
async fn lockRunner(lock: *Lock) void {
191+
suspend; // resumed by onNextTick
192+
193+
var i: usize = 0;
194+
while (i < shared_test_data.len) : (i += 1) {
195+
const lock_promise = async lock.acquire() catch @panic("out of memory");
196+
const handle = await lock_promise;
197+
defer handle.release();
198+
199+
shared_test_index = 0;
200+
while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) {
201+
shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1;
202+
}
203+
}
204+
}

‎std/event/locked.zig‎

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
const std = @import("../index.zig");
2+
const Lock = std.event.Lock;
3+
4+
/// Thread-safe async/await lock that protects one piece of data.
5+
/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
6+
/// are resumed when the lock is released, in order.
7+
pub fn Locked(comptime T: type) type {
8+
return struct {
9+
lock: Lock,
10+
private_data: T,
11+
12+
const Self = this;
13+
14+
pub const HeldLock = struct {
15+
value: *T,
16+
held: Lock.Held,
17+
18+
pub fn release(self: HeldLock) void {
19+
self.held.release();
20+
}
21+
};
22+
23+
pub fn init(loop: *Loop, data: T) Self {
24+
return Self{
25+
.lock = Lock.init(loop),
26+
.private_data = data,
27+
};
28+
}
29+
30+
pub fn deinit(self: *Self) void {
31+
self.lock.deinit();
32+
}
33+
34+
pub async fn acquire(self: *Self) HeldLock {
35+
return HeldLock{
36+
// TODO guaranteed allocation elision
37+
.held = await (async self.lock.acquire() catch unreachable),
38+
.value = &self.private_data,
39+
};
40+
}
41+
};
42+
}

‎std/event/loop.zig‎

Lines changed: 577 additions & 0 deletions
Large diffs are not rendered by default.

‎std/event/tcp.zig‎

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
const std = @import("../index.zig");
2+
const builtin = @import("builtin");
3+
const assert = std.debug.assert;
4+
const event = std.event;
5+
const mem = std.mem;
6+
const posix = std.os.posix;
7+
const windows = std.os.windows;
8+
const Loop = std.event.Loop;
9+
10+
pub const Server = struct {
11+
handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, *const std.os.File) void,
12+
13+
loop: *Loop,
14+
sockfd: ?i32,
15+
accept_coro: ?promise,
16+
listen_address: std.net.Address,
17+
18+
waiting_for_emfile_node: PromiseNode,
19+
listen_resume_node: event.Loop.ResumeNode,
20+
21+
const PromiseNode = std.LinkedList(promise).Node;
22+
23+
pub fn init(loop: *Loop) Server {
24+
// TODO can't initialize handler coroutine here because we need well defined copy elision
25+
return Server{
26+
.loop = loop,
27+
.sockfd = null,
28+
.accept_coro = null,
29+
.handleRequestFn = undefined,
30+
.waiting_for_emfile_node = undefined,
31+
.listen_address = undefined,
32+
.listen_resume_node = event.Loop.ResumeNode{
33+
.id = event.Loop.ResumeNode.Id.Basic,
34+
.handle = undefined,
35+
},
36+
};
37+
}
38+
39+
pub fn listen(
40+
self: *Server,
41+
address: *const std.net.Address,
42+
handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, *const std.os.File) void,
43+
) !void {
44+
self.handleRequestFn = handleRequestFn;
45+
46+
const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp);
47+
errdefer std.os.close(sockfd);
48+
self.sockfd = sockfd;
49+
50+
try std.os.posixBind(sockfd, &address.os_addr);
51+
try std.os.posixListen(sockfd, posix.SOMAXCONN);
52+
self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(sockfd));
53+
54+
self.accept_coro = try async<self.loop.allocator> Server.handler(self);
55+
errdefer cancel self.accept_coro.?;
56+
57+
self.listen_resume_node.handle = self.accept_coro.?;
58+
try self.loop.addFd(sockfd, &self.listen_resume_node);
59+
errdefer self.loop.removeFd(sockfd);
60+
}
61+
62+
/// Stop listening
63+
pub fn close(self: *Server) void {
64+
self.loop.removeFd(self.sockfd.?);
65+
std.os.close(self.sockfd.?);
66+
}
67+
68+
pub fn deinit(self: *Server) void {
69+
if (self.accept_coro) |accept_coro| cancel accept_coro;
70+
if (self.sockfd) |sockfd| std.os.close(sockfd);
71+
}
72+
73+
pub async fn handler(self: *Server) void {
74+
while (true) {
75+
var accepted_addr: std.net.Address = undefined;
76+
if (std.os.posixAccept(self.sockfd.?, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| {
77+
var socket = std.os.File.openHandle(accepted_fd);
78+
_ = async<self.loop.allocator> self.handleRequestFn(self, accepted_addr, socket) catch |err| switch (err) {
79+
error.OutOfMemory => {
80+
socket.close();
81+
continue;
82+
},
83+
};
84+
} else |err| switch (err) {
85+
error.WouldBlock => {
86+
suspend; // we will get resumed by epoll_wait in the event loop
87+
continue;
88+
},
89+
error.ProcessFdQuotaExceeded => {
90+
errdefer std.os.emfile_promise_queue.remove(&self.waiting_for_emfile_node);
91+
suspend |p| {
92+
self.waiting_for_emfile_node = PromiseNode.init(p);
93+
std.os.emfile_promise_queue.append(&self.waiting_for_emfile_node);
94+
}
95+
continue;
96+
},
97+
error.ConnectionAborted, error.FileDescriptorClosed => continue,
98+
99+
error.PageFault => unreachable,
100+
error.InvalidSyscall => unreachable,
101+
error.FileDescriptorNotASocket => unreachable,
102+
error.OperationNotSupported => unreachable,
103+
104+
error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => {
105+
@panic("TODO handle this error");
106+
},
107+
}
108+
}
109+
}
110+
};
111+
112+
pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File {
113+
var address = _address.*; // TODO https://github.com/ziglang/zig/issues/733
114+
115+
const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp);
116+
errdefer std.os.close(sockfd);
117+
118+
try std.os.posixConnectAsync(sockfd, &address.os_addr);
119+
try await try async loop.waitFd(sockfd);
120+
try std.os.posixGetSockOptConnectError(sockfd);
121+
122+
return std.os.File.openHandle(sockfd);
123+
}
124+
125+
test "listen on a port, send bytes, receive bytes" {
126+
if (builtin.os != builtin.Os.linux) {
127+
// TODO build abstractions for other operating systems
128+
return;
129+
}
130+
const MyServer = struct {
131+
tcp_server: Server,
132+
133+
const Self = this;
134+
async<*mem.Allocator> fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: *const std.os.File) void {
135+
const self = @fieldParentPtr(Self, "tcp_server", tcp_server);
136+
var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733
137+
defer socket.close();
138+
// TODO guarantee elision of this allocation
139+
const next_handler = async errorableHandler(self, _addr, socket) catch unreachable;
140+
(await next_handler) catch |err| {
141+
std.debug.panic("unable to handle connection: {}\n", err);
142+
};
143+
suspend |p| {
144+
cancel p;
145+
}
146+
}
147+
async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: *const std.os.File) !void {
148+
const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/733
149+
var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733
150+
151+
var adapter = std.io.FileOutStream.init(&socket);
152+
var stream = &adapter.stream;
153+
try stream.print("hello from server\n");
154+
}
155+
};
156+
157+
const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable;
158+
const addr = std.net.Address.initIp4(ip4addr, 0);
159+
160+
var loop: Loop = undefined;
161+
try loop.initSingleThreaded(std.debug.global_allocator);
162+
var server = MyServer{ .tcp_server = Server.init(&loop) };
163+
defer server.tcp_server.deinit();
164+
try server.tcp_server.listen(addr, MyServer.handler);
165+
166+
const p = try async<std.debug.global_allocator> doAsyncTest(&loop, server.tcp_server.listen_address, &server.tcp_server);
167+
defer cancel p;
168+
loop.run();
169+
}
170+
171+
async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Server) void {
172+
errdefer @panic("test failure");
173+
174+
var socket_file = try await try async connect(loop, address);
175+
defer socket_file.close();
176+
177+
var buf: [512]u8 = undefined;
178+
const amt_read = try socket_file.read(buf[0..]);
179+
const msg = buf[0..amt_read];
180+
assert(mem.eql(u8, msg, "hello from server\n"));
181+
server.close();
182+
}
183+

0 commit comments

Comments
 (0)
Please sign in to comment.