Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
require "logstash/pipelines_registry"
require "logstash/persisted_queue_config_validator"
require "logstash/pipeline_resource_usage_validator"
require "logstash/ssl_file_tracker"
require "stud/trap"
require "uri"
require "socket"
Expand All @@ -61,10 +60,11 @@ class LogStash::Agent
# :name [String] - identifier for the agent
# :auto_reload [Boolean] - enable reloading of pipelines
# :reload_interval [Integer] - reload pipelines every X seconds
def initialize(settings = LogStash::SETTINGS, source_loader = nil)
def initialize(settings = LogStash::SETTINGS, source_loader = nil, ssl_file_tracker = nil)
@logger = self.class.logger
@settings = settings
@auto_reload = setting("config.reload.automatic")
@ssl_file_tracker = ssl_file_tracker
@ephemeral_id = SecureRandom.uuid

java_import("org.logstash.health.HealthObserver")
Expand Down Expand Up @@ -123,11 +123,6 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
dispatcher.fire(:after_initialize)

@running = Concurrent::AtomicBoolean.new(false)
if @auto_reload && setting("ssl.reload.automatic")
java_import org.logstash.common.FileWatchService
@file_watch_service = FileWatchService.create
@ssl_file_tracker = LogStash::SslFileTracker.new(@file_watch_service)
end
end

def execute
Expand Down Expand Up @@ -289,7 +284,7 @@ def shutdown

transition_to_stopped
converge_result = shutdown_pipelines
@file_watch_service&.close
@ssl_file_tracker&.close
stop_collecting_metrics
stop_webserver
converge_result
Expand Down
13 changes: 11 additions & 2 deletions logstash-core/lib/logstash/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class LogStash::Runner < Clamp::StrictCommand
SYSTEM_SETTINGS = LogStash::SETTINGS.clone
LogStash::PLUGIN_REGISTRY.setup!

attr_reader :agent, :settings, :source_loader
attr_reader :agent, :settings, :source_loader, :ssl_file_tracker
attr_accessor :bootstrap_checks

def initialize(*args)
Expand Down Expand Up @@ -333,6 +333,14 @@ def execute
signal_usage_error(e.message)
return 1
end

# CPM BootstrapCheck may have flipped `config.reload.automatic`, so evaluate here
# before :after_bootstrap_checks hooks (CPM/monitoring) add their sources.
if @settings.get_value("config.reload.automatic") && @settings.get_value("ssl.reload.automatic")
require "logstash/ssl_file_tracker"
java_import org.logstash.common.FileWatchService
@ssl_file_tracker = LogStash::SslFileTracker.new(FileWatchService.create)
end
@dispatcher.fire(:after_bootstrap_checks)

LogStash::Util::set_thread_name(self.class.name)
Expand Down Expand Up @@ -389,7 +397,7 @@ def execute
@data_path_lock = FileLockFactory.obtainLock(java.nio.file.Paths.get(setting("path.data")).to_absolute_path, ".lock")

@dispatcher.fire(:before_agent)
@agent = create_agent(@settings, @source_loader)
@agent = create_agent(@settings, @source_loader, @ssl_file_tracker)
@dispatcher.fire(:after_agent)

# enable sigint/sigterm before starting the agent
Expand Down Expand Up @@ -433,6 +441,7 @@ def execute
Stud::untrap("TERM", sigterm_id) unless sigterm_id.nil?
Stud::untrap("HUP", sighup_id) unless sighup_id.nil?
FileLockFactory.releaseLock(@data_path_lock) if @data_path_lock
@ssl_file_tracker&.close
@log_fd.close if @log_fd
end # def self.main

Expand Down
56 changes: 52 additions & 4 deletions logstash-core/lib/logstash/ssl_file_tracker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,35 @@ class SslFileTracker
ssl_truststore_path
].freeze

