Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 0 additions & 46 deletions src/babylon/concurrent/counter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,6 @@

BABYLON_NAMESPACE_BEGIN

////////////////////////////////////////////////////////////////////////////////
// ConcurrentAdder begin
ssize_t ConcurrentAdder::value() const noexcept {
ssize_t sum = 0;
_storage.for_each([&](const ssize_t& value) {
sum += value;
});
return sum;
}

void ConcurrentAdder::reset() noexcept {
_storage.for_each([&](ssize_t& value) {
value = 0;
});
}
// ConcurrentAdder end
////////////////////////////////////////////////////////////////////////////////

////////////////////////////////////////////////////////////////////////////////
// ConcurrentMaxer begin
ssize_t ConcurrentMaxer::value() const noexcept {
ssize_t max_value = 0;
value(max_value);
return max_value;
}

bool ConcurrentMaxer::value(ssize_t& max_value) const noexcept {
bool has_result = false;
ssize_t result = ::std::numeric_limits<ssize_t>::min();
_storage.for_each([&](const Slot& slot) {
if (slot.version == _version) {
if (slot.value > result) {
result = slot.value;
has_result = true;
}
}
});
if (has_result) {
max_value = result;
}
return has_result;
}

void ConcurrentMaxer::reset() noexcept {
++_version;
}
// ConcurrentMaxer end
////////////////////////////////////////////////////////////////////////////////

Expand Down
201 changes: 135 additions & 66 deletions src/babylon/concurrent/counter.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <type_traits> // std::is_integral, std::is_floating_point
#include "babylon/concurrent/thread_local.h" // CompactEnumerableThreadLocal
#include "babylon/environment.h"

