From 072dd18d522ac0f9ef7d9945f07271956dfbd1de Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Fri, 6 Jun 2025 14:29:16 +0300 Subject: [PATCH 01/12] update: adopt SubscriptionEngine Signed-off-by: Dmitriy Khaustov aka xDimon --- src/CMakeLists.txt | 4 +++- src/injector/node_injector.cpp | 2 +- src/se/CMakeLists.txt | 12 ++++++------ src/se/async_dispatcher.cpp | 4 +--- src/se/impl/async_dispatcher_impl.hpp | 11 ++--------- src/se/impl/common.hpp | 10 +++------- src/se/impl/scheduler_impl.hpp | 11 ++--------- src/se/impl/subscriber.hpp | 14 +++++--------- src/se/impl/subscription_engine.hpp | 13 ++++--------- src/se/impl/subscription_manager.hpp | 20 +++++++++++--------- src/se/impl/sync_dispatcher_impl.hpp | 11 ++--------- src/se/impl/thread_handler.hpp | 9 ++++++++- src/se/subscription.cpp | 11 ----------- src/se/subscription.hpp | 2 +- src/se/subscription_fwd.hpp | 11 ++++++----- src/se/sync_dispatcher.cpp | 4 +--- 16 files changed, 56 insertions(+), 93 deletions(-) delete mode 100644 src/se/subscription.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 87a038bb..45ee63fe 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,7 +4,9 @@ # SPDX-License-Identifier: Apache-2.0 # -include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_SOURCE_DIR}) +include_directories(${CMAKE_SOURCE_DIR}/src) +include_directories(${CMAKE_BINARY_DIR}/generated) # Executables (should contain `main()` function) add_subdirectory(executable) diff --git a/src/injector/node_injector.cpp b/src/injector/node_injector.cpp index 0989844c..65e076b1 100644 --- a/src/injector/node_injector.cpp +++ b/src/injector/node_injector.cpp @@ -60,7 +60,7 @@ namespace { di::bind.to(logsys), di::bind.to(), di::bind.to(), - di::bind.to>(), + di::bind.to>(), di::bind.to([](const auto &injector) { return metrics::Exposer::Configuration{ {boost::asio::ip::address_v4::from_string("127.0.0.1"), 7777} diff --git a/src/se/CMakeLists.txt b/src/se/CMakeLists.txt index fe524f69..1e254899 100644 --- a/src/se/CMakeLists.txt +++ b/src/se/CMakeLists.txt @@ -6,16 +6,16 @@ add_library(se_async async_dispatcher.cpp - subscription.cpp - ) +) target_link_libraries(se_async - ) + fmt::fmt +) add_library(se_sync sync_dispatcher.cpp - subscription.cpp - ) +) target_link_libraries(se_sync - ) + fmt::fmt +) diff --git a/src/se/async_dispatcher.cpp b/src/se/async_dispatcher.cpp index a7426326..04d52089 100644 --- a/src/se/async_dispatcher.cpp +++ b/src/se/async_dispatcher.cpp @@ -12,9 +12,7 @@ namespace jam::se { std::shared_ptr getDispatcher() { - return std::make_shared< - AsyncDispatcher>(); + return std::make_shared>(); } } // namespace jam::se diff --git a/src/se/impl/async_dispatcher_impl.hpp b/src/se/impl/async_dispatcher_impl.hpp index b0283b6a..a7754de0 100644 --- a/src/se/impl/async_dispatcher_impl.hpp +++ b/src/se/impl/async_dispatcher_impl.hpp @@ -9,20 +9,13 @@ #include "common.hpp" #include "dispatcher.hpp" #include "thread_handler.hpp" +#include "utils/ctor_limiters.hpp" namespace jam::se { template - class AsyncDispatcher final : public Dispatcher { + class AsyncDispatcher final : public Dispatcher, NonCopyable, NonMovable { public: - // Disable copying - AsyncDispatcher(const AsyncDispatcher &) = delete; - AsyncDispatcher &operator=(const AsyncDispatcher &) = delete; - - // Disable moving - AsyncDispatcher(AsyncDispatcher &&) = delete; - AsyncDispatcher &operator=(AsyncDispatcher &&) = delete; - static constexpr uint32_t kHandlersCount = kCount; static constexpr uint32_t kPoolThreadsCount = kPoolSize; diff --git a/src/se/impl/common.hpp b/src/se/impl/common.hpp index 0ec374d9..4dc0145a 100644 --- a/src/se/impl/common.hpp +++ b/src/se/impl/common.hpp @@ -11,6 +11,8 @@ #include #include +#include "utils/ctor_limiters.hpp" + namespace jam::se::utils { /** @@ -141,7 +143,7 @@ namespace jam::se::utils { * between threads. It's similar to a manual reset event, where one thread * can wait until another thread signals the event. */ - class WaitForSingleObject final { + class WaitForSingleObject final : NonCopyable, NonMovable { std::condition_variable wait_cv_; ///< Condition variable for waiting std::mutex wait_m_; ///< Mutex for synchronization bool flag_; ///< Flag that represents the state (true = not signaled, false @@ -153,12 +155,6 @@ namespace jam::se::utils { */ WaitForSingleObject() : flag_{true} {} - // Deleted copy and move operations to prevent improper synchronization - WaitForSingleObject(WaitForSingleObject &&) = delete; - WaitForSingleObject(const WaitForSingleObject &) = delete; - WaitForSingleObject &operator=(WaitForSingleObject &&) = delete; - WaitForSingleObject &operator=(const WaitForSingleObject &) = delete; - /** * @brief Waits for the object to be signaled with a timeout * diff --git a/src/se/impl/scheduler_impl.hpp b/src/se/impl/scheduler_impl.hpp index 9e8d6ed2..b02558be 100644 --- a/src/se/impl/scheduler_impl.hpp +++ b/src/se/impl/scheduler_impl.hpp @@ -19,10 +19,11 @@ #include "common.hpp" #include "scheduler.hpp" +#include "utils/ctor_limiters.hpp" namespace jam::se { - class SchedulerBase : public IScheduler { + class SchedulerBase : public IScheduler, NonCopyable, NonMovable { private: using Time = std::chrono::high_resolution_clock; using Timepoint = std::chrono::time_point