Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions benchmarks/concurrent/01_socket_throughput/bench.cht
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,36 @@
# `client` (reader fd) is owned by main; its defer fires on RETURN.

FN main() RETURNS Void ->
server = TCPServer::listen(14537);
conn = TCPClient::connect("127.0.0.1", 14537);
client = accept(server);
server = TCPServer::listen(14_537);
conn = TCPClient::connect("127.0.0.1", 14_537);
client = server.accept();

# Writer fiber: sends 100 000 × 256-byte messages.
# `conn` is an i32, captured by value into the BG closure.
# The fiber runs concurrently with the reader loop below.
msg = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
w: ~Void = BG {
MUTABLE wi = 0;
WHILE wi < 100000 DO
tcpWrite(conn, msg);
wi += 1;
END
};

# Reader loop: reads until all 25 600 000 bytes received.
# TCP coalesces writes so each tcpRead may return more than 256 bytes.
# Timer covers only the read loop (same scope as C/Rust BENCH_RESULT).
t0 = timestampMs();
MUTABLE total_bytes = 0;
WHILE total_bytes < 25600000 DO
data = tcpRead(client);
total_bytes = total_bytes + data.length();
# Writer fiber: sends 100 000 × 256-byte messages.
# `conn` is an i32, captured by value into the BG closure.
# The fiber runs concurrently with the reader loop below.
msg = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
w: ~Void = BG {
MUTABLE wi = 0;
WHILE wi < 100_000 DO
conn.tcpWrite(msg);
wi += 1;
END
elapsed = timestampMs() - t0;
};

# Reader loop: reads until all 25 600 000 bytes received.
# TCP coalesces writes so each tcpRead may return more than 256 bytes.
# Timer covers only the read loop (same scope as C/Rust BENCH_RESULT).
t0 = timestampMs();
MUTABLE total_bytes = 0;
WHILE total_bytes < 25_600_000 DO
data = client.tcpRead();
total_bytes = total_bytes + data.length();
END
elapsed = timestampMs() - t0;

print("BENCH_RESULT: ${elapsed.toString()} ms");
print("BENCH_RESULT: ${elapsed.toString()} ms");

# Wait for writer to finish, then let defers close both fds.
NEXT w;
RETURN;
# Wait for writer to finish, then let defers close both fds.
NEXT w;
RETURN;
END
115 changes: 61 additions & 54 deletions benchmarks/concurrent/02_concurrent_search/bench.cht
Original file line number Diff line number Diff line change
Expand Up @@ -21,74 +21,81 @@
# zig build-exe zig/bench.zig zig/switch.S zig/onRoot.S \
# --name bench_clear -O ReleaseFast -lc

STRUCT SearchResult { file_idx: Int64, count: Int64 }
STRUCT SearchResult {
file_idx: Int64,
count: Int64
}

# Search a batch of files concurrently. Returns heap-promoted count list.
# Isolating the promise list here prevents per-iteration frame rewind in main.
FN search_batch!(files: String[], start: Int64, end: Int64, data_dir: String, needle: String) RETURNS !Int64[] ->
MUTABLE futures: ~Int64[]@list = [];
FOR i IN (start ..< end) DO
filepath = "${data_dir}/${files[i]}";
futures.append(BG {
content = readFile(filepath);
countOccurrences(content, needle);
});
END
counts: Int64[]@list = NEXT futures;
RETURN counts;
MUTABLE futures: ~Int64[]@list = [];
FOR i IN (start ..< end) DO
filepath = "${data_dir}/${files[i]}";
futures.append(
BG {
content = readFile(filepath);
content.countOccurrences(needle);
}
);
END
counts: Int64[]@list = NEXT futures;
RETURN counts;
END

FN main() RETURNS Void ->
t0 = timestampMs();
needle = "the";
data_dir = "benchmarks/10_concurrent_search/data";

# List all files in the data directory (filenames only, not full paths).
files = listDir(data_dir);
n: Int64 = files.length();

# Process files in batches to bound io_uring SQ pressure.
# io_uring SQ capacity = 256; BATCH_SIZE = 128 keeps headroom.
batch_size: Int64 = 128;
MUTABLE results: SearchResult[]@list = [];
MUTABLE batch_start: Int64 = 0;
t0 = timestampMs();
needle = "the";
data_dir = "benchmarks/10_concurrent_search/data";

