Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
41c4f02
roadrunner: bump dep to 0.8.0 for the response body builder
williamthome Jun 28, 2026
d9f8071
roadrunner: lift listener connection limits for high concurrency
williamthome Jun 28, 2026
4a7bef0
roadrunner: streamline the CRUD handler
williamthome Jun 28, 2026
f17862a
roadrunner: count list responses without a length/1 pass
williamthome Jun 28, 2026
5997d2d
roadrunner: run DB queries as prepared statements
williamthome Jun 28, 2026
b103824
roadrunner: scope compression to compressible routes
williamthome Jun 28, 2026
39ca39e
roadrunner: parse the baseline query string once
williamthome Jun 28, 2026
bbbd8c8
roadrunner: key the CRUD cache by the binary id
williamthome Jun 28, 2026
1032a29
roadrunner: pattern-match the item map in add_total
williamthome Jun 28, 2026
c22a87b
roadrunner: queue on DB checkout instead of failing on a full pool
williamthome Jun 28, 2026
a385f35
roadrunner: return 500 not 404 when a CRUD read errors
williamthome Jun 28, 2026
e56feb1
roadrunner: cap h2/h2c concurrent streams to bound memory
williamthome Jun 28, 2026
20326fa
roadrunner: cap h2/h2c in-flight requests to bound gzip memory
williamthome Jun 28, 2026
85f16af
roadrunner: keep default connection caps on h2/h2c listeners
williamthome Jun 28, 2026
79ae160
roadrunner: route DB queries to a per-connection pool without a broker
williamthome Jun 28, 2026
753b760
roadrunner: replace the hand-rolled DB pool with ecpool
williamthome Jun 29, 2026
c9e788a
Benchmark results: roadrunner
github-actions[bot] Jun 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions frameworks/roadrunner/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
{minimum_otp_vsn, "29"}.

{deps, [
{roadrunner, "0.6.0"},
{roadrunner, "0.8.0"},
{epgsql, "4.8.0"},
{pooler, "1.6.0"}
{ecpool, "0.4.2"}
]}.