Expand All @@ -21,73 +22,178 @@ BABYLON_NAMESPACE_BEGIN
// 实现上针对多写少读的场景做了优化,更适用于典型的计数场景
// 计数操作改为独立发生在对应的线程局部数据上,避免了缓存行竞争
// 但相应的读操作就需要遍历所有线程局部数据并再次累加才能得到结果
class ConcurrentAdder {
template <typename T>
class GenericsConcurrentAdder {
static_assert(
(::std::is_integral<T>::value ||
::std::is_floating_point<T>::value) && 8 >= sizeof(T),
"ConcurrentSummer only supports integral or "
"floating point types with size <= 8 bytes");

public:
ConcurrentAdder() noexcept = default;
ConcurrentAdder(ConcurrentAdder&&) noexcept = default;
ConcurrentAdder(const ConcurrentAdder&) = delete;
ConcurrentAdder& operator=(ConcurrentAdder&&) noexcept = default;
ConcurrentAdder& operator=(const ConcurrentAdder&) = delete;
~ConcurrentAdder() noexcept = default;
GenericsConcurrentAdder() noexcept = default;
GenericsConcurrentAdder(GenericsConcurrentAdder&&) noexcept = default;
GenericsConcurrentAdder(const GenericsConcurrentAdder&) = delete;
GenericsConcurrentAdder& operator=(GenericsConcurrentAdder&&) noexcept = default;
GenericsConcurrentAdder& operator=(const GenericsConcurrentAdder&) = delete;
~GenericsConcurrentAdder() noexcept = default;

// 分散计数接口
template <typename T>
inline ConcurrentAdder& operator<<(const T& value) noexcept;
template <typename U>
GenericsConcurrentAdder& operator<<(const U& value) noexcept {
count(static_cast<T>(value));
return *this;
}

// 汇聚读取接口
ssize_t value() const noexcept;
T value() const noexcept {
T sum = 0;
_storage.for_each([&](const T& value) {
sum += value;
});
return sum;
}

// 重置接口
void reset() noexcept;
void reset() noexcept {
_storage.for_each([&](T& value) {
value = 0;
});
}

private:
inline void count(ssize_t value) noexcept;
void count(T value) noexcept {
auto& local = _storage.local();
// local的唯一修改者是自己,所以这里不需要使用原子加法
// 正确对齐的数值类型赋值本身是原子发生的
local = local + value;
}

CompactEnumerableThreadLocal<ssize_t, 64, true> _storage;
CompactEnumerableThreadLocal<T, 64, true> _storage;
};

// 高并发最大值计数器
using ConcurrentAdder = GenericsConcurrentAdder<ssize_t>;

// 高并发最大/小值计数器
// 原理上等价于利用std::atomic
// 计数操作进行loop cas(old, max(new, old))
// 计数操作进行loop cas(old, max(new, old)) / cas(old, min(new, old))
// 读取&重置操作进行exchange(0)
//
// 实现上针对多写少读的场景做了优化,更适用于典型的计数场景
// 计数操作改为独立发生在对应的线程局部数据上,避免了缓存行竞争
// 但相应的读操作就需要遍历所有线程局部数据并再次累加才能得到结果
class ConcurrentMaxer {
namespace internal {
template <typename T>
struct MaxComparer {
bool operator()(T lhs, T rhs) const {
return lhs > rhs;
}
};
template <typename T>
struct MinComparer {
bool operator()(T lhs, T rhs) const {
return lhs < rhs;
}
};

template <typename T, bool Max>
class ConcurrentComparer {
static_assert(
(::std::is_integral<T>::value ||
::std::is_floating_point<T>::value) && 8 >= sizeof(T),
"ConcurrentSummer only supports integral or "
"floating point types with size <= 8 bytes");

constexpr static T EXTREMUM =
Max ? std::numeric_limits<T>::min() : std::numeric_limits<T>::max();
typedef typename std::conditional<
Max, MaxComparer<T>, MinComparer<T>>::type Comparer;

public:
ConcurrentMaxer() noexcept = default;
ConcurrentMaxer(ConcurrentMaxer&&) = delete;
ConcurrentMaxer(const ConcurrentMaxer&) = delete;
ConcurrentMaxer& operator=(ConcurrentMaxer&&) = delete;
ConcurrentMaxer& operator=(const ConcurrentMaxer&) = delete;
~ConcurrentMaxer() noexcept = default;
ConcurrentComparer() noexcept = default;
ConcurrentComparer(ConcurrentComparer&&) = delete;
ConcurrentComparer(const ConcurrentComparer&) = delete;
ConcurrentComparer& operator=(ConcurrentComparer&&) = delete;
ConcurrentComparer& operator=(const ConcurrentComparer&) = delete;
~ConcurrentComparer() noexcept = default;

// 分散计数接口
inline ConcurrentMaxer& operator<<(ssize_t value) noexcept;
ConcurrentComparer& operator<<(T value) noexcept {
auto& local = _storage.local();
// 为了避免原子操作引入内存屏障,这里采用异步版本感知机制
// 取代了cas动作来实现重置效果
//
// 理论上存在一个缺陷,即
// 1、刚刚完成一次汇聚读取,正在进行版本推进
// 2、此时进行中的计数动作依然计入了上一个版本
// 3、下一个汇聚读取周期中,这些样本无法计入
// 不过对于统计场景,这个影响非常微弱,可忽略不计
if (ABSL_PREDICT_FALSE(_version != local.version)) {
local.version = _version;
local.value = value;
return *this;
}
if (ABSL_PREDICT_FALSE(_comparer(value, local.value))) {
local.value = value;
}
return *this;
}

// 汇聚读取接口
// 如果周期内有计数发生,返回计数最大值
// 如果周期内没有计数发生,返回0
ssize_t value() const noexcept;
T value() const noexcept {
T compare_value = 0;
value(compare_value);
return compare_value;
}

// 汇聚读取接口
// 如果周期内有计数发生,返回true,并将max_value填充为计数最大值
// 如果周期内没有计数发生,返回false,此时不修改max_value的值
bool value(ssize_t& max_value) const noexcept;
// 如果周期内有计数发生,返回true,并将compare_value填充为计数最大值
// 如果周期内没有计数发生,返回false,此时不修改compare_value的值
bool value(T& compare_value) const noexcept {
bool has_result = false;
T result = EXTREMUM;
_storage.for_each([&](const Slot& slot) {
if (slot.version == _version) {
if (_comparer(slot.value, result)) {
result = slot.value;
has_result = true;
}
}
});
if (has_result) {
compare_value = result;
}
return has_result;
}

// 重置接口,开启一个新的周期
void reset() noexcept;
void reset() noexcept {
++_version;
}

private:
struct Slot {
size_t version {SIZE_MAX};
ssize_t value;
T value;
};

CompactEnumerableThreadLocal<Slot, 64, true> _storage;
size_t _version {0};
Comparer _comparer;
};
} // namespace internal

template <typename T>
class GenericsConcurrentMaxer : public internal::ConcurrentComparer<T, true> {};

using ConcurrentMaxer = GenericsConcurrentMaxer<ssize_t>;

template <typename T>
class GenericsConcurrentMiner : public internal::ConcurrentComparer<T, false> {};

using ConcurrentMiner = GenericsConcurrentMiner<ssize_t>;

// 高并发求和计数器
// 原理上等价于利用锁同步
Expand Down Expand Up @@ -207,43 +313,6 @@ class ConcurrentSampler {
::std::atomic<uint32_t> _version {0};
};

template <typename T>
inline ABSL_ATTRIBUTE_ALWAYS_INLINE ConcurrentAdder&
ConcurrentAdder::operator<<(const T& value) noexcept {
count(static_cast<ssize_t>(value));
return *this;
}

inline ABSL_ATTRIBUTE_ALWAYS_INLINE void ConcurrentAdder::count(
ssize_t value) noexcept {
auto& local = _storage.local();
// local的唯一修改者是自己,所以这里不需要使用原子加法
// 正确对齐的数值类型赋值本身是原子发生的
local = local + value;
}

inline ABSL_ATTRIBUTE_ALWAYS_INLINE ConcurrentMaxer&
ConcurrentMaxer::operator<<(ssize_t value) noexcept {
auto& local = _storage.local();
// 为了避免原子操作引入内存屏障,这里采用异步版本感知机制
// 取代了cas动作来实现重置效果
//
// 理论上存在一个缺陷,即
// 1、刚刚完成一次汇聚读取,正在进行版本推进
// 2、此时进行中的计数动作依然计入了上一个版本
// 3、下一个汇聚读取周期中,这些样本无法计入
// 不过对于统计场景,这个影响非常微弱,可忽略不计
if (ABSL_PREDICT_FALSE(_version != local.version)) {
local.version = _version;
local.value = value;
return *this;
}
if (ABSL_PREDICT_FALSE(value > local.value)) {
local.value = value;
}
return *this;
}

inline ABSL_ATTRIBUTE_ALWAYS_INLINE ConcurrentSummer&
ConcurrentSummer::operator<<(ssize_t value) noexcept {
return operator<<({value, 1});
Expand Down
Loading
Loading