WHILE batch_start < n DO
MUTABLE batch_end: Int64 = batch_start + batch_size;
IF batch_end > n THEN batch_end = n; END
# List all files in the data directory (filenames only, not full paths).
files = data_dir.listDir();
n = files.length();

counts = search_batch!(files, batch_start, batch_end, data_dir, needle);
FOR k IN (0 ..< counts.length()) DO
results.append(SearchResult{ file_idx: batch_start + k, count: counts[k] });
END
# Process files in batches to bound io_uring SQ pressure.
# io_uring SQ capacity = 256; BATCH_SIZE = 128 keeps headroom.
batch_size = 128;
MUTABLE results: SearchResult[]@list = [];
MUTABLE batch_start = 0;

batch_start = batch_end;
WHILE batch_start < n DO
MUTABLE batch_end = batch_start + batch_size;
IF batch_end > n THEN
batch_end = n;
END

# Selection sort: find top-10 by swapping into the first 10 positions.
FOR j IN (0 ..< 10) DO
MUTABLE best: Int64 = j;
FOR m IN (j + 1 ..< n) DO
IF results[m].count > results[best].count THEN
best = m;
END
END
IF best != j THEN
tmp = results[j];
results[j] = results[best];
results[best] = tmp;
END
counts = search_batch!(files, batch_start, batch_end, data_dir, needle);
FOR k IN (0 ..< counts.length()) DO
results.append(SearchResult{ file_idx: batch_start + k, count: counts[k] });
END

# Print top 10.
print("Top 10 files by '", needle, "' count:");
FOR p IN (0_i64 ..< 10) DO
r = results[p];
print(" ", files[r.file_idx], " ", r.count);
batch_start = batch_end;
END

# Selection sort: find top-10 by swapping into the first 10 positions.
FOR j IN (0 ..< 10) DO
MUTABLE best = j;
FOR m IN (j + 1 ..< n) DO
IF results[m].count > results[best].count THEN
best = m;
END
END
IF best != j THEN
tmp = results[j];
results[j] = results[best];
results[best] = tmp;
END
END

# Print top 10.
print("Top 10 files by '", needle, "' count:");
FOR p IN (0 ..< 10) DO
r = results[p];
print(" ", files[r.file_idx], " ", r.count);
END

elapsed = timestampMs() - t0;
print("BENCH_RESULT: ${elapsed.toString()} ms");
print("Time: ${elapsed.toString()} ms");
RETURN;
elapsed = timestampMs() - t0;
print("BENCH_RESULT: ${elapsed.toString()} ms");
print("Time: ${elapsed.toString()} ms");
RETURN;
END
34 changes: 19 additions & 15 deletions benchmarks/concurrent/03_atomic_contention/bench.cht
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,28 @@
# Compare to Go's atomic.AddInt64 which bounces the cache line across
# cores under M:N scheduling (~15ns/op on 2 cores, worse on more).

STRUCT Counter { value: Int64 }
STRUCT Counter {
value: Int64
}

FN main() RETURNS Void ->
MUTABLE c = Counter{ value: 0 } @local;
MUTABLE c = Counter{ value: 0 } @local;

t0 = timestampMs();
MUTABLE futures: ~Void[]@list = [];
FOR i IN (0_i64 ..< 1024) DO
futures.append(BG {
FOR j IN (0_i64 ..< 10000) -> c.value += 1;
});
END
t0 = timestampMs();
MUTABLE futures: ~Void[]@list = [];
FOR i IN (0 ..< 1024) DO
futures.append(
BG {
FOR j IN (0 ..< 10_000) -> c.value += 1;
}
);
END

FOR k IN (0_i64 ..< 1024) -> NEXT futures[k];
elapsed = timestampMs() - t0;
FOR k IN (0 ..< 1024) -> NEXT futures[k];
elapsed = timestampMs() - t0;

print("Counter: ${c.value.toString()}");
print("BENCH_RESULT: ${elapsed.toString()} ms");
print("Time: ${elapsed.toString()} ms");
RETURN;
print("Counter: ${c.value.toString()}");
print("BENCH_RESULT: ${elapsed.toString()} ms");
print("Time: ${elapsed.toString()} ms");
RETURN;
END
44 changes: 23 additions & 21 deletions benchmarks/concurrent/04_fanout_fanin/bench.cht
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,36 @@
# Compare to Go (10K goroutines + WaitGroup) and Rust (10K Tokio tasks
# + JoinSet).

