diff --git a/narf/histutils.py b/narf/histutils.py index f08514b..5d3fd80 100644 --- a/narf/histutils.py +++ b/narf/histutils.py @@ -19,6 +19,24 @@ narf.clingutils.Declare('#include ') narf.clingutils.Declare('#include ') +class SparseStorage: + """Storage option for HistoBoost selecting a narf::concurrent_sparse_storage + backed by a narf::concurrent_flat_map. Conversion to a python hist.Hist + object is not supported in this mode. + + Parameters + ---------- + fill_fraction : float + Estimated fraction of bins (including under/overflow) that will be + populated. Used to size the underlying concurrent_flat_map so that + most fills hit the initial allocation rather than triggering + on-the-fly expansion. Values outside (0, 1] are accepted; pass a + small number for very sparse fills. + """ + def __init__(self, fill_fraction=0.1): + self.fill_fraction = float(fill_fraction) + + def bool_to_string(b): if b: return "true" @@ -114,7 +132,7 @@ def make_array_interface_view(boost_hist): underflow = [axis.traits.underflow for axis in boost_hist.axes] - acc_type = convert_storage_type(boost_hist._storage_type) + acc_type = convert_storage_type(boost_hist.storage_type) arrview = ROOT.narf.array_interface_view[acc_type, len(shape)](arr, shape, strides, underflow) return arrview @@ -132,9 +150,9 @@ def hist_to_pyroot_boost(hist_hist, tensor_rank = 0, force_atomic = False): scalar_type = ROOT.double dimensions = ROOT.Eigen.Sizes[tuple(tensor_sizes)] - if issubclass(hist_hist._storage_type, bh.storage.Double): + if issubclass(hist_hist.storage_type, bh.storage.Double): cppstoragetype = ROOT.narf.tensor_accumulator[scalar_type, dimensions] - elif issubclass(hist_hist._storage_type, bh.storage.Weight): + elif issubclass(hist_hist.storage_type, bh.storage.Weight): cppstoragetype = ROOT.narf.tensor_accumulator[ROOT.boost.histogram.accumulators.weighted_sum[scalar_type], dimensions] else: raise TypeError("Requested storage type is not supported with tensor weights currently") @@ -143,7 +161,7 @@ def hist_to_pyroot_boost(hist_hist, tensor_rank = 0, force_atomic = False): cppstoragetype = ROOT.narf.atomic_adaptor[cppstoragetype] else: python_axes = hist_hist.axes - cppstoragetype = convert_storage_type(hist_hist._storage_type, force_atomic = force_atomic) + cppstoragetype = convert_storage_type(hist_hist.storage_type, force_atomic = force_atomic) cppaxes = [ROOT.std.move(convert_axis(axis)) for axis in python_axes] @@ -161,6 +179,66 @@ def _histo_boost(df, name, axes, cols, storage = bh.storage.Weight(), force_atom if force_atomic is None: force_atomic = ROOT.ROOT.IsImplicitMTEnabled() + # Sparse storage path: build a narf::sparse_histogram backed by a + # concurrent_flat_map. The result is exposed as a wums.SparseHist. + if isinstance(storage, SparseStorage): + if tensor_axes is not None: + raise NotImplementedError("Tensor weights are not supported with SparseStorage") + coltypes = [df.GetColumnType(col) for col in cols] + for coltype in coltypes[len(axes):]: + traits = ROOT.narf.tensor_traits[coltype] + if traits.is_tensor: + raise NotImplementedError("Tensor weights are not supported with SparseStorage") + cppaxes = [ROOT.std.move(convert_axis(axis)) for axis in axes] + hfill = ROOT.narf.make_histogram_sparse[ROOT.narf.atomic_adaptor[ROOT.double]](storage.fill_fraction, *cppaxes) + helper = ROOT.narf.FillBoostHelperAtomic[type(hfill)](ROOT.std.move(hfill)) + targs = tuple([type(df), type(helper)] + coltypes) + res = ROOT.narf.book_helper[targs](df, ROOT.std.move(helper), cols) + + if not convert_to_hist: + return res + + # Lazily convert the underlying C++ sparse histogram to a wums.SparseHist + # the first time the result is dereferenced. + from wums.sparse_hist import SparseHist + + res._GetPtr = res.GetPtr + res._sparse_hist = None + python_axes_sparse = list(axes) + + def _build_sparse(): + if res._sparse_hist is not None: + return res._sparse_hist + cpp_hist = res._GetPtr() + snapshot = ROOT.narf.sparse_histogram_snapshot(cpp_hist) + n = len(snapshot) + boost_flat = np.empty(n, dtype=np.int64) + vals = np.empty(n, dtype=np.float64) + for i, kv in enumerate(snapshot): + boost_flat[i] = int(kv.first) + vals[i] = float(kv.second) + extents = tuple(int(ax.extent) for ax in python_axes_sparse) + size = int(np.prod(extents)) if extents else 1 + # boost::histogram linearizes column-major (leftmost axis = stride 1), + # but wums.SparseHist expects numpy row-major (C order). Remap by + # un-raveling under F order and re-raveling under C order. + if n and len(extents) > 1: + multi = np.unravel_index(boost_flat, extents, order="F") + flat = np.ravel_multi_index(multi, extents, order="C").astype(np.int64) + else: + flat = boost_flat + res._sparse_hist = SparseHist._from_flat(flat, vals, python_axes_sparse, size) + return res._sparse_hist + + ret_null = lambda: None + res.__deref__ = _build_sparse + res.__follow__ = _build_sparse + res.begin = ret_null + res.end = ret_null + res.GetPtr = _build_sparse + res.GetValue = _build_sparse + return res + #TODO some of this code can be shared with root histogram version #FIXME make this more generic @@ -641,8 +719,23 @@ def pythonize_rdataframe(klass): klass.HistoNDWithBoost = _histond_with_boost klass.SumAndCount = _sum_and_count -def build_quantile_hists(df, cols, condaxes, quantaxes): - """Build histograms which encode conditional quantiles for the provided variables, to be used with define_quantile_ints""" +def build_quantile_hists(df, cols, condaxes, quantaxes, continuous=False): + """Build histograms which encode conditional quantiles for the provided variables, to be used with define_quantile_ints. + + When ``continuous=True`` the original quantile axes are kept as-is in the + returned helper histograms (instead of being replaced by ``Integer`` axes). + ``define_quantile_ints`` detects this from the axis type and uses the + continuous quantile helpers, which return CDF-style values in ``[0, 1]`` + obtained by linearly interpolating between the stored quantile edges. + + Returns a tuple ``(quantile_hists, centers_hist, volume_hist)`` where + ``centers_hist`` gives the multidimensional center (one component per + quantile variable, along an extra ``coord`` ``StrCategory`` axis labelled + with the input quantile axis names when available) of each final + transformed quantile bin, and ``volume_hist`` gives the product of the + per-dimension widths of the same bin, both indexed by the full set of + conditional and (integer or original) quantile axes. + """ arraxes = condaxes + quantaxes @@ -678,6 +771,8 @@ def build_quantile_hists(df, cols, condaxes, quantaxes): quantile_integer_axes = [] quantile_hists = [] + quantile_mins_list = [] + quantile_maxs_list = [] for iax, (col, axis) in enumerate(zip(cols, arraxes)): @@ -707,15 +802,36 @@ def build_quantile_hists(df, cols, condaxes, quantaxes): quantile_edges = np.reshape(quantile_edges, shape_final[:iax+1]) quantile_edges = ak.to_numpy(quantile_edges) + quantile_mins = ak.min(arr[..., col], axis=-1, mask_identity=False) + quantile_mins = np.reshape(quantile_mins, shape_final[:iax+1]) + quantile_mins = ak.to_numpy(quantile_mins) + # replace -infinity from empty values with the previous bin edge # so that the quantile edges are at least still monotonic nquants = quantile_edges.shape[-1] for iquant in range(1, nquants): quantile_edges[..., iquant] = np.where(quantile_edges[..., iquant]==-np.inf, quantile_edges[..., iquant-1], quantile_edges[..., iquant]) + # replace +infinity from empty values with the next bin's min, + # keeping mins monotonic and avoiding negative bin widths; any + # remaining +infinities (trailing empty bins) fall back to the + # corresponding max so the bin collapses to zero width. + for iquant in range(nquants - 2, -1, -1): + quantile_mins[..., iquant] = np.where(quantile_mins[..., iquant]==np.inf, quantile_mins[..., iquant+1], quantile_mins[..., iquant]) + quantile_mins = np.where(quantile_mins==np.inf, quantile_edges, quantile_mins) + + quantile_mins_list.append(quantile_mins) + quantile_maxs_list.append(quantile_edges) + iquantax = iax - ncond - quantile_integer_axis = hist.axis.Integer(0, axis.size, underflow=False, overflow=False, name=f"{axis.name}_int") + if continuous: + # Keep the original (Regular / Variable) quantile axis so that + # subsequent helpers in the chain can be indexed by the + # continuous CDF-style output of the previous helper. + quantile_integer_axis = axis + else: + quantile_integer_axis = hist.axis.Integer(0, axis.size, underflow=False, overflow=False, name=f"{axis.name}_int") quantile_integer_axes.append(quantile_integer_axis) helper_axes = condaxes[:iax+1] + quantile_integer_axes @@ -725,11 +841,57 @@ def build_quantile_hists(df, cols, condaxes, quantaxes): quantile_hists.append(helper_hist) - return quantile_hists + # Compute multidim centers and volumes for the final transformed bins. + # Each per-axis widths/centers array has shape covering only the + # conditional dims plus the quantile dims up to that axis; broadcast to + # the full (cond + all quant) shape and combine. + full_shape = tuple(shape_final) + per_dim_widths = [] + per_dim_centers = [] + for i in range(nquant): + mins_i = quantile_mins_list[i] + maxs_i = quantile_maxs_list[i] + widths_i = maxs_i - mins_i + centers_i = 0.5 * (maxs_i + mins_i) + ndim_extra = nquant - i - 1 + new_shape = widths_i.shape + (1,) * ndim_extra + per_dim_widths.append(np.broadcast_to(widths_i.reshape(new_shape), full_shape)) + per_dim_centers.append(np.broadcast_to(centers_i.reshape(new_shape), full_shape)) + + final_axes = condaxes + quantile_integer_axes + volume_hist = hist.Hist(*final_axes) + if nquant > 0: + volume = per_dim_widths[0].copy() + for w in per_dim_widths[1:]: + volume = volume * w + volume_hist.values(flow=True)[...] = volume + + # Use the input quantile axis names as labels on the coord axis, with + # a unique placeholder for any unnamed axis. + coord_names = [] + for i, ax in enumerate(quantaxes): + name = getattr(ax, "name", "") or "" + if not name or name in coord_names: + name = f"quant_{i}" + coord_names.append(name) + coord_axis = hist.axis.StrCategory(coord_names, name="coord", overflow=False) + centers_hist = hist.Hist(*final_axes, coord_axis) + centers_hist.values(flow=True)[...] = np.stack(per_dim_centers, axis=-1) + else: + centers_hist = hist.Hist(*final_axes) + + return quantile_hists, centers_hist, volume_hist def define_quantile_ints(df, cols, quantile_hists): - """Define transformed columns corresponding to conditional quantile bins (integers)""" + """Define transformed columns corresponding to conditional quantiles. + + By default the helpers return the integer quantile bin index. If the + helper histograms produced by :func:`build_quantile_hists` were built in + continuous mode (their trailing quantile axis is not a plain ``Integer`` + axis), the continuous quantile helpers are used instead, returning a + CDF-style value in ``[0, 1]``. + """ ncols = len(cols) nquant = len(quantile_hists) @@ -738,23 +900,37 @@ def define_quantile_ints(df, cols, quantile_hists): condcols = cols[:ncond] quantcols = cols[ncond:] + # Detect continuous mode from the trailing (quantile) axis of the first + # helper histogram: continuous-mode helper histograms preserve the + # original (Regular / Variable) quantile axis, integer-mode ones use a + # generated Integer axis. + continuous = not isinstance(quantile_hists[0].axes[-1], hist.axis.Integer) + helper_cols_cond = condcols.copy() + suffix = "_quant" if continuous else "_iquant" + for col, quantile_hist in zip(quantcols, quantile_hists): if len(quantile_hist.axes) > 1: helper_hist = narf.hist_to_pyroot_boost(quantile_hist, tensor_rank=1) - quanthelper = ROOT.narf.make_quantile_helper(ROOT.std.move(helper_hist)) + if continuous: + quanthelper = ROOT.narf.make_quantile_helper_continuous(ROOT.std.move(helper_hist)) + else: + quanthelper = ROOT.narf.make_quantile_helper(ROOT.std.move(helper_hist)) else: # special case for static quantiles with no conditional variables vals = quantile_hist.values() arr = ROOT.std.array["double", vals.size](vals) - quanthelper = ROOT.narf.QuantileHelperStatic[vals.size](arr) + if continuous: + quanthelper = ROOT.narf.QuantileHelperStaticContinuous[vals.size](arr) + else: + quanthelper = ROOT.narf.QuantileHelperStatic[vals.size](arr) helper_cols = helper_cols_cond + [col] - outname = f"{col}_iquant" - df = df.Define(outname, quanthelper, helper_cols) + outname = f"{col}{suffix}" + df = narf.rdfutils.flexible_define(df, outname, quanthelper, helper_cols) helper_cols_cond.append(outname) quantile_axes = list(quantile_hists[-1].axes) diff --git a/narf/include/concurrent_flat_map.hpp b/narf/include/concurrent_flat_map.hpp new file mode 100644 index 0000000..e1c0cd2 --- /dev/null +++ b/narf/include/concurrent_flat_map.hpp @@ -0,0 +1,384 @@ +#ifndef NARF_CONCURRENT_FLAT_MAP_HPP +#define NARF_CONCURRENT_FLAT_MAP_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace narf { + +// Lock-free, insert-only, expandable concurrent flat hash map. +// +// Properties: +// - find / insert / emplace / expansion are all lock-free and safe to call +// concurrently from any number of threads. +// - Elements are never erased; once inserted, the address of an element's +// value is stable for the lifetime of the map (pointers returned by +// find/emplace remain valid). +// - The container grows by appending geometrically larger segments; existing +// segments are never rehashed, so concurrent readers are never disturbed. +// +// Requirements on Key: +// - Must be an integer type. +// - The two most significant bits of any user-supplied key must be zero; +// they are reserved for internal slot state (occupied / busy markers). +template > +class concurrent_flat_map { + static_assert(std::is_integral_v, "Key must be an integer type"); + +public: + using key_type = Key; + using mapped_type = Value; + using hasher = Hash; + +private: + using UKey = std::make_unsigned_t; + + static constexpr unsigned kKeyBits = sizeof(UKey) * 8; + static constexpr UKey kOccupiedBit = UKey(1) << (kKeyBits - 1); + static constexpr UKey kBusyBit = UKey(1) << (kKeyBits - 2); + static constexpr UKey kStateMask = kOccupiedBit | kBusyBit; + static constexpr UKey kPayloadMask = ~kStateMask; + static constexpr UKey kEmpty = 0; + + struct Slot { + std::atomic key{kEmpty}; + alignas(Value) unsigned char storage[sizeof(Value)]; + + Value* value_ptr() noexcept { + return std::launder(reinterpret_cast(&storage)); + } + }; + + struct Segment { + const std::size_t capacity; + const std::size_t mask; + std::atomic size{0}; + std::unique_ptr slots; + std::atomic next{nullptr}; + + explicit Segment(std::size_t cap) + : capacity(cap), mask(cap - 1), slots(new Slot[cap]) {} + + ~Segment() { + if constexpr (!std::is_trivially_destructible_v) { + for (std::size_t i = 0; i < capacity; ++i) { + UKey k = slots[i].key.load(std::memory_order_relaxed); + if ((k & kOccupiedBit) && !(k & kBusyBit)) { + slots[i].value_ptr()->~Value(); + } + } + } + } + }; + + static constexpr std::size_t kDefaultInitialCapacity = 64; + static constexpr std::size_t kMaxProbe = 32; + + Segment* head_; + std::atomic tail_; + Hash hash_; + + static UKey encode(Key key) noexcept { + return (static_cast(key) & kPayloadMask) | kOccupiedBit; + } + + static std::size_t round_up_pow2(std::size_t n) noexcept { + std::size_t c = 1; + while (c < n) c <<= 1; + return c; + } + + // Sentinel published in `Segment::next` while a thread is allocating the + // successor segment. Distinct from both nullptr and any valid pointer. + static Segment* growing_sentinel() noexcept { + return reinterpret_cast(static_cast(1)); + } + + // Returns the published successor of `s`, or nullptr if there isn't one + // yet (or if a growth is in progress and not yet visible to readers). + static Segment* observed_next(const Segment* s) noexcept { + Segment* n = s->next.load(std::memory_order_acquire); + return (n == growing_sentinel()) ? nullptr : n; + } + + // Spin until the slot's busy bit clears, then return the stable key. + static UKey wait_not_busy(Slot& slot) noexcept { + UKey k = slot.key.load(std::memory_order_acquire); + while (k & kBusyBit) { + k = slot.key.load(std::memory_order_acquire); + } + return k; + } + + // Allocate (or observe) the segment that follows `current`. Only the thread + // that wins the right to grow performs the allocation; other threads spin + // briefly on a sentinel until the new segment is published. This avoids the + // thundering-herd of speculative allocations that doubling-segment growth + // would otherwise produce under high thread contention. + Segment* ensure_next(Segment* current) { + Segment* next = current->next.load(std::memory_order_acquire); + if (next && next != growing_sentinel()) return next; + + Segment* expected = nullptr; + if (current->next.compare_exchange_strong( + expected, growing_sentinel(), + std::memory_order_acq_rel, std::memory_order_acquire)) { + // Won the race: this thread is the sole allocator. + Segment* fresh = nullptr; + try { + fresh = new Segment(current->capacity * 2); + } catch (...) { + current->next.store(nullptr, std::memory_order_release); + throw; + } + current->next.store(fresh, std::memory_order_release); + + // Best-effort tail advance so future inserters skip filled segments. + Segment* t = tail_.load(std::memory_order_acquire); + while (true) { + Segment* tn = t->next.load(std::memory_order_acquire); + if (!tn || tn == growing_sentinel()) break; + if (tail_.compare_exchange_weak(t, tn, + std::memory_order_acq_rel, + std::memory_order_acquire)) { + t = tn; + } + } + return fresh; + } + + // Lost the race. Either another thread is mid-allocation (sentinel + // observed) or the new segment is already published. + while (true) { + Segment* obs = current->next.load(std::memory_order_acquire); + if (obs && obs != growing_sentinel()) return obs; + std::this_thread::yield(); + } + } + + // Search a single segment for `target`. Returns pointer to value or nullptr. + Value* find_in(Segment* seg, std::size_t h, UKey target) const noexcept { + const std::size_t base = h & seg->mask; + const std::size_t probe_limit = std::min(kMaxProbe, seg->capacity); + for (std::size_t i = 0; i < probe_limit; ++i) { + Slot& slot = seg->slots[(base + i) & seg->mask]; + UKey k = slot.key.load(std::memory_order_acquire); + if (k == kEmpty) return nullptr; + if (k & kBusyBit) k = wait_not_busy(slot); + if (k == target) return slot.value_ptr(); + } + return nullptr; + } + + // Try to insert `target` into a single segment. Returns + // {ptr, true} : newly inserted, value constructed from args + // {ptr, false} : key was already present + // {nullptr, false} : segment full along this probe sequence + template + std::pair emplace_in(Segment* seg, std::size_t h, UKey target, + Args&&... args) { + const std::size_t base = h & seg->mask; + const std::size_t probe_limit = std::min(kMaxProbe, seg->capacity); + const UKey busy = (target & kPayloadMask) | kOccupiedBit | kBusyBit; + for (std::size_t i = 0; i < probe_limit; ++i) { + Slot& slot = seg->slots[(base + i) & seg->mask]; + UKey k = slot.key.load(std::memory_order_acquire); + if (k == kEmpty) { + UKey expected = kEmpty; + if (slot.key.compare_exchange_strong( + expected, busy, + std::memory_order_acq_rel, std::memory_order_acquire)) { + ::new (static_cast(&slot.storage)) + Value(std::forward(args)...); + slot.key.store(target, std::memory_order_release); + seg->size.fetch_add(1, std::memory_order_relaxed); + return {slot.value_ptr(), true}; + } + k = expected; + } + if (k & kBusyBit) k = wait_not_busy(slot); + if (k == target) return {slot.value_ptr(), false}; + } + return {nullptr, false}; + } + +public: + explicit concurrent_flat_map(std::size_t initial_capacity = kDefaultInitialCapacity) { + const std::size_t cap = round_up_pow2(std::max(initial_capacity, 2)); + head_ = new Segment(cap); + tail_.store(head_, std::memory_order_release); + } + + ~concurrent_flat_map() { + Segment* s = head_; + while (s) { + Segment* n = s->next.load(std::memory_order_relaxed); + delete s; + s = n; + } + } + + concurrent_flat_map(const concurrent_flat_map&) = delete; + concurrent_flat_map& operator=(const concurrent_flat_map&) = delete; + + // Move constructor: transfers ownership. The moved-from object is left in + // a destroy-only state; calling any other method on it is undefined. + concurrent_flat_map(concurrent_flat_map&& other) noexcept + : head_(other.head_), + tail_(other.tail_.load(std::memory_order_relaxed)), + hash_(std::move(other.hash_)) { + other.head_ = nullptr; + other.tail_.store(nullptr, std::memory_order_relaxed); + } + + concurrent_flat_map& operator=(concurrent_flat_map&& other) noexcept { + if (this != &other) { + Segment* s = head_; + while (s) { + Segment* n = s->next.load(std::memory_order_relaxed); + delete s; + s = n; + } + head_ = other.head_; + tail_.store(other.tail_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + hash_ = std::move(other.hash_); + other.head_ = nullptr; + other.tail_.store(nullptr, std::memory_order_relaxed); + } + return *this; + } + + // Returns a pointer to the value associated with `key`, or nullptr. + // The returned pointer remains valid for the lifetime of the map. + Value* find(Key key) noexcept { + const UKey target = encode(key); + const std::size_t h = hash_(key); + for (Segment* seg = head_; seg; + seg = observed_next(seg)) { + if (Value* v = find_in(seg, h, target)) return v; + } + return nullptr; + } + + const Value* find(Key key) const noexcept { + return const_cast(this)->find(key); + } + + bool contains(Key key) const noexcept { return find(key) != nullptr; } + + // Construct a value in-place if `key` is not yet present. + // Returns {pointer-to-value, inserted?}. + template + std::pair emplace(Key key, Args&&... args) { + const UKey target = encode(key); + const std::size_t h = hash_(key); + + // Look in every existing segment first to honour insert-once semantics. + for (Segment* seg = head_; seg; + seg = observed_next(seg)) { + if (Value* v = find_in(seg, h, target)) return {v, false}; + } + + // Then try to insert at the tail, growing as required. + Segment* seg = tail_.load(std::memory_order_acquire); + while (true) { + auto result = emplace_in(seg, h, target, std::forward(args)...); + if (result.first) { + return result; + } + // This probe sequence is saturated in `seg`; another thread may have + // already inserted the same key into a later segment, so re-check + // everything past `seg` before allocating. + Segment* next = observed_next(seg); + if (!next) next = ensure_next(seg); + for (Segment* s = next; s; s = observed_next(s)) { + if (Value* v = find_in(s, h, target)) return {v, false}; + } + seg = next; + } + } + + std::pair insert(Key key, const Value& v) { + return emplace(key, v); + } + std::pair insert(Key key, Value&& v) { + return emplace(key, std::move(v)); + } + + // Total number of inserted elements summed across all segments. Reads each + // segment's atomic counter; safe to call concurrently with other operations + // but the result reflects an instantaneous, possibly racing snapshot. + std::size_t size() const noexcept { + std::size_t total = 0; + for (Segment* seg = head_; seg; + seg = observed_next(seg)) { + total += seg->size.load(std::memory_order_acquire); + } + return total; + } + + // Visit every (key, value) pair currently in the map. Not safe to call + // concurrently with insertions if the visitor relies on a stable snapshot; + // the visitor only sees fully-constructed slots and waits past any in-flight + // insert that races with iteration. + template + void for_each(F&& f) { + for (Segment* seg = head_; seg; + seg = observed_next(seg)) { + for (std::size_t i = 0; i < seg->capacity; ++i) { + Slot& slot = seg->slots[i]; + UKey k = slot.key.load(std::memory_order_acquire); + if (k == kEmpty) continue; + if (k & kBusyBit) k = wait_not_busy(slot); + if (!(k & kOccupiedBit)) continue; + f(static_cast(k & kPayloadMask), *slot.value_ptr()); + } + } + } + + template + void for_each(F&& f) const { + const_cast(this)->for_each(std::forward(f)); + } + + // Remove all elements and shrink back to a single head segment. NOT thread + // safe with respect to any other operation; the caller must establish + // exclusive access (e.g. between processing passes). + void clear() { + Segment* s = head_->next.load(std::memory_order_relaxed); + while (s) { + Segment* n = s->next.load(std::memory_order_relaxed); + delete s; + s = n; + } + head_->next.store(nullptr, std::memory_order_relaxed); + if constexpr (!std::is_trivially_destructible_v) { + for (std::size_t i = 0; i < head_->capacity; ++i) { + UKey k = head_->slots[i].key.load(std::memory_order_relaxed); + if ((k & kOccupiedBit) && !(k & kBusyBit)) { + head_->slots[i].value_ptr()->~Value(); + } + head_->slots[i].key.store(kEmpty, std::memory_order_relaxed); + } + } else { + for (std::size_t i = 0; i < head_->capacity; ++i) { + head_->slots[i].key.store(kEmpty, std::memory_order_relaxed); + } + } + head_->size.store(0, std::memory_order_relaxed); + tail_.store(head_, std::memory_order_release); + } +}; + +} // namespace narf + +#endif // NARF_CONCURRENT_FLAT_MAP_HPP diff --git a/narf/include/histutils.hpp b/narf/include/histutils.hpp index ad4fc31..c5928d4 100644 --- a/narf/include/histutils.hpp +++ b/narf/include/histutils.hpp @@ -6,8 +6,10 @@ #include "traits.hpp" #include "utils.hpp" #include "atomic_adaptor.hpp" +#include "sparse_histogram.hpp" #include "tensorutils.hpp" #include "tensorevalutils.hpp" +#include "rdfutils.hpp" #include #include #include @@ -616,27 +618,54 @@ namespace narf { // Helper which facilitates conversion from value to quantile for a single variable // The underlying histogram holds a tensor with the bin edges for the quantiles in the last variable, // conditional on all the previous variables - template - class QuantileHelper : public HistHelper { + /// Look up the quantile bin for `val` in the sorted edge array [begin, end). + /// When Continuous is false, return the clamped integer bin index (matching + /// the previous behavior). When Continuous is true, return a CDF-style + /// double in [0, 1] obtained by linearly interpolating between adjacent + /// edges, with edges[i] mapping to i/(N-1) and values outside + /// [edges[0], edges[N-1]] clamped to 0 / 1 respectively. + template + auto quantile_lookup(It begin, It end, const T &val) { + const auto n = std::distance(begin, end); + auto const upper = std::upper_bound(begin, end, val); + auto const iquant = std::distance(begin, upper); + if constexpr (Continuous) { + auto const i = std::clamp(iquant - 1, 0, n - 2); + auto const lo = *(begin + i); + auto const hi = *(begin + i + 1); + double const frac = double(val - lo) / double(hi - lo); + double const res = (double(i) + frac) / double(n - 1); + return std::clamp(res, 0.0, 1.0); + } else { + return std::clamp(iquant, 0, n - 1); + } + } + + template + class QuantileHelperImpl : public HistHelper { using base_t = HistHelper; using hist_t = typename base_t::hist_t; using scalar_t = typename Storage::value_type::tensor_t::Scalar; static constexpr auto nquants = Storage::value_type::size; public: - QuantileHelper(hist_t &&resource) : base_t(std::forward(resource)) {} + QuantileHelperImpl(hist_t &&resource) : base_t(std::forward(resource)) {} - boost::histogram::axis::index_type operator()(const boost::histogram::axis::traits::value_type&... args, const scalar_t &last) { + auto operator()(const boost::histogram::axis::traits::value_type&... args, const scalar_t &last) const { auto const &hist = *base_t::resourceHist_; auto const &edges = narf::get_value(hist, args...).data(); - - // find the quantile bin corresponding to the last argument - auto const upper = std::upper_bound(edges.data(), edges.data()+nquants, last); - auto const iquant = std::distance(edges.data(), upper); - return std::clamp(iquant, 0, nquants-1); + return quantile_lookup(edges.data(), edges.data() + nquants, last); } }; + // MapWrapper alias so container arguments are automatically broadcast / + // zipped element-wise, while scalar arguments call through directly. + template + using QuantileHelper = MapWrapper>; + + template + using QuantileHelperContinuous = MapWrapper>; + // CTAD doesn't work reliably from cppyy so add factory function template QuantileHelper make_quantile_helper(boost::histogram::histogram, Storage> &&h) { @@ -644,25 +673,34 @@ namespace narf { return QuantileHelper(std::forward(h)); } + template + QuantileHelperContinuous make_quantile_helper_continuous(boost::histogram::histogram, Storage> &&h) { + using hist_t = boost::histogram::histogram, Storage>; + return QuantileHelperContinuous(std::forward(h)); + } + // simple version for static quantiles - template - class QuantileHelperStatic { + template + class QuantileHelperStaticImpl { public: using edge_t = std::array; - QuantileHelperStatic(const edge_t &edges) : edges_(edges) {} + QuantileHelperStaticImpl(const edge_t &edges) : edges_(edges) {} - boost::histogram::axis::index_type operator() (double val) { - // find the quantile bin corresponding to the last argument - auto const upper = std::upper_bound(edges_.begin(), edges_.end(), val); - auto const iquant = std::distance(edges_.begin(), upper); - return std::clamp(iquant, 0, N-1); + auto operator() (double val) const { + return quantile_lookup(edges_.begin(), edges_.end(), val); } private: const edge_t edges_; }; + template + using QuantileHelperStatic = MapWrapper>; + + template + using QuantileHelperStaticContinuous = MapWrapper>; + /// Computes the minimum-variance reweighting to approximate a shift /// or smearing in the underlying variables of a multidimensional histogram. /// @@ -670,10 +708,10 @@ namespace narf { /// summing over the contribution from each axis template - class HistShiftHelper { + class HistShiftHelperImpl { public: - HistShiftHelper(const Axes&... axes) : axes_(axes...) {} - HistShiftHelper(Axes&&... axes) : axes_(std::move(axes)...) {} + HistShiftHelperImpl(const Axes&... axes) : axes_(axes...) {} + HistShiftHelperImpl(Axes&&... axes) : axes_(std::move(axes)...) {} template auto operator()(const Args&... args) const { @@ -714,39 +752,6 @@ namespace narf { /// Core computation dispatched over axis indices. template auto compute(const Nominal& orig, - const Shifted& shifted, - const SmearShifted& smear_shifted, - const Weight& nominal_weight, - std::index_sequence idxs) const { - - constexpr bool is_container_any = (is_container> || ...); - - if constexpr(is_container_any) { - auto make_range_of_tuples = [](auto const&...xs){ return make_zip_view(make_view(xs)...); }; - - auto const orig_v = std::apply(make_range_of_tuples, orig); - auto const shifted_v = std::apply(make_range_of_tuples, shifted); - auto const smear_shifted_v = std::apply(make_range_of_tuples, smear_shifted); - - auto compute_elem_impl = [this, &nominal_weight, &idxs](auto &orig_elem, auto &shifted_elem, auto&smear_shifted_elem) { - return compute_impl(orig_elem, shifted_elem, smear_shifted_elem, nominal_weight, idxs); - }; - auto compute_elem = [&compute_elem_impl](auto const &elem_tuple) { return std::apply(compute_elem_impl, elem_tuple); }; - - auto const res_view = make_zip_view(orig_v, shifted_v, smear_shifted_v) - | std::views::transform(compute_elem); - - auto const res = range_to(res_view); - return res; - } - else { - return compute_impl(orig, shifted, smear_shifted, nominal_weight, idxs); - } - } - - /// Core computation dispatched over axis indices. - template - auto compute_impl(const Nominal& orig, const Shifted& shifted, const SmearShifted& smear_shifted, const Weight& nominal_weight, @@ -806,9 +811,11 @@ namespace narf { // will probably have to return u,d,s individually with more complicated logic // in the calling layer to do the matrix multiplication - // Underflow / overflow: no reliable bin geometry, return no correction. + // Underflow / overflow or degenerate bin geometry (e.g. infinite bin + // edge): no reliable bin geometry, return no correction. // (note that a is infinity in this case such that delta and v are also zero) - const bool flow = bin_idx < 0 || bin_idx >= ax.size(); + const bool degenerate = !std::isfinite(a) || !std::isfinite(x_c); + const bool flow = bin_idx < 0 || bin_idx >= ax.size() || degenerate; auto const u = flow ? 0.*x_orig : (x_orig - x_c)/a; auto const delta = (x_shifted - x_orig)/a; @@ -838,12 +845,18 @@ namespace narf { const std::tuple axes_; }; + /// Public helper: MapWrapper around HistShiftHelperImpl so container + /// arguments are automatically broadcast/zipped element-wise, while scalar + /// arguments are passed through directly. + template + using HistShiftHelper = MapWrapper>; + // factory function needed because CTAD doesn't work reliably from cppyy // also the trailing return type is needed because cppyy has issues with auto // return types template HistShiftHelper...> make_hist_shift_helper(Axes&&... axes) { - return HistShiftHelper(std::forward(axes)...); + return HistShiftHelper...>(std::forward(axes)...); } } diff --git a/narf/include/matrix_utils.hpp b/narf/include/matrix_utils.hpp new file mode 100644 index 0000000..8a74729 --- /dev/null +++ b/narf/include/matrix_utils.hpp @@ -0,0 +1,151 @@ +#pragma once + +#include +#include +#include +#include "concurrent_flat_map.hpp" + + +class SymMatrixAtomic { +public: + SymMatrixAtomic() = default; + SymMatrixAtomic(std::size_t n) : n_(n), data_(n*(n+1)/2) {} + + double fetch_add(std::size_t iidx, std::size_t jidx, double val) { + std::atomic& ref = data_[packed_index(iidx, jidx)]; + return ref.fetch_add(val); + } + + void fill_row(std::size_t row, double *rowData) { + const std::size_t offset = packed_index(row, row); + std::fill(rowData, rowData + row, 0.); + std::copy(data_.begin() + offset, data_.begin() + offset + n_ - row, rowData + row); + } + +private: + +/** + * Converts (row, col) indices of a symmetric matrix into a linearized index + * for packed storage of unique elements (upper triangle, row-major). + * + * Storage layout (0-indexed, n=4 example): + * (0,0)(0,1)(0,2)(0,3) | (1,1)(1,2)(1,3) | (2,2)(2,3) | (3,3) + * 0 1 2 3 4 5 6 7 8 9 + * + * Total elements stored: n*(n+1)/2 + * + * @param row Row index (0-based) + * @param col Column index (0-based) + * @return Linearized index into packed storage array + */ + inline std::size_t packed_index(std::size_t row, std::size_t col) { + // Normalize to upper triangle: ensure row <= col + if (row > col) { + std::size_t tmp = row; + row = col; + col = tmp; + } + + // Number of elements in rows 0..(row-1): row*n - row*(row-1)/2 + // Plus offset within current row: (col - row) + return row * n_ - row * (row - 1) / 2 + (col - row); + } + + std::size_t n_{}; + std::vector > data_; + +}; + +class SparseMatrixIndexValues { +public: + const std::vector &idxs0() const { return idxs0_; } + const std::vector &idxs1() const { return idxs1_; } + const std::vector &vals() const { return vals_; } + + void emplace_back(std::size_t idx0, std::size_t idx1, double val) { + idxs0_.emplace_back(idx0); + idxs1_.emplace_back(idx1); + vals_.emplace_back(val); + } + + void reserve(std::size_t i) { + idxs0_.reserve(i); + idxs1_.reserve(i); + vals_.reserve(i); + } + + std::size_t size() const { return vals_.size(); } + +private: + std::vector idxs0_; + std::vector idxs1_; + std::vector vals_; +}; + +class SparseMatrixAtomic { +public: + using map_type = narf::concurrent_flat_map>; + + SparseMatrixAtomic(std::size_t size0, std::size_t size1, + double fill_fraction = 0.025) + : size0_(size0), size1_(size1), + data_(std::max( + static_cast(fill_fraction * static_cast(size0 * size1)), + 16)) {} + + std::atomic &operator() (std::size_t idx0, std::size_t idx1) { + const std::size_t i = globalidx(idx0, idx1); + auto res = data_.emplace(i); + return *res.first; + } + + const std::atomic &operator() (std::size_t idx0, std::size_t idx1) const { + const std::size_t i = globalidx(idx0, idx1); + auto* p = data_.find(i); + return *p; + } + + void fetch_add(std::size_t idx0, std::size_t idx1, double val) { + if (val != 0.) { + auto &elemval = operator()(idx0, idx1); + elemval.fetch_add(val); + } + } + + SparseMatrixIndexValues index_values() const { + SparseMatrixIndexValues res; + res.reserve(data_.size()); + + data_.for_each([&](std::size_t key, const std::atomic& val) { + auto is = idxs(key); + res.emplace_back(is[0], is[1], val.load()); + }); + + return res; + } + + void clear() { data_.clear(); } + + void reserve(std::size_t /*i*/) { /* no-op: map grows on demand */ } + + std::size_t dense_size() const { return size0_*size1_; } + + map_type &data() { return data_; } + +private: + std::size_t globalidx(std::size_t idx0, std::size_t idx1) const { + return idx0*size0_ + idx1; + } + + std::array idxs(std::size_t globalidx) const { + const std::size_t idx0 = globalidx/size0_; + const std::size_t idx1 = globalidx % size0_; + + return std::array{idx0, idx1}; + } + + const std::size_t size0_; + const std::size_t size1_; + map_type data_; + +}; \ No newline at end of file diff --git a/narf/include/rdfutils.hpp b/narf/include/rdfutils.hpp index eaddbbb..bf642f1 100755 --- a/narf/include/rdfutils.hpp +++ b/narf/include/rdfutils.hpp @@ -1,5 +1,7 @@ #pragma once +#include "utils.hpp" + namespace narf { template @@ -8,15 +10,45 @@ namespace narf { private: Callable callable_; - using return_t = decltype(callable_(std::declval()...)); + using return_t = std::decay_t()...))>; public: DefineWrapper(const Callable &callable) : callable_(callable) {} DefineWrapper(Callable &&callable) : callable_(std::move(callable)) {} - return_t operator() (const Args&... args) const { + return_t operator() (const Args&... args) { return callable_(args...); } }; + + template + class MapWrapper { + + private: + Callable callable_; + public: + + MapWrapper(const Callable &callable) : callable_(callable) {} + MapWrapper(Callable &&callable) : callable_(std::move(callable)) {} + + template + requires (sizeof...(CArgs) != 1 || (!std::is_same_v, MapWrapper> && ...)) && + std::is_constructible_v + explicit MapWrapper(CArgs&&... cargs) : callable_(std::forward(cargs)...) {} + + template + auto operator() (const Args&... args) { + if constexpr ((is_container || ...)) { + auto apply_elem = [this](auto const &elem_tuple) { + return std::apply(callable_, elem_tuple); + }; + auto const res_view = make_zip_view(make_view(args)...) | std::views::transform(apply_elem); + return range_to(res_view); + } else { + return callable_(args...); + } + } + }; + } diff --git a/narf/include/sparse_histogram.hpp b/narf/include/sparse_histogram.hpp new file mode 100644 index 0000000..a2e5d20 --- /dev/null +++ b/narf/include/sparse_histogram.hpp @@ -0,0 +1,202 @@ +#ifndef NARF_SPARSE_HISTOGRAM_HPP +#define NARF_SPARSE_HISTOGRAM_HPP + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "atomic_adaptor.hpp" +#include "concurrent_flat_map.hpp" + +namespace narf { + +// A boost::histogram-compatible Storage backed by narf::concurrent_flat_map. +// +// Bins are addressed by their linearized index (the same scheme boost +// histogram itself uses for dense storages). Only bins that have been touched +// by a fill consume memory; the underlying lock-free map allows concurrent +// fills from many threads. +// +// Iteration via the standard begin()/end() interface walks every virtual bin +// (including never-touched bins, which materialize on access via the map's +// emplace path). For sparse traversals prefer iterating data() directly with +// for_each. +// +// Conversion of histograms using this storage to python hist.Hist objects is +// not supported in the current implementation. +template > +class concurrent_sparse_storage { +public: + using value_type = T; + using reference = T&; + using const_reference = const T&; + using map_type = concurrent_flat_map; + + static constexpr bool has_threading_support = true; + + concurrent_sparse_storage() = default; + explicit concurrent_sparse_storage(double fill_fraction) + : fill_fraction_(fill_fraction) {} + concurrent_sparse_storage(concurrent_sparse_storage&&) = default; + concurrent_sparse_storage& operator=(concurrent_sparse_storage&&) = default; + concurrent_sparse_storage(const concurrent_sparse_storage&) = delete; + concurrent_sparse_storage& operator=(const concurrent_sparse_storage&) = delete; + + // boost::histogram::histogram calls reset() with the total number of bins + // (including under/overflow) on construction and after axis growth. + // The map is sized to fill_fraction * n to avoid most early expansions + // when an estimate of occupancy is supplied by the caller. + void reset(std::size_t n) { + size_ = n; + const double cap_d = fill_fraction_ * static_cast(n); + std::size_t cap = cap_d > 1.0 ? static_cast(cap_d) : 1; + map_ = map_type{cap}; + } + + double fill_fraction() const noexcept { return fill_fraction_; } + + std::size_t size() const noexcept { return size_; } + + // Insert-on-access; safe for concurrent fills. + reference operator[](std::size_t i) { + return *map_.emplace(i).first; + } + + const_reference operator[](std::size_t i) const { + if (auto* p = map_.find(i)) return *p; + // Materialize a default-constructed cell so the const overload still + // returns a stable reference. This matches the dense_storage contract + // that "every bin index in [0, size()) is addressable". + return *const_cast(map_).emplace(i).first; + } + + // Required by boost::histogram::storage concept, but not used by our fill + // path. Two sparse storages compare equal iff they have the same logical + // size and identical populated entries. + bool operator==(const concurrent_sparse_storage& other) const { + if (size_ != other.size_) return false; + bool eq = true; + map_.for_each([&](std::size_t k, const T& v) { + if (!eq) return; + auto* p = const_cast(other.map_).find(k); + if (!p || !(*p == v)) eq = false; + }); + return eq; + } + + // Random-access iterator over the full virtual bin range. Dereferencing + // materializes the bin (via operator[]). boost::histogram's fill path + // requires random-access semantics so it can do `begin() + idx`. + class iterator { + public: + using iterator_category = std::random_access_iterator_tag; + using value_type = T; + using reference = T&; + using pointer = T*; + using difference_type = std::ptrdiff_t; + + iterator() = default; + iterator(concurrent_sparse_storage* s, std::size_t i) : s_(s), i_(i) {} + + reference operator*() const { return (*s_)[i_]; } + reference operator[](difference_type n) const { + return (*s_)[i_ + static_cast(n)]; + } + + iterator& operator++() { ++i_; return *this; } + iterator operator++(int) { auto t = *this; ++i_; return t; } + iterator& operator--() { --i_; return *this; } + iterator operator--(int) { auto t = *this; --i_; return t; } + + iterator& operator+=(difference_type n) { + i_ = static_cast(static_cast(i_) + n); + return *this; + } + iterator& operator-=(difference_type n) { return *this += -n; } + iterator operator+(difference_type n) const { auto t = *this; t += n; return t; } + iterator operator-(difference_type n) const { auto t = *this; t -= n; return t; } + friend iterator operator+(difference_type n, iterator it) { return it + n; } + difference_type operator-(const iterator& o) const { + return static_cast(i_) - static_cast(o.i_); + } + + bool operator==(const iterator& o) const { return i_ == o.i_; } + bool operator!=(const iterator& o) const { return i_ != o.i_; } + bool operator< (const iterator& o) const { return i_ < o.i_; } + bool operator<=(const iterator& o) const { return i_ <= o.i_; } + bool operator> (const iterator& o) const { return i_ > o.i_; } + bool operator>=(const iterator& o) const { return i_ >= o.i_; } + + private: + concurrent_sparse_storage* s_ = nullptr; + std::size_t i_ = 0; + }; + using const_iterator = iterator; + + iterator begin() { return iterator(this, 0); } + iterator end() { return iterator(this, size_); } + const_iterator begin() const { + return iterator(const_cast(this), 0); + } + const_iterator end() const { + return iterator(const_cast(this), size_); + } + + map_type& data() noexcept { return map_; } + const map_type& data() const noexcept { return map_; } + +private: + double fill_fraction_ = 1.0; + std::size_t size_ = 0; + map_type map_; +}; + +// Convenience factory: builds a boost::histogram::histogram with the +// concurrent sparse storage and the supplied axes. +template +boost::histogram::histogram...>, + concurrent_sparse_storage> +make_histogram_sparse(double fill_fraction, Axes&&... axes) { + return boost::histogram::make_histogram_with( + concurrent_sparse_storage{fill_fraction}, + std::forward(axes)...); +} + +// Helpers to inspect the underlying concurrent_flat_map of a sparse-storage +// boost histogram. Free functions because cppyy does not expose +// boost::histogram::histogram::storage_ directly. +template +typename concurrent_sparse_storage::map_type& +sparse_histogram_data( + boost::histogram::histogram>& h) { + return boost::histogram::unsafe_access::storage(h).data(); +} + +// Snapshot the populated bins of a sparse-storage boost histogram into a +// vector of (linearized_index, value) pairs. Convenience for python +// inspection where passing a python callable to for_each is awkward. +template +std::vector> sparse_histogram_snapshot( + boost::histogram::histogram>& h) { + std::vector> out; + auto& m = boost::histogram::unsafe_access::storage(h).data(); + out.reserve(m.size()); + m.for_each([&](std::size_t k, const T& v) { + if constexpr (requires { v.load(); }) { + out.emplace_back(k, static_cast(v.load())); + } else { + out.emplace_back(k, static_cast(v)); + } + }); + return out; +} + +} // namespace narf + +#endif // NARF_SPARSE_HISTOGRAM_HPP diff --git a/narf/include/tests.hpp b/narf/include/tests.hpp index 36b99d6..c88dc0d 100644 --- a/narf/include/tests.hpp +++ b/narf/include/tests.hpp @@ -1,5 +1,16 @@ #pragma once +#include "concurrent_flat_map.hpp" +#include "histutils.hpp" +#include "matrix_utils.hpp" +#include "rdfutils.hpp" + +#include +#include +#include +#include +#include + namespace narf { ROOT::VecOps::RVec testshift() { boost::histogram::axis::regular a(100, 0., 1.); @@ -35,18 +46,270 @@ namespace narf { void testshiftrw() { std::vector a{0, 1, 2}; std::vector b{3, 4, 5}; - + std::cout << a.front() << std::endl; std::cout << b.front() << std::endl; - + for (auto const &[ael, bel] : make_zip_view(a, b)) { ael = 0; bel = 1; } - + std::cout << a.front() << std::endl; std::cout << b.front() << std::endl; - - + + + } + + // Test SymMatrixAtomic: construction, fetch_add, fill_row, and index symmetry. + // Returns true if all checks pass. + bool testSymMatrixAtomic() { + const std::size_t n = 4; + SymMatrixAtomic mat(n); + + // Fill upper triangle: mat[i][j] = (i+1)*(j+1) for i <= j + for (std::size_t i = 0; i < n; ++i) { + for (std::size_t j = i; j < n; ++j) { + mat.fetch_add(i, j, double((i + 1) * (j + 1))); + } + } + + // Verify via fill_row: rowData[j] == (i+1)*(j+1) for j >= i, else 0 + std::vector rowData(n); + for (std::size_t i = 0; i < n; ++i) { + mat.fill_row(i, rowData.data()); + for (std::size_t j = 0; j < i; ++j) { + if (rowData[j] != 0.0) return false; + } + for (std::size_t j = i; j < n; ++j) { + if (rowData[j] != double((i + 1) * (j + 1))) return false; + } + } + + // Verify symmetry: fetch_add(i,j) and fetch_add(j,i) address the same element + SymMatrixAtomic mat2(n); + mat2.fetch_add(1, 3, 5.0); // upper triangle (1,3) + mat2.fetch_add(3, 1, 3.0); // lower triangle: should map to same element + mat2.fill_row(1, rowData.data()); + if (rowData[3] != 8.0) return false; + + return true; + } + + // Test SparseMatrixAtomic: single-threaded fetch_add, index_values round-trip, + // clear, and multi-threaded concurrent fetch_add. Returns true on success. + bool testSparseMatrixAtomic() { + // ---- single-threaded ---- + { + SparseMatrixAtomic mat(20, 20); + mat.fetch_add(1, 2, 3.0); + mat.fetch_add(1, 2, 4.0); + mat.fetch_add(5, 7, 1.5); + mat.fetch_add(0, 0, 2.0); + mat.fetch_add(0, 0, 0.0); // no-op + if (mat(1, 2).load() != 7.0) return false; + if (mat(5, 7).load() != 1.5) return false; + if (mat(0, 0).load() != 2.0) return false; + + auto iv = mat.index_values(); + if (iv.size() != 3) return false; + double sum = 0.0; + for (std::size_t k = 0; k < iv.size(); ++k) sum += iv.vals()[k]; + if (sum != 10.5) return false; + + mat.clear(); + if (mat.index_values().size() != 0) return false; + // reuse after clear + mat.fetch_add(3, 4, 9.0); + if (mat(3, 4).load() != 9.0) return false; + if (mat.index_values().size() != 1) return false; + } + + // ---- multi-threaded fetch_add: each (i,j) cell receives a known total ---- + { + const std::size_t N = 32; + SparseMatrixAtomic mat(N, N); + const unsigned T = 8; + const unsigned reps = 500; + std::vector threads; + threads.reserve(T); + for (unsigned t = 0; t < T; ++t) { + threads.emplace_back([&, t] { + for (unsigned r = 0; r < reps; ++r) { + for (std::size_t i = 0; i < N; ++i) { + // Use a sparse pattern: only ~half the columns + std::size_t j = (i * 3 + 1) % N; + mat.fetch_add(i, j, 1.0 + 0.01 * t); + } + } + }); + } + for (auto& th : threads) th.join(); + + double per_cell_expected = 0.0; + for (unsigned t = 0; t < T; ++t) per_cell_expected += reps * (1.0 + 0.01 * t); + + auto iv = mat.index_values(); + if (iv.size() != N) return false; + for (std::size_t k = 0; k < iv.size(); ++k) { + std::size_t i = iv.idxs0()[k]; + std::size_t j = iv.idxs1()[k]; + if (j != (i * 3 + 1) % N) return false; + if (std::abs(iv.vals()[k] - per_cell_expected) > 1e-9) return false; + } + } + + return true; + } + + // Test concurrent_flat_map: single-threaded correctness, expansion, and + // multi-threaded concurrent insert / find. Returns true on success. + bool testConcurrentFlatMap() { + // ---- single-threaded correctness, force several expansions ---- + { + concurrent_flat_map map(8); + const std::uint64_t N = 5000; + for (std::uint64_t i = 1; i <= N; ++i) { + auto [p, inserted] = map.emplace(i, i * 7u + 3u); + if (!inserted || !p || *p != i * 7u + 3u) return false; + } + // re-insert: must report not-inserted but return existing value + for (std::uint64_t i = 1; i <= N; ++i) { + auto [p, inserted] = map.emplace(i, std::uint64_t(0)); + if (inserted || !p || *p != i * 7u + 3u) return false; + } + // find every key + for (std::uint64_t i = 1; i <= N; ++i) { + auto* p = map.find(i); + if (!p || *p != i * 7u + 3u) return false; + } + // missing keys + if (map.find(0) != nullptr) return false; + if (map.find(N + 1) != nullptr) return false; + } + + // ---- pointer stability across expansion ---- + { + concurrent_flat_map map(4); + std::vector ptrs; + const std::uint64_t N = 1000; + ptrs.reserve(N); + for (std::uint64_t i = 1; i <= N; ++i) { + ptrs.push_back(map.emplace(i, i).first); + } + for (std::uint64_t i = 1; i <= N; ++i) { + if (ptrs[i - 1] != map.find(i)) return false; + if (*ptrs[i - 1] != i) return false; + } + } + + // ---- multi-threaded insert / find ---- + { + concurrent_flat_map map(16); + const unsigned T = 8; + const std::uint64_t per = 4000; + std::atomic dup_inserts{0}; + std::atomic bad{0}; + std::vector threads; + threads.reserve(T); + for (unsigned t = 0; t < T; ++t) { + threads.emplace_back([&, t] { + for (std::uint64_t i = 0; i < per; ++i) { + // Overlapping key ranges across threads exercise contention. + std::uint64_t key = (i % (per / 2)) + 1 + (t % 2) * (per / 2); + std::uint64_t val = key * 1315423911u; + auto [p, ins] = map.emplace(key, val); + if (!p || *p != val) bad.fetch_add(1); + (void)ins; + auto* f = map.find(key); + if (!f || *f != val) bad.fetch_add(1); + } + // Each thread also inserts a unique key block. + for (std::uint64_t i = 0; i < per; ++i) { + std::uint64_t key = 1000000ull + t * per + i; + auto [p, ins] = map.emplace(key, key ^ 0xdeadbeefu); + if (!ins || !p || *p != (key ^ 0xdeadbeefu)) { + dup_inserts.fetch_add(1); + } + } + }); + } + for (auto& th : threads) th.join(); + if (bad.load() != 0) return false; + if (dup_inserts.load() != 0) return false; + + // Verify all unique-block keys present and correct. + for (unsigned t = 0; t < T; ++t) { + for (std::uint64_t i = 0; i < per; ++i) { + std::uint64_t key = 1000000ull + t * per + i; + auto* p = map.find(key); + if (!p || *p != (key ^ 0xdeadbeefu)) return false; + } + } + } + + return true; + } + + // Test QuantileHelperStatic: edges {0.25, 0.5, 0.75} partition into 4 bins. + // Exercise both scalar and container (RVec) call paths via MapWrapper. + bool testQuantileHelperStatic() { + QuantileHelperStatic<4> helper(std::array{0.25, 0.5, 0.75, 1.0}); + + if (helper(0.1) != 0) return false; + if (helper(0.25) != 1) return false; + if (helper(0.4) != 1) return false; + if (helper(0.6) != 2) return false; + if (helper(0.9) != 3) return false; + + ROOT::VecOps::RVec vals{0.1, 0.4, 0.6, 0.9}; + auto res = helper(vals); + if (res.size() != 4) return false; + if (res[0] != 0 || res[1] != 1 || res[2] != 2 || res[3] != 3) return false; + + // Continuous mode: CDF in [0, 1], edges[i] -> i/(N-1). + QuantileHelperStaticContinuous<4> helper_c(std::array{0.25, 0.5, 0.75, 1.0}); + auto const eps = 1e-9; + if (std::abs(helper_c(0.25) - 0.0) > eps) return false; + if (std::abs(helper_c(0.5) - 1.0/3.0) > eps) return false; + if (std::abs(helper_c(1.0) - 1.0) > eps) return false; + // val below first edge clamps to 0 + if (std::abs(helper_c(0.0) - 0.0) > eps) return false; + // val above last edge clamps to 1 + if (std::abs(helper_c(2.0) - 1.0) > eps) return false; + // midpoint of first interval + if (std::abs(helper_c(0.375) - (1.0/6.0)) > eps) return false; + + // Container path through MapWrapper + auto res_c = helper_c(ROOT::VecOps::RVec{0.25, 0.5, 0.75, 1.0}); + if (res_c.size() != 4) return false; + if (std::abs(res_c[0] - 0.0) > eps) return false; + if (std::abs(res_c[1] - 1.0/3.0) > eps) return false; + if (std::abs(res_c[2] - 2.0/3.0) > eps) return false; + if (std::abs(res_c[3] - 1.0) > eps) return false; + + return true; + } + + // Test MapWrapper: constructs a wrapper over a simple callable and applies + // it over zipped input ranges. + bool testMapWrapper() { + auto add = [](int a, int b) { return a + b; }; + MapWrapper wrapper(add); + + // Scalar passthrough: no container args -> callable invoked directly. + auto add_scalar = [](int x, int y) { return x + y; }; + MapWrapper scalar_wrapper(add_scalar); + if (scalar_wrapper(2, 5) != 7) return false; + + std::vector a{1, 2, 3, 4}; + std::vector b{10, 20, 30, 40}; + auto res = wrapper(a, b); + if (res.size() != 4) return false; + if (res[0] != 11) return false; + if (res[1] != 22) return false; + if (res[2] != 33) return false; + if (res[3] != 44) return false; + return true; } } diff --git a/narf/include/utils.hpp b/narf/include/utils.hpp index e5eaf3b..fc99960 100755 --- a/narf/include/utils.hpp +++ b/narf/include/utils.hpp @@ -1,5 +1,7 @@ #pragma once +#include + namespace narf { template @@ -134,8 +136,8 @@ namespace narf { | std::views::transform([](auto const &it){ return *it; }); } - template