diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 3a930d4f79..7e50c356d8 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -38,7 +38,6 @@ require "logstash/pipelines_registry" require "logstash/persisted_queue_config_validator" require "logstash/pipeline_resource_usage_validator" -require "logstash/ssl_file_tracker" require "stud/trap" require "uri" require "socket" @@ -61,10 +60,11 @@ class LogStash::Agent # :name [String] - identifier for the agent # :auto_reload [Boolean] - enable reloading of pipelines # :reload_interval [Integer] - reload pipelines every X seconds - def initialize(settings = LogStash::SETTINGS, source_loader = nil) + def initialize(settings = LogStash::SETTINGS, source_loader = nil, ssl_file_tracker = nil) @logger = self.class.logger @settings = settings @auto_reload = setting("config.reload.automatic") + @ssl_file_tracker = ssl_file_tracker @ephemeral_id = SecureRandom.uuid java_import("org.logstash.health.HealthObserver") @@ -123,11 +123,6 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil) dispatcher.fire(:after_initialize) @running = Concurrent::AtomicBoolean.new(false) - if @auto_reload && setting("ssl.reload.automatic") - java_import org.logstash.common.FileWatchService - @file_watch_service = FileWatchService.create - @ssl_file_tracker = LogStash::SslFileTracker.new(@file_watch_service) - end end def execute @@ -289,7 +284,7 @@ def shutdown transition_to_stopped converge_result = shutdown_pipelines - @file_watch_service&.close + @ssl_file_tracker&.close stop_collecting_metrics stop_webserver converge_result diff --git a/logstash-core/lib/logstash/runner.rb b/logstash-core/lib/logstash/runner.rb index 995b6b9e0c..ed69e0bdb6 100644 --- a/logstash-core/lib/logstash/runner.rb +++ b/logstash-core/lib/logstash/runner.rb @@ -243,7 +243,7 @@ class LogStash::Runner < Clamp::StrictCommand SYSTEM_SETTINGS = LogStash::SETTINGS.clone LogStash::PLUGIN_REGISTRY.setup! - attr_reader :agent, :settings, :source_loader + attr_reader :agent, :settings, :source_loader, :ssl_file_tracker attr_accessor :bootstrap_checks def initialize(*args) @@ -333,6 +333,14 @@ def execute signal_usage_error(e.message) return 1 end + + # CPM BootstrapCheck may have flipped `config.reload.automatic`, so evaluate here + # before :after_bootstrap_checks hooks (CPM/monitoring) add their sources. + if @settings.get_value("config.reload.automatic") && @settings.get_value("ssl.reload.automatic") + require "logstash/ssl_file_tracker" + java_import org.logstash.common.FileWatchService + @ssl_file_tracker = LogStash::SslFileTracker.new(FileWatchService.create) + end @dispatcher.fire(:after_bootstrap_checks) LogStash::Util::set_thread_name(self.class.name) @@ -389,7 +397,7 @@ def execute @data_path_lock = FileLockFactory.obtainLock(java.nio.file.Paths.get(setting("path.data")).to_absolute_path, ".lock") @dispatcher.fire(:before_agent) - @agent = create_agent(@settings, @source_loader) + @agent = create_agent(@settings, @source_loader, @ssl_file_tracker) @dispatcher.fire(:after_agent) # enable sigint/sigterm before starting the agent @@ -433,6 +441,7 @@ def execute Stud::untrap("TERM", sigterm_id) unless sigterm_id.nil? Stud::untrap("HUP", sighup_id) unless sighup_id.nil? FileLockFactory.releaseLock(@data_path_lock) if @data_path_lock + @ssl_file_tracker&.close @log_fd.close if @log_fd end # def self.main diff --git a/logstash-core/lib/logstash/ssl_file_tracker.rb b/logstash-core/lib/logstash/ssl_file_tracker.rb index 587f489517..78a2c86887 100644 --- a/logstash-core/lib/logstash/ssl_file_tracker.rb +++ b/logstash-core/lib/logstash/ssl_file_tracker.rb @@ -42,6 +42,16 @@ class SslFileTracker ssl_truststore_path ].freeze + # Settings key suffixes for Elasticsearch SSL connections. + # Used by non-pipeline consumers (ElasticsearchSource, LicenseReader) to discover paths. + SETTINGS_SSL_SUFFIXES = %w[ + elasticsearch.ssl.certificate_authority + elasticsearch.ssl.truststore.path + elasticsearch.ssl.keystore.path + elasticsearch.ssl.certificate + elasticsearch.ssl.key + ].freeze + # Holds all per-path watch state in one place. # stamp: latest observed stamp. SHA-256 string for :file paths; mtime (Time) for :symlink paths. # callback: the FileChangeCallback registered with FileWatchService. nil for polled paths. @@ -49,6 +59,18 @@ class SslFileTracker # file_type: :file for regular files (WatchService-driven), :symlink for symlinks (mtime on each converge). WatchedFile = Struct.new(:stamp, :callback, :pipeline_ids, :file_type) + # Returns absolute SSL file paths configured under `namespace` in `settings`. + # @param settings [LogStash::Settings] + # @param namespace [String] e.g. "xpack.management" + # @return [Array] + def self.paths_from_settings(settings, namespace) + SETTINGS_SSL_SUFFIXES.filter_map do |suffix| + val = settings.get_value("#{namespace}.#{suffix}") rescue nil + s = val.to_s + s.empty? ? nil : ::File.expand_path(s) + end + end + def initialize(file_watch_service) @file_watch_service = file_watch_service # id includes pipeline_id and xpack service, { id => [file_path] }, tracks which paths each id registered @@ -108,7 +130,6 @@ def register_paths(id, paths) raise end end - private :register_paths # Starts watching all SSL file paths for the pipeline. Paths already watched # by another pipeline share the same WatchedFile entry and are not re-registered. @@ -165,11 +186,11 @@ def deregister(pipeline_id) end # Refreshes mtime stamps for symlink paths belonging to the given ids. - # @param ids [Array, Set] + # @param ids [Symbol, String, Set] # @return [void] def refresh_symlink_stamps(ids) - return if ids.empty? - target_ids = Set.new(Array(ids).map(&:to_sym)) + target_ids = Array(ids).map(&:to_sym) + return if target_ids.empty? # Collect unique polled paths only for the ids. polled_paths = @mutex.synchronize do @@ -177,6 +198,7 @@ def refresh_symlink_stamps(ids) .select { |p| @path_watched[p]&.file_type == :symlink } .uniq end + return if polled_paths.empty? # Stat outside the mutex new_stamps = polled_paths.to_h { |p| [p, compute_mtime(p)] }.compact @@ -208,6 +230,32 @@ def stale_pipeline_ids @mutex.synchronize { (@stale_ids & @pipeline_ids).to_a } end + # Atomically refresh the given id's stamps. When the paths were stale, clear its stale flag, and yield. + # If the block raises, the flag is reasserted so the next cycle can retry. + # @param id [Symbol, String] + # @yield called only when id is stale + # @return [Boolean] true if the block ran + def consume_stale(id) + id = id.to_sym + refresh_symlink_stamps(id) + + was_stale = @mutex.synchronize { !@stale_ids.delete?(id).nil? } + return false unless was_stale + + begin + yield if block_given? + true + rescue + @mutex.synchronize { @stale_ids.add(id) } + raise + end + end + + # Closes the underlying FileWatchService. Called on agent shutdown. + def close + @file_watch_service&.close + end + private # Returns a FileChangeCallback lambda that recomputes the SHA-256 checksum diff --git a/logstash-core/spec/logstash/agent_spec.rb b/logstash-core/spec/logstash/agent_spec.rb index 24193e8e54..2896400366 100644 --- a/logstash-core/spec/logstash/agent_spec.rb +++ b/logstash-core/spec/logstash/agent_spec.rb @@ -19,6 +19,7 @@ require "stud/temporary" require "logstash/inputs/generator" require "logstash/config/source/local" +require "logstash/ssl_file_tracker" require_relative "../support/mocks_classes" require "fileutils" require_relative "../support/helpers" @@ -619,20 +620,15 @@ def register allow(LogStash::WebServer).to receive(:from_settings).with(any_args).and_return(double("WebServer").as_null_object) end - it "is a SslFileTracker when config.reload.automatic and ssl.reload.automatic are both true" do - agent = described_class.new(mock_settings("config.reload.automatic" => true, "ssl.reload.automatic" => true), nil) - expect(agent.ssl_file_tracker).to be_a(LogStash::SslFileTracker) + it "exposes the tracker passed to the constructor" do + tracker = double("ssl_file_tracker").as_null_object + agent = described_class.new(mock_settings({}), nil, tracker) + expect(agent.ssl_file_tracker).to be(tracker) agent.shutdown end - it "is nil when ssl.reload.automatic is false even if config.reload.automatic is true" do - agent = described_class.new(mock_settings("config.reload.automatic" => true, "ssl.reload.automatic" => false), nil) - expect(agent.ssl_file_tracker).to be_nil - agent.shutdown - end - - it "is nil when config.reload.automatic is false" do - agent = described_class.new(mock_settings("config.reload.automatic" => false, "ssl.reload.automatic" => true), nil) + it "is nil when no tracker is passed" do + agent = described_class.new(mock_settings({}), nil) expect(agent.ssl_file_tracker).to be_nil agent.shutdown end @@ -646,7 +642,7 @@ def register end context "when ssl reload is disabled" do - let(:agent) { described_class.new(mock_settings("config.reload.automatic" => false), nil) } + let(:agent) { described_class.new(mock_settings({}), nil) } after :each do agent.shutdown @@ -658,7 +654,8 @@ def register end context "when ssl reload is enabled" do - let(:agent) { described_class.new(mock_settings("config.reload.automatic" => true, "ssl.reload.automatic" => true), nil) } + let(:tracker) { LogStash::SslFileTracker.new(org.logstash.common.FileWatchService.create) } + let(:agent) { described_class.new(mock_settings("config.reload.automatic" => true), nil, tracker) } after :each do agent.shutdown diff --git a/logstash-core/spec/logstash/runner_spec.rb b/logstash-core/spec/logstash/runner_spec.rb index d4b683ce16..b7aee36b13 100644 --- a/logstash-core/spec/logstash/runner_spec.rb +++ b/logstash-core/spec/logstash/runner_spec.rb @@ -200,6 +200,69 @@ end end + describe "SSL file tracker creation" do + subject(:runner) { LogStash::Runner.new("") } + let(:fake_tracker) { double("SslFileTracker").as_null_object } + let(:config_file) do + path = Stud::Temporary.pathname + File.write(path, "input { generator { count => 1 } } output { null {} }") + path + end + + # Settings are mutated by `-S`, `-f`, and `-t` flags. + # Reset them before and after each example so we neither inherit nor leak global state. + SSL_FILE_TRACKER_TEST_SETTINGS = { + "config.reload.automatic" => false, + "ssl.reload.automatic" => false, + "config.test_and_exit" => false, + "path.config" => nil, + }.freeze + + def reload_settings + SSL_FILE_TRACKER_TEST_SETTINGS.each { |k, v| LogStash::SETTINGS.set_value(k, v) } + end + + before do + require "logstash/ssl_file_tracker" + allow(LogStash::SslFileTracker).to receive(:new).and_return(fake_tracker) + reload_settings + end + + after { reload_settings } + + it "creates the tracker when both config.reload.automatic and ssl.reload.automatic are true" do + runner.run(["-f", config_file, "-Sconfig.reload.automatic=true", "-Sssl.reload.automatic=true"]) + expect(LogStash::SslFileTracker).to have_received(:new) + end + + it "does not create the tracker when config.reload.automatic is false" do + runner.run(["-f", config_file, "-Sssl.reload.automatic=true"]) + expect(LogStash::SslFileTracker).not_to have_received(:new) + end + + it "does not create the tracker when ssl.reload.automatic is false" do + runner.run(["-f", config_file, "-Sconfig.reload.automatic=true"]) + expect(LogStash::SslFileTracker).not_to have_received(:new) + end + + it "creates the tracker when a bootstrap_check flips config.reload.automatic (CPM-style)" do + flip_check = Class.new do + def self.check(settings) + settings.set("config.reload.automatic", true) + end + end + runner.bootstrap_checks = [flip_check] + runner.run(["-f", config_file, "-Sssl.reload.automatic=true"]) + expect(LogStash::SslFileTracker).to have_received(:new) + end + + it "closes the tracker if the runner aborts before the agent takes ownership" do + # -t: config.test_and_exit + expect(fake_tracker).to receive(:close) + runner.run(["-t", "-f", config_file, "-Sconfig.reload.automatic=true", "-Sssl.reload.automatic=true"]) + end + end + describe "--config.test_and_exit" do subject { LogStash::Runner.new("") } let(:args) { ["-t", "-e", pipeline_string] } diff --git a/logstash-core/spec/logstash/ssl_file_tracker_spec.rb b/logstash-core/spec/logstash/ssl_file_tracker_spec.rb index ca50f655af..2d5a0cd686 100644 --- a/logstash-core/spec/logstash/ssl_file_tracker_spec.rb +++ b/logstash-core/spec/logstash/ssl_file_tracker_spec.rb @@ -560,4 +560,71 @@ def two_cert_pipeline(id = :main) it_behaves_like "two pipelines sharing a path" end end + + describe ".paths_from_settings" do + let(:settings) { double("settings") } + let(:namespace) { "xpack.management" } + + it "returns absolute paths for every configured SSL suffix" do + allow(settings).to receive(:get_value).with("xpack.management.elasticsearch.ssl.certificate_authority").and_return("/tmp/ca.crt") + allow(settings).to receive(:get_value).with("xpack.management.elasticsearch.ssl.truststore.path").and_return("relative/ts.p12") + allow(settings).to receive(:get_value).with("xpack.management.elasticsearch.ssl.keystore.path").and_return(nil) + allow(settings).to receive(:get_value).with("xpack.management.elasticsearch.ssl.certificate").and_return("") + allow(settings).to receive(:get_value).with("xpack.management.elasticsearch.ssl.key").and_return(nil) + + paths = described_class.paths_from_settings(settings, namespace) + expect(paths).to contain_exactly(File.expand_path("/tmp/ca.crt"), File.expand_path("relative/ts.p12")) + end + + it "swallows settings.get_value errors and returns empty array when nothing configured" do + allow(settings).to receive(:get_value).and_raise(ArgumentError, "unknown setting") + expect(described_class.paths_from_settings(settings, namespace)).to eq([]) + end + end + + describe "#consume_stale" do + it "does not yield when id was never stale" do + ran = false + result = tracker.consume_stale(:".cpm") { ran = true } + expect(ran).to eq(false) + expect(result).to eq(false) + end + + it "yields and clears the stale flag when id is stale" do + tracker.instance_variable_get(:@stale_ids).add(:".cpm") + + ran = false + result = tracker.consume_stale(:".cpm") { ran = true } + + expect(ran).to eq(true) + expect(result).to eq(true) + expect(tracker.instance_variable_get(:@stale_ids)).not_to include(:".cpm") + end + + it "accepts string ids" do + tracker.instance_variable_get(:@stale_ids).add(:".cpm") + expect(tracker.consume_stale(".cpm") {}).to eq(true) + end + + it "re-asserts the stale flag when the block raises" do + tracker.instance_variable_get(:@stale_ids).add(:".cpm") + + expect { + tracker.consume_stale(:".cpm") { raise "rebuild failed" } + }.to raise_error("rebuild failed") + + expect(tracker.instance_variable_get(:@stale_ids)).to include(:".cpm") + end + + it "clears only the given id" do + stale = tracker.instance_variable_get(:@stale_ids) + stale.add(:".cpm") + stale.add(:".cpm_license") + + tracker.consume_stale(:".cpm") {} + + expect(stale).not_to include(:".cpm") + expect(stale).to include(:".cpm_license") + end + end end diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index eeba1568ac..bda52d230d 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -8,6 +8,8 @@ require "logstash/json" require 'helpers/elasticsearch_options' require 'helpers/loggable_try' +require 'logstash/ssl_file_tracker' +require 'helpers/elasticsearch_client_holder' require "license_checker/licensed" module LogStash @@ -22,6 +24,9 @@ class RemoteConfigError < LogStash::Error; end VALID_LICENSES = %w(trial standard gold platinum enterprise) FEATURE_INTERNAL = 'management' FEATURE_EXTERNAL = 'logstash' + NAMESPACE = "xpack.#{FEATURE_INTERNAL}".freeze + SSL_TRACK_ID_CPM = :".cpm" + SSL_TRACK_ID_LICENSE = :".cpm_license" SUPPORTED_PIPELINE_SETTINGS = %w( pipeline.workers pipeline.batch.size @@ -33,14 +38,24 @@ class RemoteConfigError < LogStash::Error; end queue.checkpoint.writes ) - def initialize(settings) + def initialize(settings, ssl_file_tracker = nil) super(settings) - if enabled? - @es_options = es_options_from_settings('management', settings) - setup_license_checker(FEATURE_INTERNAL) - license_check(true) + return unless enabled? + + @es_options = es_options_from_settings('management', settings) + + if ssl_file_tracker + paths = LogStash::SslFileTracker.paths_from_settings(@settings, NAMESPACE) + ssl_file_tracker.register_paths(SSL_TRACK_ID_CPM, paths) + ssl_file_tracker.register_paths(SSL_TRACK_ID_LICENSE, paths) end + + @es_client_holder = LogStash::Helpers::ElasticsearchClientHolder.create(ssl_file_tracker, SSL_TRACK_ID_CPM) { build_client } + setup_license_checker(FEATURE_INTERNAL, + ssl_file_tracker: ssl_file_tracker, + tracking_id: SSL_TRACK_ID_LICENSE) + license_check(true) end def match? @@ -179,7 +194,7 @@ def pipeline_ids end def client - @client ||= build_client + @es_client_holder.get end end diff --git a/x-pack/lib/config_management/hooks.rb b/x-pack/lib/config_management/hooks.rb index c0ca586759..57f27e119a 100644 --- a/x-pack/lib/config_management/hooks.rb +++ b/x-pack/lib/config_management/hooks.rb @@ -35,7 +35,7 @@ def after_bootstrap_checks(runner) logger.debug("Removing the `Logstash::Config::Source::Local` and replacing it with `ElasticsearchSource`") runner.source_loader.remove_source(LogStash::Config::Source::Local) runner.source_loader.remove_source(LogStash::Config::Source::MultiLocal) - source = LogStash::ConfigManagement::ElasticsearchSource.new(runner.settings) + source = LogStash::ConfigManagement::ElasticsearchSource.new(runner.settings, runner.ssl_file_tracker) runner.source_loader.add_source(source) end end diff --git a/x-pack/lib/helpers/elasticsearch_client_holder.rb b/x-pack/lib/helpers/elasticsearch_client_holder.rb new file mode 100644 index 0000000000..24a6755883 --- /dev/null +++ b/x-pack/lib/helpers/elasticsearch_client_holder.rb @@ -0,0 +1,125 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +require "logstash/logging/logger" + +module LogStash module Helpers + ## + # Holds an Elasticsearch client. + # @see ElasticsearchClientHolder#create + module ElasticsearchClientHolder + + ## + # Creates an ElasticsearchClientHolder. + # If a `tracker` is specified, an SslRebuildable instance + # connected to the tracker with the provided id is returned; + # Otherwise, a Lazy instance is returned. + # @param tracker [#consume_stale, nil] + # @param id [#to_sym, nil] + # @yieldreturn [LogStash::Outputs::ElasticSearch::HttpClient] + # @return [ElasticsearchClientHolder] + def self.create(tracker=nil, id=nil, &client_factory) + return Lazy.new(&client_factory) if tracker.nil? + + SslRebuildable.new(tracker, id, &client_factory) + end + + ## + # Get a current client for immediate use. + # Consumers MUST NOT cache the returned client. + # @return [LogStash::Outputs::ElasticSearch::HttpClient] + def get + fail NotImplementedError + end + + ## + # close the current client, if it exists and is connected + # @return [void] + def close + fail NotImplementedError + end + + ## + # An ElasticsearchClientHolder that lazily creates the client when it is needed, + # caching the result indefinitely. + # @api internal (see ElasticsearchClientHolder::create) + class Lazy + include ElasticsearchClientHolder + include LogStash::Util::Loggable + + ## + # @yieldreturn [LogStash::Outputs::ElasticSearch::HttpClient] + def initialize(&client_factory) + fail ArgumentError, "client_factory block is required" unless block_given? + @client_factory = client_factory + end + + def get + @client || Util.synchronize(self) do + @client ||= begin + logger.debug("initializing ES client") + @client_factory.call + end + end + end + + def close + Util.synchronize(self) do + @client&.close + end + end + end + + ## + # An ElasticsearchClientHolder that is connected to the provided tracker by the provided id. + # The client is created lazily, and is re-created when the tracker has marked the given id as stale. + # @api internal (see ElasticsearchClientHolder::create) + class SslRebuildable + include ElasticsearchClientHolder + include LogStash::Util::Loggable + + attr_reader :tracker + attr_reader :id + + ## + # @param tracker [#consume_stale] + # @param id [#to_sym] + # @yieldreturn [LogStash::Outputs::ElasticSearch::HttpClient] + def initialize(tracker, id, &client_factory) + @tracker = tracker or fail(ArgumentError, "tracker is required") + @id = id&.to_sym or fail(ArgumentError, "id is required") + @client_factory = client_factory or fail(ArgumentError, "client_factory block is required") + end + + def get + Util.synchronize(self) do + @client ||= begin + logger.debug("initializing rebuildable elasticsearch client `#{@id}`") + @client_factory.call + end + + @tracker.consume_stale(@id) do + begin + old_client = @client + @client = @client_factory.call + logger.debug("rebuilt elasticsearch client `#{@id}` on certificate change") + old_client&.close rescue logger.warn("error closing stale elasticsearch client `#{@id}`", exception: $!.class, message: $!.message) + rescue => e + logger.warn("failed to rebuild elasticsearch client `#{@id}`", exception: e.class, message: e.message) + raise + end + end + + @client + end + end + + def close + Util.synchronize(self) do + @client&.close + end + end + end + end +end end diff --git a/x-pack/lib/license_checker/license_manager.rb b/x-pack/lib/license_checker/license_manager.rb index c56061cc94..b97fdd6dcb 100644 --- a/x-pack/lib/license_checker/license_manager.rb +++ b/x-pack/lib/license_checker/license_manager.rb @@ -35,7 +35,6 @@ def current_xpack_info def fetch_xpack_info xpack_info = @license_reader.fetch_xpack_info - update_xpack_info(xpack_info) end diff --git a/x-pack/lib/license_checker/license_reader.rb b/x-pack/lib/license_checker/license_reader.rb index cc1bcea634..653846c500 100644 --- a/x-pack/lib/license_checker/license_reader.rb +++ b/x-pack/lib/license_checker/license_reader.rb @@ -4,6 +4,7 @@ require 'logstash/logging/logger' require 'logstash/outputs/elasticsearch' +require 'helpers/elasticsearch_client_holder' module LogStash module LicenseChecker @@ -12,12 +13,14 @@ class LicenseReader XPACK_MISSING_STATUS_CODES = [400, 404] - def initialize(settings, feature, options) + def initialize(settings, feature, options, ssl_file_tracker: nil, tracking_id: nil) @namespace = "xpack.#{feature}" @settings = settings es_options = options.merge('resurrect_delay' => 30) @es_options = Helpers::ElasticsearchOptions::es_options_with_product_origin_header(es_options) + + @es_client_holder = LogStash::Helpers::ElasticsearchClientHolder.create(ssl_file_tracker, tracking_id) { build_client } end ## @@ -65,7 +68,7 @@ def fetch_cluster_info ## # @api private def client - @client ||= build_client + @es_client_holder.get end private diff --git a/x-pack/lib/license_checker/licensed.rb b/x-pack/lib/license_checker/licensed.rb index c94a0cff55..ec0d65eecc 100644 --- a/x-pack/lib/license_checker/licensed.rb +++ b/x-pack/lib/license_checker/licensed.rb @@ -30,8 +30,11 @@ module LicenseChecker module Licensed include LogStash::Util::Loggable - def setup_license_checker(feature, refresh_period = 30, refresh_unit = TimeUnit::SECONDS) + def setup_license_checker(feature, refresh_period = 30, refresh_unit = TimeUnit::SECONDS, + ssl_file_tracker: nil, tracking_id: nil) @feature = feature + @ssl_file_tracker = ssl_file_tracker + @ssl_tracking_id = tracking_id license_manager = LogStash::LicenseChecker::LicenseManager.new(license_reader, feature, refresh_period, refresh_unit) xpack_info = license_manager.current_xpack_info @@ -73,7 +76,9 @@ def with_license_check(raise_on_error = false) alias_method :license_check, :with_license_check def license_reader - LogStash::LicenseChecker::LicenseReader.new(@settings, @feature, @es_options) + LogStash::LicenseChecker::LicenseReader.new(@settings, @feature, @es_options, + ssl_file_tracker: @ssl_file_tracker, + tracking_id: @ssl_tracking_id) end def update_license_state(xpack_info, is_serverless) diff --git a/x-pack/lib/monitoring/internal_pipeline_source.rb b/x-pack/lib/monitoring/internal_pipeline_source.rb index 15d782e778..7c27f603a4 100644 --- a/x-pack/lib/monitoring/internal_pipeline_source.rb +++ b/x-pack/lib/monitoring/internal_pipeline_source.rb @@ -5,6 +5,7 @@ require "logstash/config/source/base" require 'license_checker/licensed' require 'helpers/elasticsearch_options' +require 'logstash/ssl_file_tracker' module LogStash module Monitoring class InternalPipelineSource < LogStash::Config::Source::Base @@ -12,14 +13,23 @@ class InternalPipelineSource < LogStash::Config::Source::Base include LogStash::Helpers::ElasticsearchOptions include LogStash::Util::Loggable FEATURE = 'monitoring' + SSL_TRACK_ID_LICENSE = :".monitoring_license" - def initialize(pipeline_config, agent, settings) + def initialize(pipeline_config, agent, settings, ssl_file_tracker = nil) super(pipeline_config.settings) @pipeline_config = pipeline_config @settings = settings @agent = agent @es_options = es_options_from_settings(FEATURE, @settings) - setup_license_checker(FEATURE) + + if ssl_file_tracker + paths = LogStash::SslFileTracker.paths_from_settings(@settings, "xpack.#{FEATURE}") + ssl_file_tracker.register_paths(SSL_TRACK_ID_LICENSE, paths) + end + + setup_license_checker(FEATURE, + ssl_file_tracker: ssl_file_tracker, + tracking_id: SSL_TRACK_ID_LICENSE) end def pipeline_configs diff --git a/x-pack/lib/monitoring/monitoring.rb b/x-pack/lib/monitoring/monitoring.rb index 0ef310344e..1d4ba3b1fd 100644 --- a/x-pack/lib/monitoring/monitoring.rb +++ b/x-pack/lib/monitoring/monitoring.rb @@ -145,7 +145,7 @@ def after_agent(runner) logger.trace("registering the metrics pipeline") LogStash::SETTINGS.set("node.uuid", runner.agent.id) - internal_pipeline_source = LogStash::Monitoring::InternalPipelineSource.new(setup_metrics_pipeline, runner.agent, LogStash::SETTINGS.clone) + internal_pipeline_source = LogStash::Monitoring::InternalPipelineSource.new(setup_metrics_pipeline, runner.agent, LogStash::SETTINGS.clone, runner.ssl_file_tracker) runner.source_loader.add_source(internal_pipeline_source) rescue => e logger.error("Failed to set up the metrics pipeline", :message => e.message, :backtrace => e.backtrace) diff --git a/x-pack/qa/integration/management/cpm_tls_hot_reload_spec.rb b/x-pack/qa/integration/management/cpm_tls_hot_reload_spec.rb new file mode 100644 index 0000000000..ce3f0f1b56 --- /dev/null +++ b/x-pack/qa/integration/management/cpm_tls_hot_reload_spec.rb @@ -0,0 +1,128 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +require_relative "../spec_helper" +require "stud/temporary" +require "net/http" +require "json" + +describe "TLS hot-reload: CPM (ElasticsearchSource) detects cert changes and rebuilds clients", :skip_fips do + PIPELINE_ID = "cpm-tls-hot-reload" + + before(:all) do + # generate certs + @cert_dir = Stud::Temporary.directory + + @ca_key, @ca_cert = generate_ca + @ca2_key, @ca2_cert = generate_ca + + es_key, es_cert = generate_leaf(@ca_key, @ca_cert) + + # add CPM pipeline to ES + @elasticsearch_service = elasticsearch_with_tls(es_cert.to_pem, es_key.to_pem, @ca_cert.to_pem) + + tls_client = elasticsearch_client_tls + begin + tls_client.perform_request(:delete, "_logstash/pipeline/#{PIPELINE_ID}") + rescue Elastic::Transport::Transport::Errors::NotFound + end + tls_client.perform_request(:put, "_logstash/pipeline/#{PIPELINE_ID}", {}, + { pipeline: "input { generator { count => 0 } } output { null {} }", + username: "log.stash", + pipeline_metadata: { version: "1" }, + pipeline_settings: { "pipeline.batch.delay": "50" }, + last_modified: Time.now.utc.iso8601 }) + + # write ca.crt + @ca_file = File.join(@cert_dir, "ca.crt") + File.write(@ca_file, @ca_cert.to_pem) + + # start Logstash with CPM + # log.level: debug because the rebuild line in ElasticsearchClientHolder logs at DEBUG. + @logstash_service = logstash_with_empty_default("bin/logstash -w 1", { + :settings => { + "log.level" => "debug", + "ssl.reload.automatic" => true, + "xpack.management.enabled" => true, + "xpack.management.pipeline.id" => [PIPELINE_ID], + "xpack.management.elasticsearch.hosts" => ["https://localhost:9200"], + "xpack.management.elasticsearch.username" => "elastic", + "xpack.management.elasticsearch.password" => elastic_password, + "xpack.management.elasticsearch.ssl.certificate_authority" => @ca_file, + "xpack.management.logstash.poll_interval" => "2s", + "xpack.monitoring.enabled" => false + }, + :belzebuth => { + :wait_condition => /Pipelines running/, + :timeout => 60 + } + }) + end + + after(:all) do + cleanup_tls_certs_from_es_config + elasticsearch_client_tls.perform_request(:delete, "_logstash/pipeline/#{PIPELINE_ID}") rescue nil + @logstash_service&.stop + @elasticsearch_service&.stop + end + + context "CA cert file changes" do + # Appending a second CA to the bundle changes the file's content; SHA-256 changes. + # The original CA stays in the bundle so ES connectivity survives throughout. + # SslFileTracker detects the change and both the ElasticsearchSource client + # and the LicenseReader client are rebuilt independently. + it "detects the change, rebuilds both clients, and the new connection remains functional" do + Stud.try(15.times, [StandardError]) do + stats = logstash_pipeline_stats(PIPELINE_ID) + raise "CPM pipeline not running yet" unless stats + end + + # No reloads before cert rotation + stats = logstash_pipeline_stats(PIPELINE_ID) + expect(stats["reloads"]["successes"]).to eq(0) + expect(stats["reloads"]["failures"]).to eq(0) + + # Rotate: append second CA. Content changes; connection still works. + File.open(@ca_file, "a") { |f| f.write(@ca2_cert.to_pem) } + + # Two rebuild events: the CPM ES client (driven by Agent's reload tick) + # and the LicenseReader client (driven by LicenseManager's scheduler tick). + wait_for_log_count(/rebuilt elasticsearch client.*on certificate change/, 2) + + # Push a pipeline update so CPM must fetch via the new client + elasticsearch_client_tls.perform_request(:put, "_logstash/pipeline/#{PIPELINE_ID}", {}, + { pipeline: "input { generator { count => 0 } } output { sink {} }", + username: "log.stash", + pipeline_metadata: { version: "2" }, + pipeline_settings: { "pipeline.batch.delay": "50" }, + last_modified: Time.now.utc.iso8601 }) + + # Wait for CPM to fetch and apply the update to prove new client can connect + Stud.try(30.times, [StandardError]) do + stats = logstash_pipeline_stats(PIPELINE_ID) + raise "Pipeline not reloaded yet" unless stats && stats["reloads"]["successes"] == 1 + end + + expect(@logstash_service.stdout_lines.join("\n")).not_to match(/\[ERROR\]/) + end + end + + private + + def wait_for_log_count(pattern, count, tries: 30) + Stud.try(tries.times, [StandardError]) do + actual = @logstash_service.stdout_lines.join("\n").scan(pattern).size + raise "Log pattern #{pattern.inspect} seen #{actual} times, want #{count}" unless actual == count + end + end + + def logstash_pipeline_stats(pipeline_id) + uri = URI("http://localhost:9600/_node/stats/pipelines/#{pipeline_id}") + resp = Net::HTTP.get_response(uri) + return nil unless resp.code == "200" + JSON.parse(resp.body).dig("pipelines", pipeline_id) + rescue + nil + end +end diff --git a/x-pack/qa/integration/monitoring/monitoring_tls_hot_reload_spec.rb b/x-pack/qa/integration/monitoring/monitoring_tls_hot_reload_spec.rb new file mode 100644 index 0000000000..8cce3f6737 --- /dev/null +++ b/x-pack/qa/integration/monitoring/monitoring_tls_hot_reload_spec.rb @@ -0,0 +1,127 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +require_relative "../spec_helper" +require "stud/temporary" + +# LicenseManager runs fetch_license every 30 seconds via a background scheduler. +# With config.reload.automatic: true, Runner creates the SslFileTracker and passes +# it into InternalPipelineSource, which attaches it to the LicenseManager. The +# scheduler then notices cert changes and rebuilds the client on its next tick. +# With config.reload.automatic: false (the default), no tracker is created, so +# cert changes are silently ignored. +# +# Note: -e (config.string) cannot be combined with an explicitly-set +# config.reload.automatic, because config_reload_automatic? checks set? +# (not the value). Both tests use -f to avoid this constraint. + +describe "Monitoring TLS hot-reload", :skip_fips do + PIPELINE_CONF = "input { generator { count => 0 } } output { null {} }" + + def setup_monitoring_logstash(extra_settings = {}) + @cert_dir = Stud::Temporary.directory + + @ca_key, @ca_cert = generate_ca + @ca2_key, @ca2_cert = generate_ca + + es_key, es_cert = generate_leaf(@ca_key, @ca_cert) + @elasticsearch_service = elasticsearch_with_tls(es_cert.to_pem, es_key.to_pem, @ca_cert.to_pem) + + @ca_file = File.join(@cert_dir, "ca.crt") + File.write(@ca_file, @ca_cert.to_pem) + + config_file = File.join(@cert_dir, "pipeline.conf") + File.write(config_file, PIPELINE_CONF) + + # log.level: debug because the rebuild line in ElasticsearchClientHolder logs at DEBUG. + base_settings = { + "log.level" => "debug", + "xpack.monitoring.enabled" => true, + "xpack.monitoring.allow_legacy_collection" => true, + "xpack.monitoring.elasticsearch.hosts" => ["https://localhost:9200"], + "xpack.monitoring.elasticsearch.username" => "elastic", + "xpack.monitoring.elasticsearch.password" => elastic_password, + "xpack.monitoring.elasticsearch.ssl.certificate_authority" => @ca_file + } + + @logstash_service = logstash_with_empty_default("bin/logstash -f #{config_file} -w 1", { + :settings => base_settings.merge(extra_settings), + :belzebuth => { + :wait_condition => /Pipelines running/, + :timeout => 60 + } + }) + end + + def teardown_monitoring_logstash + cleanup_tls_certs_from_es_config + @logstash_service&.stop + @elasticsearch_service&.stop + end + + def wait_for_log_line(pattern, tries: 30) + Stud.try(tries.times, [StandardError]) do + unless @logstash_service.stdout_lines.join("\n") =~ pattern + raise "Log pattern not yet visible: #{pattern.inspect}" + end + end + end + + context "config.reload.automatic: true" do + before(:all) do + setup_monitoring_logstash( + "config.reload.automatic" => true, + "config.reload.interval" => "2s", + "ssl.reload.automatic" => true + ) + end + + after(:all) { teardown_monitoring_logstash } + + it "cert rotation rebuilds the license reader client and monitoring continues" do + # Wait for initial monitoring docs before rotation to confirm monitoring is active + initial_count = Stud.try(15.times, [StandardError]) do + elasticsearch_client_tls.indices.refresh(index: MONITORING_INDEXES) + resp = elasticsearch_client_tls.count(index: MONITORING_INDEXES) + raise "No monitoring docs yet" unless resp["count"] > 0 + resp["count"] + end + + # Rotate: append second CA. Original CA stays so ES connectivity survives. + File.open(@ca_file, "a") { |f| f.write(@ca2_cert.to_pem) } + + # LicenseManager scheduler fires every 30s + wait_for_log_line(/rebuilt elasticsearch client.*on certificate change/) + + # Doc count must grow to prove the rebuilt client can still write monitoring data to ES + Stud.try(15.times, [StandardError]) do + elasticsearch_client_tls.indices.refresh(index: MONITORING_INDEXES) + count = elasticsearch_client_tls.count(index: MONITORING_INDEXES)["count"] + raise "Doc count not yet exceeded initial (#{count} <= #{initial_count})" unless count > initial_count + end + + expect(@logstash_service.stdout_lines.join("\n")).not_to match(/\[ERROR\]/) + end + end + + context "config.reload.automatic: false" do + before(:all) { setup_monitoring_logstash } + + after(:all) { teardown_monitoring_logstash } + + it "cert rotation is ignored and no error is raised" do + initial_length = @logstash_service.stdout_lines.length + + # Rotate: append second CA. + File.open(@ca_file, "a") { |f| f.write(@ca2_cert.to_pem) } + + # Wait longer than one LicenseManager scheduler cycle (30s) to confirm silence. + sleep 35 + + output_after_rotation = @logstash_service.stdout_lines[initial_length..].join("\n") + expect(output_after_rotation).not_to match(/rebuilt elasticsearch client.*on certificate change/) + expect(output_after_rotation).not_to match(/\[ERROR\]/) + end + end +end diff --git a/x-pack/qa/integration/support/helpers.rb b/x-pack/qa/integration/support/helpers.rb index 39b510049d..6ba2e032f6 100644 --- a/x-pack/qa/integration/support/helpers.rb +++ b/x-pack/qa/integration/support/helpers.rb @@ -9,6 +9,7 @@ require "stud/try" require "open3" require "time" +require_relative "../../../../qa/integration/framework/cert_helpers" VERSIONS_YML_PATH = File.join(File.dirname(__FILE__), "..", "..", "..", "..", "versions.yml") VERSION_PATH = File.join(File.dirname(__FILE__), "..", "..", "..", "VERSION") @@ -183,3 +184,60 @@ def verify_response!(cmd, response) raise "Something went wrong when installing xpack,\ncmd: #{cmd}\nresponse: #{response}" end end + +# Starts Elasticsearch with HTTP/TLS enabled. +# Returns the Belzebuth process object. +def elasticsearch_with_tls(server_cert_pem, server_key_pem, ca_pem) + es_config_dir = File.join(get_elasticsearch_path, "config") + File.write(File.join(es_config_dir, "test-ca.crt"), ca_pem) + File.write(File.join(es_config_dir, "test-server.crt"), server_cert_pem) + File.write(File.join(es_config_dir, "test-server.key"), server_key_pem) + + temporary_path_data = Stud::Temporary.directory + settings = { + "path.data" => temporary_path_data, + "discovery.type" => "single-node", + "xpack.monitoring.collection.enabled" => true, + "xpack.security.enabled" => true, + "xpack.security.http.ssl.enabled" => true, + "xpack.security.http.ssl.certificate" => "test-server.crt", + "xpack.security.http.ssl.key" => "test-server.key", + "xpack.security.http.ssl.certificate_authorities" => "test-ca.crt", + "action.destructive_requires_name" => false + } + settings_args = settings.map { |k, v| "-E#{k}=#{v}" } + + bootstrap_elastic_password unless bootstrap_password_exists? + + cmd = "bin/elasticsearch #{settings_args.join(' ')}" + puts "Running elasticsearch with TLS: #{cmd}" + response = Belzebuth.run(cmd, { + :directory => get_elasticsearch_path, + :wait_condition => /ClusterStateLicenseService.*license.*valid/, + :timeout => 15 * 60 + }) + raise "Could not start Elasticsearch with TLS: #{response}" unless response.successful? + + tls_client = elasticsearch_client_tls + if tls_client.perform_request(:get, '_license').body['license']['type'] != 'trial' + resp = tls_client.perform_request(:post, '_license/start_trial', "acknowledge" => true) + raise "Trial not started: #{resp.body}" if resp.body["trial_was_started"] != true + end + + response +end + +# This ES client is only used for test. SSL verification is disabled. +def elasticsearch_client_tls + Elasticsearch::Client.new( + :url => "https://elastic:#{elastic_password}@localhost:9200", + :transport_options => { :ssl => { :verify => false } } + ) +end + +def cleanup_tls_certs_from_es_config + es_config_dir = File.join(get_elasticsearch_path, "config") + %w[test-ca.crt test-server.crt test-server.key].each do |f| + FileUtils.rm_f(File.join(es_config_dir, f)) + end +end diff --git a/x-pack/spec/config_management/elasticsearch_source_spec.rb b/x-pack/spec/config_management/elasticsearch_source_spec.rb index 58a6222497..c0e1490a85 100644 --- a/x-pack/spec/config_management/elasticsearch_source_spec.rb +++ b/x-pack/spec/config_management/elasticsearch_source_spec.rb @@ -10,6 +10,7 @@ require "license_checker/license_manager" require "monitoring/monitoring" require "stud/temporary" +require "logstash/ssl_file_tracker" describe LogStash::ConfigManagement::ElasticsearchSource do let(:system_indices_api) { LogStash::ConfigManagement::SystemIndicesFetcher::SYSTEM_INDICES_API_PATH } @@ -844,4 +845,61 @@ expect { subject.get_es_version }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) end end + + describe "ssl_file_tracker injection" do + let(:tracker) { instance_double(LogStash::SslFileTracker) } + let(:license_manager) { instance_double(LogStash::LicenseChecker::LicenseManager).as_null_object } + + before do + allow_any_instance_of(described_class).to receive(:setup_license_checker).and_return(license_manager) + allow_any_instance_of(described_class).to receive(:license_check) + allow(tracker).to receive(:register_paths) + allow(tracker).to receive(:consume_stale).and_return(false) + end + + it "registers both CPM and CPM-license paths with the tracker" do + described_class.new(system_settings, tracker) + expect(tracker).to have_received(:register_paths).with(described_class::SSL_TRACK_ID_CPM, anything) + expect(tracker).to have_received(:register_paths).with(described_class::SSL_TRACK_ID_LICENSE, anything) + end + + it "invalidates the CPM client when the tracker reports stale during pipeline_configs" do + allow(tracker).to receive(:consume_stale).with(described_class::SSL_TRACK_ID_CPM).and_yield + fetcher = double("fetcher", fetch_config: nil, get_pipeline_ids: []) + instance = described_class.new(system_settings, tracker) + allow(instance).to receive(:license_check) + allow(instance).to receive(:get_es_version).and_return(major: 8, minor: 0) + allow(instance).to receive(:get_pipeline_fetcher).and_return(fetcher) + + old_client = double("old_client") + expect(old_client).to receive(:close) + instance.instance_variable_get(:@es_client_holder).instance_variable_set(:@client, old_client) + allow(instance).to receive(:build_client).and_return(double("new_client")) + + instance.pipeline_configs + end + + it "does not invalidate the client when the tracker reports not stale" do + fetcher = double("fetcher", fetch_config: nil, get_pipeline_ids: []) + instance = described_class.new(system_settings, tracker) + allow(instance).to receive(:license_check) + allow(instance).to receive(:get_es_version).and_return(major: 8, minor: 0) + allow(instance).to receive(:get_pipeline_fetcher).and_return(fetcher) + + old_client = double("old_client") + expect(old_client).not_to receive(:close) + instance.instance_variable_get(:@es_client_holder).instance_variable_set(:@client, old_client) + + instance.pipeline_configs + end + + it "threads the tracker and CPM license id into setup_license_checker" do + expect_any_instance_of(described_class).to receive(:setup_license_checker).with( + described_class::FEATURE_INTERNAL, + ssl_file_tracker: tracker, + tracking_id: described_class::SSL_TRACK_ID_LICENSE + ).and_return(license_manager) + described_class.new(system_settings, tracker) + end + end end diff --git a/x-pack/spec/helpers/elasticsearch_client_holder_spec.rb b/x-pack/spec/helpers/elasticsearch_client_holder_spec.rb new file mode 100644 index 0000000000..14b3b66c86 --- /dev/null +++ b/x-pack/spec/helpers/elasticsearch_client_holder_spec.rb @@ -0,0 +1,104 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +require "spec_helper" +require "helpers/elasticsearch_client_holder" +require "logstash/ssl_file_tracker" + +describe LogStash::Helpers::ElasticsearchClientHolder do + let(:tracker) { instance_double(LogStash::SslFileTracker) } + let(:tracking_id) { :".cpm" } + let(:factory_calls) { [] } + let(:factory) { ->() { factory_calls << :built; double("client", close: nil) } } + subject(:es_client_holder) { described_class.create(tracker, tracking_id, &factory) } + + describe "#initialize" do + it "raises when no factory block is given" do + expect { described_class.create(tracker, :".cpm") } + .to raise_error(ArgumentError, /client_factory block is required/) + end + end + + describe "::create" do + shared_examples "without factory block" do + it "fails helpfully when no factory block is provided" do + expect { described_class.create(tracker, tracking_id) } + .to raise_error(ArgumentError, /client_factory block is required/) + end + end + + context "when no tracker is given" do + let(:tracker) { nil } + it "returns a Lazy instance" do + client_holder = described_class.create(tracker, tracking_id, &factory) + expect(client_holder).to be_a_kind_of LogStash::Helpers::ElasticsearchClientHolder + expect(client_holder).to be_a_kind_of LogStash::Helpers::ElasticsearchClientHolder::Lazy + end + include_examples "without factory block" + end + + context "when a tracker and id are both given" do + it "returns an SslRebuildable instance with the provided tracker and id" do + client_holder = described_class.create(tracker, tracking_id, &factory) + expect(client_holder).to be_a_kind_of LogStash::Helpers::ElasticsearchClientHolder + expect(client_holder).to be_a_kind_of LogStash::Helpers::ElasticsearchClientHolder::SslRebuildable + expect(client_holder.tracker).to eq(tracker) + expect(client_holder.id).to eq(tracking_id) + end + + include_examples "without factory block" + end + + context "when `tracker` is provided and `id` is not" do + let(:tracking_id) { nil } + it "fails helpfully" do + expect { described_class.create(tracker, tracking_id) }.to raise_error(ArgumentError, /id is required/) + end + end + end + + shared_examples "#get memoization" do + it "lazily calls the factory once and caches the result" do + es_client_holder + expect(factory_calls.size).to eq(0) + first = es_client_holder.get + second = es_client_holder.get + expect(factory_calls.size).to eq(1) + expect(second).to be(first) + end + end + + describe "Lazy implementation" do + let(:es_client_holder) { described_class::Lazy.new(&factory) } + + describe "#get" do + include_examples "#get memoization" + end + end + + describe "SslRebuildable implementation" do + let(:es_client_holder) { described_class::SslRebuildable.new(tracker, tracking_id, &factory) } + + describe "#get" do + before(:each) do + # mimic tracking behaviour when the id has NOT been marked stale + allow(tracker).to receive(:consume_stale).with(tracking_id).and_return(nil) + end + include_examples "#get memoization" + + it "reloads the client when it has been invalidated" do + existing = es_client_holder.get + expect(existing).to receive(:close) + + # mimic tracker behaviour when the id has been marked stale + allow(tracker).to receive(:consume_stale).with(tracking_id).and_yield + + expect do + fresh = es_client_holder.get + expect(fresh).to_not be(existing) + end.to change { factory_calls.size }.by(1) + end + end + end +end diff --git a/x-pack/spec/license_checker/license_manager_spec.rb b/x-pack/spec/license_checker/license_manager_spec.rb index d6832dea37..af41a73565 100644 --- a/x-pack/spec/license_checker/license_manager_spec.rb +++ b/x-pack/spec/license_checker/license_manager_spec.rb @@ -6,6 +6,8 @@ require "logstash/json" require "license_checker/license_manager" require 'monitoring/monitoring' +require "logstash/runner" +require "logstash/ssl_file_tracker" class Observer attr_reader :xpack_info @@ -23,6 +25,7 @@ def update(xpack_info, is_serverless) describe LogStash::LicenseChecker::LicenseManager do let(:subject) { described_class.new(license_reader, 'monitoring') } + let(:status) { "active"} let(:type) { 'trial' } diff --git a/x-pack/spec/license_checker/license_reader_spec.rb b/x-pack/spec/license_checker/license_reader_spec.rb index 1e9f9caba6..f691d38ee9 100644 --- a/x-pack/spec/license_checker/license_reader_spec.rb +++ b/x-pack/spec/license_checker/license_reader_spec.rb @@ -6,8 +6,10 @@ require 'support/helpers' require "license_checker/license_reader" require "helpers/elasticsearch_options" +require "helpers/elasticsearch_client_holder" require "monitoring/monitoring" require "logstash/runner" +require "logstash/ssl_file_tracker" describe LogStash::LicenseChecker::LicenseReader do let(:elasticsearch_url) { "https://localhost:9898" } @@ -214,4 +216,41 @@ expect(subject.client.client_settings[:headers]).to include(product_origin_header) end end + + describe "SSL tracker wiring" do + let(:tracker) { instance_double(LogStash::SslFileTracker) } + subject(:license_reader) do + described_class.new(system_settings, 'monitoring', elasticsearch_options, + ssl_file_tracker: tracker, tracking_id: :".cpm_license") + end + + it "passes the tracker and tracking_id to its SslRebuildable" do + client_holder = license_reader.instance_variable_get(:@es_client_holder) + expect(client_holder).to be_a_kind_of LogStash::Helpers::ElasticsearchClientHolder + expect(client_holder).to be_a_kind_of LogStash::Helpers::ElasticsearchClientHolder::SslRebuildable + expect(client_holder.id).to eq(:".cpm_license") + expect(client_holder.tracker).to eq(tracker) + end + + it "client delegates to the es_client_holder" do + client_holder = license_reader.instance_variable_get(:@es_client_holder) + expect(client_holder).to be_a_kind_of LogStash::Helpers::ElasticsearchClientHolder + expect(client_holder).to be_a_kind_of LogStash::Helpers::ElasticsearchClientHolder::SslRebuildable + expect(client_holder).to receive(:get) + + subject.client + end + + context "when tracker is not provided" do + let(:tracker) { nil } + it "uses a non-rebuildable elasticsearch client holder" do + client_holder = license_reader.instance_variable_get(:@es_client_holder) + expect(client_holder).to be_a_kind_of LogStash::Helpers::ElasticsearchClientHolder + expect(client_holder).to be_a_kind_of LogStash::Helpers::ElasticsearchClientHolder::Lazy + expect(client_holder).to receive(:get) + + subject.client + end + end + end end