diff --git a/src/babylon/concurrent/counter.cpp b/src/babylon/concurrent/counter.cpp index 208612c0..73a165bd 100644 --- a/src/babylon/concurrent/counter.cpp +++ b/src/babylon/concurrent/counter.cpp @@ -4,52 +4,6 @@ BABYLON_NAMESPACE_BEGIN -//////////////////////////////////////////////////////////////////////////////// -// ConcurrentAdder begin -ssize_t ConcurrentAdder::value() const noexcept { - ssize_t sum = 0; - _storage.for_each([&](const ssize_t& value) { - sum += value; - }); - return sum; -} - -void ConcurrentAdder::reset() noexcept { - _storage.for_each([&](ssize_t& value) { - value = 0; - }); -} -// ConcurrentAdder end -//////////////////////////////////////////////////////////////////////////////// - -//////////////////////////////////////////////////////////////////////////////// -// ConcurrentMaxer begin -ssize_t ConcurrentMaxer::value() const noexcept { - ssize_t max_value = 0; - value(max_value); - return max_value; -} - -bool ConcurrentMaxer::value(ssize_t& max_value) const noexcept { - bool has_result = false; - ssize_t result = ::std::numeric_limits::min(); - _storage.for_each([&](const Slot& slot) { - if (slot.version == _version) { - if (slot.value > result) { - result = slot.value; - has_result = true; - } - } - }); - if (has_result) { - max_value = result; - } - return has_result; -} - -void ConcurrentMaxer::reset() noexcept { - ++_version; -} // ConcurrentMaxer end //////////////////////////////////////////////////////////////////////////////// diff --git a/src/babylon/concurrent/counter.h b/src/babylon/concurrent/counter.h index 8b6e7e33..caa51235 100644 --- a/src/babylon/concurrent/counter.h +++ b/src/babylon/concurrent/counter.h @@ -1,5 +1,6 @@ #pragma once +#include // std::is_integral, std::is_floating_point #include "babylon/concurrent/thread_local.h" // CompactEnumerableThreadLocal #include "babylon/environment.h" @@ -21,73 +22,178 @@ BABYLON_NAMESPACE_BEGIN // 实现上针对多写少读的场景做了优化,更适用于典型的计数场景 // 计数操作改为独立发生在对应的线程局部数据上,避免了缓存行竞争 // 但相应的读操作就需要遍历所有线程局部数据并再次累加才能得到结果 -class ConcurrentAdder { +template +class GenericsConcurrentAdder { + static_assert( + (::std::is_integral::value || + ::std::is_floating_point::value) && 8 >= sizeof(T), + "ConcurrentSummer only supports integral or " + "floating point types with size <= 8 bytes"); + public: - ConcurrentAdder() noexcept = default; - ConcurrentAdder(ConcurrentAdder&&) noexcept = default; - ConcurrentAdder(const ConcurrentAdder&) = delete; - ConcurrentAdder& operator=(ConcurrentAdder&&) noexcept = default; - ConcurrentAdder& operator=(const ConcurrentAdder&) = delete; - ~ConcurrentAdder() noexcept = default; + GenericsConcurrentAdder() noexcept = default; + GenericsConcurrentAdder(GenericsConcurrentAdder&&) noexcept = default; + GenericsConcurrentAdder(const GenericsConcurrentAdder&) = delete; + GenericsConcurrentAdder& operator=(GenericsConcurrentAdder&&) noexcept = default; + GenericsConcurrentAdder& operator=(const GenericsConcurrentAdder&) = delete; + ~GenericsConcurrentAdder() noexcept = default; // 分散计数接口 - template - inline ConcurrentAdder& operator<<(const T& value) noexcept; + template + GenericsConcurrentAdder& operator<<(const U& value) noexcept { + count(static_cast(value)); + return *this; + } // 汇聚读取接口 - ssize_t value() const noexcept; + T value() const noexcept { + T sum = 0; + _storage.for_each([&](const T& value) { + sum += value; + }); + return sum; + } // 重置接口 - void reset() noexcept; + void reset() noexcept { + _storage.for_each([&](T& value) { + value = 0; + }); + } private: - inline void count(ssize_t value) noexcept; + void count(T value) noexcept { + auto& local = _storage.local(); + // local的唯一修改者是自己,所以这里不需要使用原子加法 + // 正确对齐的数值类型赋值本身是原子发生的 + local = local + value; + } - CompactEnumerableThreadLocal _storage; + CompactEnumerableThreadLocal _storage; }; -// 高并发最大值计数器 +using ConcurrentAdder = GenericsConcurrentAdder; + +// 高并发最大/小值计数器 // 原理上等价于利用std::atomic -// 计数操作进行loop cas(old, max(new, old)) +// 计数操作进行loop cas(old, max(new, old)) / cas(old, min(new, old)) // 读取&重置操作进行exchange(0) // // 实现上针对多写少读的场景做了优化,更适用于典型的计数场景 // 计数操作改为独立发生在对应的线程局部数据上,避免了缓存行竞争 // 但相应的读操作就需要遍历所有线程局部数据并再次累加才能得到结果 -class ConcurrentMaxer { +namespace internal { +template +struct MaxComparer { + bool operator()(T lhs, T rhs) const { + return lhs > rhs; + } +}; +template +struct MinComparer { + bool operator()(T lhs, T rhs) const { + return lhs < rhs; + } +}; + +template +class ConcurrentComparer { + static_assert( + (::std::is_integral::value || + ::std::is_floating_point::value) && 8 >= sizeof(T), + "ConcurrentSummer only supports integral or " + "floating point types with size <= 8 bytes"); + + constexpr static T EXTREMUM = + Max ? std::numeric_limits::min() : std::numeric_limits::max(); + typedef typename std::conditional< + Max, MaxComparer, MinComparer>::type Comparer; + public: - ConcurrentMaxer() noexcept = default; - ConcurrentMaxer(ConcurrentMaxer&&) = delete; - ConcurrentMaxer(const ConcurrentMaxer&) = delete; - ConcurrentMaxer& operator=(ConcurrentMaxer&&) = delete; - ConcurrentMaxer& operator=(const ConcurrentMaxer&) = delete; - ~ConcurrentMaxer() noexcept = default; + ConcurrentComparer() noexcept = default; + ConcurrentComparer(ConcurrentComparer&&) = delete; + ConcurrentComparer(const ConcurrentComparer&) = delete; + ConcurrentComparer& operator=(ConcurrentComparer&&) = delete; + ConcurrentComparer& operator=(const ConcurrentComparer&) = delete; + ~ConcurrentComparer() noexcept = default; // 分散计数接口 - inline ConcurrentMaxer& operator<<(ssize_t value) noexcept; + ConcurrentComparer& operator<<(T value) noexcept { + auto& local = _storage.local(); + // 为了避免原子操作引入内存屏障,这里采用异步版本感知机制 + // 取代了cas动作来实现重置效果 + // + // 理论上存在一个缺陷,即 + // 1、刚刚完成一次汇聚读取,正在进行版本推进 + // 2、此时进行中的计数动作依然计入了上一个版本 + // 3、下一个汇聚读取周期中,这些样本无法计入 + // 不过对于统计场景,这个影响非常微弱,可忽略不计 + if (ABSL_PREDICT_FALSE(_version != local.version)) { + local.version = _version; + local.value = value; + return *this; + } + if (ABSL_PREDICT_FALSE(_comparer(value, local.value))) { + local.value = value; + } + return *this; + } // 汇聚读取接口 // 如果周期内有计数发生,返回计数最大值 // 如果周期内没有计数发生,返回0 - ssize_t value() const noexcept; + T value() const noexcept { + T compare_value = 0; + value(compare_value); + return compare_value; + } // 汇聚读取接口 - // 如果周期内有计数发生,返回true,并将max_value填充为计数最大值 - // 如果周期内没有计数发生,返回false,此时不修改max_value的值 - bool value(ssize_t& max_value) const noexcept; + // 如果周期内有计数发生,返回true,并将compare_value填充为计数最大值 + // 如果周期内没有计数发生,返回false,此时不修改compare_value的值 + bool value(T& compare_value) const noexcept { + bool has_result = false; + T result = EXTREMUM; + _storage.for_each([&](const Slot& slot) { + if (slot.version == _version) { + if (_comparer(slot.value, result)) { + result = slot.value; + has_result = true; + } + } + }); + if (has_result) { + compare_value = result; + } + return has_result; + } // 重置接口,开启一个新的周期 - void reset() noexcept; + void reset() noexcept { + ++_version; + } private: struct Slot { size_t version {SIZE_MAX}; - ssize_t value; + T value; }; CompactEnumerableThreadLocal _storage; size_t _version {0}; + Comparer _comparer; }; +} // namespace internal + +template +class GenericsConcurrentMaxer : public internal::ConcurrentComparer {}; + +using ConcurrentMaxer = GenericsConcurrentMaxer; + +template +class GenericsConcurrentMiner : public internal::ConcurrentComparer {}; + +using ConcurrentMiner = GenericsConcurrentMiner; // 高并发求和计数器 // 原理上等价于利用锁同步 @@ -207,43 +313,6 @@ class ConcurrentSampler { ::std::atomic _version {0}; }; -template -inline ABSL_ATTRIBUTE_ALWAYS_INLINE ConcurrentAdder& -ConcurrentAdder::operator<<(const T& value) noexcept { - count(static_cast(value)); - return *this; -} - -inline ABSL_ATTRIBUTE_ALWAYS_INLINE void ConcurrentAdder::count( - ssize_t value) noexcept { - auto& local = _storage.local(); - // local的唯一修改者是自己,所以这里不需要使用原子加法 - // 正确对齐的数值类型赋值本身是原子发生的 - local = local + value; -} - -inline ABSL_ATTRIBUTE_ALWAYS_INLINE ConcurrentMaxer& -ConcurrentMaxer::operator<<(ssize_t value) noexcept { - auto& local = _storage.local(); - // 为了避免原子操作引入内存屏障,这里采用异步版本感知机制 - // 取代了cas动作来实现重置效果 - // - // 理论上存在一个缺陷,即 - // 1、刚刚完成一次汇聚读取,正在进行版本推进 - // 2、此时进行中的计数动作依然计入了上一个版本 - // 3、下一个汇聚读取周期中,这些样本无法计入 - // 不过对于统计场景,这个影响非常微弱,可忽略不计 - if (ABSL_PREDICT_FALSE(_version != local.version)) { - local.version = _version; - local.value = value; - return *this; - } - if (ABSL_PREDICT_FALSE(value > local.value)) { - local.value = value; - } - return *this; -} - inline ABSL_ATTRIBUTE_ALWAYS_INLINE ConcurrentSummer& ConcurrentSummer::operator<<(ssize_t value) noexcept { return operator<<({value, 1}); diff --git a/test/concurrent/test_counter.cpp b/test/concurrent/test_counter.cpp index b88b126f..a78d2886 100644 --- a/test/concurrent/test_counter.cpp +++ b/test/concurrent/test_counter.cpp @@ -6,11 +6,13 @@ using ::babylon::ConcurrentAdder; using ::babylon::ConcurrentMaxer; +using ::babylon::ConcurrentMiner; using ::babylon::ConcurrentSampler; using ::babylon::ConcurrentSummer; -TEST(concurrent_adder, caculate_right) { - ConcurrentAdder adder; +template +void test_concurrent_adder_caculate_right() { + ::babylon::GenericsConcurrentAdder adder; ::std::thread a([&] { adder << 10; }); @@ -26,8 +28,20 @@ TEST(concurrent_adder, caculate_right) { ASSERT_EQ(8, adder.value()); } -TEST(concurrent_maxer, caculate_right) { - ConcurrentMaxer maxer; +TEST(concurrent_adder, caculate_right) { + // 保证编译兼容性。 + ConcurrentAdder adder; + + test_concurrent_adder_caculate_right(); + test_concurrent_adder_caculate_right(); + test_concurrent_adder_caculate_right(); + test_concurrent_adder_caculate_right(); + test_concurrent_adder_caculate_right(); +} + +template +void test_concurrent_maxer_caculate_right() { + ::babylon::GenericsConcurrentMaxer maxer; ::std::thread a([&] { maxer << 10; }); @@ -43,9 +57,19 @@ TEST(concurrent_maxer, caculate_right) { ASSERT_EQ(10, maxer.value()); } -TEST(concurrent_maxer, empty_aware) { +TEST(concurrent_maxer, caculate_right) { + // 保证编译兼容性。 ConcurrentMaxer maxer; - ssize_t value = 10086; + + test_concurrent_maxer_caculate_right(); + test_concurrent_maxer_caculate_right(); + test_concurrent_maxer_caculate_right(); +} + +template +void test_concurrent_maxer_empty_aware() { + ::babylon::GenericsConcurrentMaxer maxer; + T value = 10086; auto has_value = maxer.value(value); ASSERT_FALSE(has_value); ASSERT_EQ(10086, value); @@ -69,8 +93,20 @@ TEST(concurrent_maxer, empty_aware) { ASSERT_EQ(0, maxer.value()); } -TEST(concurrent_maxer, resetable) { +TEST(concurrent_maxer, empty_aware) { + // 保证编译兼容性。 ConcurrentMaxer maxer; + + test_concurrent_maxer_empty_aware(); + test_concurrent_maxer_empty_aware(); + test_concurrent_maxer_empty_aware(); + test_concurrent_maxer_empty_aware(); + test_concurrent_maxer_empty_aware(); +} + +template +void test_concurrent_maxer_resetable() { + ::babylon::GenericsConcurrentMaxer maxer; { ::std::thread a([&] { maxer << 10; @@ -104,6 +140,119 @@ TEST(concurrent_maxer, resetable) { ASSERT_EQ(7, maxer.value()); } +TEST(concurrent_maxer, resetable) { + // 保证编译兼容性。 + ConcurrentMaxer maxer; + + test_concurrent_maxer_resetable(); + test_concurrent_maxer_resetable(); + test_concurrent_maxer_resetable(); +} + +template +void test_concurrent_miner_caculate_right() { + ::babylon::GenericsConcurrentMiner miner; + ::std::thread a([&] { + miner << 10; + }); + ::std::thread b([&] { + miner << 3; + }); + ::std::thread c([&] { + miner << -5; + }); + a.join(); + b.join(); + c.join(); + ASSERT_EQ(-5, miner.value()); +} + +TEST(concurrent_miner, caculate_right) { + // 保证编译兼容性。 + ConcurrentMiner miner; + + test_concurrent_miner_caculate_right(); + test_concurrent_miner_caculate_right(); + test_concurrent_miner_caculate_right(); +} + +template +void test_concurrent_miner_empty_aware() { + ::babylon::GenericsConcurrentMiner miner; + T value = 10086; + auto has_value = miner.value(value); + ASSERT_FALSE(has_value); + ASSERT_EQ(10086, value); + ASSERT_EQ(0, miner.value()); + ::std::thread([&] { + miner << 10; + }).join(); + has_value = miner.value(value); + ASSERT_TRUE(has_value); + ASSERT_EQ(10, value); + ASSERT_EQ(10, miner.value()); + miner.reset(); + value = 10010; + has_value = miner.value(value); + ASSERT_FALSE(has_value); + ASSERT_EQ(10010, value); + ASSERT_EQ(0, miner.value()); +} + +TEST(concurrent_miner, empty_aware) { + // 保证编译兼容性。 + ConcurrentMiner miner; + + test_concurrent_miner_empty_aware(); + test_concurrent_miner_empty_aware(); + test_concurrent_miner_empty_aware(); +} + +template +void test_concurrent_miner_resetable() { + ::babylon::GenericsConcurrentMiner miner; + { + ::std::thread a([&] { + miner << 10; + }); + ::std::thread b([&] { + miner << 3; + }); + ::std::thread c([&] { + miner << -5; + }); + a.join(); + b.join(); + c.join(); + } + ASSERT_EQ(-5, miner.value()); + miner.reset(); + { + ::std::thread a([&] { + miner << 3; + }); + ::std::thread b([&] { + miner << 7; + }); + ::std::thread c([&] { + miner << -2; + }); + a.join(); + b.join(); + c.join(); + } + ASSERT_EQ(-2, miner.value()); +} + +TEST(concurrent_miner, resetable) { + // 保证编译兼容性。 + ConcurrentMiner miner; + + test_concurrent_miner_resetable(); + test_concurrent_miner_resetable(); + test_concurrent_miner_resetable(); +} + TEST(concurrent_summary, caculate_right) { ConcurrentSummer summer; ::std::thread a([&] {