{relx, [
{release, {roadrunner_httparena, "0.1.0"},
[roadrunner_httparena, roadrunner]},
[roadrunner_httparena, ecpool, roadrunner]},
{sys_config, "./config/sys.config"},
{dev_mode, false},
{include_erts, true},
Expand Down
17 changes: 10 additions & 7 deletions frameworks/roadrunner/rebar.lock
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
{"1.2.0",
[{<<"epgsql">>,{pkg,<<"epgsql">>,<<"4.8.0">>},0},
{<<"pooler">>,{pkg,<<"pooler">>,<<"1.6.0">>},0},
{<<"roadrunner">>,{pkg,<<"roadrunner">>,<<"0.6.0">>},0},
[{<<"ecpool">>,{pkg,<<"ecpool">>,<<"0.4.2">>},0},
{<<"epgsql">>,{pkg,<<"epgsql">>,<<"4.8.0">>},0},
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},1},
{<<"roadrunner">>,{pkg,<<"roadrunner">>,<<"0.8.0">>},0},
{<<"telemetry">>,{pkg,<<"telemetry">>,<<"1.4.2">>},1}]}.
[
{pkg_hash,[
{<<"ecpool">>, <<"2F466096B81B9C945494CDF6A554ACAD4C31B0AC5A3D2B1929B822BA7C9B9F6E">>},
{<<"epgsql">>, <<"C491B141B8C37BCE7B67F2079F168F339BB374A7CF9A286CB3B40AD1CD3FABE5">>},
{<<"pooler">>, <<"F4F33C94AB3AB82565A2E31CEA9EFE4149A160651F3707A0A2669BC54AAF81C8">>},
{<<"roadrunner">>, <<"8C177B946F19325EF1DF0C28B3963A272CA9FD96291B5E8A8EA20D7C82DED232">>},
{<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>},
{<<"roadrunner">>, <<"7BBC9DA8E2085EA94FE0A06DB51DBAD762AA016E70972F2A533C3914438C96F6">>},
{<<"telemetry">>, <<"A0CB522801DFFB1C49FE6E30561BADFFC7B6D0E180DB1300DF759FAA22062855">>}]},
{pkg_hash_ext,[
{<<"ecpool">>, <<"4E834C138ABF742F281ECEBEF69D37C6D7BF0A4E8E23965CF3826E027B723ED2">>},
{<<"epgsql">>, <<"00E550006A62FB439FC7E879419443C889C34605E9B9DC406029D8BAA1AD79D8">>},
{<<"pooler">>, <<"748C988FD2928DE9577C882A49621863CAB57809E3E1A88A14C9D3B55C6AB877">>},
{<<"roadrunner">>, <<"FE988A6862CA88EFDD793F7C9FC50FA36C60021DAED03509FCF72274EE67E33B">>},
{<<"gproc">>, <<"580ADAFA56463B75263EF5A5DF4C86AF321F68694E7786CB057FD805D1E2A7DE">>},
{<<"roadrunner">>, <<"76F3389A27CE084FF380F80C6DE73C2448A6702017A2C22ACF34DE606C8239D1">>},
{<<"telemetry">>, <<"928F6495066506077862C0D1646609EED891A4326BEE3126BA54B60AF61FEBB1">>}]}
].
2 changes: 1 addition & 1 deletion frameworks/roadrunner/src/roadrunner_httparena.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
sasl,
ssl,
epgsql,
pooler,
ecpool,
roadrunner
]},
{env, []},
Expand Down
19 changes: 15 additions & 4 deletions frameworks/roadrunner/src/roadrunner_httparena_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ start(_StartType, _StartArgs) ->
{ok, _} = roadrunner:start_listener(httparena_http, #{
port => HttpPort,
routes => Routes,
middlewares => [roadrunner_compress],
%% High-concurrency profiles offer thousands of connections; lift the
%% conservative core defaults (max_clients 150, num_acceptors 10) so the
%% listener keeps every offered connection live instead of capping and
%% forcing the load generator into a reconnect storm.
max_clients => 32768,
num_acceptors => 100,
%% 25 MB headroom for the upload profile (validator goes up to 20 MB).
max_content_length => 26214400,
%% Manual body buffering: handlers read the body themselves via
Expand All @@ -27,8 +32,12 @@ start(_StartType, _StartArgs) ->
{ok, _} = roadrunner:start_listener(httparena_h2c, #{
port => H2cPort,
routes => Routes,
middlewares => [roadrunner_compress],
max_content_length => 26214400,
%% The h2 listeners keep the default connection cap on purpose: h2
%% multiplexes many streams per connection, so a few connections
%% already saturate. Lifting max_clients here (as the h1 listeners
%% do) only multiplies per-stream memory (the `/json` gzip state)
%% into a blow-up with no throughput gain.
%% h2c prior-knowledge: `[http2]` on a plain-TCP listener
%% serves h2 directly (client sends the h2 preface, no
%% `Upgrade: h2c` negotiation).
Expand All @@ -41,7 +50,8 @@ start(_StartType, _StartArgs) ->
{ok, _} = roadrunner:start_listener(httparena_tls, #{
port => TlsPort,
routes => Routes,
middlewares => [roadrunner_compress],
max_clients => 32768,
num_acceptors => 100,
max_content_length => 26214400,
tls => TlsOpts,
body_buffering => manual
Expand All @@ -50,9 +60,10 @@ start(_StartType, _StartArgs) ->
{ok, _} = roadrunner:start_listener(httparena_h2, #{
port => H2Port,
routes => Routes,
middlewares => [roadrunner_compress],
max_content_length => 26214400,
tls => TlsOpts,
%% Default connection cap (see the h2c listener above) — h2/h3
%% multiplex, so raising it only bloats per-stream memory.
%% Listener derives `alpn_preferred_protocols` from
%% this list — `h2` preferred, fall back to `http/1.1`.
%% `http3` co-serves over QUIC on UDP 8443 (same port
Expand Down
93 changes: 36 additions & 57 deletions frameworks/roadrunner/src/roadrunner_httparena_crud.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,13 @@ list(Req) ->
Page = max(1, qs_int(~"page", Qs, 1)),
Limit = clamp(qs_int(~"limit", Qs, 10), 1, 50),
Offset = (Page - 1) * Limit,
ListSql = ~"""
SELECT id, name, category, price, quantity, active, tags,
rating_score, rating_count
FROM items
WHERE category = $1
ORDER BY id
LIMIT $2 OFFSET $3
""",
CountSql = ~"SELECT COUNT(*) FROM items WHERE category = $1",
Items =
case roadrunner_httparena_db:query(ListSql, [Cat, Limit, Offset]) of
case roadrunner_httparena_db:query(list, [Cat, Limit, Offset]) of
{ok, _, Rows} -> [roadrunner_httparena_items:row_to_json(R) || R <- Rows];
_ -> []
end,
Total =
case roadrunner_httparena_db:query(CountSql, [Cat]) of
case roadrunner_httparena_db:query(count, [Cat]) of
{ok, _, [{N}]} -> N;
_ -> 0
end,
Expand All @@ -51,39 +42,40 @@ list(Req) ->
get(Req) ->
Id = id_from_path(Req),
case ets:lookup(?CACHE, Id) of
[{Id, Item}] ->
Resp = roadrunner_resp:json(200, Item),
{roadrunner_resp:add_header(Resp, ~"x-cache", ~"HIT"), Req};
[{Id, Body}] ->
{cached_json(Body, ~"HIT"), Req};
[] ->
case roadrunner_httparena_db:query(read_sql(), [Id]) of
case roadrunner_httparena_db:query(read, [binary_to_integer(Id)]) of
{ok, _, [Row]} ->
Item = roadrunner_httparena_items:row_to_json(Row),
ets:insert(?CACHE, {Id, Item}),
Resp = roadrunner_resp:json(200, Item),
{roadrunner_resp:add_header(Resp, ~"x-cache", ~"MISS"), Req};
_ ->
{roadrunner_resp:not_found(), Req}
Body = iolist_to_binary(json:encode(Item)),
ets:insert(?CACHE, {Id, Body}),
{cached_json(Body, ~"MISS"), Req};
{ok, _, []} ->
{roadrunner_resp:not_found(), Req};
{error, _} ->
{roadrunner_resp:internal_error(), Req}
end
end.

%% Cache stores the already-encoded JSON body, so a cache hit ships the
%% bytes verbatim instead of re-encoding the item map every request;
%% `roadrunner_resp:body/4` folds in the `x-cache` marker in one call.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is 200ms TTL being evaluated?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. You are right, I missed it. Thanks for checking it out. I'll revisit the spec to ensure code is 100% aligned with it.

cached_json(Body, CacheStatus) ->
roadrunner_resp:body(200, ~"application/json", [{~"x-cache", CacheStatus}], Body).

create(Req) ->
{ok, Body, Req2} = roadrunner_req:read_body(Req),
Input = json:decode(Body),
Id = maps:get(~"id", Input),
Sql = ~"""
INSERT INTO items (id, name, category, price, quantity,
active, tags, rating_score, rating_count)
VALUES ($1, $2, $3, $4, $5, true, '[]'::jsonb, 0, 0)
""",
case roadrunner_httparena_db:query(Sql, [
Id,
maps:get(~"name", Input),
maps:get(~"category", Input),
maps:get(~"price", Input),
maps:get(~"quantity", Input)
]) of
#{
~"id" := Id,
~"name" := Name,
~"category" := Category,
~"price" := Price,
~"quantity" := Quantity
} = json:decode(Body),
case roadrunner_httparena_db:query(create, [Id, Name, Category, Price, Quantity]) of
{ok, 1} ->
ets:delete(?CACHE, Id),
ets:delete(?CACHE, integer_to_binary(Id)),
{roadrunner_resp:status(201), Req2};
_ ->
{roadrunner_resp:internal_error(), Req2}
Expand All @@ -92,19 +84,13 @@ create(Req) ->
update(Req) ->
Id = id_from_path(Req),
{ok, Body, Req2} = roadrunner_req:read_body(Req),
Input = json:decode(Body),
Sql = ~"""
UPDATE items
SET name = $2, category = $3, price = $4, quantity = $5
WHERE id = $1
""",
case roadrunner_httparena_db:query(Sql, [
Id,
maps:get(~"name", Input),
maps:get(~"category", Input),
maps:get(~"price", Input),
maps:get(~"quantity", Input)
]) of
#{
~"name" := Name,
~"category" := Category,
~"price" := Price,
~"quantity" := Quantity
} = json:decode(Body),
case roadrunner_httparena_db:query(update, [binary_to_integer(Id), Name, Category, Price, Quantity]) of
{ok, 1} ->
ets:delete(?CACHE, Id),
{roadrunner_resp:status(200), Req2};
Expand All @@ -114,16 +100,9 @@ update(Req) ->
{roadrunner_resp:internal_error(), Req2}
end.

read_sql() ->
~"""
SELECT id, name, category, price, quantity, active, tags,
rating_score, rating_count
FROM items
WHERE id = $1
""".

id_from_path(Req) ->
binary_to_integer(maps:get(~"id", roadrunner_req:bindings(Req))).
#{~"id" := Id} = roadrunner_req:bindings(Req),
Id.

qs_int(Key, Qs, Default) ->
case lists:keyfind(Key, 1, Qs) of
Expand Down
118 changes: 98 additions & 20 deletions frameworks/roadrunner/src/roadrunner_httparena_db.erl
Original file line number Diff line number Diff line change
@@ -1,41 +1,119 @@
-module(roadrunner_httparena_db).

-export([start_pool/0, query/2]).
-behaviour(ecpool_worker).

-export([start_pool/0, connect/1, query/2]).

%% `connect/1` is the ecpool_worker callback, invoked by ecpool via the
%% `Mod:connect/1` MFA when it opens (or reopens) a pooled connection, not
%% by a static call, so xref cannot see the reference.
-ignore_xref([{connect, 1}]).

-define(POOL, httparena_pg).

%% Every SQL the adapter runs, keyed by an atom name. Each statement is
%% parsed once per pooled connection at connect time (named after its key),
%% so the hot path runs `epgsql:prepared_query/3` against a cached
%% `#statement{}` record instead of re-parsing on every request.
statements() ->
#{
read => ~"""
SELECT id, name, category, price, quantity, active, tags,
rating_score, rating_count
FROM items
WHERE id = $1
""",
list => ~"""
SELECT id, name, category, price, quantity, active, tags,
rating_score, rating_count
FROM items
WHERE category = $1
ORDER BY id
LIMIT $2 OFFSET $3
""",
count => ~"SELECT COUNT(*) FROM items WHERE category = $1",
create => ~"""
INSERT INTO items (id, name, category, price, quantity,
active, tags, rating_score, rating_count)
VALUES ($1, $2, $3, $4, $5, true, '[]'::jsonb, 0, 0)
ON CONFLICT (id) DO UPDATE
SET name = EXCLUDED.name,
category = EXCLUDED.category,
price = EXCLUDED.price,
quantity = EXCLUDED.quantity
""",
update => ~"""
UPDATE items
SET name = $2, category = $3, price = $4, quantity = $5
WHERE id = $1
""",
async_db => ~"""
SELECT id, name, category, price, quantity, active, tags,
rating_score, rating_count
FROM items
WHERE price BETWEEN $1 AND $2 LIMIT $3
""",
fortunes => ~"SELECT id, message FROM fortune"
}.

%% Start an ecpool pool of `pool_size()` connections. ecpool routes each
%% request to a worker via `gproc_pool` (an ETS lookup, no central checkout
%% broker), so concurrent callers spread across connections instead of
%% serializing through one coordinator. `auto_reconnect` makes a worker
%% reopen (and re-prepare) its connection if the backend drops it.
-spec start_pool() -> ok | disabled.
start_pool() ->
case os:getenv("DATABASE_URL") of
false ->
disabled;
Url ->
ConnMap = parse_url(Url),
Size = pool_size(),
PoolConfig = [
{name, ?POOL},
{init_count, Size},
{max_count, Size},
{start_mfa, {epgsql, connect, [ConnMap]}}
Opts = [
{pool_size, pool_size()},
{pool_type, random},
{auto_reconnect, 2},
{conn_map, ConnMap}
],
{ok, _Pid} = pooler:new_pool(PoolConfig),
{ok, _} = ecpool:start_sup_pool(?POOL, ?MODULE, Opts),
ok
end.

-spec query(iodata(), [term()]) ->
{ok, list(), list()} | {ok, non_neg_integer()} | {error, term()}.
query(Sql, Params) ->
case pooler:take_member(?POOL) of
Conn when is_pid(Conn) ->
try
epgsql:equery(Conn, Sql, Params)
after
pooler:return_member(?POOL, Conn, ok)
end;
error_no_members ->
{error, no_members}
%% ecpool_worker callback: open a connection and parse every statement on it
%% (the prepared statement must exist on each backend connection). The
%% resulting `#statement{}` metadata is connection-independent, so we cache
%% one per name in `persistent_term`; connections racing to put the same
%% value (including after a reconnect) is harmless.
-spec connect([term()]) -> {ok, epgsql:connection()}.
connect(Opts) ->
{conn_map, ConnMap} = lists:keyfind(conn_map, 1, Opts),
{ok, Conn} = epgsql:connect(ConnMap),
ok = maps:foreach(
fun(Name, Sql) -> prepare(Conn, Name, Sql) end,
statements()
),
{ok, Conn}.

prepare(Conn, Name, Sql) ->
{ok, Stmt} = epgsql:parse(Conn, atom_to_list(Name), Sql, []),
Key = {?MODULE, stmt, Name},
case persistent_term:get(Key, undefined) of
undefined -> persistent_term:put(Key, Stmt);
_ -> ok
end.

%% Run the prepared statement on a pool-picked connection. ecpool hands the
%% callback the connection chosen for this request; epgsql serializes
%% commands per connection through its own mailbox, so callers queue on the
%% chosen connection, never on a shared broker. A worker mid-reconnect
%% yields `{error, disconnected}`, which the handlers map to a 5xx.
-spec query(atom(), [term()]) ->
{ok, list(), list()} | {ok, non_neg_integer()} | {error, term()}.
query(Name, Params) ->
Stmt = persistent_term:get({?MODULE, stmt, Name}),
ecpool:with_client(?POOL, fun(Conn) ->
epgsql:prepared_query(Conn, Stmt, Params)
end).

parse_url(Url) ->
Parsed = uri_string:parse(Url),
{User, Pass} = split_userinfo(maps:get(userinfo, Parsed, "")),
Expand Down
Loading