This repository has been archived by the owner on Sep 27, 2024. It is now read-only.
forked from cekongnetcv/pike
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevent_epoll.zig
96 lines (79 loc) · 2.99 KB
/
event_epoll.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
const std = @import("std");
const pike = @import("pike.zig");
const Waker = @import("waker.zig").Waker;
const os = std.os;
const mem = std.mem;
pub const Event = struct {
const Self = @This();
handle: pike.Handle,
readers: Waker = .{},
writers: Waker = .{},
pub fn init() !Self {
return Self{
.handle = .{
.inner = try os.eventfd(0, os.linux.EFD.CLOEXEC | os.linux.EFD.NONBLOCK),
.wake_fn = wake,
},
};
}
pub fn deinit(self: *Self) void {
os.close(self.handle.inner);
if (self.writers.shutdown()) |task| pike.dispatch(task, .{});
if (self.readers.shutdown()) |task| pike.dispatch(task, .{});
}
pub fn registerTo(self: *const Self, notifier: *const pike.Notifier) !void {
try notifier.register(&self.handle, .{ .read = true, .write = true });
}
fn wake(handle: *pike.Handle, batch: *pike.Batch, opts: pike.WakeOptions) void {
const self = @fieldParentPtr(Self, "handle", handle);
if (opts.write_ready) if (self.writers.notify()) |task| batch.push(task);
if (opts.read_ready) if (self.readers.notify()) |task| batch.push(task);
if (opts.shutdown) {
if (self.writers.shutdown()) |task| batch.push(task);
if (self.readers.shutdown()) |task| batch.push(task);
}
}
fn ErrorUnionOf(comptime func: anytype) std.builtin.TypeInfo.ErrorUnion {
return @typeInfo(@typeInfo(@TypeOf(func)).Fn.return_type.?).ErrorUnion;
}
fn call(self: *Self, comptime function: anytype, args: anytype, comptime opts: pike.CallOptions) !ErrorUnionOf(function).payload {
while (true) {
const result = @call(.{ .modifier = .always_inline }, function, args) catch |err| switch (err) {
error.WouldBlock => {
if (comptime opts.write) {
try self.writers.wait(.{ .use_lifo = true });
} else if (comptime opts.read) {
try self.readers.wait(.{});
}
continue;
},
else => return err,
};
return result;
}
}
fn write(self: *Self, amount: u64) callconv(.Async) !void {
const num_bytes = try self.call(os.write, .{
self.handle.inner,
mem.asBytes(&amount),
}, .{ .write = true });
if (num_bytes != @sizeOf(@TypeOf(amount))) {
return error.ShortWrite;
}
}
fn read(self: *Self) callconv(.Async) !void {
var counter: u64 = 0;
const num_bytes = try self.call(os.read, .{
self.handle.inner,
mem.asBytes(&counter),
}, .{ .read = true });
if (num_bytes != @sizeOf(@TypeOf(counter))) {
return error.ShortRead;
}
}
pub fn post(self: *Self) callconv(.Async) !void {
var frame = async self.read();
try self.write(1);
try await frame;
}
};