Skip to content
Open
216 changes: 173 additions & 43 deletions src/poolboy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
-export_type([pool/0]).

-define(TIMEOUT, 5000).
-define(DEFAULT_SIZE, 5).
-define(DEFAULT_TYPE, list).
-define(DEFAULT_STRATEGY, lifo).
-define(DEFAULT_OVERFLOW, 10).

-ifdef(pre17).
-type pid_queue() :: queue().
Expand All @@ -37,14 +41,15 @@
-type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}.

-record(state, {
supervisor :: undefined | pid(),
workers :: undefined | pid_queue(),
supervisor :: pid(),
worker_module :: atom(),
workers :: pid_queue(),
waiting :: pid_queue(),
monitors :: ets:tid(),
size = 5 :: non_neg_integer(),
size = ?DEFAULT_SIZE :: non_neg_integer(),
overflow = 0 :: non_neg_integer(),
max_overflow = 10 :: non_neg_integer(),
strategy = lifo :: lifo | fifo
max_overflow = ?DEFAULT_OVERFLOW :: non_neg_integer(),
strategy = ?DEFAULT_STRATEGY :: lifo | fifo
}).

-spec checkout(Pool :: pool()) -> pid().
Expand Down Expand Up @@ -147,26 +152,107 @@ status(Pool) ->

init({PoolArgs, WorkerArgs}) ->
process_flag(trap_exit, true),

WorkerModule = worker_module(PoolArgs),
Supervisor =
case worker_supervisor(PoolArgs) of
undefined ->
start_supervisor(WorkerModule, WorkerArgs);
Sup when is_pid(Sup) ->
monitor(process, Sup),
Sup
end,
Size = pool_size(PoolArgs),
Workers = init_workers(Supervisor, WorkerModule, Size),

MaxOverflow = max_overflow(PoolArgs),
Overflow = init_overflow(Size, MaxOverflow),

Waiting = queue:new(),
Monitors = ets:new(monitors, [private]),
init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}).