# Settings key suffixes for Elasticsearch SSL connections.
# Used by non-pipeline consumers (ElasticsearchSource, LicenseReader) to discover paths.
SETTINGS_SSL_SUFFIXES = %w[
elasticsearch.ssl.certificate_authority
elasticsearch.ssl.truststore.path
elasticsearch.ssl.keystore.path
elasticsearch.ssl.certificate
elasticsearch.ssl.key
].freeze

# Holds all per-path watch state in one place.
# stamp: latest observed stamp. SHA-256 string for :file paths; mtime (Time) for :symlink paths.
# callback: the FileChangeCallback registered with FileWatchService. nil for polled paths.
# pipeline_ids: Set of pipeline_ids referencing this path. The Java watch is removed only when pipeline_ids is empty.
# file_type: :file for regular files (WatchService-driven), :symlink for symlinks (mtime on each converge).
WatchedFile = Struct.new(:stamp, :callback, :pipeline_ids, :file_type)

# Returns absolute SSL file paths configured under `namespace` in `settings`.
# @param settings [LogStash::Settings]
# @param namespace [String] e.g. "xpack.management"
# @return [Array<String>]
def self.paths_from_settings(settings, namespace)
SETTINGS_SSL_SUFFIXES.filter_map do |suffix|
val = settings.get_value("#{namespace}.#{suffix}") rescue nil
s = val.to_s
s.empty? ? nil : ::File.expand_path(s)
end
end

def initialize(file_watch_service)
@file_watch_service = file_watch_service
# id includes pipeline_id and xpack service, { id => [file_path] }, tracks which paths each id registered
Expand Down Expand Up @@ -108,7 +130,6 @@ def register_paths(id, paths)
raise
end
end
private :register_paths

# Starts watching all SSL file paths for the pipeline. Paths already watched
# by another pipeline share the same WatchedFile entry and are not re-registered.
Expand Down Expand Up @@ -165,18 +186,19 @@ def deregister(pipeline_id)
end

# Refreshes mtime stamps for symlink paths belonging to the given ids.
# @param ids [Array, Set]
# @param ids [Symbol, String, Set]
# @return [void]
def refresh_symlink_stamps(ids)
return if ids.empty?
target_ids = Set.new(Array(ids).map(&:to_sym))
target_ids = Array(ids).map(&:to_sym)
return if target_ids.empty?
Comment thread
andsel marked this conversation as resolved.

# Collect unique polled paths only for the ids.
polled_paths = @mutex.synchronize do
target_ids.flat_map { |id| @id_paths[id] || [] }
.select { |p| @path_watched[p]&.file_type == :symlink }
.uniq
end
return if polled_paths.empty?

# Stat outside the mutex
new_stamps = polled_paths.to_h { |p| [p, compute_mtime(p)] }.compact
Expand Down Expand Up @@ -208,6 +230,32 @@ def stale_pipeline_ids
@mutex.synchronize { (@stale_ids & @pipeline_ids).to_a }
end

# Atomically refresh the given id's stamps. When the paths were stale, clear its stale flag, and yield.
# If the block raises, the flag is reasserted so the next cycle can retry.
# @param id [Symbol, String]
# @yield called only when id is stale
# @return [Boolean] true if the block ran
def consume_stale(id)
id = id.to_sym
refresh_symlink_stamps(id)

was_stale = @mutex.synchronize { !@stale_ids.delete?(id).nil? }
return false unless was_stale

begin
yield if block_given?
true
rescue
@mutex.synchronize { @stale_ids.add(id) }
raise
end
end

# Closes the underlying FileWatchService. Called on agent shutdown.
def close
@file_watch_service&.close
end

private

# Returns a FileChangeCallback lambda that recomputes the SHA-256 checksum
Expand Down
23 changes: 10 additions & 13 deletions logstash-core/spec/logstash/agent_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
require "stud/temporary"
require "logstash/inputs/generator"
require "logstash/config/source/local"
require "logstash/ssl_file_tracker"
require_relative "../support/mocks_classes"
require "fileutils"
require_relative "../support/helpers"
Expand Down Expand Up @@ -619,20 +620,15 @@ def register
allow(LogStash::WebServer).to receive(:from_settings).with(any_args).and_return(double("WebServer").as_null_object)
end

