Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/opensearch/transport/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ class Client
# exception (false by default)
# @option arguments Array<Number> :retry_on_status Retry when specific status codes are returned
#
# @option arguments [Number] :retry_backoff Base delay in seconds before retrying (nil by default,
# meaning no delay between retries). When set, retries will
# sleep with exponential backoff: retry_backoff * (retry_backoff_factor ** (attempt - 1))
# with up to 25% random jitter.
# @option arguments [Number] :retry_backoff_factor Multiplier for exponential backoff (2 by default)
#
# @option arguments [Boolean] :reload_on_failure Reload connections after failure (false by default)
#
# @option arguments [Boolean] :ignore_404_on_delete Whether to ignore 404/NotFound error on http DELETE (false by default)
Expand Down
25 changes: 25 additions & 0 deletions lib/opensearch/transport/transport/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def initialize(arguments = {}, &block)
@reload_after = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
@resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
@retry_on_status = Array(options[:retry_on_status]).map(&:to_i)

@retry_backoff = options[:retry_backoff]
@retry_backoff_factor = options[:retry_backoff_factor] || 2
end

# Returns a connection from the connection pool by delegating to {Connections::Collection#get_connection}.
Expand Down Expand Up @@ -304,6 +307,7 @@ def perform_request(method, path, params = {}, body = nil, _headers = nil, opts
raise e unless response && @retry_on_status.include?(response.status)
log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
if tries <= (max_retries || DEFAULT_MAX_RETRIES)
__retry_backoff_sleep(tries)
retry
else
log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries"
Expand All @@ -316,6 +320,7 @@ def perform_request(method, path, params = {}, body = nil, _headers = nil, opts

if reload_on_failure && (tries < connections.all.size)
log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})"
__retry_backoff_sleep(tries)
reload_connections! and retry
end

Expand All @@ -324,6 +329,7 @@ def perform_request(method, path, params = {}, body = nil, _headers = nil, opts
raise exception unless max_retries
log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}"
if tries <= max_retries
__retry_backoff_sleep(tries)
retry
else
log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries"
Expand Down Expand Up @@ -382,6 +388,25 @@ def host_unreachable_exceptions

private

# Sleeps with exponential backoff before a retry attempt, if `retry_backoff` is configured.
#
# The delay is calculated as: retry_backoff * (retry_backoff_factor ** (tries - 1))
# with up to 25% random jitter to avoid thundering herd effects.
#
# @param tries [Integer] The current attempt number (1-based)
#
# @api private
#
def __retry_backoff_sleep(tries)
return unless @retry_backoff

delay = @retry_backoff * (@retry_backoff_factor**(tries - 1))
jitter = delay * rand * 0.25
sleep_duration = delay + jitter
log_warn "[Retry] Backing off for #{format('%.2f', sleep_duration)}s before retry ##{tries}"
sleep(sleep_duration)
end

USER_AGENT_STR = 'User-Agent'.freeze
USER_AGENT_REGEX = /user-?_?agent/
CONTENT_TYPE_STR = 'Content-Type'.freeze
Expand Down
76 changes: 76 additions & 0 deletions spec/opensearch/transport/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,80 @@
end
end
end

context 'when the client has `retry_backoff` configured' do
let(:client) do
OpenSearch::Transport::Client.new(arguments)
end

let(:arguments) do
{
hosts: ['http://unavailable:9200', 'http://unavailable:9201'],
retry_on_failure: 2,
retry_backoff: 1,
retry_backoff_factor: 2
}
end

context 'when a request fails and is retried' do
before do
allow(client.transport).to receive(:sleep)
end

it 'sleeps with exponential backoff before each retry' do
expect {
client.transport.perform_request('GET', '/info')
}.to raise_exception(OpenSearch::Transport::Transport::Error)

expect(client.transport).to have_received(:sleep).with(a_value_between(1.0, 1.25)).ordered
expect(client.transport).to have_received(:sleep).with(a_value_between(2.0, 2.5)).ordered
end
end

context 'when retry_backoff is not set' do
let(:arguments) do
{
hosts: ['http://unavailable:9200'],
retry_on_failure: 1
}
end

before do
allow(client.transport).to receive(:sleep)
end

it 'does not sleep between retries' do
expect {
client.transport.perform_request('GET', '/info')
}.to raise_exception(OpenSearch::Transport::Transport::Error)

expect(client.transport).not_to have_received(:sleep)
end
end

context 'when retrying on status codes' do
let(:arguments) do
{
hosts: OPENSEARCH_HOSTS,
retry_on_failure: 2,
retry_on_status: [404],
retry_backoff: 0.5,
retry_backoff_factor: 2
}
end

before do
allow(client.transport).to receive(:sleep)
end

it 'sleeps with exponential backoff on retry_on_status retries' do
expect {
client.transport.perform_request('GET', 'myindex/_doc/1?routing=FOOBARBAZ')
}.to raise_exception(OpenSearch::Transport::Transport::Errors::NotFound)

expect(client.transport).to have_received(:sleep).with(a_value_between(0.5, 0.625)).ordered
expect(client.transport).to have_received(:sleep).with(a_value_between(1.0, 1.25)).ordered
end
end
end
end