diff --git a/src/osiris_log.erl b/src/osiris_log.erl index a4407e2..0e7c50a 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -451,7 +451,8 @@ position = 0 :: non_neg_integer(), filter :: undefined | osiris_bloom:mstate(), filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size(), - read_ahead = #ra{} :: #ra{}}). + read_ahead = #ra{} :: #ra{}, + use_sendfile = true :: boolean()}). -record(write, {type = writer :: writer | acceptor, segment_size = {?LOG_HEADER_SIZE, 0} :: {non_neg_integer(), non_neg_integer()}, @@ -1069,6 +1070,13 @@ check_chunk_has_expected_epoch(Name, ChunkId, Epoch, IdxFiles) -> end end. +use_sendfile(ssl) -> false; +use_sendfile(tcp) -> + case os:type() of + {unix, darwin} -> false; + _ -> true + end. + init_data_reader_at(ChunkId, FilePos, File, #{dir := Dir, name := Name, shared := Shared, @@ -1094,6 +1102,7 @@ init_data_reader_at(ChunkId, FilePos, File, chunk_selector = all, position = FilePos, transport = maps:get(transport, Config, tcp), + use_sendfile = use_sendfile(maps:get(transport, Config, tcp)), read_ahead = ra(Config)}, fd = Fd}}; Err -> @@ -1355,6 +1364,7 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos, user_data), next_offset = NextChunkId, transport = maps:get(transport, Options, tcp), + use_sendfile = use_sendfile(maps:get(transport, Options, tcp)), filter = FilterMatcher, read_ahead = ra(Conf)}, fd = Fd}}. @@ -1781,7 +1791,8 @@ send_file(Sock, #?MODULE{mode = #read{type = RType, chunk_selector = Selector, transport = Transport, - filter_size = RaFs}} = State0, + filter_size = RaFs, + use_sendfile = UseSendfile}} = State0, Callback) -> case catch read_header0(State0) of {ok, #{type := ChType, @@ -1829,7 +1840,7 @@ send_file(Sock, %% data read case send(Transport, Sock, [FrameHeader, HeaderData]) of ok -> - case sendfile(Transport, Fd, Sock, + case sendfile(UseSendfile, Transport, Fd, Sock, DataPos, ToSend) of ok -> State = State1#?MODULE{mode = Read}, @@ -2582,22 +2593,22 @@ max_segment_size_reached( CurrentSizeBytes >= MaxSizeBytes orelse CurrentSizeChunks >= MaxSizeChunks. -sendfile(_Transport, _Fd, _Sock, _Pos, 0) -> +sendfile(_UseSendfile, _Transport, _Fd, _Sock, _Pos, 0) -> ok; -sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) -> +sendfile(true, Transport, Fd, Sock, Pos, ToSend) -> case file:sendfile(Fd, Sock, Pos, ToSend, []) of {ok, 0} -> %% TODO add counter for this? - sendfile(Transport, Fd, Sock, Pos, ToSend); + sendfile(true, Transport, Fd, Sock, Pos, ToSend); {ok, BytesSent} -> - sendfile(Transport, Fd, Sock, Pos + BytesSent, ToSend - BytesSent); + sendfile(true, Transport, Fd, Sock, Pos + BytesSent, ToSend - BytesSent); {error, _} = Err -> Err end; -sendfile(ssl, Fd, Sock, Pos, ToSend) -> +sendfile(false, Transport, Fd, Sock, Pos, ToSend) -> case file:pread(Fd, Pos, ToSend) of {ok, Data} -> - ssl:send(Sock, Data); + send(Transport, Sock, Data); {error, _} = Err -> Err end. diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 740cb1b..26bc1e3 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -2495,6 +2495,16 @@ samples_fail_with_io_error_unix(Config) -> application:unset_env(osiris, max_segment_size_chunks) end. +assert_sendfile_pread(T, ExpectedSendfile, ExpectedPread) -> + case os:type() of + {unix, darwin} -> + ?assertEqual(0, osiris_tracer:call_count(T, file, sendfile)), + ?assertEqual(ExpectedPread + ExpectedSendfile, osiris_tracer:call_count(T, file, pread)); + _ -> + ?assertEqual(ExpectedSendfile, osiris_tracer:call_count(T, file, sendfile)), + ?assertEqual(ExpectedPread, osiris_tracer:call_count(T, file, pread)) + end. + read_ahead_send_file(Config) -> RAL = 4096, %% read ahead limit HS = ?HEADER_SIZE_B, @@ -2511,8 +2521,7 @@ read_ahead_send_file(Config) -> {ok, R1} = osiris_log:send_file(CS, R0), {ok, Read} = recv(SS, byte_size(D) + HS), ?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)), - ?assertEqual(1, osiris_tracer:call_count(T, file, pread)), - ?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)), + assert_sendfile_pread(T, 1, 1), R1 end, fun(write, #{w := W0}) -> @@ -2539,8 +2548,7 @@ read_ahead_send_file(Config) -> {ok, R1} = osiris_log:send_file(CS, R0), {ok, Read} = recv(SS, byte_size(D) + HS), ?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)), - ?assertEqual(0, osiris_tracer:call_count(T, file, pread)), - ?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)), + assert_sendfile_pread(T, 1, 0), R1 end, fun(write, #{w := W0}) -> @@ -2610,9 +2618,8 @@ read_ahead_send_file(Config) -> {ok, Read} = recv(SS, byte_size(D) + HS), ?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)), %% we had the header already, no need to read - ?assertEqual(0, osiris_tracer:call_count(T, file, pread)), %% send the data with sendfile - ?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)), + assert_sendfile_pread(T, 1, 0), R1 end ], @@ -2692,15 +2699,13 @@ read_ahead_send_file_filter(Config) -> case FSize =:= DFS of true -> %% just read the header - ?assertEqual(1, osiris_tracer:call_count(T, file, pread)), %% no read ahead data yet, used sendfile - ?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)); + assert_sendfile_pread(T, 1, 1); false -> %% read the header, but the filter data did not fit, %% so read again - ?assertEqual(2, osiris_tracer:call_count(T, file, pread)), %% read the data in the second pread call - ?assertEqual(0, osiris_tracer:call_count(T, file, sendfile)) + assert_sendfile_pread(T, 0, 2) end, R1 end, @@ -2720,15 +2725,13 @@ read_ahead_send_file_filter(Config) -> case FSize =:= DFS of true -> %% just read the header - ?assertEqual(1, osiris_tracer:call_count(T, file, pread)), %% data just fit in the read ahead, no sendfile - ?assertEqual(0, osiris_tracer:call_count(T, file, sendfile)); + assert_sendfile_pread(T, 0, 1); false -> %% read the header ahead previously %% (because it could not read the filter at first, %% which triggered the read-ahead) - ?assertEqual(0, osiris_tracer:call_count(T, file, pread)), - ?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)) + assert_sendfile_pread(T, 1, 0) end, R1 end, @@ -2746,9 +2749,8 @@ read_ahead_send_file_filter(Config) -> {ok, Read} = recv(SS, byte_size(D) + HS), ?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)), %% just read the header - ?assertEqual(1, osiris_tracer:call_count(T, file, pread)), %% large chunk, used sendfile - ?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)), + assert_sendfile_pread(T, 1, 1), R1 end, fun(write, #{w := W0}) -> @@ -2762,8 +2764,7 @@ read_ahead_send_file_filter(Config) -> {ok, R1} = osiris_log:send_file(CS, R0), {ok, Read} = recv(SS, byte_size(D) + HS), ?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)), - ?assertEqual(1, osiris_tracer:call_count(T, file, pread)), - ?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)), + assert_sendfile_pread(T, 1, 1), R1 end, fun(write, #{w := W0}) -> @@ -2779,8 +2780,7 @@ read_ahead_send_file_filter(Config) -> {ok, R1} = osiris_log:send_file(CS, R0), {ok, Read} = recv(SS, byte_size(D) + HS), ?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)), - ?assertEqual(1, osiris_tracer:call_count(T, file, pread)), - ?assertEqual(0, osiris_tracer:call_count(T, file, sendfile)), + assert_sendfile_pread(T, 0, 1), R1 end, fun(write, #{w := W0}) -> @@ -2796,8 +2796,7 @@ read_ahead_send_file_filter(Config) -> {ok, R1} = osiris_log:send_file(CS, R0), {ok, Read} = recv(SS, byte_size(D) + HS), ?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)), - ?assertEqual(0, osiris_tracer:call_count(T, file, pread)), - ?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)), + assert_sendfile_pread(T, 1, 0), R1 end ], @@ -2831,8 +2830,7 @@ read_ahead_send_file_on_off(Config) -> {ok, R1} = osiris_log:send_file(CS, R0), {ok, Read} = recv(SS, byte_size(D) + HS), ?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)), - ?assertEqual(1, osiris_tracer:call_count(T, file, pread)), - ?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)), + assert_sendfile_pread(T, 1, 1), R1 end, fun(write, #{w := W0}) -> @@ -2847,8 +2845,7 @@ read_ahead_send_file_on_off(Config) -> true -> ?assertEqual(1, osiris_tracer:call_count(T)); false -> - ?assertEqual(1, osiris_tracer:call_count(T, file, pread)), - ?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)) + assert_sendfile_pread(T, 1, 1) end, R1 end