it "is a SslFileTracker when config.reload.automatic and ssl.reload.automatic are both true" do
agent = described_class.new(mock_settings("config.reload.automatic" => true, "ssl.reload.automatic" => true), nil)
expect(agent.ssl_file_tracker).to be_a(LogStash::SslFileTracker)
it "exposes the tracker passed to the constructor" do
tracker = double("ssl_file_tracker").as_null_object
agent = described_class.new(mock_settings({}), nil, tracker)
expect(agent.ssl_file_tracker).to be(tracker)
agent.shutdown
end

it "is nil when ssl.reload.automatic is false even if config.reload.automatic is true" do
agent = described_class.new(mock_settings("config.reload.automatic" => true, "ssl.reload.automatic" => false), nil)
expect(agent.ssl_file_tracker).to be_nil
agent.shutdown
end

it "is nil when config.reload.automatic is false" do
agent = described_class.new(mock_settings("config.reload.automatic" => false, "ssl.reload.automatic" => true), nil)
it "is nil when no tracker is passed" do
agent = described_class.new(mock_settings({}), nil)
expect(agent.ssl_file_tracker).to be_nil
agent.shutdown
end
Expand All @@ -646,7 +642,7 @@ def register
end

context "when ssl reload is disabled" do
let(:agent) { described_class.new(mock_settings("config.reload.automatic" => false), nil) }
let(:agent) { described_class.new(mock_settings({}), nil) }

after :each do
agent.shutdown
Expand All @@ -658,7 +654,8 @@ def register
end

context "when ssl reload is enabled" do
let(:agent) { described_class.new(mock_settings("config.reload.automatic" => true, "ssl.reload.automatic" => true), nil) }
let(:tracker) { LogStash::SslFileTracker.new(org.logstash.common.FileWatchService.create) }
let(:agent) { described_class.new(mock_settings("config.reload.automatic" => true), nil, tracker) }

after :each do
agent.shutdown
Expand Down
63 changes: 63 additions & 0 deletions logstash-core/spec/logstash/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,69 @@
end
end

describe "SSL file tracker creation" do
subject(:runner) { LogStash::Runner.new("") }
let(:fake_tracker) { double("SslFileTracker").as_null_object }
let(:config_file) do
path = Stud::Temporary.pathname
File.write(path, "input { generator { count => 1 } } output { null {} }")
path
end

# Settings are mutated by `-S`, `-f`, and `-t` flags.
# Reset them before and after each example so we neither inherit nor leak global state.
SSL_FILE_TRACKER_TEST_SETTINGS = {
"config.reload.automatic" => false,
"ssl.reload.automatic" => false,
"config.test_and_exit" => false,
"path.config" => nil,
}.freeze

def reload_settings
SSL_FILE_TRACKER_TEST_SETTINGS.each { |k, v| LogStash::SETTINGS.set_value(k, v) }
end

before do
require "logstash/ssl_file_tracker"
allow(LogStash::SslFileTracker).to receive(:new).and_return(fake_tracker)
reload_settings
end

after { reload_settings }

it "creates the tracker when both config.reload.automatic and ssl.reload.automatic are true" do
runner.run(["-f", config_file, "-Sconfig.reload.automatic=true", "-Sssl.reload.automatic=true"])
expect(LogStash::SslFileTracker).to have_received(:new)
end

it "does not create the tracker when config.reload.automatic is false" do
runner.run(["-f", config_file, "-Sssl.reload.automatic=true"])
expect(LogStash::SslFileTracker).not_to have_received(:new)
end

