diff --git a/src/channel.zig b/src/channel.zig index 0047f60..d8e4f01 100644 --- a/src/channel.zig +++ b/src/channel.zig @@ -1,5 +1,5 @@ -//// Credits to https://github.com/erik-dunteman/chanz/tree/main -//// This is a placeholder until the new async queue in zig 0.16.0. +//! Credits to https://github.com/erik-dunteman/chanz/tree/main for the +//! initial implementation. This has been heavily modified since. const std = @import("std"); @@ -14,20 +14,27 @@ pub fn Chan(comptime T: type) type { return BufferedChan(T, 0); } -pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type { +pub fn BufferedChan(comptime T: type, comptime buf_size: u32) type { return struct { const Self = @This(); - const bufType = [bufSize]?T; - buf: bufType = [_]?T{null} ** bufSize, + const bufType = [buf_size]?T; + + /// Ring buffer. + ring_buffer: bufType = [_]?T{null} ** buf_size, + len: u32 = 0, + /// Head is the next item to receive. + head: u32 = 0, + /// Tail is the next free slot in the ring buffer. + tail: u32 = 0, + closed: bool = false, mut: std.Io.Mutex = .init, - allocator: std.mem.Allocator = undefined, + allocator: std.mem.Allocator, io: std.Io, - recvQ: std.ArrayList(*Receiver) = undefined, - sendQ: std.ArrayList(*Sender) = undefined, - len: u32 = 0, + recvQ: std.ArrayList(*Receiver), + sendQ: std.ArrayList(*Sender), - // represents a thread waiting on recv + // Represents a thread waiting on recv. const Receiver = struct { mut: std.Io.Mutex = .init, cond: std.Io.Condition = .init, @@ -39,7 +46,7 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type { } }; - // represents a thread waiting on send + // Represents a thread waiting on send. const Sender = struct { mut: std.Io.Mutex = .init, cond: std.Io.Condition = .init, @@ -99,17 +106,14 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type { else => T, }; if (@hasDecl(type_info, "deinit")) { - for (self.buf, 0..) |buf, i| { + while (self.len > 0) { + const item = self.pop_ring_buffer() catch break; if (@typeInfo(T) == .pointer) { - if (buf) |b| { - b.deinit(); - } + item.deinit(); } else { - if (buf) |*b| { - @constCast(b).deinit(); - } + var owned = item; + owned.deinit(); } - self.buf[i] = null; } } else { @compileError(@typeName(T) ++ " must implement a 'deinit' method when .drain = true"); @@ -118,16 +122,44 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type { } pub fn capacity(self: *Self) u32 { - return self.buf.len; + return self.ring_buffer.len; } pub fn full(self: *Self) bool { - return self.buf.len == self.len; + return self.ring_buffer.len == self.len; + } + + /// Increment the index, going back to 0 if needed. + fn next_ring_buffer_index(index: u32) u32 { + comptime std.debug.assert(buf_size > 0); + return (index + 1) % buf_size; + } + + /// Push item onto the tail. + fn push_ring_buffer(self: *Self, data: T) ChanError!void { + if (self.len >= self.capacity()) { + return ChanError.DataCorruption; + } + self.ring_buffer[self.tail] = data; + self.tail = next_ring_buffer_index(self.tail); + self.len += 1; + } + + /// Read from the head and then clear the slot. + fn pop_ring_buffer(self: *Self) ChanError!T { + if (self.len == 0) { + return ChanError.DataCorruption; + } + const val = self.ring_buffer[self.head] orelse return ChanError.DataCorruption; + self.ring_buffer[self.head] = null; + self.head = next_ring_buffer_index(self.head); + self.len -= 1; + return val; } fn debug_buf(self: *Self) void { std.log.debug("{d} Buffer debug\n", .{std.time.milliTimestamp()}); - for (self.buf, 0..) |item, i| { + for (self.ring_buffer, 0..) |item, i| { if (item) |unwrapped| { std.log.debug("[{d}] = {d}\n", .{ i, unwrapped }); } @@ -140,8 +172,8 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type { /// Returns true if sent successfully. pub fn try_send(self: *Self, data: T) ChanError!bool { self.mut.lockUncancelable(self.io); - if ((bufSize == 0 and self.recvQ.items.len > 0) or - (bufSize > 0 and self.len < self.capacity())) + if ((buf_size == 0 and self.recvQ.items.len > 0) or + (buf_size > 0 and self.len < self.capacity())) { try self._send(data); return true; @@ -177,13 +209,12 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type { return; } - if (self.len < self.capacity() and bufSize > 0) { - defer self.mut.unlock(self.io); - - // insert into first null spot in buffer - self.buf[self.len] = data; - self.len += 1; - return; + if (comptime buf_size > 0) { + if (self.len < self.capacity()) { + defer self.mut.unlock(self.io); + try self.push_ring_buffer(data); + return; + } } // hold on sender queue. Receivers will signal when they take data. @@ -236,8 +267,8 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type { /// BufferedChan - receive if items have been sent pub fn try_recv(self: *Self) ChanError!?T { self.mut.lockUncancelable(self.io); - if ((bufSize == 0 and self.sendQ.items.len > 0) or - (bufSize > 0 and self.len > 0)) + if ((buf_size == 0 and self.sendQ.items.len > 0) or + (buf_size > 0 and self.len > 0)) { const val = try self._recv(); return val; @@ -260,27 +291,17 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type { } // case: value in buffer - const l = self.len; - if (l > 0 and bufSize > 0) { + if (self.len > 0 and buf_size > 0) { defer self.mut.unlock(self.io); - const val = self.buf[0] orelse return ChanError.DataCorruption; - - // advance items in buffer - if (l > 1) { - for (self.buf[1..l], 0..l - 1) |item, i| { - self.buf[i] = item; - } - } - self.buf[l - 1] = null; + const val = try self.pop_ring_buffer(); - // Top up buffer with a waiting sender, if any. In this case - // the buffer remains the same logical length. + // Top up buffer with a waiting sender, if any. The receiver + // still gets the oldest buffered value, and the waiting sender's + // value occupies the newly freed ring slot. if (self.sendQ.items.len > 0) { var sender: *Sender = self.sendQ.orderedRemove(0); const valFromSender: T = sender.get_data_and_signal(self.io); - self.buf[l - 1] = valFromSender; - } else { - self.len -= 1; + try self.push_ring_buffer(valFromSender); } return val; } @@ -651,7 +672,7 @@ test "Channel - close - BufferedChan drains queued items" { chan.close(.{ .drain = true }); try std.testing.expectEqual(@as(usize, 3), deinit_count); - for (chan.buf) |entry| { + for (chan.ring_buffer) |entry| { try std.testing.expect(entry == null); } }