diff --git a/Source/messaging/MessageUnit.cpp b/Source/messaging/MessageUnit.cpp index 6ed5543f3..ef9c8faf9 100644 --- a/Source/messaging/MessageUnit.cpp +++ b/Source/messaging/MessageUnit.cpp @@ -22,6 +22,12 @@ namespace Thunder { +ENUM_CONVERSION_BEGIN(Thunder::Messaging::MessageUnit::OutputMode) + { Thunder::Messaging::MessageUnit::PLUGIN, _TXT("plugin") }, + { Thunder::Messaging::MessageUnit::DIRECT, _TXT("direct") }, + { Thunder::Messaging::MessageUnit::BOTH, _TXT("both") }, +ENUM_CONVERSION_END(Thunder::Messaging::MessageUnit::OutputMode) + namespace Messaging { uint16_t MessageUnit::Serialize(uint8_t* buffer, const uint16_t length, const string& module) @@ -359,30 +365,43 @@ namespace Thunder { */ /* virtual */ void MessageUnit::Push(const Core::Messaging::MessageInfo& messageInfo, const Core::Messaging::IEvent* message) { - //logging messages can happen in Core, meaning, otherside plugin can be not started yet - //those should be just printed - if (_dataBuffer == nullptr) { + const MessageUnit::OutputMode outputMode = _settings.EffectiveOutput(messageInfo); + + const bool sendDirect = (outputMode == MessageUnit::DIRECT) || (outputMode == MessageUnit::BOTH); + const bool sendToPlugin = (outputMode == MessageUnit::PLUGIN) || (outputMode == MessageUnit::BOTH); + + if (sendDirect == true) { _direct.Output(messageInfo, message); - } else { - const uint16_t messageSize = _settings.MessageSize(); - ASSERT(messageSize != 0); - uint8_t* serializationBuffer = static_cast(ALLOCA(messageSize)); - uint16_t length = 0; + } - ASSERT(messageInfo.Type() != Core::Messaging::Metadata::type::INVALID); + if (sendToPlugin == true) { - length = messageInfo.Serialize(serializationBuffer, messageSize); + if (_dataBuffer != nullptr) { + const uint16_t messageSize = _settings.MessageSize(); + ASSERT(messageSize != 0); + uint8_t* serializationBuffer = static_cast(ALLOCA(messageSize)); + uint16_t length = 0; - //only serialize message if the information could fit - if (length != 0) { - length += message->Serialize(serializationBuffer + length, messageSize - length); + ASSERT(messageInfo.Type() != Core::Messaging::Metadata::type::INVALID); - if (_dataBuffer->PushData(length, serializationBuffer) != Core::ERROR_NONE) { - TRACE_L1("Unable to push message data!"); + length = messageInfo.Serialize(serializationBuffer, messageSize); + + //only serialize message if the information could fit + if (length != 0) { + length += message->Serialize(serializationBuffer + length, messageSize - length); + + if (_dataBuffer->PushData(length, serializationBuffer) != Core::ERROR_NONE) { + TRACE_L1("Unable to push message data!"); + } + } + else { + TRACE_L1("Unable to push data, buffer is too small!"); } } - else { - TRACE_L1("Unable to push data, buffer is too small!"); + else if (sendDirect == false) { + // Buffer unavailable (early startup or DirectOutput mode without plugin overrides): + // fall back to direct output if we haven't already sent it directly. + _direct.Output(messageInfo, message); } } } diff --git a/Source/messaging/MessageUnit.h b/Source/messaging/MessageUnit.h index 94c57c78a..2afcb7aa1 100644 --- a/Source/messaging/MessageUnit.h +++ b/Source/messaging/MessageUnit.h @@ -69,6 +69,18 @@ namespace Thunder { FLUSH_ABBREVIATED = 2 }; + // Determines where a message type/category/module is routed. + // PLUGIN - sent to the data buffer (MessageControl plugin) + // DIRECT - printed immediately via DirectOutput (console/syslog) + // BOTH - sent to both destinations + // The default when no explicit override is configured is PLUGIN in normal mode + // and DIRECT in DirectOutput mode (when starting Thunder with -f). + enum OutputMode : uint8_t { + PLUGIN = 0, + DIRECT = 1, + BOTH = 2 + }; + class EXTERNAL Buffer : public Core::IPC::BufferType(~0)> { public: Buffer() @@ -292,10 +304,12 @@ namespace Thunder { , Module() , Category() , Enabled(false) + , Output(MessageUnit::PLUGIN) { Add(_T("module"), &Module); Add(_T("category"), &Category); Add(_T("enabled"), &Enabled); + Add(_T("output"), &Output); } Entry(const string& module, const string& category, const bool enabled) : Entry() @@ -309,20 +323,24 @@ namespace Thunder { , Module(std::move(other.Module)) , Category(std::move(other.Category)) , Enabled(std::move(other.Enabled)) + , Output(std::move(other.Output)) { Add(_T("module"), &Module); Add(_T("category"), &Category); Add(_T("enabled"), &Enabled); + Add(_T("output"), &Output); } Entry(const Entry& other) : Core::JSON::Container() , Module(other.Module) , Category(other.Category) , Enabled(other.Enabled) + , Output(other.Output) { Add(_T("module"), &Module); Add(_T("category"), &Category); Add(_T("enabled"), &Enabled); + Add(_T("output"), &Output); } Entry& operator=(Entry&& other) noexcept @@ -331,6 +349,7 @@ namespace Thunder { Module = std::move(other.Module); Category = std::move(other.Category); Enabled = std::move(other.Enabled); + Output = std::move(other.Output); } return (*this); @@ -342,6 +361,7 @@ namespace Thunder { Module = other.Module; Category = other.Category; Enabled = other.Enabled; + Output = other.Output; } return (*this); @@ -352,6 +372,7 @@ namespace Thunder { Core::JSON::String Module; Core::JSON::String Category; Core::JSON::Boolean Enabled; + Core::JSON::EnumType Output; }; public: @@ -363,15 +384,18 @@ namespace Thunder { Section() : Core::JSON::Container() , Settings() - , Abbreviated(true) { + , Abbreviated(true) + , Output(MessageUnit::PLUGIN) { Add(_T("settings"), &Settings); Add(_T("abbreviated"), &Abbreviated); + Add(_T("output"), &Output); } ~Section() = default; public: Core::JSON::ArrayType Settings; Core::JSON::Boolean Abbreviated; + Core::JSON::EnumType Output; }; public: @@ -513,6 +537,7 @@ namespace Thunder { void Configure(const string& basePath, const string& identifier, const Config& jsonParsed, const bool background, const flush flushMode) { _settings.clear(); + _outputRouting.clear(); string messagingFolder; Core::ParsePathInfo(jsonParsed.Path.Value(), messagingFolder, _permission); @@ -561,7 +586,12 @@ namespace Thunder { ASSERT(false); } - if (IsDirect() == true) { + // Populate _outputRouting first so HasPluginOutput() can be used below. + FromConfig(jsonParsed); + + // In DirectOutput mode (-f), skip creating the data buffer UNLESS specific + // entries explicitly request plugin or both output. + if ((IsDirect() == true) && (HasPluginOutput() == false)) { _dataSize = 0; } else { @@ -577,8 +607,6 @@ namespace Thunder { ASSERT(false); } } - - FromConfig(jsonParsed); } /** @@ -650,6 +678,53 @@ namespace Thunder { return (result); } + + // Returns whether any routing entry requires the data buffer (plugin or both). + // Used to decide whether the buffer should be created in DirectOutput mode. + bool HasPluginOutput() const + { + bool found = false; + + _adminLock.Lock(); + + for (const auto& [key, mode] : _outputRouting) { + if ((mode == MessageUnit::PLUGIN) || (mode == MessageUnit::BOTH)) { + found = true; + break; + } + } + + _adminLock.Unlock(); + + return (found); + } + + // Resolves the effective OutputMode for a given message via exactly three + // hash lookups in priority order: type wildcard, then category, then module. + // Note: per-entry config overrides are stored with either category OR module + // set (never both simultaneously), so these three lookups cover all cases. + OutputMode EffectiveOutput(const Core::Messaging::Metadata& metaData) const + { + OutputMode result = IsDirect() ? MessageUnit::DIRECT : MessageUnit::PLUGIN; + + _adminLock.Lock(); + + if (auto it = _outputRouting.find({metaData.Type(), _T(""), _T("")}); it != _outputRouting.end()) { + result = it->second; + } + + if (auto it = _outputRouting.find({metaData.Type(), metaData.Category(), _T("")}); it != _outputRouting.end()) { + result = it->second; + } + + if (auto it = _outputRouting.find({metaData.Type(), _T(""), metaData.Module()}); it != _outputRouting.end()) { + result = it->second; + } + + _adminLock.Unlock(); + + return (result); + } void Save() const { // Store all config info.. @@ -669,6 +744,16 @@ namespace Thunder { DELIMITER + (entry.Enabled() ? '1' : '0'); } + // 0xFF separates enabled/disabled settings from output-routing entries + settings += DELIMITER + Core::NumberType(0xFF).Text(); + + for (auto& [key, routeMode] : _outputRouting) { + settings += DELIMITER + Core::NumberType(static_cast(key.type)).Text() + + DELIMITER + key.category + + DELIMITER + key.module + + DELIMITER + Core::NumberType(static_cast(routeMode)).Text(); + } + Core::SystemInfo::SetEnvironment(MESSAGE_DISPATCHER_CONFIG_ENV, settings, true); } @@ -687,6 +772,7 @@ namespace Thunder { _metadataSize = 0; _messageSize = 0; _settings.clear(); + _outputRouting.clear(); if (iterator.Next() == true) { _path = iterator.Current().Text(); @@ -715,6 +801,28 @@ namespace Thunder { while (iterator.Next()) { uint8_t type = Core::NumberType(iterator.Current()).Value(); + + if (type == 0xFF) { + // Remaining entries are output-routing records + while (iterator.Next()) { + uint8_t type = Core::NumberType(iterator.Current()).Value(); + if (iterator.Next() == true) { + string category = iterator.Current().Text(); + if (iterator.Next() == true) { + string module = iterator.Current().Text(); + if (iterator.Next() == true) { + uint8_t routeMode = Core::NumberType(iterator.Current()).Value(); + if ((type >= Core::Messaging::Metadata::type::TRACING) && (type <= Core::Messaging::Metadata::type::TELEMETRY) && + (routeMode <= static_cast(MessageUnit::BOTH))) { + _outputRouting[{static_cast(type), std::move(category), std::move(module)}] = static_cast(routeMode); + } + } + } + } + } + break; + } + if (iterator.Next() == true) { string module = iterator.Current().Text(); if (iterator.Next() == true) { @@ -733,53 +841,103 @@ namespace Thunder { } private: + // Key for the output-routing map. Empty category/module strings act as wildcards + struct RoutingKey { + Core::Messaging::Metadata::type type; + string category; + string module; + + bool operator==(const RoutingKey& rhs) const { + return ((type == rhs.type) && (category == rhs.category) && (module == rhs.module)); + } + }; + + struct RoutingKeyHash { + size_t operator()(const RoutingKey& key) const { + const size_t h1 = std::hash{}(static_cast(key.type)); + const size_t h2 = std::hash{}(key.category); + const size_t h3 = std::hash{}(key.module); + return (h1 ^ (h2 << 1) ^ (h3 << 2)); + } + }; + void FromConfig(const Config& config) { _adminLock.Lock(); if (config.Tracing.IsSet() == true) { + if (config.Tracing.Output.IsSet() == true) { + _outputRouting[{Core::Messaging::Metadata::type::TRACING, _T(""), _T("")}] = config.Tracing.Output.Value(); + } auto it = config.Tracing.Settings.Elements(); while (it.Next() == true) { Core::Messaging::Metadata info(Core::Messaging::Metadata::type::TRACING, it.Current().Category.Value(), it.Current().Module.Value()); if (info.Default() != it.Current().Enabled.Value()) { _settings.emplace_back(info, it.Current().Enabled.Value()); } + if (it.Current().Output.IsSet() == true) { + _outputRouting[{info.Type(), info.Category(), info.Module()}] = it.Current().Output.Value(); + } } } if (config.Logging.IsSet() == true) { + if (config.Logging.Output.IsSet() == true) { + _outputRouting[{Core::Messaging::Metadata::type::LOGGING, _T(""), _T("")}] = config.Logging.Output.Value(); + } auto it = config.Logging.Settings.Elements(); while (it.Next() == true) { Core::Messaging::Metadata info(Core::Messaging::Metadata::type::LOGGING, it.Current().Category.Value(), it.Current().Module.Value()); if (info.Default() != it.Current().Enabled.Value()) { _settings.emplace_back(info, it.Current().Enabled.Value()); } + if (it.Current().Output.IsSet() == true) { + _outputRouting[{info.Type(), info.Category(), info.Module()}] = it.Current().Output.Value(); + } } } if (config.Reporting.IsSet() == true) { + if (config.Reporting.Output.IsSet() == true) { + _outputRouting[{Core::Messaging::Metadata::type::REPORTING, _T(""), _T("")}] = config.Reporting.Output.Value(); + } auto it = config.Reporting.Settings.Elements(); while (it.Next() == true) { Core::Messaging::Metadata info(Core::Messaging::Metadata::type::REPORTING, it.Current().Category.Value(), it.Current().Module.Value()); _settings.emplace_back(info, it.Current().Enabled.Value()); + if (it.Current().Output.IsSet() == true) { + _outputRouting[{info.Type(), info.Category(), info.Module()}] = it.Current().Output.Value(); + } } } if (config.Assertion.IsSet() == true) { + if (config.Assertion.Output.IsSet() == true) { + _outputRouting[{Core::Messaging::Metadata::type::ASSERT, _T(""), _T("")}] = config.Assertion.Output.Value(); + } auto it = config.Assertion.Settings.Elements(); while (it.Next() == true) { Core::Messaging::Metadata info(Core::Messaging::Metadata::type::ASSERT, it.Current().Category.Value(), it.Current().Module.Value()); _settings.emplace_back(info, it.Current().Enabled.Value()); + if (it.Current().Output.IsSet() == true) { + _outputRouting[{info.Type(), info.Category(), info.Module()}] = it.Current().Output.Value(); + } } } if (config.Telemetry.IsSet() == true) { + if (config.Telemetry.Output.IsSet() == true) { + _outputRouting[{Core::Messaging::Metadata::type::TELEMETRY, _T(""), _T("")}] = config.Telemetry.Output.Value(); + } auto it = config.Telemetry.Settings.Elements(); while (it.Next() == true) { Core::Messaging::Metadata info(Core::Messaging::Metadata::type::TELEMETRY, it.Current().Category.Value(), it.Current().Module.Value()); if (info.Default() != it.Current().Enabled.Value()) { _settings.emplace_back(info, it.Current().Enabled.Value()); } + if (it.Current().Output.IsSet() == true) { + _outputRouting[{info.Type(), info.Category(), info.Module()}] = it.Current().Output.Value(); + } } } @@ -820,6 +978,7 @@ namespace Thunder { private: mutable Core::CriticalSection _adminLock; ControlList _settings; + std::unordered_map _outputRouting; string _path; string _identifier; uint16_t _socketPort;