it "does not create the tracker when ssl.reload.automatic is false" do
runner.run(["-f", config_file, "-Sconfig.reload.automatic=true"])
expect(LogStash::SslFileTracker).not_to have_received(:new)
end

it "creates the tracker when a bootstrap_check flips config.reload.automatic (CPM-style)" do
flip_check = Class.new do
def self.check(settings)
settings.set("config.reload.automatic", true)
end
end
runner.bootstrap_checks = [flip_check]
runner.run(["-f", config_file, "-Sssl.reload.automatic=true"])
expect(LogStash::SslFileTracker).to have_received(:new)
end

it "closes the tracker if the runner aborts before the agent takes ownership" do
# -t: config.test_and_exit
expect(fake_tracker).to receive(:close)
runner.run(["-t", "-f", config_file, "-Sconfig.reload.automatic=true", "-Sssl.reload.automatic=true"])
end
end

describe "--config.test_and_exit" do
subject { LogStash::Runner.new("") }
let(:args) { ["-t", "-e", pipeline_string] }
Expand Down
67 changes: 67 additions & 0 deletions logstash-core/spec/logstash/ssl_file_tracker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -560,4 +560,71 @@ def two_cert_pipeline(id = :main)
it_behaves_like "two pipelines sharing a path"
end
end

describe ".paths_from_settings" do
let(:settings) { double("settings") }
let(:namespace) { "xpack.management" }

it "returns absolute paths for every configured SSL suffix" do
allow(settings).to receive(:get_value).with("xpack.management.elasticsearch.ssl.certificate_authority").and_return("/tmp/ca.crt")
allow(settings).to receive(:get_value).with("xpack.management.elasticsearch.ssl.truststore.path").and_return("relative/ts.p12")
allow(settings).to receive(:get_value).with("xpack.management.elasticsearch.ssl.keystore.path").and_return(nil)
allow(settings).to receive(:get_value).with("xpack.management.elasticsearch.ssl.certificate").and_return("")
allow(settings).to receive(:get_value).with("xpack.management.elasticsearch.ssl.key").and_return(nil)
Comment thread
andsel marked this conversation as resolved.

paths = described_class.paths_from_settings(settings, namespace)
expect(paths).to contain_exactly(File.expand_path("/tmp/ca.crt"), File.expand_path("relative/ts.p12"))
end

it "swallows settings.get_value errors and returns empty array when nothing configured" do
allow(settings).to receive(:get_value).and_raise(ArgumentError, "unknown setting")
expect(described_class.paths_from_settings(settings, namespace)).to eq([])
end
end

describe "#consume_stale" do
it "does not yield when id was never stale" do
ran = false
result = tracker.consume_stale(:".cpm") { ran = true }
expect(ran).to eq(false)
expect(result).to eq(false)
end

it "yields and clears the stale flag when id is stale" do
tracker.instance_variable_get(:@stale_ids).add(:".cpm")

ran = false
result = tracker.consume_stale(:".cpm") { ran = true }

expect(ran).to eq(true)
expect(result).to eq(true)
expect(tracker.instance_variable_get(:@stale_ids)).not_to include(:".cpm")
end

it "accepts string ids" do
tracker.instance_variable_get(:@stale_ids).add(:".cpm")
expect(tracker.consume_stale(".cpm") {}).to eq(true)
end

it "re-asserts the stale flag when the block raises" do
tracker.instance_variable_get(:@stale_ids).add(:".cpm")

expect {
tracker.consume_stale(:".cpm") { raise "rebuild failed" }
}.to raise_error("rebuild failed")

expect(tracker.instance_variable_get(:@stale_ids)).to include(:".cpm")
end

it "clears only the given id" do
stale = tracker.instance_variable_get(:@stale_ids)
stale.add(:".cpm")
stale.add(:".cpm_license")

tracker.consume_stale(:".cpm") {}

expect(stale).not_to include(:".cpm")
expect(stale).to include(:".cpm_license")
end
end
end
Loading
Loading