STRUCT Work { seed: Int64 }
STRUCT Work {
seed: Int64
}

FN doWork(seed: Int64) RETURNS Int64 ->
MUTABLE x: Int64 = seed;
FOR i IN (0_i64 ..< 100000) -> x = x %* 6364136223846793005_i64 %+ 1442695040888963407_i64;
RETURN x;
MUTABLE x = seed;
FOR i IN (0 ..< 100_000) -> x = x %* 6_364_136_223_846_793_005 %+ 1_442_695_040_888_963_407;
RETURN x;
END

FN main() RETURNS Void ->
t0 = timestampMs();
# Build work items
MUTABLE items: Work[] = [];
FOR i IN (0_i64 ..< 10000) -> items.append(Work{ seed: i });
t0 = timestampMs();
# Build work items
MUTABLE items: Work[] = [];
FOR i IN (0 ..< 10_000) -> items.append(Work{ seed: i });

# Fan-out: scale workers with core count, distributed across schedulers
results = items |> CONCURRENT(parallel: TRUE) SELECT doWork(_.seed);
# Fan-out: scale workers with core count, distributed across schedulers
results = items |> CONCURRENT(parallel: TRUE) SELECT doWork(_.seed);

# Fan-in: sum results
MUTABLE total: Int64 = 0;
FOR j IN (0_i64 ..< 10000) -> total = total %+ results[j];
# Fan-in: sum results
MUTABLE total = 0;
FOR j IN (0 ..< 10_000) -> total = total %+ results[j];

checksum = total MOD 1000000000;
print("Checksum:", checksum);
print("Workers: 10000");
print("Iterations: 100000");
checksum = total MOD 1_000_000_000;
print("Checksum:", checksum);
print("Workers: 10000");
print("Iterations: 100000");

elapsed = timestampMs() - t0;
print("BENCH_RESULT: ${elapsed.toString()} ms");
print("Time: ${elapsed.toString()} ms");
RETURN;
elapsed = timestampMs() - t0;
print("BENCH_RESULT: ${elapsed.toString()} ms");
print("Time: ${elapsed.toString()} ms");
RETURN;
END
60 changes: 31 additions & 29 deletions benchmarks/concurrent/05_backpressure/bench.cht
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,44 @@
# @shared:locked struct — no result list is ever materialized.
# Peak memory: O(capacity), not O(N).

STRUCT Acc { value: Int64 }
STRUCT Acc {
value: Int64
}

FN processItem(val: Int64) RETURNS Int64 ->
MUTABLE x: Int64 = val;
FOR i IN (0_i64 ..< 5000) -> x = x %* 6364136223846793005 %+ 1442695040888963407;
RETURN x;
MUTABLE x = val;
FOR i IN (0 ..< 5000) -> x = x %* 6_364_136_223_846_793_005 %+ 1_442_695_040_888_963_407;
RETURN x;
END

FN main() RETURNS Void ->
acc = Acc{ value: 0 } @shared:locked;
t0 = timestampMs();
acc = Acc{ value: 0 } @shared:locked;
t0 = timestampMs();

gen: ~?Int64[] = BG STREAM {
MUTABLE i: Int64 = 0;
WHILE i < 100000 DO
YIELD i;
i = i + 1;
END
};
gen: ~?Int64[] = BG STREAM {
MUTABLE i = 0;
WHILE i < 100_000 DO
YIELD i;
i = i + 1;
END
};

gen |> CONCURRENT(capacity: 64) EACH {
h = processItem(_);
WITH EXCLUSIVE acc AS a {
a.value = a.value %+ h;
}
};

WITH acc AS a {
checksum = a.value MOD 1000000000;
print("Checksum:", checksum);
gen |> CONCURRENT(capacity: 64) EACH {
h = processItem(_);
WITH EXCLUSIVE acc AS a {
a.value = a.value %+ h;
}
};

WITH acc AS a {
checksum = a.value MOD 1_000_000_000;
print("Checksum:", checksum);
}

elapsed = timestampMs() - t0;
print("Items: 100000");
print("Channel capacity: 64");
print("BENCH_RESULT: ${elapsed.toString()} ms");
print("Time: ${elapsed.toString()} ms");
RETURN;
elapsed = timestampMs() - t0;
print("Items: 100000");
print("Channel capacity: 64");
print("BENCH_RESULT: ${elapsed.toString()} ms");
print("Time: ${elapsed.toString()} ms");
RETURN;
END
Loading
Loading