Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@
position = 0 :: non_neg_integer(),
filter :: undefined | osiris_bloom:mstate(),
filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size(),
read_ahead = #ra{} :: #ra{}}).
read_ahead = #ra{} :: #ra{},
use_sendfile = true :: boolean()}).
Comment thread
kjnilsson marked this conversation as resolved.
-record(write,
{type = writer :: writer | acceptor,
segment_size = {?LOG_HEADER_SIZE, 0} :: {non_neg_integer(), non_neg_integer()},
Expand Down Expand Up @@ -1069,6 +1070,13 @@ check_chunk_has_expected_epoch(Name, ChunkId, Epoch, IdxFiles) ->
end
end.

use_sendfile(ssl) -> false;
use_sendfile(tcp) ->
case os:type() of
{unix, darwin} -> false;
_ -> true
end.

init_data_reader_at(ChunkId, FilePos, File,
#{dir := Dir, name := Name,
shared := Shared,
Expand All @@ -1094,6 +1102,7 @@ init_data_reader_at(ChunkId, FilePos, File,
chunk_selector = all,
position = FilePos,
transport = maps:get(transport, Config, tcp),
use_sendfile = use_sendfile(maps:get(transport, Config, tcp)),
read_ahead = ra(Config)},
fd = Fd}};
Err ->
Expand Down Expand Up @@ -1355,6 +1364,7 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos,
user_data),
next_offset = NextChunkId,
transport = maps:get(transport, Options, tcp),
use_sendfile = use_sendfile(maps:get(transport, Options, tcp)),
filter = FilterMatcher,
read_ahead = ra(Conf)},
fd = Fd}}.
Expand Down Expand Up @@ -1781,7 +1791,8 @@ send_file(Sock,
#?MODULE{mode = #read{type = RType,
chunk_selector = Selector,
transport = Transport,
filter_size = RaFs}} = State0,
filter_size = RaFs,
use_sendfile = UseSendfile}} = State0,
Callback) ->
case catch read_header0(State0) of
{ok, #{type := ChType,
Expand Down Expand Up @@ -1829,7 +1840,7 @@ send_file(Sock,
%% data read
case send(Transport, Sock, [FrameHeader, HeaderData]) of
ok ->
case sendfile(Transport, Fd, Sock,
case sendfile(UseSendfile, Transport, Fd, Sock,
DataPos, ToSend) of
ok ->
State = State1#?MODULE{mode = Read},
Expand Down Expand Up @@ -2582,22 +2593,22 @@ max_segment_size_reached(
CurrentSizeBytes >= MaxSizeBytes orelse
CurrentSizeChunks >= MaxSizeChunks.

sendfile(_Transport, _Fd, _Sock, _Pos, 0) ->
sendfile(_UseSendfile, _Transport, _Fd, _Sock, _Pos, 0) ->
ok;
sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) ->
sendfile(true, Transport, Fd, Sock, Pos, ToSend) ->
case file:sendfile(Fd, Sock, Pos, ToSend, []) of
{ok, 0} ->
%% TODO add counter for this?
sendfile(Transport, Fd, Sock, Pos, ToSend);
sendfile(true, Transport, Fd, Sock, Pos, ToSend);
{ok, BytesSent} ->
sendfile(Transport, Fd, Sock, Pos + BytesSent, ToSend - BytesSent);
sendfile(true, Transport, Fd, Sock, Pos + BytesSent, ToSend - BytesSent);
Comment thread
kjnilsson marked this conversation as resolved.
{error, _} = Err ->
Err
end;
sendfile(ssl, Fd, Sock, Pos, ToSend) ->
sendfile(false, Transport, Fd, Sock, Pos, ToSend) ->
case file:pread(Fd, Pos, ToSend) of
{ok, Data} ->
ssl:send(Sock, Data);
send(Transport, Sock, Data);
{error, _} = Err ->
Err
end.
Expand Down
49 changes: 23 additions & 26 deletions test/osiris_log_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2495,6 +2495,16 @@ samples_fail_with_io_error_unix(Config) ->
application:unset_env(osiris, max_segment_size_chunks)
end.

assert_sendfile_pread(T, ExpectedSendfile, ExpectedPread) ->
case os:type() of
{unix, darwin} ->
?assertEqual(0, osiris_tracer:call_count(T, file, sendfile)),
?assertEqual(ExpectedPread + ExpectedSendfile, osiris_tracer:call_count(T, file, pread));
_ ->
?assertEqual(ExpectedSendfile, osiris_tracer:call_count(T, file, sendfile)),
?assertEqual(ExpectedPread, osiris_tracer:call_count(T, file, pread))
end.

read_ahead_send_file(Config) ->
RAL = 4096, %% read ahead limit
HS = ?HEADER_SIZE_B,
Expand All @@ -2511,8 +2521,7 @@ read_ahead_send_file(Config) ->
{ok, R1} = osiris_log:send_file(CS, R0),
{ok, Read} = recv(SS, byte_size(D) + HS),
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)),
assert_sendfile_pread(T, 1, 1),
R1
end,
fun(write, #{w := W0}) ->
Expand All @@ -2539,8 +2548,7 @@ read_ahead_send_file(Config) ->
{ok, R1} = osiris_log:send_file(CS, R0),
{ok, Read} = recv(SS, byte_size(D) + HS),
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
?assertEqual(0, osiris_tracer:call_count(T, file, pread)),
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)),
assert_sendfile_pread(T, 1, 0),
R1
end,
fun(write, #{w := W0}) ->
Expand Down Expand Up @@ -2610,9 +2618,8 @@ read_ahead_send_file(Config) ->
{ok, Read} = recv(SS, byte_size(D) + HS),
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
%% we had the header already, no need to read
?assertEqual(0, osiris_tracer:call_count(T, file, pread)),
%% send the data with sendfile
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)),
assert_sendfile_pread(T, 1, 0),
R1
end
],
Expand Down Expand Up @@ -2692,15 +2699,13 @@ read_ahead_send_file_filter(Config) ->
case FSize =:= DFS of
true ->
%% just read the header
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
%% no read ahead data yet, used sendfile
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile));
assert_sendfile_pread(T, 1, 1);
false ->
%% read the header, but the filter data did not fit,
%% so read again
?assertEqual(2, osiris_tracer:call_count(T, file, pread)),
%% read the data in the second pread call
?assertEqual(0, osiris_tracer:call_count(T, file, sendfile))
assert_sendfile_pread(T, 0, 2)
end,
R1
end,
Expand All @@ -2720,15 +2725,13 @@ read_ahead_send_file_filter(Config) ->
case FSize =:= DFS of
true ->
%% just read the header
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
%% data just fit in the read ahead, no sendfile
?assertEqual(0, osiris_tracer:call_count(T, file, sendfile));
assert_sendfile_pread(T, 0, 1);
false ->
%% read the header ahead previously
%% (because it could not read the filter at first,
%% which triggered the read-ahead)
?assertEqual(0, osiris_tracer:call_count(T, file, pread)),
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile))
assert_sendfile_pread(T, 1, 0)
end,
R1
end,
Expand All @@ -2746,9 +2749,8 @@ read_ahead_send_file_filter(Config) ->
{ok, Read} = recv(SS, byte_size(D) + HS),
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
%% just read the header
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
%% large chunk, used sendfile
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)),
assert_sendfile_pread(T, 1, 1),
R1
end,
fun(write, #{w := W0}) ->
Expand All @@ -2762,8 +2764,7 @@ read_ahead_send_file_filter(Config) ->
{ok, R1} = osiris_log:send_file(CS, R0),
{ok, Read} = recv(SS, byte_size(D) + HS),
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)),
assert_sendfile_pread(T, 1, 1),
R1
end,
fun(write, #{w := W0}) ->
Expand All @@ -2779,8 +2780,7 @@ read_ahead_send_file_filter(Config) ->
{ok, R1} = osiris_log:send_file(CS, R0),
{ok, Read} = recv(SS, byte_size(D) + HS),
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
?assertEqual(0, osiris_tracer:call_count(T, file, sendfile)),
assert_sendfile_pread(T, 0, 1),
R1
end,
fun(write, #{w := W0}) ->
Expand All @@ -2796,8 +2796,7 @@ read_ahead_send_file_filter(Config) ->
{ok, R1} = osiris_log:send_file(CS, R0),
{ok, Read} = recv(SS, byte_size(D) + HS),
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
?assertEqual(0, osiris_tracer:call_count(T, file, pread)),
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)),
assert_sendfile_pread(T, 1, 0),
R1
end
],
Expand Down Expand Up @@ -2831,8 +2830,7 @@ read_ahead_send_file_on_off(Config) ->
{ok, R1} = osiris_log:send_file(CS, R0),
{ok, Read} = recv(SS, byte_size(D) + HS),
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)),
assert_sendfile_pread(T, 1, 1),
R1
end,
fun(write, #{w := W0}) ->
Expand All @@ -2847,8 +2845,7 @@ read_ahead_send_file_on_off(Config) ->
true ->
?assertEqual(1, osiris_tracer:call_count(T));
false ->
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile))
assert_sendfile_pread(T, 1, 1)
end,
R1
end
Expand Down
Loading