-
Notifications
You must be signed in to change notification settings - Fork 0
/
message-queue.zig
131 lines (114 loc) · 3.52 KB
/
message-queue.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
const std = @import("std");
const Allocator = std.mem.Allocator;
const edsm = @import("edsm.zig");
const ecap = @import("event-capture.zig");
const esrc = @import("event-sources.zig");
/// This structure decribes a message being sent to stage machines
pub const Message = struct {
/// internal messages
pub const M0: u4 = 0;
pub const M1: u4 = 1;
pub const M2: u4 = 2;
pub const M3: u4 = 3;
pub const M4: u4 = 4;
pub const M5: u4 = 5;
pub const M6: u4 = 6;
pub const M7: u4 = 7;
/// read()/accept() will not block (POLLIN)
pub const D0: u4 = 0;
/// write() will not block/connection established (POLLOUT)
pub const D1: u4 = 1;
/// error happened (POLLERR, POLLHUP, POLLRDHUP)
pub const D2: u4 = 2;
/// timers
pub const T0: u4 = 0;
pub const T1: u4 = 1;
pub const T2: u4 = 2;
/// signals
pub const S0: u4 = 0;
pub const S1: u4 = 1;
pub const S2: u4 = 2;
/// file system events (TODO)
pub const F0: u4 = 0;
pub const F1: u4 = 1;
pub const F2: u4 = 2;
/// message sender (null for messages from OS)
src: ?*edsm.StageMachine,
/// message recipient (null will stop event loop)
dst: ?*edsm.StageMachine,
/// row number for stage reflex matrix
esk: esrc.EventSourceKind,
/// column number for stage reflex matrix
sqn: u4,
/// *EventSource for messages from OS (Tx, Sx, Dx, Fx),
/// otherwise (Mx) pointer to some arbitrary data if needed
ptr: ?*anyopaque,
};
pub const MessageQueueError = error {
IsFull,
};
/// Ring buffer (non-growable) that holds messages
pub const MessageQueue = struct {
cap: u32,
storage: []Message,
index_mask: u32,
r_index: u32,
w_index: u32,
n_items: u32,
pub fn onHeap(a: Allocator, order: u5) !*MessageQueue {
var mq = try a.create(MessageQueue);
mq.cap = @as(u32, 1) << order;
mq.storage = try a.alloc(Message, mq.cap);
mq.index_mask = mq.cap - 1;
mq.r_index = 0;
mq.w_index = mq.cap - 1;
mq.n_items = 0;
return mq;
}
pub fn put(self: *MessageQueue, item: Message) !void {
if (self.n_items == self.cap) return MessageQueueError.IsFull;
self.w_index += 1;
self.w_index &= self.index_mask;
self.storage[self.w_index] = item;
self.n_items += 1;
}
pub fn get(self: *MessageQueue) ?Message {
if (0 == self.n_items) return null;
var item = self.storage[self.r_index];
self.n_items -= 1;
self.r_index += 1;
self.r_index &= self.index_mask;
return item;
}
};
pub const MessageDispatcher = struct {
mq: *MessageQueue,
eq: ecap.EventQueue,
pub fn onStack(a: Allocator, mq_cap_order: u5) !MessageDispatcher {
var mq = try MessageQueue.onHeap(a, mq_cap_order);
var eq = try ecap.EventQueue.onStack(mq);
return MessageDispatcher {
.mq = mq,
.eq = eq,
};
}
/// message processing loop
pub fn loop(self: *MessageDispatcher) !void {
outer: while (true) {
while (true) {
const msg = self.mq.get() orelse break;
if (msg.dst) |sm| {
sm.reactTo(msg);
} else {
if (msg.src) |sm| {
if (sm.current_stage.leave) |bye| {
bye(sm);
}
}
break :outer;
}
}
try self.eq.wait();
}
}
};