diff --git a/.vscode/settings.json b/.vscode/settings.json index 636e8a1..2bb6935 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -79,6 +79,8 @@ "cassert": "cpp", "csignal": "cpp", "regex": "cpp", - "*.ipp": "cpp" + "*.ipp": "cpp", + "barrier": "cpp", + "text_encoding": "cpp" }, } \ No newline at end of file diff --git a/ArtNode.h b/ArtNode.h index d7a19ba..c68c2e5 100644 --- a/ArtNode.h +++ b/ArtNode.h @@ -25,6 +25,8 @@ #include #include +#include // for concurrency + #include "Helper.h" namespace ART { @@ -51,6 +53,9 @@ struct ArtNode { // compressed path (prefix) uint8_t prefix[maxPrefixLength]; + // version for concurrency + std::atomic version{0}; + ArtNode(int8_t type) : prefixLength(0), count(0), type(type) {} }; diff --git a/CMakeLists.txt b/CMakeLists.txt index 3315bfd..cac1d6c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,7 +4,9 @@ project(ART VERSION 1.0 DESCRIPTION "DISC fork of Adaptive Radix Tree" LANGUAGES CXX) -set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS OFF) # Default to Release only if no build type is set (and not a multi-config generator) if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES) @@ -23,4 +25,6 @@ set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g") # Targets add_executable(main main.cpp) add_executable(insert_profiling insert_profiling.cpp) -add_executable(run run.cpp) +add_executable(run_bin run_bin.cpp) +add_executable(run_txt run_txt.cpp) +add_executable(run_cc concurrent/run_cc.cpp) diff --git a/concurrent/ConcurrentART.h b/concurrent/ConcurrentART.h new file mode 100644 index 0000000..6b9fcc3 --- /dev/null +++ b/concurrent/ConcurrentART.h @@ -0,0 +1,382 @@ +/* + * Concurrent Adaptive Radix Tree (ART) implementation with concurrency control. + * This implementation extends the base ART to support concurrent insertions + * and queries using version control mechanisms. The insert function is refactored + * according to the implementation in "The ART of Practical Synchronization" paper. + */ + +#pragma once + +#include "../ART.h" +#include "../ArtNode.h" +#include "VersionControl.h" +#include "VersionControl.cpp" +#include +#include + + +namespace ART { + +class ConcurrentART : public ART { + public: + ConcurrentART() : ART() {} + + void insert(uint8_t key[], uintptr_t value) { + ART::insert(key, value); + } + + void insertCC(uint8_t key[], uintptr_t value) { + while (true) { // Retry loop + try { + insert(this, root, &root, key, 0, value, maxPrefixLength, nullptr, 0); + return; // Success - exit retry loop + } catch (const RestartException&) { + // Conflict detected - loop will retry from beginning + continue; + } + } + } + + private: + + + void insert(ConcurrentART* tree, ArtNode* node, ArtNode** nodeRef, uint8_t key[], int depth, uintptr_t value, int maxKeyLength, ArtNode* parent, int parentVersion) { + // Insert the leaf value into the tree + + uint64_t version = readLockOrRestart(node); + + // Handle prefix of inner node + if (node->prefixLength) { + unsigned mismatchPos = + prefixMismatch(node, key, depth, maxKeyLength); + if (mismatchPos != node->prefixLength) { + // Prefix differs, create new node + + if (parent) { + upgradeToWriteLockOrRestart(parent, parentVersion); + } + upgradeToWriteLockOrRestart(node, version, parent); + + Node4* newNode = new Node4(); + newNode->prefixLength = mismatchPos; + memcpy(newNode->prefix, node->prefix, + min(mismatchPos, maxPrefixLength)); + // Break up prefix + if (node->prefixLength < maxPrefixLength) { + newNode->insertNode4(this, nodeRef, + node->prefix[mismatchPos], node); + node->prefixLength -= (mismatchPos + 1); + memmove(node->prefix, node->prefix + mismatchPos + 1, + min(node->prefixLength, maxPrefixLength)); + } else { + node->prefixLength -= (mismatchPos + 1); + uint8_t minKey[maxKeyLength]; + loadKey(getLeafValue(minimum(node)), minKey); + newNode->insertNode4(this, nodeRef, + minKey[depth + mismatchPos], node); + memmove(node->prefix, minKey + depth + mismatchPos + 1, + min(node->prefixLength, maxPrefixLength)); + } + newNode->insertNode4(this, nodeRef, key[depth + mismatchPos], + makeLeaf(value)); + *nodeRef = newNode; + writeUnlock(node); + if (parent) { writeUnlock(parent); } + + return; + } + depth += node->prefixLength; + } + + // Recurse + ArtNode** child = findChild(node, key[depth]); + + checkOrRestart(node, version); + + if (!*child) { + // Insert leaf into inner node + ArtNode* newNode = makeLeaf(value); + uint8_t keyByte = key[depth]; + + switch (node->type) { + case NodeType4: { + // Cast node to Node4 to access its members + Node4* node4 = static_cast(node); + // Insert leaf into inner node + if (node4->count < 4) { + + upgradeToWriteLockOrRestart(node, version); + if (parent) { readUnlockOrRestart(parent, parentVersion, node); } + + // Insert element + unsigned pos; + for (pos = 0; (pos < node4->count) && (node4->key[pos] < keyByte); pos++) + ; + // Shift keys and children to the right to make space for the new + // key/child. This preserves the sorted order of keys in the node. + memmove(node4->key + pos + 1, node4->key + pos, node4->count - pos); + memmove(node4->child + pos + 1, node4->child + pos, + (node4->count - pos) * sizeof(uintptr_t)); + node4->key[pos] = keyByte; + node4->child[pos] = newNode; // Use newNode, not child + node4->count++; + + writeUnlock(node); + + } else { + + if (parent) { + upgradeToWriteLockOrRestart(parent, parentVersion); + } + upgradeToWriteLockOrRestart(node, version, parent); + + // Grow to Node16 + Node16* newNode16 = new Node16(); + newNode16->count = 4; + copyPrefix(node4, newNode16); + for (unsigned i = 0; i < 4; i++) + newNode16->key[i] = flipSign(node4->key[i]); + memcpy(newNode16->child, node4->child, node4->count * sizeof(uintptr_t)); + + // Shouldn't be deleted, other threads might still be reading it + // delete node; + + // INSERT NODE 16 PART + + // Flip the sign bit of the key byte for correct ordering in signed + // comparisons + uint8_t keyByteFlipped = flipSign(keyByte); + + // SIMD: Compare keyByteFlipped with all keys in the node in parallel + // _mm_set1_epi8 sets all 16 bytes of an SSE register to keyByteFlipped + // _mm_loadu_si128 loads the node's keys into an SSE register + // _mm_cmplt_epi8 does a signed comparison of each byte + __m128i cmp = _mm_cmplt_epi8( + _mm_set1_epi8(keyByteFlipped), + _mm_loadu_si128(reinterpret_cast<__m128i*>(newNode16->key))); + + // _mm_movemask_epi8 creates a 16-bit mask from the comparison results + // Only consider the bits for the active keys (node16->count) + uint16_t bitfield = + _mm_movemask_epi8(cmp) & (0xFFFF >> (16 - newNode16->count)); + + // Find the position of the first set bit (i.e., where keyByteFlipped < + // key[i]) + unsigned pos = bitfield ? ctz(bitfield) : newNode16->count; + + // Shift keys and children to the right to make space for the new + // key/child. This preserves the sorted order of keys in the node. + memmove(newNode16->key + pos + 1, newNode16->key + pos, newNode16->count - pos); + memmove(newNode16->child + pos + 1, newNode16->child + pos, + (newNode16->count - pos) * sizeof(uintptr_t)); + newNode16->key[pos] = keyByteFlipped; + newNode16->child[pos] = newNode; + newNode16->count++; + *nodeRef = newNode16; + + writeUnlockObsolete(node); + if (parent) { writeUnlock(parent); } + + } + break; + } + case NodeType16: { + // Cast node to Node16 to access its members + Node16* node16 = static_cast(node); + + // Insert leaf into inner node + if (node16->count < 16) { + // Insert element + + upgradeToWriteLockOrRestart(node, version); + if (parent) { readUnlockOrRestart(parent, parentVersion, node); } + + // Flip the sign bit of the key byte for correct ordering in signed + // comparisons + uint8_t keyByteFlipped = flipSign(keyByte); + + // SIMD: Compare keyByteFlipped with all keys in the node in parallel + // _mm_set1_epi8 sets all 16 bytes of an SSE register to keyByteFlipped + // _mm_loadu_si128 loads the node's keys into an SSE register + // _mm_cmplt_epi8 does a signed comparison of each byte + __m128i cmp = _mm_cmplt_epi8( + _mm_set1_epi8(keyByteFlipped), + _mm_loadu_si128(reinterpret_cast<__m128i*>(node16->key))); + + // _mm_movemask_epi8 creates a 16-bit mask from the comparison results + // Only consider the bits for the active keys (node16->count) + uint16_t bitfield = + _mm_movemask_epi8(cmp) & (0xFFFF >> (16 - node16->count)); + + // Find the position of the first set bit (i.e., where keyByteFlipped < + // key[i]) + unsigned pos = bitfield ? ctz(bitfield) : node16->count; + + // Shift keys and children to the right to make space for the new + // key/child. This preserves the sorted order of keys in the node. + memmove(node16->key + pos + 1, node16->key + pos, node16->count - pos); + memmove(node16->child + pos + 1, node16->child + pos, + (node16->count - pos) * sizeof(uintptr_t)); + node16->key[pos] = keyByteFlipped; + node16->child[pos] = newNode; + node16->count++; + + writeUnlock(node); + + } else { + + if (parent) { + upgradeToWriteLockOrRestart(parent, parentVersion); + } + upgradeToWriteLockOrRestart(node, version, parent); + + // Grow to Node48 + Node48* newNode48 = new Node48(); + + memcpy(newNode48->child, node16->child, node16->count * sizeof(uintptr_t)); + for (unsigned i = 0; i < node16->count; i++) + newNode48->childIndex[flipSign(node16->key[i])] = i; + copyPrefix(node16, newNode48); + newNode48->count = node16->count; + + // Shouldn't be deleted, other threads might still be reading it + // delete node; + + // INSERT NODE 48 PART + + // Insert element + unsigned pos = newNode48->count; + if (newNode48->child[pos]) + for (pos = 0; newNode48->child[pos] != NULL; pos++) + ; + // No memmove needed here because Node48 uses a mapping (childIndex) and + // a dense array. + newNode48->child[pos] = newNode; + newNode48->childIndex[keyByte] = pos; + newNode48->count++; + *nodeRef = newNode48; + + writeUnlockObsolete(node); + if (parent) { writeUnlock(parent); } + } + break; + } + case NodeType48: { + // Cast node to Node48 to access its members + Node48* node48 = static_cast(node); + + // Insert leaf into inner node + if (node48->count < 48) { + + upgradeToWriteLockOrRestart(node, version); + if (parent) { readUnlockOrRestart(parent, parentVersion, node); } + + // Insert element + unsigned pos = node48->count; + if (node48->child[pos]) + for (pos = 0; node48->child[pos] != NULL; pos++) + ; + // No memmove needed here because Node48 uses a mapping (childIndex) and + // a dense array. + node48->child[pos] = newNode; + node48->childIndex[keyByte] = pos; + node48->count++; + + writeUnlock(node); + + } else { + // Grow to Node256 + + if (parent) { + upgradeToWriteLockOrRestart(parent, parentVersion); + } + upgradeToWriteLockOrRestart(node, version, parent); + + Node256* newNode256 = new Node256(); + for (unsigned i = 0; i < 256; i++) + if (node48->childIndex[i] != 48) + newNode256->child[i] = node48->child[node48->childIndex[i]]; + newNode256->count = node48->count; + copyPrefix(node48, newNode256); + + // tree->printTree(); + + // Shouldn't be deleted, other threads might still be reading it + // delete node; + + // INSERT NODE 256 PART + + // Insert leaf into inner node + // No memmove needed here because Node256 uses a direct mapping for all + // possible keys. + newNode256->count++; + newNode256->child[keyByte] = newNode; + *nodeRef = newNode256; + + writeUnlockObsolete(node); + if (parent) { writeUnlock(parent); } + } + break; + } + case NodeType256: { + // Cast node to Node256 to access its members + Node256* node256 = static_cast(node); + + upgradeToWriteLockOrRestart(node, version); + if (parent) { readUnlockOrRestart(parent, parentVersion, node); } + + // Insert leaf into inner node + // No memmove needed here because Node256 uses a direct mapping for all + // possible keys. + node256->count++; + node256->child[keyByte] = newNode; + + writeUnlock(node); + + break; + } + } + return; + } + + if (parent) { + readUnlockOrRestart(parent, parentVersion); + } + + if (isLeaf(*child)) { + + upgradeToWriteLockOrRestart(node, version); + + depth++; + // Replace leaf with Node4 and store both leaves in it + uint8_t existingKey[maxKeyLength]; + loadKey(getLeafValue(*child), existingKey); + unsigned newPrefixLength = 0; + while (existingKey[depth + newPrefixLength] == + key[depth + newPrefixLength]) + newPrefixLength++; + + Node4* newNode = new Node4(); + newNode->prefixLength = newPrefixLength; + memcpy(newNode->prefix, key + depth, + min(newPrefixLength, maxPrefixLength)); + + ArtNode* oldLeaf = *child; + + newNode->insertNode4(this, child, + existingKey[depth + newPrefixLength], oldLeaf); + newNode->insertNode4(this, child, key[depth + newPrefixLength], + makeLeaf(value)); + + *child = newNode; + writeUnlock(node); + + return; + } + + insert(tree, *child, child, key, depth + 1, value, maxKeyLength, node, version); + return; + } +}; + +} // namespace ART \ No newline at end of file diff --git a/concurrent/VersionControl.cpp b/concurrent/VersionControl.cpp new file mode 100644 index 0000000..20486ac --- /dev/null +++ b/concurrent/VersionControl.cpp @@ -0,0 +1,90 @@ +/* + * Implementation file for concurrency version control functions. Taken directly from + * "The ART of Practical Synchronization" paper. + */ + +#include "VersionControl.h" + +namespace ART { + +uint64_t readLockOrRestart(ArtNode* node) { + uint64_t version = awaitNodeUnlocked(node); + if (isObsolete(version)) { + throw RestartException(); + } + return version; +} + +void checkOrRestart(ArtNode* node, uint64_t version) { + readUnlockOrRestart(node, version); +} + +void readUnlockOrRestart(ArtNode* node, uint64_t version) { + if (version != node->version.load()) { + throw RestartException(); + } +} + +void readUnlockOrRestart(ArtNode* node, uint64_t version, ArtNode* lockedNode) { + if (version != node->version.load()) { + writeUnlock(lockedNode); + throw RestartException(); + } +} + +void upgradeToWriteLockOrRestart(ArtNode* node, uint64_t version) { + uint64_t expected = version; + if (!node->version.compare_exchange_strong(expected, setLockedBit(version))) { + throw RestartException(); + } +} + +void upgradeToWriteLockOrRestart(ArtNode* node, uint64_t version, ArtNode* lockedNode) { + uint64_t expected = version; + if (!node->version.compare_exchange_strong(expected, setLockedBit(version))) { + if (lockedNode) { writeUnlock(lockedNode); } + throw RestartException(); + } +} + +void writeLockOrRestart(ArtNode* node) { + uint64_t version; + do { + version = readLockOrRestart(node); + try { + upgradeToWriteLockOrRestart(node, version); + break; // Successfully acquired write lock + } catch (const RestartException&) { + // Continue loop to retry + } + } while (true); +} + +void writeUnlock(ArtNode* node) { + // reset locked bit and overflow into version + node->version.fetch_add(2); +} + +void writeUnlockObsolete(ArtNode* node) { + // set obsolete, reset locked, overflow into version + node->version.fetch_add(3); +} + +uint64_t awaitNodeUnlocked(ArtNode* node) { + uint64_t version = node->version.load(); + while ((version & 2) == 2) { // spinlock while locked + PAUSE(); + version = node->version.load(); + } + return version; +} + +uint64_t setLockedBit(uint64_t version) { + return version + 2; +} + +bool isObsolete(uint64_t version) { + return (version & 1) == 1; +} + +} // namespace ART \ No newline at end of file diff --git a/concurrent/VersionControl.h b/concurrent/VersionControl.h new file mode 100644 index 0000000..e9c6da8 --- /dev/null +++ b/concurrent/VersionControl.h @@ -0,0 +1,46 @@ +/* + * Header file for concurrency version control functions. Taken directly from + * "The ART of Practical Synchronization" paper. + */ + +#pragma once + +#include "../ArtNode.h" +#include +#include +#include + +#ifdef _MSC_VER +#include +#define PAUSE() _mm_pause() +#else +#include +#define PAUSE() _mm_pause() +#endif + +namespace ART { + +class RestartException : public std::exception { +public: + const char* what() const noexcept override { + return "Operation needs to restart"; + } +}; + +// Version control function declarations - matching your implementation +uint64_t readLockOrRestart(ArtNode* node); +void checkOrRestart(ArtNode* node, uint64_t version); +void readUnlockOrRestart(ArtNode* node, uint64_t version); +void readUnlockOrRestart(ArtNode* node, uint64_t version, ArtNode* lockedNode); +void upgradeToWriteLockOrRestart(ArtNode* node, uint64_t version); +void upgradeToWriteLockOrRestart(ArtNode* node, uint64_t version, ArtNode* lockedNode); +void writeLockOrRestart(ArtNode* node); +void writeUnlock(ArtNode* node); +void writeUnlockObsolete(ArtNode* node); + +// Helper functions +uint64_t awaitNodeUnlocked(ArtNode* node); +uint64_t setLockedBit(uint64_t version); +bool isObsolete(uint64_t version); + +} // namespace ART \ No newline at end of file diff --git a/concurrent/run_cc.cpp b/concurrent/run_cc.cpp new file mode 100644 index 0000000..a8277b3 --- /dev/null +++ b/concurrent/run_cc.cpp @@ -0,0 +1,429 @@ +/* + * Concurrent ART benchmark with concurrency control (CC) insertions and queries. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../ART.h" +#include "../ArtNode.h" +#include "../Chain.h" +#include "../Helper.h" +#include "ConcurrentART.h" + +using namespace std; + +struct Config { + bool verbose = false; + int N = 500000000; + string input_file; + int num_threads = 1; + int query_threads = 1; + string results_csv = "results.csv"; +}; + +template +std::vector read_bin(const char* filename) { + std::ifstream inputFile(filename, std::ios::binary); + inputFile.seekg(0, std::ios::end); + const std::streampos fileSize = inputFile.tellg(); + inputFile.seekg(0, std::ios::beg); + std::vector data(fileSize / sizeof(key_type)); + inputFile.read(reinterpret_cast(data.data()), fileSize); + return data; +} + +namespace utils { +namespace executor { + +template +class Workload { +private: + Tree& tree; + const Config& config; + +public: + Workload(Tree& t, const Config& conf) : tree(t), config(conf) {} + + struct WorkerStats { + long long insertion_time = 0; + long long query_time = 0; + size_t keys_processed = 0; + size_t queries_processed = 0; + }; + + // Insert operation for a single thread + void insert_worker(const std::vector& keys, + size_t start_idx, size_t end_idx, + std::barrier<>& sync_point, + WorkerStats& stats) { + + // Wait for all threads to be ready + sync_point.arrive_and_wait(); + + auto start_time = chrono::high_resolution_clock::now(); + + for (size_t i = start_idx; i < end_idx; i++) { + uint8_t key[4]; + ART::loadKey(keys[i], key); + + auto op_start = chrono::high_resolution_clock::now(); + tree.insertCC(key, keys[i]); + auto op_stop = chrono::high_resolution_clock::now(); + + auto duration = chrono::duration_cast(op_stop - op_start); + stats.insertion_time += duration.count(); + stats.keys_processed++; + } + + auto end_time = chrono::high_resolution_clock::now(); + if (config.verbose) { + auto total_time = chrono::duration_cast(end_time - start_time); + cout << "Thread processed " << stats.keys_processed + << " insertions in " << total_time.count() << " ns" << endl; + } + } + + // Query operation for a single thread + void query_worker(const std::vector& keys, + size_t start_idx, size_t end_idx, + std::barrier<>& sync_point, + WorkerStats& stats) { + + // Wait for all threads to be ready + sync_point.arrive_and_wait(); + + auto start_time = chrono::high_resolution_clock::now(); + + for (size_t i = start_idx; i < end_idx; i++) { + uint8_t key[4]; + ART::loadKey(keys[i], key); + + auto op_start = chrono::high_resolution_clock::now(); + ART::ArtNode* leaf = tree.lookup(key); + auto op_stop = chrono::high_resolution_clock::now(); + + auto duration = chrono::duration_cast(op_stop - op_start); + stats.query_time += duration.count(); + stats.queries_processed++; + + if (!leaf) { + cerr << "Thread query " << i << " failed: key " << keys[i] + << " (index " << i << ") returned NULL" << endl; + abort(); + } + if (!ART::isLeaf(leaf)) { + cerr << "Thread query " << i << " failed: key " << keys[i] + << " (index " << i << ") returned non-leaf node" << endl; + abort(); + } + uint64_t found_value = ART::getLeafValue(leaf); + if (found_value != keys[i]) { + cerr << "Thread query " << i << " failed: key " << keys[i] + << " (index " << i << ") expected " << keys[i] + << " but got " << found_value << endl; + abort(); + } + } + + auto end_time = chrono::high_resolution_clock::now(); + if (config.verbose) { + auto total_time = chrono::duration_cast(end_time - start_time); + cout << "Thread processed " << stats.queries_processed + << " queries in " << total_time.count() << " ns" << endl; + } + } + + // Run insertion workload + pair run_insertions(const std::vector& keys) { + WorkerStats total_stats; + + // HYBRID MODE: Always insert first 2 keys sequentially + auto seq_start = chrono::high_resolution_clock::now(); + for (size_t i = 0; i < 2; i++) { + uint8_t key[4]; + ART::loadKey(keys[i], key); + + auto op_start = chrono::high_resolution_clock::now(); + tree.insert(key, keys[i]); // we should use ART's insert here + auto op_stop = chrono::high_resolution_clock::now(); + + auto duration = chrono::duration_cast(op_stop - op_start); + total_stats.insertion_time += duration.count(); + total_stats.keys_processed++; + } + auto seq_stop = chrono::high_resolution_clock::now(); + + if (config.verbose) { + auto seq_time = chrono::duration_cast(seq_stop - seq_start); + cout << "Sequential phase: " << seq_time.count() << " ns" << endl; + } + + // Enable concurrent mode and continue with remaining keys + if (config.N > 2) { + + if (config.num_threads == 1) { + // Single-threaded for remaining keys + for (size_t i = 2; i < config.N; i++) { + uint8_t key[4]; + ART::loadKey(keys[i], key); + + auto op_start = chrono::high_resolution_clock::now(); + tree.insertCC(key, keys[i]); + auto op_stop = chrono::high_resolution_clock::now(); + + auto duration = chrono::duration_cast(op_stop - op_start); + total_stats.insertion_time += duration.count(); + total_stats.keys_processed++; + } + cout << "Insert has ended" << endl; + + } else { + // Multi-threaded for remaining keys - round-robin assignment + vector threads; + vector worker_stats(config.num_threads); + barrier sync_point(config.num_threads); + + // Create per-thread key lists (round-robin distribution) + vector> thread_keys(config.num_threads); + for (size_t i = 2; i < config.N; i++) { + int thread_id = (i - 2) % config.num_threads; + thread_keys[thread_id].push_back(i); + } + + auto concurrent_start = chrono::high_resolution_clock::now(); + + for (int t = 0; t < config.num_threads; t++) { + threads.emplace_back([this, &keys, &thread_keys, t, &sync_point, &worker_stats]() { + sync_point.arrive_and_wait(); + + auto start_time = chrono::high_resolution_clock::now(); + + for (size_t idx : thread_keys[t]) { + uint8_t key[4]; + ART::loadKey(keys[idx], key); + + auto op_start = chrono::high_resolution_clock::now(); + tree.insertCC(key, keys[idx]); + auto op_stop = chrono::high_resolution_clock::now(); + + auto duration = chrono::duration_cast(op_stop - op_start); + worker_stats[t].insertion_time += duration.count(); + worker_stats[t].keys_processed++; + } + + auto end_time = chrono::high_resolution_clock::now(); + if (config.verbose) { + auto total_time = chrono::duration_cast(end_time - start_time); + cout << "Thread " << t << " processed " << worker_stats[t].keys_processed + << " insertions in " << total_time.count() << " ns" << endl; + } + }); + } + + for (auto& t : threads) { + t.join(); + } + + auto concurrent_stop = chrono::high_resolution_clock::now(); + auto concurrent_wall_time = chrono::duration_cast(concurrent_stop - concurrent_start); + cout << "Insert has ended" << endl; + + + // Aggregate worker stats + for (const auto& stats : worker_stats) { + total_stats.keys_processed += stats.keys_processed; + } + + total_stats.insertion_time += concurrent_wall_time.count(); + + if (config.verbose) { + cout << "Inserted " << total_stats.keys_processed << " keys" << endl; + cout << "Concurrent phase wall time: " << concurrent_wall_time.count() << " ns" << endl; + + long long total_cpu_time = 0; + for (const auto& stats : worker_stats) { + total_cpu_time += stats.insertion_time; + } + cout << "Concurrent phase CPU time: " << total_cpu_time << " ns" << endl; + } + } + } + + return {total_stats.insertion_time, total_stats}; + } + + // Run query workload + pair run_queries(const std::vector& keys) { + WorkerStats total_stats; + size_t num_queries = config.N / 100; + + uint64_t minval = 0; + uint64_t maxval = config.N-1; + + if (config.query_threads == 1) { + // Single-threaded queries + auto start_time = chrono::high_resolution_clock::now(); + + for (size_t i = 0; i < num_queries; i++) { + int random = rand() % (maxval - minval + 1) + minval; + uint8_t key[4]; + ART::loadKey(keys[random], key); + + auto op_start = chrono::high_resolution_clock::now(); + ART::ArtNode* leaf = tree.lookup(key); + auto op_stop = chrono::high_resolution_clock::now(); + + auto duration = chrono::duration_cast(op_stop - op_start); + total_stats.query_time += duration.count(); + total_stats.queries_processed++; + + if (!leaf) { + cerr << "Query " << random << " failed: key " << keys[random] + << " (index " << random << ") returned NULL" << endl; + abort(); + } + if (!ART::isLeaf(leaf)) { + cerr << "Query " << random << " failed: key " << keys[random] + << " (index " << random << ") returned non-leaf node" << endl; + abort(); + } + uint64_t found_value = ART::getLeafValue(leaf); + if (found_value != keys[random]) { + cerr << "Query " << random << " failed: key " << keys[random] + << " (index " << random << ") expected value " << keys[random] + << " but got " << found_value << endl; + abort(); + } + } + + auto end_time = chrono::high_resolution_clock::now(); + auto wall_time = chrono::duration_cast(end_time - start_time); + + return {wall_time.count(), total_stats}; + } else { + // Multi-threaded queries + vector threads; + vector worker_stats(config.query_threads); + barrier sync_point(config.query_threads); + + size_t queries_per_thread = num_queries / config.query_threads; + + auto concurrent_start = chrono::high_resolution_clock::now(); + + for (int t = 0; t < config.query_threads; t++) { + size_t start_idx = t * queries_per_thread; + size_t end_idx = (t == config.query_threads - 1) ? num_queries : (t + 1) * queries_per_thread; + + threads.emplace_back(&Workload::query_worker, this, + ref(keys), + start_idx, end_idx, + ref(sync_point), ref(worker_stats[t])); + } + + for (auto& t : threads) { + t.join(); + } + + auto concurrent_stop = chrono::high_resolution_clock::now(); + auto wall_time = chrono::duration_cast(concurrent_stop - concurrent_start); + + // Aggregate stats + for (const auto& stats : worker_stats) { + total_stats.queries_processed += stats.queries_processed; + } + + if (config.verbose) { + long long total_cpu_time = 0; + for (const auto& stats : worker_stats) { + total_cpu_time += stats.query_time; + } + cout << "Query wall time: " << wall_time.count() << " ns" << endl; + cout << "Query CPU time: " << total_cpu_time << " ns" << endl; + } + + return {wall_time.count(), total_stats}; + } + } + + void run_all(const std::vector& keys) { + if (config.verbose) { + cout << "Running workload with " << config.num_threads + << " insertion threads, " << config.query_threads << " query threads" << endl; + } + + // Run insertions + auto [insertion_time, insert_stats] = run_insertions(keys); + + // Run queries + auto [query_time, query_stats] = run_queries(keys); + + if (config.verbose) { + cout << "Total insertion time: " << insertion_time << " ns" << endl; + cout << "Total query time: " << query_time << " ns" << endl; + + double insertion_throughput = (double)insert_stats.keys_processed / (insertion_time / 1e9); + double query_throughput = (double)query_stats.queries_processed / (query_time / 1e9); + + cout << "Insertion throughput: " << insertion_throughput << " ops/sec" << endl; + cout << "Query throughput: " << query_throughput << " ops/sec" << endl; + } + + // Output CSV format + cout << config.num_threads << "," << config.query_threads << "," + << insertion_time << "," << query_time << "," + << insert_stats.keys_processed << "," << query_stats.queries_processed << endl; + } +}; + +} // namespace executor +} // namespace utils + +int main(int argc, char** argv) { + Config config; + + // Parse arguments + for (int i = 1; i < argc;) { + if (string(argv[i]) == "-v") { + config.verbose = true; + i++; + } else if (string(argv[i]) == "-N") { + config.N = atoi(argv[i + 1]); + i += 2; + } else if (string(argv[i]) == "-f") { + config.input_file = argv[i + 1]; + i += 2; + } else if (string(argv[i]) == "-it") { + config.num_threads = atoi(argv[i + 1]); + i += 2; + } else if (string(argv[i]) == "-qt") { + config.query_threads = atoi(argv[i + 1]); + i += 2; + } else { + i++; + } + } + + // Load data + auto keys = read_bin(config.input_file.c_str()); + + if (config.verbose) { + cout << "Loaded " << keys.size() << " keys" << endl; + cout << "Configuration:" << endl; + cout << " Insertion threads: " << config.num_threads << endl; + cout << " Query threads: " << config.query_threads << endl; + } + + ART::ConcurrentART tree; + utils::executor::Workload workload(tree, config); + workload.run_all(keys); + + return 0; +} \ No newline at end of file diff --git a/run.cpp b/run_bin.cpp similarity index 99% rename from run.cpp rename to run_bin.cpp index 2f73b08..7957dd0 100644 --- a/run.cpp +++ b/run_bin.cpp @@ -33,11 +33,6 @@ int main(int argc, char** argv) { string input_file; // required argument string tree_type = "ART"; // default tree type - - // Query 1% of entries - uint64_t minval = 0; - uint64_t maxval = N-1; - // Parse arguments; make sure to increment i by 2 if you consume an argument for (int i = 1; i < argc;) { if (string(argv[i]) == "-v") { @@ -57,6 +52,10 @@ int main(int argc, char** argv) { } } + // Used during querying + uint64_t minval = 0; + uint64_t maxval = N - 1; + // read data auto keys = read_bin(input_file.c_str()); diff --git a/run_txt.cpp b/run_txt.cpp new file mode 100644 index 0000000..a3e5d9d --- /dev/null +++ b/run_txt.cpp @@ -0,0 +1,338 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "ART.h" +#include "ArtNode.h" +#include "Chain.h" +#include "Helper.h" +#include "trees/QuART_lil.h" +#include "trees/QuART_tail.h" +#include "trees/QuART_stail.h" +#include "trees/QuART_lil_can.h" +#include "trees/QuART_stail_reset.h" + +using namespace std; + +template +std::vector read_txt(const char* filename) { + std::ifstream inputFile(filename); + if (!inputFile) { + std::cerr << "Failed to open file: " << filename << std::endl; + exit(1); + } + + std::vector data; + std::string line; + + while (std::getline(inputFile, line)) { + // Skip empty lines + if (line.empty()) continue; + + // Parse the number from the line + std::stringstream ss(line); + key_type key; + ss >> key; + + data.push_back(key); + } + + inputFile.close(); + return data; +} + +int main(int argc, char** argv) { + bool verbose = false; // optional argument + int N = 500000000; // optional argument + string input_file; // required argument + string tree_type = "ART"; // default tree type + + // Parse arguments; make sure to increment i by 2 if you consume an argument + for (int i = 1; i < argc;) { + if (string(argv[i]) == "-v") { + verbose = true; + i++; + } else if (string(argv[i]) == "-N") { + N = atoi(argv[i + 1]); + i += 2; + } else if (string(argv[i]) == "-f") { + input_file = argv[i + 1]; + i += 2; + } else if (string(argv[i]) == "-t") { + tree_type = argv[i + 1]; + i += 2; + } else { + i++; + } + } + + // Used during querying + uint64_t minval = 0; + uint64_t maxval = N - 1; + + // read data + auto keys = read_txt(input_file.c_str()); + + if (tree_type == "ART") { + ART::ART* tree = new ART::ART(); + long long insertion_time = 0; + for (uint64_t i = 0; i < N; i++) { + uint8_t key[4]; + ART::loadKey(keys[i], key); + auto start = chrono::high_resolution_clock::now(); + tree->insert(key, keys[i]); + auto stop = chrono::high_resolution_clock::now(); + auto duration = + chrono::duration_cast(stop - start); + insertion_time += duration.count(); + } + + if (verbose) { + cout << "Tree type: " << tree_type << endl; + cout << "Insertion time: " << insertion_time << " ns" << endl; + } + + srand(time(0)); + + long long query_time = 0; + for (uint64_t i = 0; i < (N / 100); i++) { + int random = rand() % (maxval - minval + 1) + minval; + uint8_t key[4]; + ART::loadKey(keys[random], key); + auto start = chrono::high_resolution_clock::now(); + ART::ArtNode* leaf = tree->lookup(key); + auto stop = chrono::high_resolution_clock::now(); + auto duration = + chrono::duration_cast(stop - start); + query_time += duration.count(); + assert(ART::isLeaf(leaf) && + ART::getLeafValue(leaf) == keys[random]); + } + + if (verbose) { + cout << "Query time: " << query_time << " ns" << endl; + } + + // Output the times in csv format, including tree type + cout << insertion_time << "," << query_time << endl; + } else if (tree_type == "QuART_tail") { + ART::QuART_tail* tree = new ART::QuART_tail(); + long long insertion_time = 0; + for (uint64_t i = 0; i < N; i++) { + uint8_t key[4]; + ART::loadKey(keys[i], key); + auto start = chrono::high_resolution_clock::now(); + tree->insert(key, keys[i]); + auto stop = chrono::high_resolution_clock::now(); + auto duration = + chrono::duration_cast(stop - start); + insertion_time += duration.count(); + } + + if (verbose) { + cout << "Tree type: " << tree_type << endl; + cout << "Insertion time: " << insertion_time << " ns" << endl; + } + + srand(time(0)); + + long long query_time = 0; + for (uint64_t i = 0; i < (N / 100); i++) { + int random = rand() % (maxval - minval + 1) + minval; + uint8_t key[4]; + ART::loadKey(keys[random], key); + auto start = chrono::high_resolution_clock::now(); + ART::ArtNode* leaf = tree->lookup(key); + auto stop = chrono::high_resolution_clock::now(); + auto duration = + chrono::duration_cast(stop - start); + query_time += duration.count(); + assert(ART::isLeaf(leaf) && + ART::getLeafValue(leaf) == keys[random]); + } + + if (verbose) { + cout << "Query time: " << query_time << " ns" << endl; + } + + // Output the times in csv format, including tree type + cout << insertion_time << "," << query_time << endl; + } else if (tree_type == "QuART_lil") { + ART::QuART_lil* tree = new ART::QuART_lil(); + long long insertion_time = 0; + for (uint64_t i = 0; i < N; i++) { + uint8_t key[4]; + ART::loadKey(keys[i], key); + auto start = chrono::high_resolution_clock::now(); + tree->insert(key, keys[i]); + auto stop = chrono::high_resolution_clock::now(); + auto duration = + chrono::duration_cast(stop - start); + insertion_time += duration.count(); + } + + if (verbose) { + cout << "Tree type: " << tree_type << endl; + cout << "Insertion time: " << insertion_time << " ns" << endl; + } + + srand(time(0)); + + long long query_time = 0; + for (uint64_t i = 0; i < (N / 100); i++) { + int random = rand() % (maxval - minval + 1) + minval; + uint8_t key[4]; + ART::loadKey(keys[random], key); + auto start = chrono::high_resolution_clock::now(); + ART::ArtNode* leaf = tree->lookup(key); + auto stop = chrono::high_resolution_clock::now(); + auto duration = + chrono::duration_cast(stop - start); + query_time += duration.count(); + assert(ART::isLeaf(leaf) && + ART::getLeafValue(leaf) == keys[random]); + } + + if (verbose) { + cout << "Query time: " << query_time << " ns" << endl; + } + + // Output the times in csv format, including tree type + cout << insertion_time << "," << query_time << endl; + } else if (tree_type == "QuART_stail") { + ART::QuART_stail* tree = new ART::QuART_stail(); + long long insertion_time = 0; + for (uint64_t i = 0; i < N; i++) { + uint8_t key[4]; + ART::loadKey(keys[i], key); + auto start = chrono::high_resolution_clock::now(); + tree->insert(key, keys[i]); + auto stop = chrono::high_resolution_clock::now(); + auto duration = + chrono::duration_cast(stop - start); + insertion_time += duration.count(); + } + + if (verbose) { + cout << "Tree type: " << tree_type << endl; + cout << "Insertion time: " << insertion_time << " ns" << endl; + } + + srand(time(0)); + + long long query_time = 0; + for (uint64_t i = 0; i < (N / 100); i++) { + int random = rand() % (maxval - minval + 1) + minval; + uint8_t key[4]; + ART::loadKey(keys[random], key); + auto start = chrono::high_resolution_clock::now(); + ART::ArtNode* leaf = tree->lookup(key); + auto stop = chrono::high_resolution_clock::now(); + auto duration = + chrono::duration_cast(stop - start); + query_time += duration.count(); + assert(ART::isLeaf(leaf) && + ART::getLeafValue(leaf) == keys[random]); + } + + if (verbose) { + cout << "Query time: " << query_time << " ns" << endl; + } + + // Output the times in csv format, including tree type + cout << insertion_time << "," << query_time << endl; + } else if (tree_type == "QuART_lil_can") { + ART::QuART_lil_can* tree = new ART::QuART_lil_can(); + long long insertion_time = 0; + for (uint64_t i = 0; i < N; i++) { + uint8_t key[4]; + ART::loadKey(keys[i], key); + auto start = chrono::high_resolution_clock::now(); + tree->insert(key, keys[i]); + auto stop = chrono::high_resolution_clock::now(); + auto duration = + chrono::duration_cast(stop - start); + insertion_time += duration.count(); + } + + if (verbose) { + cout << "Tree type: " << tree_type << endl; + cout << "Insertion time: " << insertion_time << " ns" << endl; + } + + srand(time(0)); + + long long query_time = 0; + for (uint64_t i = 0; i < (N / 100); i++) { + int random = rand() % (maxval - minval + 1) + minval; + uint8_t key[4]; + ART::loadKey(keys[random], key); + auto start = chrono::high_resolution_clock::now(); + ART::ArtNode* leaf = tree->lookup(key); + auto stop = chrono::high_resolution_clock::now(); + auto duration = + chrono::duration_cast(stop - start); + query_time += duration.count(); + assert(ART::isLeaf(leaf) && + ART::getLeafValue(leaf) == keys[random]); + } + + if (verbose) { + cout << "Query time: " << query_time << " ns" << endl; + } + + // Output the times in csv format, including tree type + cout << insertion_time << "," << query_time << endl; + } else if (tree_type == "QuART_stail_reset") { + ART::QuART_stail_reset* tree = new ART::QuART_stail_reset(); + long long insertion_time = 0; + for (uint64_t i = 0; i < N; i++) { + uint8_t key[4]; + ART::loadKey(keys[i], key); + auto start = chrono::high_resolution_clock::now(); + tree->insert(key, keys[i]); + auto stop = chrono::high_resolution_clock::now(); + auto duration = + chrono::duration_cast(stop - start); + insertion_time += duration.count(); + } + + if (verbose) { + cout << "Tree type: " << tree_type << endl; + cout << "Insertion time: " << insertion_time << " ns" << endl; + } + + srand(time(0)); + + long long query_time = 0; + for (uint64_t i = 0; i < (N / 100); i++) { + int random = rand() % (maxval - minval + 1) + minval; + uint8_t key[4]; + ART::loadKey(keys[random], key); + auto start = chrono::high_resolution_clock::now(); + ART::ArtNode* leaf = tree->lookup(key); + auto stop = chrono::high_resolution_clock::now(); + auto duration = + chrono::duration_cast(stop - start); + query_time += duration.count(); + assert(ART::isLeaf(leaf) && + ART::getLeafValue(leaf) == keys[random]); + } + + if (verbose) { + cout << "Query time: " << query_time << " ns" << endl; + } + + // Output the times in csv format, including tree type + cout << insertion_time << "," << query_time << endl; + } + else { + cerr << "Unknown tree type: " << tree_type << endl; + return 1; + } + return 0; +}