From a6a643b6a2fa286c92c5c3bef470bbd0b2a0020b Mon Sep 17 00:00:00 2001 From: Koko Bhadra Date: Wed, 11 Mar 2026 20:05:16 -0400 Subject: [PATCH 1/3] Add typed subscriptions, retry middleware, Makefile, and fix security issues - subscription.zig: add nextBlock(), nextLog(), nextTxHash() typed helpers that parse eth_subscription notifications directly into BlockHeader/Log/[32]u8, removing boilerplate from bot code (closes #43) - retry_provider.zig: new RetryingProvider wrapper with exponential backoff, configurable max attempts, jitter, and retryable error policy (closes #41) - provider.zig: expose parseBlockHeader and parseSingleLog as pub for reuse by subscription typed helpers - root.zig: export RetryingProvider and RetryOpts at top level - Makefile: add build/test/fmt/lint/ci/integration-test/bench targets so contributors can run the full CI check locally before pushing - multicall.zig: replace plain overflow-prone additions with std.math.add checked arithmetic in decodeAggregate3Results boundary checks - abi_encode.zig: replace std.debug.assert with error.TooManyValues propagation through encodeValuesIntoNoAlloc and encodeDynamicValueInto recursive chain - .gitignore: exclude docs/plans/ (local planning files) --- .gitignore | 1 + Makefile | 43 ++++++ src/abi_encode.zig | 18 +-- src/multicall.zig | 23 +-- src/provider.zig | 4 +- src/retry_provider.zig | 334 +++++++++++++++++++++++++++++++++++++++++ src/root.zig | 4 + src/subscription.zig | 110 ++++++++++++++ 8 files changed, 517 insertions(+), 20 deletions(-) create mode 100644 Makefile create mode 100644 src/retry_provider.zig diff --git a/.gitignore b/.gitignore index 3e5b0e7..0be3926 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +docs/plans/ .zig-cache/ zig-out/ zig-pkg/ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..69228a4 --- /dev/null +++ b/Makefile @@ -0,0 +1,43 @@ +ZIG ?= zig + +.PHONY: build test fmt lint ci integration-test bench bench-u256 bench-keccak clean + +## Build the library (default) +build: + $(ZIG) build + +## Run unit tests (no network required) +test: + $(ZIG) build test + +## Check formatting — mirrors the CI fmt job +fmt: + $(ZIG) fmt --check src/ tests/ + +## Auto-fix formatting +fmt-fix: + $(ZIG) fmt src/ tests/ + +## Run fmt + test — everything CI checks locally (no Anvil required) +lint: fmt test + +## Full CI check: build + fmt + test (matches all CI jobs, still no Anvil) +ci: build fmt test + +## Run integration tests (requires Anvil running on localhost:8545) +integration-test: + $(ZIG) build integration-test + +## Run all benchmarks (ReleaseFast) +bench: + $(ZIG) build bench -Doptimize=ReleaseFast + +bench-u256: + $(ZIG) build bench-u256 -Doptimize=ReleaseFast + +bench-keccak: + $(ZIG) build bench-keccak -Doptimize=ReleaseFast + +## Remove build artifacts +clean: + rm -rf zig-out zig-cache .zig-cache diff --git a/src/abi_encode.zig b/src/abi_encode.zig index 6045506..846689a 100644 --- a/src/abi_encode.zig +++ b/src/abi_encode.zig @@ -136,7 +136,7 @@ fn encodeValuesInto(allocator: std.mem.Allocator, buf: *std.ArrayList(u8), value // First pass: calculate tail offsets for dynamic values // and pre-compute the offset each dynamic value will be at - std.debug.assert(values.len <= max_tuple_values); + if (values.len > max_tuple_values) return error.TooManyValues; var offsets: [max_tuple_values]usize = undefined; for (values, 0..) |val, i| { if (val.isDynamic()) { @@ -157,7 +157,7 @@ fn encodeValuesInto(allocator: std.mem.Allocator, buf: *std.ArrayList(u8), value // Third pass: write tail section directly into buf (no temp allocations) for (values) |val| { if (val.isDynamic()) { - encodeDynamicValueInto(buf, val); + try encodeDynamicValueInto(buf, val); } } } @@ -209,7 +209,7 @@ fn encodeStaticValueNoAlloc(buf: *std.ArrayList(u8), val: AbiValue) void { } /// Encode a dynamic value directly into the output buffer (no temp allocation). -fn encodeDynamicValueInto(buf: *std.ArrayList(u8), val: AbiValue) void { +fn encodeDynamicValueInto(buf: *std.ArrayList(u8), val: AbiValue) EncodeError!void { switch (val) { .bytes => |data| { writeUint256NoAlloc(buf, @intCast(data.len)); @@ -225,20 +225,20 @@ fn encodeDynamicValueInto(buf: *std.ArrayList(u8), val: AbiValue) void { }, .array => |items| { writeUint256NoAlloc(buf, @intCast(items.len)); - encodeValuesIntoNoAlloc(buf, items); + try encodeValuesIntoNoAlloc(buf, items); }, .fixed_array => |items| { - encodeValuesIntoNoAlloc(buf, items); + try encodeValuesIntoNoAlloc(buf, items); }, .tuple => |items| { - encodeValuesIntoNoAlloc(buf, items); + try encodeValuesIntoNoAlloc(buf, items); }, else => unreachable, } } /// Encode values into an ArrayList that already has sufficient capacity. -fn encodeValuesIntoNoAlloc(buf: *std.ArrayList(u8), values: []const AbiValue) void { +fn encodeValuesIntoNoAlloc(buf: *std.ArrayList(u8), values: []const AbiValue) EncodeError!void { const n = values.len; if (n == 0) return; @@ -246,7 +246,7 @@ fn encodeValuesIntoNoAlloc(buf: *std.ArrayList(u8), values: []const AbiValue) vo var tail_offset: usize = head_size; // Calculate offsets for dynamic values - std.debug.assert(values.len <= max_tuple_values); + if (values.len > max_tuple_values) return error.TooManyValues; var offsets: [max_tuple_values]usize = undefined; for (values, 0..) |val, i| { if (val.isDynamic()) { @@ -267,7 +267,7 @@ fn encodeValuesIntoNoAlloc(buf: *std.ArrayList(u8), values: []const AbiValue) vo // Write tails for (values) |val| { if (val.isDynamic()) { - encodeDynamicValueInto(buf, val); + try encodeDynamicValueInto(buf, val); } } } diff --git a/src/multicall.zig b/src/multicall.zig index 658dc2d..6b2a946 100644 --- a/src/multicall.zig +++ b/src/multicall.zig @@ -171,11 +171,12 @@ pub fn decodeAggregate3Results(allocator: std.mem.Allocator, data: []const u8) ! // First word: offset to array data (should be 0x20) const array_offset = readWord(data[0..32]); - if (array_offset + 32 > data.len) return error.InvalidAbiData; + const array_header_end = std.math.add(usize, array_offset, 32) catch return error.InvalidAbiData; + if (array_header_end > data.len) return error.InvalidAbiData; // Array length const array_len = readWord(data[array_offset .. array_offset + 32]); - const array_data_start = array_offset + 32; + const array_data_start = array_offset + 32; // safe: array_header_end <= data.len var results = try allocator.alloc(Result, array_len); errdefer { @@ -188,25 +189,29 @@ pub fn decodeAggregate3Results(allocator: std.mem.Allocator, data: []const u8) ! // Read offsets for each result tuple for (0..array_len) |i| { const offset_pos = array_data_start + i * 32; - if (offset_pos + 32 > data.len) return error.InvalidAbiData; + const offset_end = std.math.add(usize, offset_pos, 32) catch return error.InvalidAbiData; + if (offset_end > data.len) return error.InvalidAbiData; const tuple_offset = readWord(data[offset_pos .. offset_pos + 32]); - const tuple_start = array_data_start + tuple_offset; + const tuple_start = std.math.add(usize, array_data_start, tuple_offset) catch return error.InvalidAbiData; // Each tuple: (bool success, bytes returnData) // word 0: success (bool) // word 1: offset to returnData within the tuple // At that offset: length word + data - if (tuple_start + 64 > data.len) return error.InvalidAbiData; + const tuple_end = std.math.add(usize, tuple_start, 64) catch return error.InvalidAbiData; + if (tuple_end > data.len) return error.InvalidAbiData; const success_word = readWord(data[tuple_start .. tuple_start + 32]); const return_data_offset = readWord(data[tuple_start + 32 .. tuple_start + 64]); - const return_data_abs = tuple_start + return_data_offset; + const return_data_abs = std.math.add(usize, tuple_start, return_data_offset) catch return error.InvalidAbiData; - if (return_data_abs + 32 > data.len) return error.InvalidAbiData; + const return_data_header_end = std.math.add(usize, return_data_abs, 32) catch return error.InvalidAbiData; + if (return_data_header_end > data.len) return error.InvalidAbiData; const return_data_len = readWord(data[return_data_abs .. return_data_abs + 32]); - const return_data_start = return_data_abs + 32; + const return_data_start = return_data_abs + 32; // safe: return_data_header_end <= data.len - if (return_data_start + return_data_len > data.len) return error.InvalidAbiData; + const return_data_end = std.math.add(usize, return_data_start, return_data_len) catch return error.InvalidAbiData; + if (return_data_end > data.len) return error.InvalidAbiData; var return_data: []const u8 = &.{}; if (return_data_len > 0) { diff --git a/src/provider.zig b/src/provider.zig index 0f96947..db23afb 100644 --- a/src/provider.zig +++ b/src/provider.zig @@ -716,7 +716,7 @@ fn parseLogsArray(allocator: std.mem.Allocator, obj: std.json.ObjectMap) ![]cons } /// Parse a single Log from a JSON object. -fn parseSingleLog(allocator: std.mem.Allocator, obj: std.json.ObjectMap) !receipt_mod.Log { +pub fn parseSingleLog(allocator: std.mem.Allocator, obj: std.json.ObjectMap) !receipt_mod.Log { const address = (try parseOptionalAddress(jsonGetString(obj, "address"))) orelse return error.InvalidResponse; const data_str = jsonGetString(obj, "data") orelse "0x"; const data = try parseHexBytes(allocator, data_str); @@ -816,7 +816,7 @@ fn parseLogsResponse(allocator: std.mem.Allocator, raw: []const u8) ![]receipt_m } /// Parse a block header from a raw JSON-RPC response. -fn parseBlockHeader(allocator: std.mem.Allocator, raw: []const u8) !?block_mod.BlockHeader { +pub fn parseBlockHeader(allocator: std.mem.Allocator, raw: []const u8) !?block_mod.BlockHeader { const parsed = std.json.parseFromSlice(std.json.Value, allocator, raw, .{}) catch { return error.InvalidResponse; }; diff --git a/src/retry_provider.zig b/src/retry_provider.zig new file mode 100644 index 0000000..8035575 --- /dev/null +++ b/src/retry_provider.zig @@ -0,0 +1,334 @@ +const std = @import("std"); +const provider_mod = @import("provider.zig"); +const block_mod = @import("block.zig"); +const receipt_mod = @import("receipt.zig"); +const json_rpc = @import("json_rpc.zig"); + +const Provider = provider_mod.Provider; + +/// Controls which errors trigger a retry attempt. +pub const RetryableErrors = enum { + /// Only retry on network/transport errors (connection refused, timed out, reset). + connection_errors, + /// Also retry on RPC-level errors such as rate limits and server errors. + all_rpc_errors, +}; + +/// Configuration for RetryingProvider. +pub const RetryOpts = struct { + /// Maximum number of attempts (1 = no retry). Default: 3. + max_attempts: u32 = 3, + /// Initial backoff delay in milliseconds. Default: 100. + initial_backoff_ms: u64 = 100, + /// Backoff multiplier applied after each failed attempt. Default: 2.0 (exponential). + backoff_multiplier: f64 = 2.0, + /// Maximum backoff delay in milliseconds. Default: 5_000. + max_backoff_ms: u64 = 5_000, + /// Jitter factor 0.0–1.0 added to each backoff to prevent thundering herd. Default: 0.1. + jitter: f64 = 0.1, + /// Which errors trigger a retry. Default: connection errors only. + retryable: RetryableErrors = .connection_errors, +}; + +/// Wraps a Provider and retries failed calls with exponential backoff. +/// +/// Example: +/// var provider = Provider.init(allocator, &transport); +/// var retrying = RetryingProvider.init(&provider, .{}); +/// const block_num = try retrying.getBlockNumber(); +pub const RetryingProvider = struct { + inner: *Provider, + opts: RetryOpts, + prng: std.rand.DefaultPrng, + + /// Initialise a RetryingProvider wrapping the given Provider. + /// Seeds the PRNG from a cryptographically random value. + pub fn init(inner: *Provider, opts: RetryOpts) RetryingProvider { + const seed = std.crypto.random.int(u64); + return .{ + .inner = inner, + .opts = opts, + .prng = std.rand.DefaultPrng.init(seed), + }; + } + + // ======================================================================== + // Chain state + // ======================================================================== + + pub fn getChainId(self: *RetryingProvider) !u64 { + var state = RetryState.init(self); + while (true) { + if (self.inner.getChainId()) |r| return r else |err| if (!state.next(err)) return err; + } + } + + pub fn getBlockNumber(self: *RetryingProvider) !u64 { + var state = RetryState.init(self); + while (true) { + if (self.inner.getBlockNumber()) |r| return r else |err| if (!state.next(err)) return err; + } + } + + // ======================================================================== + // Account state + // ======================================================================== + + pub fn getBalance(self: *RetryingProvider, address: [20]u8) !u256 { + var state = RetryState.init(self); + while (true) { + if (self.inner.getBalance(address)) |r| return r else |err| if (!state.next(err)) return err; + } + } + + pub fn getTransactionCount(self: *RetryingProvider, address: [20]u8) !u64 { + var state = RetryState.init(self); + while (true) { + if (self.inner.getTransactionCount(address)) |r| return r else |err| if (!state.next(err)) return err; + } + } + + pub fn getCode(self: *RetryingProvider, address: [20]u8) ![]u8 { + var state = RetryState.init(self); + while (true) { + if (self.inner.getCode(address)) |r| return r else |err| if (!state.next(err)) return err; + } + } + + pub fn getStorageAt(self: *RetryingProvider, address: [20]u8, slot: [32]u8) ![32]u8 { + var state = RetryState.init(self); + while (true) { + if (self.inner.getStorageAt(address, slot)) |r| return r else |err| if (!state.next(err)) return err; + } + } + + // ======================================================================== + // Gas + // ======================================================================== + + pub fn getGasPrice(self: *RetryingProvider) !u256 { + var state = RetryState.init(self); + while (true) { + if (self.inner.getGasPrice()) |r| return r else |err| if (!state.next(err)) return err; + } + } + + pub fn getMaxPriorityFee(self: *RetryingProvider) !u256 { + var state = RetryState.init(self); + while (true) { + if (self.inner.getMaxPriorityFee()) |r| return r else |err| if (!state.next(err)) return err; + } + } + + // ======================================================================== + // Transaction execution + // ======================================================================== + + pub fn call(self: *RetryingProvider, to: [20]u8, data: []const u8) ![]u8 { + var state = RetryState.init(self); + while (true) { + if (self.inner.call(to, data)) |r| return r else |err| if (!state.next(err)) return err; + } + } + + pub fn estimateGas(self: *RetryingProvider, to: [20]u8, data: []const u8, from: ?[20]u8) !u64 { + var state = RetryState.init(self); + while (true) { + if (self.inner.estimateGas(to, data, from)) |r| return r else |err| if (!state.next(err)) return err; + } + } + + pub fn sendRawTransaction(self: *RetryingProvider, signed_tx: []const u8) ![32]u8 { + var state = RetryState.init(self); + while (true) { + if (self.inner.sendRawTransaction(signed_tx)) |r| return r else |err| if (!state.next(err)) return err; + } + } + + // ======================================================================== + // Receipts + // ======================================================================== + + pub fn getTransactionReceipt(self: *RetryingProvider, tx_hash: [32]u8) !?receipt_mod.TransactionReceipt { + var state = RetryState.init(self); + while (true) { + if (self.inner.getTransactionReceipt(tx_hash)) |r| return r else |err| if (!state.next(err)) return err; + } + } + + // ======================================================================== + // Blocks + // ======================================================================== + + pub fn getBlock(self: *RetryingProvider, block_number: u64) !?block_mod.BlockHeader { + var state = RetryState.init(self); + while (true) { + if (self.inner.getBlock(block_number)) |r| return r else |err| if (!state.next(err)) return err; + } + } + + // ======================================================================== + // Logs + // ======================================================================== + + pub fn getLogs(self: *RetryingProvider, filter: json_rpc.LogFilter) ![]receipt_mod.Log { + var state = RetryState.init(self); + while (true) { + if (self.inner.getLogs(filter)) |r| return r else |err| if (!state.next(err)) return err; + } + } +}; + +// --------------------------------------------------------------------------- +// Internal retry state machine +// --------------------------------------------------------------------------- + +/// Per-call retry state. Tracks attempt count and current backoff. +const RetryState = struct { + provider: *RetryingProvider, + attempt: u32, + backoff_ms: u64, + + fn init(provider: *RetryingProvider) RetryState { + return .{ + .provider = provider, + .attempt = 0, + .backoff_ms = provider.opts.initial_backoff_ms, + }; + } + + /// Called on each error. Returns true if the caller should retry, false if it should propagate. + /// When returning true, sleeps for the appropriate backoff duration. + fn next(self: *RetryState, err: anyerror) bool { + self.attempt += 1; + if (self.attempt >= self.provider.opts.max_attempts) return false; + if (!isRetryable(err, self.provider.opts)) return false; + + const jitter_ms: u64 = @intFromFloat(@as(f64, @floatFromInt(self.backoff_ms)) * self.provider.opts.jitter); + const extra = if (jitter_ms > 0) self.provider.prng.random().int(u64) % jitter_ms else 0; + std.time.sleep((self.backoff_ms + extra) * std.time.ns_per_ms); + + const next_backoff: u64 = @intFromFloat( + @as(f64, @floatFromInt(self.backoff_ms)) * self.provider.opts.backoff_multiplier, + ); + self.backoff_ms = @min(next_backoff, self.provider.opts.max_backoff_ms); + return true; + } +}; + +/// Returns true if the given error should trigger a retry under the given options. +fn isRetryable(err: anyerror, opts: RetryOpts) bool { + return switch (err) { + // Network / transport errors: always retryable. + error.ConnectionRefused, + error.ConnectionTimedOut, + error.ConnectionResetByPeer, + error.BrokenPipe, + error.NetworkUnreachable, + error.WouldBlock, + error.UnexpectedEof, + => true, + // RPC-level errors: only retryable in .all_rpc_errors mode. + error.RpcError => opts.retryable == .all_rpc_errors, + // Parsing, invalid input, etc.: never retryable. + else => false, + }; +} + +// ============================================================================ +// Tests +// ============================================================================ + +test "RetryOpts - defaults" { + const opts = RetryOpts{}; + try std.testing.expectEqual(@as(u32, 3), opts.max_attempts); + try std.testing.expectEqual(@as(u64, 100), opts.initial_backoff_ms); + try std.testing.expectEqual(@as(u64, 5_000), opts.max_backoff_ms); + try std.testing.expectEqual(RetryableErrors.connection_errors, opts.retryable); +} + +test "isRetryable - connection errors" { + const opts = RetryOpts{ .retryable = .connection_errors }; + try std.testing.expect(isRetryable(error.ConnectionRefused, opts)); + try std.testing.expect(isRetryable(error.ConnectionTimedOut, opts)); + try std.testing.expect(isRetryable(error.ConnectionResetByPeer, opts)); + try std.testing.expect(isRetryable(error.BrokenPipe, opts)); + try std.testing.expect(isRetryable(error.UnexpectedEof, opts)); + try std.testing.expect(!isRetryable(error.RpcError, opts)); + try std.testing.expect(!isRetryable(error.InvalidResponse, opts)); + try std.testing.expect(!isRetryable(error.OutOfMemory, opts)); +} + +test "isRetryable - all_rpc_errors" { + const opts = RetryOpts{ .retryable = .all_rpc_errors }; + try std.testing.expect(isRetryable(error.ConnectionRefused, opts)); + try std.testing.expect(isRetryable(error.RpcError, opts)); + try std.testing.expect(!isRetryable(error.InvalidResponse, opts)); +} + +test "RetryState - exhausts attempts then stops" { + // Build a RetryingProvider with max_attempts=2 and no sleep (initial_backoff_ms=0). + // We can't call into a real Provider here; we test RetryState directly. + const opts = RetryOpts{ .max_attempts = 2, .initial_backoff_ms = 0, .jitter = 0 }; + const seed: u64 = 0; + var fake_inner: Provider = undefined; + var rp = RetryingProvider{ + .inner = &fake_inner, + .opts = opts, + .prng = std.rand.DefaultPrng.init(seed), + }; + + var state = RetryState.init(&rp); + + // First error: attempt becomes 1, still < max_attempts=2, retryable error → should retry. + try std.testing.expect(state.next(error.ConnectionRefused)); + // Second error: attempt becomes 2, 2 >= max_attempts=2 → should not retry. + try std.testing.expect(!state.next(error.ConnectionRefused)); +} + +test "RetryState - non-retryable error stops immediately" { + const opts = RetryOpts{ .max_attempts = 5, .initial_backoff_ms = 0, .jitter = 0 }; + var fake_inner: Provider = undefined; + var rp = RetryingProvider{ + .inner = &fake_inner, + .opts = opts, + .prng = std.rand.DefaultPrng.init(0), + }; + + var state = RetryState.init(&rp); + // InvalidResponse is not retryable: should stop immediately. + try std.testing.expect(!state.next(error.InvalidResponse)); +} + +test "RetryState - backoff increases exponentially" { + const opts = RetryOpts{ + .max_attempts = 10, + .initial_backoff_ms = 100, + .backoff_multiplier = 2.0, + .max_backoff_ms = 1_000, + .jitter = 0, // no sleep + .retryable = .connection_errors, + }; + var fake_inner: Provider = undefined; + var rp = RetryingProvider{ + .inner = &fake_inner, + .opts = opts, + .prng = std.rand.DefaultPrng.init(0), + }; + + var state = RetryState.init(&rp); + try std.testing.expectEqual(@as(u64, 100), state.backoff_ms); + + _ = state.next(error.ConnectionRefused); + try std.testing.expectEqual(@as(u64, 200), state.backoff_ms); + + _ = state.next(error.ConnectionRefused); + try std.testing.expectEqual(@as(u64, 400), state.backoff_ms); + + _ = state.next(error.ConnectionRefused); + try std.testing.expectEqual(@as(u64, 800), state.backoff_ms); + + // Capped at max_backoff_ms=1000 + _ = state.next(error.ConnectionRefused); + try std.testing.expectEqual(@as(u64, 1_000), state.backoff_ms); +} diff --git a/src/root.zig b/src/root.zig index b38dce7..7c33bea 100644 --- a/src/root.zig +++ b/src/root.zig @@ -36,6 +36,9 @@ pub const http_transport = @import("http_transport.zig"); pub const ws_transport = @import("ws_transport.zig"); pub const subscription = @import("subscription.zig"); pub const provider = @import("provider.zig"); +pub const retry_provider = @import("retry_provider.zig"); +pub const RetryingProvider = retry_provider.RetryingProvider; +pub const RetryOpts = retry_provider.RetryOpts; // -- Layer 7: ENS -- pub const ens_namehash = @import("ens/namehash.zig"); @@ -105,6 +108,7 @@ test { _ = @import("ws_transport.zig"); _ = @import("subscription.zig"); _ = @import("provider.zig"); + _ = @import("retry_provider.zig"); // Layer 7: Client _ = @import("wallet.zig"); _ = @import("flashbots.zig"); diff --git a/src/subscription.zig b/src/subscription.zig index c71a1f5..33ee88d 100644 --- a/src/subscription.zig +++ b/src/subscription.zig @@ -1,6 +1,10 @@ const std = @import("std"); const WsTransport = @import("ws_transport.zig").WsTransport; const json_rpc = @import("json_rpc.zig"); +const block_mod = @import("block.zig"); +const receipt_mod = @import("receipt.zig"); +const primitives = @import("primitives.zig"); +const provider_mod = @import("provider.zig"); /// Types of Ethereum subscriptions available via eth_subscribe. pub const SubscriptionType = enum { @@ -56,6 +60,8 @@ pub const Subscription = struct { InvalidResponse, ConnectionClosed, OutOfMemory, + InvalidNotification, + NullResult, }; /// Subscribe to events via eth_subscribe. @@ -139,8 +145,76 @@ pub const Subscription = struct { self.id = ""; } } + + /// For new_heads subscriptions: parse and return the next block header. + /// Caller owns the returned BlockHeader's allocated fields (extra_data). + pub fn nextBlock(self: *Subscription, allocator: std.mem.Allocator) !block_mod.BlockHeader { + const raw = try self.next(); + defer self.allocator.free(raw); + + const parsed = std.json.parseFromSlice(std.json.Value, allocator, raw, .{}) catch + return error.InvalidNotification; + defer parsed.deinit(); + + const result_val = getNotificationResult(parsed.value) orelse return error.InvalidNotification; + + // Serialize result back and wrap as {"result":...} for reuse with parseBlockHeader. + const result_json = try std.json.stringifyAlloc(allocator, result_val, .{}); + defer allocator.free(result_json); + + const wrapped = try std.fmt.allocPrint(allocator, "{{\"result\":{s}}}", .{result_json}); + defer allocator.free(wrapped); + + return (try provider_mod.parseBlockHeader(allocator, wrapped)) orelse error.NullResult; + } + + /// For logs subscriptions: parse and return the next log. + /// Caller owns the returned Log's allocated fields (topics, data). + pub fn nextLog(self: *Subscription, allocator: std.mem.Allocator) !receipt_mod.Log { + const raw = try self.next(); + defer self.allocator.free(raw); + + const parsed = std.json.parseFromSlice(std.json.Value, allocator, raw, .{}) catch + return error.InvalidNotification; + defer parsed.deinit(); + + const result_val = getNotificationResult(parsed.value) orelse return error.InvalidNotification; + if (result_val != .object) return error.InvalidNotification; + + return try provider_mod.parseSingleLog(allocator, result_val.object); + } + + /// For new_pending_transactions subscriptions: return the next transaction hash. + pub fn nextTxHash(self: *Subscription) ![32]u8 { + const raw = try self.next(); + defer self.allocator.free(raw); + + const parsed = std.json.parseFromSlice(std.json.Value, self.allocator, raw, .{}) catch + return error.InvalidNotification; + defer parsed.deinit(); + + const result_val = getNotificationResult(parsed.value) orelse return error.InvalidNotification; + if (result_val != .string) return error.InvalidNotification; + + return primitives.hashFromHex(result_val.string) catch error.InvalidNotification; + } }; +// --------------------------------------------------------------------------- +// Notification parsing helpers +// --------------------------------------------------------------------------- + +/// Extract the `params.result` value from an eth_subscription notification. +/// Notification format: {"jsonrpc":"2.0","method":"eth_subscription", +/// "params":{"subscription":"0x...","result":{...}}} +/// Returns null if the JSON does not match the expected notification structure. +fn getNotificationResult(root: std.json.Value) ?std.json.Value { + if (root != .object) return null; + const params_val = root.object.get("params") orelse return null; + if (params_val != .object) return null; + return params_val.object.get("result"); +} + // --------------------------------------------------------------------------- // JSON building helpers // --------------------------------------------------------------------------- @@ -459,6 +533,42 @@ test "formatHash" { try std.testing.expectEqualStrings("0xabababababababababababababababababababababababababababababababababab", &result); } +test "getNotificationResult - new_heads notification" { + const json = + \\{"jsonrpc":"2.0","method":"eth_subscription", + \\ "params":{"subscription":"0xabc","result":{"number":"0x5","hash":"0xdeadbeef"}}} + ; + const allocator = std.testing.allocator; + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, json, .{}); + defer parsed.deinit(); + const result = getNotificationResult(parsed.value); + try std.testing.expect(result != null); + try std.testing.expect(result.? == .object); + try std.testing.expect(result.?.object.get("number") != null); +} + +test "getNotificationResult - not a notification" { + const json = "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0xabc\"}"; + const allocator = std.testing.allocator; + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, json, .{}); + defer parsed.deinit(); + const result = getNotificationResult(parsed.value); + try std.testing.expect(result == null); +} + +test "getNotificationResult - pending tx notification (string result)" { + const json = + \\{"jsonrpc":"2.0","method":"eth_subscription", + \\ "params":{"subscription":"0xabc","result":"0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"}} + ; + const allocator = std.testing.allocator; + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, json, .{}); + defer parsed.deinit(); + const result = getNotificationResult(parsed.value); + try std.testing.expect(result != null); + try std.testing.expect(result.? == .string); +} + test "Subscription struct layout" { // Verify the struct can be created with expected fields var sub = Subscription{ From 5ec414114ba42f39031622b62aa4b807275e7ea8 Mon Sep 17 00:00:00 2001 From: Koko Bhadra Date: Wed, 11 Mar 2026 22:30:36 -0400 Subject: [PATCH 2/3] Address nitpick comments from code review - abi_encode.zig: add pre-condition doc to writeValuesDirect clarifying that all callers (including recursive paths through writeDynamicValueDirect) must satisfy values.len <= max_tuple_values; the assert is an internal invariant, not a validation gap - retry_provider.zig: document sendRawTransaction retry semantics explaining nonce-protection safety, the lost-response edge case, and the recommended fallback of checking receipt status on nonce errors --- src/abi_encode.zig | 7 +++++++ src/retry_provider.zig | 12 ++++++++++++ 2 files changed, 19 insertions(+) diff --git a/src/abi_encode.zig b/src/abi_encode.zig index 846689a..2d0855a 100644 --- a/src/abi_encode.zig +++ b/src/abi_encode.zig @@ -273,6 +273,13 @@ fn encodeValuesIntoNoAlloc(buf: *std.ArrayList(u8), values: []const AbiValue) En } /// Write values directly into a raw buffer (zero ArrayList overhead). +/// +/// Pre-condition: every slice passed to this function — including nested +/// array/fixed_array/tuple items reached through recursive calls — must +/// satisfy `values.len <= max_tuple_values`. Public callers (`encodeValues`, +/// `encodeFunctionCall`) enforce this for the top-level slice. The assert +/// at the stack-frame boundary is the internal invariant check for all +/// call sites, including the recursive paths in `writeDynamicValueDirect`. fn writeValuesDirect(buf: []u8, values: []const AbiValue) void { const n = values.len; if (n == 0) return; diff --git a/src/retry_provider.zig b/src/retry_provider.zig index 8035575..a816942 100644 --- a/src/retry_provider.zig +++ b/src/retry_provider.zig @@ -138,6 +138,18 @@ pub const RetryingProvider = struct { } } + /// Send a signed transaction and return the transaction hash. + /// + /// Retry semantics: retrying `sendRawTransaction` is generally safe because + /// signed Ethereum transactions are nonce-protected — a duplicate submission + /// of the same signed bytes is a no-op on the node once the first is mined. + /// However, if the first attempt succeeds but the response is lost (network + /// error after the node accepted the tx), a subsequent retry will fail with + /// a non-retryable "nonce already used" error. Callers should treat that + /// error as a signal to check transaction status via `getTransactionReceipt` + /// rather than as a confirmation of failure. + /// + /// See also: `RetryingProvider`, `RetryState`, `RetryOpts`. pub fn sendRawTransaction(self: *RetryingProvider, signed_tx: []const u8) ![32]u8 { var state = RetryState.init(self); while (true) { From f619ae7d7bd98f81226ef5c67db43bfb90338a71 Mon Sep 17 00:00:00 2001 From: Koko Bhadra Date: Thu, 12 Mar 2026 08:25:55 -0400 Subject: [PATCH 3/3] Fix std.rand.DefaultPrng -> std.Random.DefaultPrng for Zig 0.15.2 std.rand doesn't exist in Zig 0.15.2; the correct path is std.Random.DefaultPrng. This was a latent compile error because retry_provider.zig wasn't included in unit_tests.zig. Also adds thread-safety doc comment on the prng field and includes retry_provider in the unit test suite. --- src/retry_provider.zig | 11 ++++++----- tests/unit_tests.zig | 2 ++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/retry_provider.zig b/src/retry_provider.zig index a816942..aa006b9 100644 --- a/src/retry_provider.zig +++ b/src/retry_provider.zig @@ -39,7 +39,8 @@ pub const RetryOpts = struct { pub const RetryingProvider = struct { inner: *Provider, opts: RetryOpts, - prng: std.rand.DefaultPrng, + /// PRNG for jitter; mutated on each retry. Not thread-safe — do not share instances across threads. + prng: std.Random.DefaultPrng, /// Initialise a RetryingProvider wrapping the given Provider. /// Seeds the PRNG from a cryptographically random value. @@ -48,7 +49,7 @@ pub const RetryingProvider = struct { return .{ .inner = inner, .opts = opts, - .prng = std.rand.DefaultPrng.init(seed), + .prng = std.Random.DefaultPrng.init(seed), }; } @@ -287,7 +288,7 @@ test "RetryState - exhausts attempts then stops" { var rp = RetryingProvider{ .inner = &fake_inner, .opts = opts, - .prng = std.rand.DefaultPrng.init(seed), + .prng = std.Random.DefaultPrng.init(seed), }; var state = RetryState.init(&rp); @@ -304,7 +305,7 @@ test "RetryState - non-retryable error stops immediately" { var rp = RetryingProvider{ .inner = &fake_inner, .opts = opts, - .prng = std.rand.DefaultPrng.init(0), + .prng = std.Random.DefaultPrng.init(0), }; var state = RetryState.init(&rp); @@ -325,7 +326,7 @@ test "RetryState - backoff increases exponentially" { var rp = RetryingProvider{ .inner = &fake_inner, .opts = opts, - .prng = std.rand.DefaultPrng.init(0), + .prng = std.Random.DefaultPrng.init(0), }; var state = RetryState.init(&rp); diff --git a/tests/unit_tests.zig b/tests/unit_tests.zig index ad7ab64..0cbae5e 100644 --- a/tests/unit_tests.zig +++ b/tests/unit_tests.zig @@ -56,4 +56,6 @@ test { // Utils _ = eth.units; _ = eth.constants; + // Middleware + _ = eth.retry_provider; }