Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 71 additions & 50 deletions src/channel.zig
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//// Credits to https://github.com/erik-dunteman/chanz/tree/main
//// This is a placeholder until the new async queue in zig 0.16.0.
//! Credits to https://github.com/erik-dunteman/chanz/tree/main for the
//! initial implementation. This has been heavily modified since.

const std = @import("std");

Expand All @@ -14,20 +14,27 @@ pub fn Chan(comptime T: type) type {
return BufferedChan(T, 0);
}

pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type {
pub fn BufferedChan(comptime T: type, comptime buf_size: u32) type {
return struct {
const Self = @This();
const bufType = [bufSize]?T;
buf: bufType = [_]?T{null} ** bufSize,
const bufType = [buf_size]?T;

/// Ring buffer.
ring_buffer: bufType = [_]?T{null} ** buf_size,
len: u32 = 0,
/// Head is the next item to receive.
head: u32 = 0,
/// Tail is the next free slot in the ring buffer.
tail: u32 = 0,

closed: bool = false,
mut: std.Io.Mutex = .init,
allocator: std.mem.Allocator = undefined,
allocator: std.mem.Allocator,
io: std.Io,
recvQ: std.ArrayList(*Receiver) = undefined,
sendQ: std.ArrayList(*Sender) = undefined,
len: u32 = 0,
recvQ: std.ArrayList(*Receiver),
sendQ: std.ArrayList(*Sender),

// represents a thread waiting on recv
// Represents a thread waiting on recv.
const Receiver = struct {
mut: std.Io.Mutex = .init,
cond: std.Io.Condition = .init,
Expand All @@ -39,7 +46,7 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type {
}
};

// represents a thread waiting on send
// Represents a thread waiting on send.
const Sender = struct {
mut: std.Io.Mutex = .init,
cond: std.Io.Condition = .init,
Expand Down Expand Up @@ -99,17 +106,14 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type {
else => T,
};
if (@hasDecl(type_info, "deinit")) {
for (self.buf, 0..) |buf, i| {
while (self.len > 0) {
const item = self.pop_ring_buffer() catch break;
if (@typeInfo(T) == .pointer) {
if (buf) |b| {
b.deinit();
}
item.deinit();
} else {
if (buf) |*b| {
@constCast(b).deinit();
}
var owned = item;
owned.deinit();
}
self.buf[i] = null;
}
} else {
@compileError(@typeName(T) ++ " must implement a 'deinit' method when .drain = true");
Expand All @@ -118,16 +122,44 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type {
}

pub fn capacity(self: *Self) u32 {
return self.buf.len;
return self.ring_buffer.len;
}

pub fn full(self: *Self) bool {
return self.buf.len == self.len;
return self.ring_buffer.len == self.len;
}

/// Increment the index, going back to 0 if needed.
fn next_ring_buffer_index(index: u32) u32 {
comptime std.debug.assert(buf_size > 0);
return (index + 1) % buf_size;
}

/// Push item onto the tail.
fn push_ring_buffer(self: *Self, data: T) ChanError!void {
if (self.len >= self.capacity()) {
return ChanError.DataCorruption;
}
self.ring_buffer[self.tail] = data;
self.tail = next_ring_buffer_index(self.tail);
self.len += 1;
}

/// Read from the head and then clear the slot.
fn pop_ring_buffer(self: *Self) ChanError!T {
if (self.len == 0) {
return ChanError.DataCorruption;
}
const val = self.ring_buffer[self.head] orelse return ChanError.DataCorruption;
self.ring_buffer[self.head] = null;
self.head = next_ring_buffer_index(self.head);
self.len -= 1;
return val;
}

fn debug_buf(self: *Self) void {
std.log.debug("{d} Buffer debug\n", .{std.time.milliTimestamp()});
for (self.buf, 0..) |item, i| {
for (self.ring_buffer, 0..) |item, i| {
if (item) |unwrapped| {
std.log.debug("[{d}] = {d}\n", .{ i, unwrapped });
}
Expand All @@ -140,8 +172,8 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type {
/// Returns true if sent successfully.
pub fn try_send(self: *Self, data: T) ChanError!bool {
self.mut.lockUncancelable(self.io);
if ((bufSize == 0 and self.recvQ.items.len > 0) or
(bufSize > 0 and self.len < self.capacity()))
if ((buf_size == 0 and self.recvQ.items.len > 0) or
(buf_size > 0 and self.len < self.capacity()))
{
try self._send(data);
return true;
Expand Down Expand Up @@ -177,13 +209,12 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type {
return;
}

if (self.len < self.capacity() and bufSize > 0) {
defer self.mut.unlock(self.io);

// insert into first null spot in buffer
self.buf[self.len] = data;
self.len += 1;
return;
if (comptime buf_size > 0) {
if (self.len < self.capacity()) {
defer self.mut.unlock(self.io);
try self.push_ring_buffer(data);
return;
}
}

// hold on sender queue. Receivers will signal when they take data.
Expand Down Expand Up @@ -236,8 +267,8 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type {
/// BufferedChan - receive if items have been sent
pub fn try_recv(self: *Self) ChanError!?T {
self.mut.lockUncancelable(self.io);
if ((bufSize == 0 and self.sendQ.items.len > 0) or
(bufSize > 0 and self.len > 0))
if ((buf_size == 0 and self.sendQ.items.len > 0) or
(buf_size > 0 and self.len > 0))
{
const val = try self._recv();
return val;
Expand All @@ -260,27 +291,17 @@ pub fn BufferedChan(comptime T: type, comptime bufSize: u32) type {
}

// case: value in buffer
const l = self.len;
if (l > 0 and bufSize > 0) {
if (self.len > 0 and buf_size > 0) {
defer self.mut.unlock(self.io);
const val = self.buf[0] orelse return ChanError.DataCorruption;

// advance items in buffer
if (l > 1) {
for (self.buf[1..l], 0..l - 1) |item, i| {
self.buf[i] = item;
}
}
self.buf[l - 1] = null;
const val = try self.pop_ring_buffer();

// Top up buffer with a waiting sender, if any. In this case
// the buffer remains the same logical length.
// Top up buffer with a waiting sender, if any. The receiver
// still gets the oldest buffered value, and the waiting sender's
// value occupies the newly freed ring slot.
if (self.sendQ.items.len > 0) {
var sender: *Sender = self.sendQ.orderedRemove(0);
const valFromSender: T = sender.get_data_and_signal(self.io);
self.buf[l - 1] = valFromSender;
} else {
self.len -= 1;
try self.push_ring_buffer(valFromSender);
}
return val;
}
Expand Down Expand Up @@ -651,7 +672,7 @@ test "Channel - close - BufferedChan drains queued items" {
chan.close(.{ .drain = true });

try std.testing.expectEqual(@as(usize, 3), deinit_count);
for (chan.buf) |entry| {
for (chan.ring_buffer) |entry| {
try std.testing.expect(entry == null);
}
}
Expand Down
Loading