init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) ->
{ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs),
init(Rest, WorkerArgs, State#state{supervisor = Sup});
init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) ->
init(Rest, WorkerArgs, State#state{size = Size});
init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) ->
init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow});
init([{strategy, lifo} | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State#state{strategy = lifo});
init([{strategy, fifo} | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State#state{strategy = fifo});
init([_ | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State);
init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->
Workers = prepopulate(Size, Sup),
{ok, State#state{workers = Workers}}.
{ok, #state{
supervisor = Supervisor,
worker_module = WorkerModule,
workers = Workers,
waiting = Waiting,
monitors = Monitors,
size = Size,
overflow = Overflow,
max_overflow = MaxOverflow,
strategy = strategy(PoolArgs)
}}.

start_supervisor(undefined, _WorkerArgs) ->
error({badarg, "worker_module or worker_supervisor is required"});
start_supervisor(WorkerModule, WorkerArgs) ->
start_supervisor(WorkerModule, WorkerArgs, 1).

start_supervisor(WorkerModule, WorkerArgs, Retries) ->
case poolboy_sup:start_link(WorkerModule, WorkerArgs) of
{ok, NewPid} ->
NewPid;
{error, {already_started, Pid}} when Retries > 0 ->
MRef = erlang:monitor(process, Pid),
receive {'DOWN', MRef, _, _, _} -> ok
after ?TIMEOUT -> ok
end,
start_supervisor(WorkerModule, WorkerArgs, Retries - 1);
{error, Error} ->
exit({no_worker_supervisor, Error})
end.

init_workers(Sup, Mod, Size) ->
prepopulate(Size, Sup, Mod).

init_overflow(_Size, _MaxOverflow) ->
0.

worker_module(PoolArgs) ->
Is = is_atom(V = proplists:get_value(worker_module, PoolArgs)),
if not Is -> undefined; true -> V end.

worker_supervisor(PoolArgs) ->
case find_pid(V = proplists:get_value(worker_supervisor, PoolArgs)) of
Res = undefined when Res =:= V -> Res;
Res when is_pid(Res) -> Res;
Res = undefined when Res =/= V -> exit({noproc, V});
Res -> exit({Res, V})
end.

find_pid(undefined) ->
undefined;
find_pid(Name) when is_atom(Name) ->
find_pid({local, Name});
find_pid({local, Name}) ->
whereis(Name);
find_pid({global, Name}) ->
find_pid({via, global, Name});
find_pid({via, Registry, Name}) ->
Registry:whereis_name(Name);
find_pid({Name, Node}) ->
(catch erlang:monitor_node(Node, true)),
try rpc_call(Node, erlang, whereis, [Name], ?TIMEOUT)
catch _:Reason -> Reason
end.

rpc_call(Node, Mod, Fun, Args, Timeout) ->
case rpc:call(Node, Mod, Fun, Args, Timeout) of
{badrpc, Reason} -> exit({Reason, {Node, {Mod, Fun, Args}}});
Result -> Result
end.

pool_size(PoolArgs) ->
Is = is_integer(V = proplists:get_value(size, PoolArgs)),
if not Is -> ?DEFAULT_SIZE; true -> V end.

max_overflow(PoolArgs) ->
Is = is_integer(V = proplists:get_value(max_overflow, PoolArgs)),
if not Is -> ?DEFAULT_OVERFLOW; true -> V end.

-define(IS_STRATEGY(S), lists:member(S, [lifo, fifo])).
strategy(PoolArgs) ->
Is = ?IS_STRATEGY(V = proplists:get_value(strategy, PoolArgs)),
if not Is -> ?DEFAULT_STRATEGY; true -> V end.

handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
case ets:lookup(Monitors, Pid) of
Expand Down Expand Up @@ -202,6 +288,7 @@ handle_cast(_Msg, State) ->

handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) ->
#state{supervisor = Sup,
worker_module = Mod,
workers = Workers,
monitors = Monitors,
overflow = Overflow,
Expand All @@ -213,7 +300,8 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) ->
true = ets:insert(Monitors, {Pid, CRef, MRef}),
{reply, Pid, State#state{workers = Left}};
{empty, _Left} when MaxOverflow > 0, Overflow < MaxOverflow ->
{Pid, MRef} = new_worker(Sup, FromPid),
Pid = new_worker(Sup, Mod),
MRef = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
{reply, Pid, State#state{overflow = Overflow + 1}};
{empty, _Left} when Block =:= false ->
Expand Down Expand Up @@ -247,6 +335,8 @@ handle_call(_Msg, _From, State) ->
Reply = {error, invalid_message},
{reply, Reply, State}.

handle_info({'DOWN', _, process, Pid, Reason}, State = #state{supervisor = Pid}) ->
{stop, Reason, State};
handle_info({'DOWN', MRef, _, _, _}, State) ->
case ets:match(State#state.monitors, {'$1', '_', MRef}) of
[[Pid]] ->
Expand All @@ -257,8 +347,11 @@ handle_info({'DOWN', MRef, _, _, _}, State) ->
Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
end;
handle_info({'EXIT', Pid, Reason}, State = #state{supervisor = Pid}) ->
{stop, Reason, State};
handle_info({'EXIT', Pid, _Reason}, State) ->
#state{supervisor = Sup,
worker_module = Mod,
monitors = Monitors} = State,
case ets:lookup(Monitors, Pid) of
[{Pid, _, MRef}] ->
Expand All @@ -270,19 +363,21 @@ handle_info({'EXIT', Pid, _Reason}, State) ->
case queue:member(Pid, State#state.workers) of
true ->
W = filter_worker_by_pid(Pid, State#state.workers),
{noreply, State#state{workers = queue:in(new_worker(Sup), W)}};
{noreply, State#state{workers = queue:in(new_worker(Sup, Mod), W)}};
false ->
{noreply, State}
end
end;

handle_info({nodedown, Node}, State = #state{supervisor = Sup})
when Node == erlang:node(Sup) ->
{stop, nodedown, State};
handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, State) ->
terminate(Reason, State = #state{supervisor = Sup}) ->
Workers = queue:to_list(State#state.workers),
ok = lists:foreach(fun (W) -> unlink(W) end, Workers),
true = exit(State#state.supervisor, shutdown),
stop_supervisor(Reason, Sup),
ok.

code_change(_OldVsn, State, _Extra) ->
Expand All @@ -296,37 +391,62 @@ start_pool(StartFun, PoolArgs, WorkerArgs) ->
gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, [])
end.

new_worker(Sup) ->
{ok, Pid} = supervisor:start_child(Sup, []),
new_worker(Sup, Mod) ->
Node = erlang:node(Sup),
{ok, Pid} =
case rpc_call(Node, erlang, process_info, [Sup, registered_name], ?TIMEOUT) of
{registered_name, Name} ->
case function_exported(Node, Name, start_child, 0) of
true -> rpc_call(Node, Name, start_child, [], ?TIMEOUT);
false ->
Args = child_args(Sup, Mod),
supervisor:start_child(Sup, Args)
end;
R when R == undefined; R == [] ->
Args = child_args(Sup, Mod),
supervisor:start_child(Sup, Args)
end,
true = link(Pid),
Pid.

new_worker(Sup, FromPid) ->
Pid = new_worker(Sup),
Ref = erlang:monitor(process, FromPid),
{Pid, Ref}.

get_worker_with_strategy(Workers, fifo) ->
queue:out(Workers);
get_worker_with_strategy(Workers, lifo) ->
queue:out_r(Workers).

child_args(Sup, Mod) ->
Node = erlang:node(Sup),
case supervisor:get_childspec(Sup, Mod) of
{ok, #{start := {M,F,A}}} ->
case function_exported(Node, M, F, length(A)) of
true -> []
end;
{ok, {_Id, {M,F,A}, _R, _SD, _T, _M}} ->
case function_exported(Node, M, F, length(A)) of
true -> []
end;
_ -> []
end.

function_exported(Node, Module, Name, Arity) ->
rpc_call(Node, erlang, function_exported, [Module, Name, Arity], ?TIMEOUT).

dismiss_worker(Sup, Pid) ->
true = unlink(Pid),
supervisor:terminate_child(Sup, Pid).

filter_worker_by_pid(Pid, Workers) ->
queue:filter(fun (WPid) -> WPid =/= Pid end, Workers).

prepopulate(N, _Sup) when N < 1 ->
prepopulate(N, _Sup, _Mod) when N < 1 ->
queue:new();
prepopulate(N, Sup) ->
prepopulate(N, Sup, queue:new()).
prepopulate(N, Sup, Mod) ->
prepopulate(N, Sup, Mod, queue:new()).

prepopulate(0, _Sup, Workers) ->
prepopulate(0, _Sup, _Mod, Workers) ->
Workers;
prepopulate(N, Sup, Workers) ->
prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)).
prepopulate(N, Sup, Mod, Workers) ->
prepopulate(N-1, Sup, Mod, queue:in(new_worker(Sup, Mod), Workers)).

handle_checkin(Pid, State) ->
#state{supervisor = Sup,
Expand All @@ -348,19 +468,20 @@ handle_checkin(Pid, State) ->

handle_worker_exit(Pid, State) ->
#state{supervisor = Sup,
worker_module = Mod,
monitors = Monitors,
overflow = Overflow} = State,
case queue:out(State#state.waiting) of
{{value, {From, CRef, MRef}}, LeftWaiting} ->
NewWorker = new_worker(State#state.supervisor),
NewWorker = new_worker(Sup, Mod),
true = ets:insert(Monitors, {NewWorker, CRef, MRef}),
gen_server:reply(From, NewWorker),
State#state{waiting = LeftWaiting};
{empty, Empty} when Overflow > 0 ->
State#state{overflow = Overflow - 1, waiting = Empty};
{empty, Empty} ->
W = filter_worker_by_pid(Pid, State#state.workers),
Workers = queue:in(new_worker(Sup), W),
Workers = queue:in(new_worker(Sup, Mod), W),
State#state{workers = Workers, waiting = Empty}
end.

Expand All @@ -375,3 +496,12 @@ state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) ->
full;
state_name(_State) ->
overflow.

stop_supervisor(Reason, Pid) when is_pid(Pid) ->
case erlang:node(Pid) of
N when N == node() ->
exit(Pid, Reason);
_ when Reason =/= nodedown ->
catch gen_server:stop(Pid, Reason, ?TIMEOUT);
_ -> ok
end.
2 changes: 1 addition & 1 deletion src/poolboy_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
-export([start_link/2, init/1]).

start_link(Mod, Args) ->
supervisor:start_link(?MODULE, {Mod, Args}).
supervisor:start_link({local, Mod}, ?MODULE, {Mod, Args}).

init({Mod, Args}) ->
{ok, {{simple_one_for_one, 0, 1},
Expand Down
6 changes: 6 additions & 0 deletions src/poolboy_worker_supervisor.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-module(poolboy_worker_supervisor).

-callback start_child() -> {ok, Pid} |
{error, Reason} when
Pid :: pid(),
Reason :: term().