From 2bff5283879b8d714e344a1253470ef435ad41c6 Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 4 Mar 2026 14:03:08 +0000 Subject: [PATCH 1/8] Split declaration and implementation of CapioFile class Tests --- .github/workflows/ci-tests.yaml | 1 + capio/common/logger.hpp | 1 + capio/server/include/handlers/close.hpp | 4 +- capio/server/include/handlers/exig.hpp | 12 +- capio/server/include/handlers/getdents.hpp | 7 +- capio/server/include/handlers/open.hpp | 4 +- capio/server/include/handlers/read.hpp | 15 +- capio/server/include/handlers/stat.hpp | 11 +- capio/server/include/handlers/write.hpp | 11 +- capio/server/include/remote/backend.hpp | 4 +- capio/server/include/remote/handlers/read.hpp | 19 +- capio/server/include/remote/handlers/stat.hpp | 6 +- capio/server/include/storage/capio_file.hpp | 664 ++++++------------ capio/server/include/utils/common.hpp | 2 +- capio/server/include/utils/location.hpp | 3 + capio/server/src/capio_file.cpp | 361 ++++++++++ capio/server/src/storage_manager.cpp | 22 +- capio/tests/unit/server/src/capio_file.cpp | 221 +++++- .../tests/unit/server/src/storage_manager.cpp | 4 +- 19 files changed, 868 insertions(+), 504 deletions(-) create mode 100644 capio/server/src/capio_file.cpp diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index 81fecfabf..12812f74b 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -232,6 +232,7 @@ jobs: --exclude-throw-branches \ --xml coverage.xml \ --gcov-executable gcov \ + --exclude capio/tests \ ../build - name: "Compute Valid Artifact Name" diff --git a/capio/common/logger.hpp b/capio/common/logger.hpp index de3f6a649..0245ea2cb 100644 --- a/capio/common/logger.hpp +++ b/capio/common/logger.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include diff --git a/capio/server/include/handlers/close.hpp b/capio/server/include/handlers/close.hpp index a9133327f..18e98a53a 100644 --- a/capio/server/include/handlers/close.hpp +++ b/capio/server/include/handlers/close.hpp @@ -23,8 +23,8 @@ inline void handle_close(int tid, int fd) { c_file.closed()) { LOG("Capio File %s is closed and commit rule is on_close. setting it to complete", path.c_str()); - c_file.setComplete(); - c_file.commit(); + c_file.setCommitted(); + c_file.dump(); } LOG("Deleting capio file %s from tid=%d", path.c_str(), tid); diff --git a/capio/server/include/handlers/exig.hpp b/capio/server/include/handlers/exig.hpp index 443a8a6e3..a3b87ae19 100644 --- a/capio/server/include/handlers/exig.hpp +++ b/capio/server/include/handlers/exig.hpp @@ -14,17 +14,17 @@ inline void handle_exit_group(int tid) { LOG("Handling file %s", path.c_str()); if (CapioCLEngine::get().getCommitRule(path) == capiocl::commitRules::ON_TERMINATION) { CapioFile &c_file = storage_manager->get(path); - if (c_file.directory()) { + if (c_file.isDirectory()) { LOG("file %s is dir", path.c_str()); - long int n_committed = c_file.n_files_expected; - if (n_committed <= c_file.n_files) { + if (const long int n_committed = c_file.getDirectoryExpectedFileCount(); + n_committed <= c_file.getDirectoryContainedFileCount()) { LOG("Setting file %s to complete", path.c_str()); - c_file.setComplete(); + c_file.setCommitted(); } } else { LOG("Setting file %s to complete", path.c_str()); - c_file.setComplete(); - c_file.commit(); + c_file.setCommitted(); + c_file.dump(); } c_file.close(); } diff --git a/capio/server/include/handlers/getdents.hpp b/capio/server/include/handlers/getdents.hpp index df2074f02..d53002391 100644 --- a/capio/server/include/handlers/getdents.hpp +++ b/capio/server/include/handlers/getdents.hpp @@ -19,13 +19,14 @@ inline void request_remote_getdents(int tid, int fd, off64_t count) { off64_t end_of_read = offset + count; off64_t end_of_sector = c_file.getSectorEnd(offset); - if (c_file.complete() && (end_of_read <= end_of_sector || - (end_of_sector == -1 ? 0 : end_of_sector) == c_file.real_file_size)) { + if (c_file.isCommitted() && + (end_of_read <= end_of_sector || + (end_of_sector == -1 ? 0 : end_of_sector) == c_file.getRealFileSize())) { LOG("Handling local read"); send_dirent_to_client(tid, fd, c_file, offset, count); } else if (end_of_read <= end_of_sector) { LOG("?"); - c_file.createBufferIfNeeded(storage_manager->getPath(tid, fd), false); + c_file.createBuffer(storage_manager->getPath(tid, fd), false); client_manager->replyToClient(tid, offset, c_file.getBuffer(), count); storage_manager->setFileOffset(tid, fd, offset + count); } else { diff --git a/capio/server/include/handlers/open.hpp b/capio/server/include/handlers/open.hpp index a6c6a6896..706d36e92 100644 --- a/capio/server/include/handlers/open.hpp +++ b/capio/server/include/handlers/open.hpp @@ -18,9 +18,9 @@ inline void update_file_metadata(const std::filesystem::path &path, int tid, int : storage_manager->add(path, false, get_file_initial_size()); storage_manager->addFileToTid(tid, fd, path, offset); - if (c_file.first_write && is_creat) { + if (c_file.isFirstWrite() && is_creat) { + c_file.registerFirstWrite(); client_manager->registerProducedFile(tid, path); - c_file.first_write = false; write_file_location(path); storage_manager->updateDirectory(tid, path); } diff --git a/capio/server/include/handlers/read.hpp b/capio/server/include/handlers/read.hpp index 07d2f69b6..2cff9afe8 100644 --- a/capio/server/include/handlers/read.hpp +++ b/capio/server/include/handlers/read.hpp @@ -30,7 +30,7 @@ inline void handle_pending_read(int tid, int fd, long int process_offset, long i bytes_read = end_of_sector - process_offset; } - c_file.createBufferIfNeeded(path, false); + c_file.createBuffer(path, false); client_manager->replyToClient(tid, process_offset, c_file.getBuffer(), bytes_read); storage_manager->setFileOffset(tid, fd, process_offset + bytes_read); @@ -47,13 +47,13 @@ inline void handle_local_read(int tid, int fd, off64_t count, bool is_prod) { off64_t process_offset = storage_manager->getFileOffset(tid, fd); // if a process is the producer of a file, then the file is always complete for that process - const bool file_complete = c_file.complete() || is_prod; + const bool file_complete = c_file.isCommitted() || is_prod; if (!(file_complete || CapioCLEngine::get().isFirable(path))) { // wait for file to be completed and then do what is done inside handle pending read LOG("Data is not available yet. Starting async thread to wait for file availability"); std::thread t([&c_file, tid, fd, count, process_offset] { - c_file.waitForCompletion(); + c_file.waitForCommit(); handle_pending_read(tid, fd, process_offset, count); }); t.detach(); @@ -85,7 +85,7 @@ inline void handle_local_read(int tid, int fd, off64_t count, bool is_prod) { const auto read_size = std::min(count, end_of_sector - process_offset); LOG("Requested read within end of sector, and data is available. Serving %ld bytes", read_size); - c_file.createBufferIfNeeded(path, false); + c_file.createBuffer(path, false); client_manager->replyToClient(tid, process_offset, c_file.getBuffer(), read_size); storage_manager->setFileOffset(tid, fd, process_offset + read_size); } @@ -99,13 +99,14 @@ inline void request_remote_read(int tid, int fd, off64_t count) { off64_t end_of_read = offset + count; off64_t end_of_sector = c_file.getSectorEnd(offset); - if (c_file.complete() && (end_of_read <= end_of_sector || - (end_of_sector == -1 ? 0 : end_of_sector) == c_file.real_file_size)) { + if (c_file.isCommitted() && + (end_of_read <= end_of_sector || + (end_of_sector == -1 ? 0 : end_of_sector) == c_file.getRealFileSize())) { LOG("Handling local read"); handle_local_read(tid, fd, count, true); } else if (end_of_read <= end_of_sector) { LOG("Data is present locally and can be served to client"); - c_file.createBufferIfNeeded(path, false); + c_file.createBuffer(path, false); client_manager->replyToClient(tid, offset, c_file.getBuffer(), count); storage_manager->setFileOffset(tid, fd, offset + count); diff --git a/capio/server/include/handlers/stat.hpp b/capio/server/include/handlers/stat.hpp index ef4c6a597..629c30305 100644 --- a/capio/server/include/handlers/stat.hpp +++ b/capio/server/include/handlers/stat.hpp @@ -25,11 +25,11 @@ void wait_for_file_completion(int tid, const std::filesystem::path &path) { CapioFile &c_file = storage_manager->get(path); // if file is streamable - if (c_file.complete() || CapioCLEngine::get().isFirable(path) || + if (c_file.isCommitted() || CapioCLEngine::get().isFirable(path) || strcmp(std::get<0>(get_file_location(path)), node_name) == 0) { client_manager->replyToClient(tid, c_file.getFileSize()); - client_manager->replyToClient(tid, static_cast(c_file.directory() ? 1 : 0)); + client_manager->replyToClient(tid, static_cast(c_file.isDirectory() ? 1 : 0)); } else { handle_remote_stat_request(tid, path); @@ -72,15 +72,16 @@ inline void reply_stat(int tid, const std::filesystem::path &path) { LOG("File is now present from remote node. retrieving file again."); file_location_opt = get_file_location_opt(path); } - if (c_file.complete() || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 || + + if (c_file.isCommitted() || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 || CapioCLEngine::get().isFirable(path) || capio_dir == path) { LOG("Sending response to client"); client_manager->replyToClient(tid, c_file.getFileSize()); - client_manager->replyToClient(tid, static_cast(c_file.directory() ? 1 : 0)); + client_manager->replyToClient(tid, static_cast(c_file.isDirectory() ? 1 : 0)); } else { LOG("Delegating backend to reply to remote stats"); // send a request for file. then start a thread to wait for the request completion - c_file.createBufferIfNeeded(path, false); + c_file.createBuffer(path, false); handle_remote_stat_request(tid, path); } } diff --git a/capio/server/include/handlers/write.hpp b/capio/server/include/handlers/write.hpp index a65c7db7c..89772e0bf 100644 --- a/capio/server/include/handlers/write.hpp +++ b/capio/server/include/handlers/write.hpp @@ -15,10 +15,11 @@ void write_handler(const char *const str) { off64_t end_of_write = offset + count; const std::filesystem::path &path = storage_manager->getPath(tid, fd); CapioFile &c_file = storage_manager->get(path); - off64_t file_shm_size = c_file.getBufferSize(); - SPSCQueue &data_buf = client_manager->getClientToServerDataBuffers(tid); - c_file.createBufferIfNeeded(path, true); + off64_t file_shm_size = c_file.getBufSize(); + SPSCQueue &data_buf = client_manager->getClientToServerDataBuffers(tid); + + c_file.createBuffer(path, true); if (end_of_write > file_shm_size) { c_file.expandBuffer(end_of_write); } @@ -26,8 +27,8 @@ void write_handler(const char *const str) { client_manager->registerProducedFile(tid, path); c_file.insertSector(offset, end_of_write); - if (c_file.first_write) { - c_file.first_write = false; + if (c_file.isFirstWrite()) { + c_file.registerFirstWrite(); write_file_location(path); // TODO: it works only if there is one prod per file storage_manager->updateDirectory(tid, path); diff --git a/capio/server/include/remote/backend.hpp b/capio/server/include/remote/backend.hpp index 43ad8f5bb..4f52b5caa 100644 --- a/capio/server/include/remote/backend.hpp +++ b/capio/server/include/remote/backend.hpp @@ -1,7 +1,9 @@ #ifndef CAPIO_SERVER_REMOTE_BACKEND_HPP #define CAPIO_SERVER_REMOTE_BACKEND_HPP -#include "common/logger.hpp" #include +#include + +#include "common/logger.hpp" class RemoteRequest { private: diff --git a/capio/server/include/remote/handlers/read.hpp b/capio/server/include/remote/handlers/read.hpp index 474241784..aa6a222e1 100644 --- a/capio/server/include/remote/handlers/read.hpp +++ b/capio/server/include/remote/handlers/read.hpp @@ -44,12 +44,13 @@ inline void handle_read_reply(int tid, int fd, long count, off64_t file_size, of const std::filesystem::path &path = storage_manager->getPath(tid, fd); CapioFile &c_file = storage_manager->get(path); off64_t offset = storage_manager->getFileOffset(tid, fd); - c_file.real_file_size = file_size; - c_file.insertSector(offset, offset + nbytes); - c_file.setComplete(complete); + c_file.setRealFileSize(file_size); + c_file.insertSector(offset, offset + nbytes); + c_file.setCommitted(complete); off64_t end_of_sector = c_file.getSectorEnd(offset); - c_file.createBufferIfNeeded(path, false); + c_file.createBuffer(path, false); + off64_t bytes_read; off64_t end_of_read = offset + count; if (end_of_sector > end_of_read) { @@ -75,7 +76,7 @@ void wait_for_data(const std::filesystem::path &path, const std::string &dest, i const CapioFile &c_file = storage_manager->get(path); // wait that nbytes are written c_file.waitForData(offset + count); - serve_remote_read(path, dest, tid, fd, count, offset, c_file.complete(), is_getdents); + serve_remote_read(path, dest, tid, fd, count, offset, c_file.isCommitted(), is_getdents); } inline void handle_remote_read(const std::filesystem::path &path, const std::string &source, @@ -86,8 +87,8 @@ inline void handle_remote_read(const std::filesystem::path &path, const std::str CapioFile &c_file = storage_manager->get(path); bool data_available = (offset + count <= c_file.getStoredSize()); - if (c_file.complete() || (CapioCLEngine::get().isFirable(path) && data_available)) { - serve_remote_read(path, source, tid, fd, count, offset, c_file.complete(), is_getdents); + if (c_file.isCommitted() || (CapioCLEngine::get().isFirable(path) && data_available)) { + serve_remote_read(path, source, tid, fd, count, offset, c_file.isCommitted(), is_getdents); } else { std::thread t(wait_for_data, path, source, tid, fd, count, offset, is_getdents); t.detach(); @@ -107,9 +108,9 @@ inline void handle_remote_read_reply(const std::string &source, int tid, int fd, off64_t offset = storage_manager->getFileOffset(tid, fd); CapioFile &c_file = storage_manager->get(path); - c_file.createBufferIfNeeded(path, false); + c_file.createBuffer(path, false); if (nbytes != 0) { - auto file_shm_size = c_file.getBufferSize(); + auto file_shm_size = c_file.getBufSize(); auto file_size_recv = offset + nbytes; if (file_size_recv > file_shm_size) { c_file.expandBuffer(file_size_recv); diff --git a/capio/server/include/remote/handlers/stat.hpp b/capio/server/include/remote/handlers/stat.hpp index c328221f5..80cd4cc47 100644 --- a/capio/server/include/remote/handlers/stat.hpp +++ b/capio/server/include/remote/handlers/stat.hpp @@ -16,7 +16,7 @@ inline void serve_remote_stat(const std::filesystem::path &path, const std::stri const CapioFile &c_file = storage_manager->get(path); off64_t file_size = c_file.getFileSize(); - bool is_dir = c_file.directory(); + bool is_dir = c_file.isDirectory(); serve_remote_stat_request(path, source_tid, file_size, is_dir, dest); } @@ -26,7 +26,7 @@ void wait_for_completion(const std::filesystem::path &path, int source_tid, dest.c_str()); const CapioFile &c_file = storage_manager->get(path); - c_file.waitForCompletion(); + c_file.waitForCommit(); LOG("File %s has been completed. serving stats data", path.c_str()); serve_remote_stat(path, dest, source_tid); } @@ -39,7 +39,7 @@ inline void handle_remote_stat(int source_tid, const std::filesystem::path &path const auto c_file = storage_manager->tryGet(path); if (c_file) { LOG("File %s is present on capio file system", path.c_str()); - if (c_file->get().complete() || CapioCLEngine::get().isFirable(path)) { + if (c_file->get().isCommitted() || CapioCLEngine::get().isFirable(path)) { LOG("file is complete. serving file"); serve_remote_stat(path, dest, source_tid); } else { // wait for completion diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index 79406842c..0be89f72d 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -1,477 +1,253 @@ -#ifndef CAPIO_SERVER_STORAGE_CAPIO_FILE_HPP -#define CAPIO_SERVER_STORAGE_CAPIO_FILE_HPP +#ifndef CAPIO_SERVER_CAPIO_FILE_HPP +#define CAPIO_SERVER_CAPIO_FILE_HPP -#include #include +#include +#include #include -#include +#include #include #include -#include -#include -#include - -#include "common/logger.hpp" #include "common/queue.hpp" -#include "remote/backend.hpp" - -/* - * Only the server have all the information - * A process that only read from a file doesn't have the info on the _sectors - * A process that writes only have info on the sector that he wrote - * the file size is in shm because all the processes need this info - * and it's easy to provide it to them using the shm +/** + * @class CapioFile + * @brief Manages file data, sparse sectors, and synchronization for the CAPIO server. */ - -struct compare { - bool operator()(const std::pair &lhs, - const std::pair &rhs) const { - return (lhs.first < rhs.first); - } -}; - class CapioFile { - char *_buf = nullptr; // buffer containing the data - off64_t _buf_size; - bool _directory = false; - // _fd is useful only when the file is memory-mapped - int _fd = -1; - bool _home_node = false; - int _n_links = 1; - long int _n_close = 0; - long int _n_close_expected = -1; - int _n_opens = 0; - bool _permanent = false; - // _sectors stored in memory of the files (only the home node is forced to - // be up to date) - std::set, compare> _sectors; - // vector of (tid, fd) + /** + * @struct compareSectors + * @brief Comparator for the sectors set, ordering by offset. + */ + struct compareSectors { + bool operator()(const std::pair &lhs, + const std::pair &rhs) const; + }; + + char *_buf = nullptr; ///< Raw pointer to memory buffer for file content + off64_t _buf_size = 0; ///< Allocated size of _buf + int _fd = -1; ///< File descriptor for permanent/mmap storage + int _n_links = 1; ///< Number of symbolic links to the file + long int _n_close_expected = -1; ///< Target close() operations for commitment + long int _n_close = 0; ///< Current count of close() operations + int _n_opens = 0; ///< Current count of open() operations + int _n_files = 0; ///< Count of dirent64 stored (if directory) + int _n_files_expected = -1; ///< Target dirent64 count (if directory) + + bool _home_node = false; ///< True if this is the home node + bool _directory = false; ///< True if this instance represents a directory + bool _permanent = false; ///< True if file persists after server exit + bool _committed = false; ///< True if file is finalized + bool _first_write = true; ///< True if no data has been written yet + + /// @brief Set of [start, end] pairs representing valid data regions + std::set, compareSectors> _sectors; + + off64_t _real_file_size = 0; ///< Total logical size of the file + + /// @brief List of {Thread ID, FD} pairs associated with this file std::vector> _threads_fd; - bool _complete = false; // whether the file is completed / committed - /*sync variables*/ - mutable std::mutex _mutex; - mutable std::condition_variable _complete_cv; - mutable std::condition_variable _data_avail_cv; + mutable std::mutex _mutex; ///< Synchronization primitive for thread safety + mutable std::condition_variable _committed_cv; ///< Wait for commitment + mutable std::condition_variable _data_avail_cv; ///< Wait for data at specific offsets - inline off64_t _getStoredSize() const { - auto it = _sectors.rbegin(); - return (it == _sectors.rend()) ? 0 : it->second; - } - - public: - bool first_write = true; - long int n_files = 0; // useful for directories - long int n_files_expected = -1; // useful for directories - /* - * file size in the home node. In a given moment could not be up-to-date. - * This member is useful because a node different from the home node - * could need to known the size of the file but not its content + /** + * @brief Internal helper to calculate stored size without locking. + * @return Logical size based on the furthest sector end. */ + off64_t _getStoredSize() const; - std::size_t real_file_size = 0; + /** + * @brief Reallocates the buffer and copies existing sectors to their correct offsets. + * @param new_p The pointer to the newly allocated memory. + * @param old_p The pointer to the old memory buffer. + */ + void _memcopyCapioFile(char *new_p, char *old_p) const; - CapioFile() : _buf_size(0), _directory(false), _permanent(false) {} + public: + /** @brief Default constructor. Initializes an empty file. */ + CapioFile(); - CapioFile(bool directory, long int n_files_expected, bool permanent, off64_t init_size, - long int n_close_expected) - : _buf_size(init_size), _directory(directory), _n_close_expected(n_close_expected), - _permanent(permanent), n_files_expected(n_files_expected + 2) {} + /** + * @brief Explicit constructor for directory-specific initialization. + * @param directory Whether the file is a directory. + * @param n_files_expected Expected number of entries. + * @param permanent Persistence flag. + * @param init_size Initial buffer allocation size. + * @param n_close_expected Expected number of close calls. + */ + CapioFile(bool directory, int n_files_expected, bool permanent, off64_t init_size, + long int n_close_expected); - CapioFile(bool directory, bool permanent, off64_t init_size, long int n_close_expected = -1) - : _buf_size(init_size), _directory(directory), _n_close_expected(n_close_expected), - _permanent(permanent) {} + /** + * @brief Standard constructor for files. + * @param directory Whether the file is a directory. + * @param permanent Persistence flag. + * @param init_size Initial buffer allocation size. + * @param n_close_expected Expected number of close calls. + */ + CapioFile(bool directory, bool permanent, off64_t init_size, long int n_close_expected); CapioFile(const CapioFile &) = delete; CapioFile &operator=(const CapioFile &) = delete; - ~CapioFile() { - START_LOG(gettid(), "call()"); - LOG("Deleting capio_file"); - - if (_permanent && _home_node) { - if (_directory) { - delete[] _buf; - } else { - int res = munmap(_buf, _buf_size); - if (res == -1) { - ERR_EXIT("munmap CapioFile"); - } - } - } else { - delete[] _buf; - } - } - - [[nodiscard]] bool complete() const { - START_LOG(gettid(), "capio_file is complete? %s", this->_complete ? "true" : "false"); - std::lock_guard lg(_mutex); - return this->_complete; - } - - void waitForCompletion() const { - START_LOG(gettid(), "call()"); - LOG("Thread waiting for file to be committed"); - std::unique_lock lock(_mutex); - _complete_cv.wait(lock, [this] { return _complete; }); - } - - void waitForData(long offset) const { - START_LOG(gettid(), "call()"); - LOG("Thread waiting for data to be available"); - std::unique_lock lock(_mutex); - _data_avail_cv.wait( - lock, [offset, this] { return (offset >= this->_getStoredSize()) || this->_complete; }); - } - - void setComplete(bool complete = true) { - START_LOG(gettid(), "setting capio_file._complete=%s", complete ? "true" : "false"); - std::lock_guard lg(_mutex); - if (this->_complete != complete) { - this->_complete = complete; - if (this->_complete) { - _complete_cv.notify_all(); - _data_avail_cv.notify_all(); - } - } - } - - void addFd(int tid, int fd) { _threads_fd.emplace_back(tid, fd); } - - [[nodiscard]] bool bufToAllocate() const { - std::lock_guard lg(_mutex); - return _buf == nullptr; - } - - void close() { - _n_close++; - _n_opens--; - } - - void commit() { - START_LOG(gettid(), "call()"); - - if (_permanent && !_directory && _home_node) { - off64_t size = getFileSize(); - if (ftruncate(_fd, size) == -1) { - ERR_EXIT("ftruncate commit capio_file"); - } - _buf_size = size; - if (::close(_fd) == -1) { - ERR_EXIT("close commit capio_file"); - } - } - } - - /* - * To be called when a process - * execute a read or a write syscall + /** @brief Destructor. Cleans up allocated buffers and file descriptors. */ + ~CapioFile(); + + /** @return True if the file is committed and read-only. */ + [[nodiscard]] bool isCommitted() const; + + /** @return True if the internal buffer has not yet been allocated. */ + [[nodiscard]] bool bufferToAllocate() const; + + /** @return True if the close count matches expected closes. */ + [[nodiscard]] bool closed() const; + + /** @return True if the file is ready for removal. */ + [[nodiscard]] bool deletable() const; + + /** @return True if this is a directory. */ + [[nodiscard]] bool isDirectory() const; + + /** @return True if no write operations have been performed yet. */ + [[nodiscard]] bool isFirstWrite() const; + + /** @brief Blocks the calling thread until setCommitted() is called. */ + void waitForCommit() const; + + /** + * @brief Blocks until the requested offset is within a valid sector. + * @param offset The file offset to wait for. + */ + void waitForData(long offset) const; + + /** + * @brief Marks the file as committed and notifies waiting threads. + * @param commit The new status (defaults to true). + */ + void setCommitted(bool commit = true); + + /** + * @brief Maps a Thread ID to a specific File Descriptor. + * @param tid Thread ID. + * @param fd File descriptor. + */ + void addFd(int tid, int fd); + + /** + * @brief Removes a Thread ID/FD mapping. + * @param tid Thread ID. + * @param fd File descriptor. */ - void createBuffer(const std::filesystem::path &path, bool home_node) { - START_LOG(gettid(), "call(path=%s, home_node=%s)", path.c_str(), - home_node ? "true" : "false"); - std::lock_guard lock(_mutex); - // TODO: will use malloc in order to be able to use realloc - _home_node = home_node; - if (_permanent && home_node) { - if (_directory) { - std::filesystem::create_directory(path); - std::filesystem::permissions(path, std::filesystem::perms::owner_all); - _buf = new char[_buf_size]; - } else { - LOG("creating mem mapped file"); - _fd = ::open(path.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IROTH); - if (_fd == -1) { - ERR_EXIT("open %s CapioFile constructor", path.c_str()); - } - if (ftruncate(_fd, _buf_size) == -1) { - ERR_EXIT("ftruncate CapioFile constructor"); - } - _buf = - (char *) mmap(nullptr, _buf_size, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0); - if (_buf == MAP_FAILED) { - ERR_EXIT("mmap CapioFile constructor"); - } - } - } else { - _buf = new char[_buf_size]; - } - } - - void createBufferIfNeeded(const std::filesystem::path &path, bool home_node) { - if (bufToAllocate()) { - createBuffer(path, home_node); - } - } - - void memcopyCapioFile(char *new_p, char *old_p) const { - for (auto §or : _sectors) { - off64_t lbound = sector.first; - off64_t ubound = sector.second; - off64_t sector_length = ubound - lbound; - memcpy(new_p + lbound, old_p + lbound, sector_length); - } - } - - char *expandBuffer(off64_t data_size) { // TODO: use realloc - off64_t double_size = _buf_size * 2; - off64_t new_size = data_size > double_size ? data_size : double_size; - char *new_buf = new char[new_size]; - std::lock_guard lock(_mutex); - // memcpy(new_p, old_p, file_shm_size); //TODO memcpy only the - // sector - // stored in CapioFile - memcopyCapioFile(new_buf, _buf); - delete[] _buf; - _buf = new_buf; - _buf_size = new_size; - return new_buf; - } - - char *getBuffer() { return _buf; } - - [[nodiscard]] off64_t getBufferSize() const { return _buf_size; } - - [[nodiscard]] const std::vector> &getFds() const { return _threads_fd; } - - [[nodiscard]] off64_t getFileSize() const { - std::lock_guard lock(_mutex); - if (!_sectors.empty()) { - return _sectors.rbegin()->second; - } else { - return 0; - } - } - - /* - * Returns the offset to the end of the sector - * if the offset parameter is inside the - * sector, -1 otherwise - * + void removeFd(int tid, int fd); + + /** @brief Increments the internal open counter. */ + void open(); + + /** @brief Increments the _n_close counter, while decrementing the _n_open counter. */ + void close(); + + /** + * @brief Initializes the memory buffer or mmap area. + * @param path Path to the file. + * @param home_node Whether this node is the home for the file. */ - [[nodiscard]] off64_t getSectorEnd(off64_t offset) const { - START_LOG(gettid(), "call(offset=%ld)", offset); - - off64_t sector_end = -1; - auto it = _sectors.upper_bound(std::make_pair(offset, 0)); - - if (!_sectors.empty() && it != _sectors.begin()) { - --it; - if (offset <= it->second) { - sector_end = it->second; - } - } - - return sector_end; - } - - [[nodiscard]] const std::set, compare> &getSectors() const { - return _sectors; - } - - /* - * get the size of the data stored in this node - * If the node is the home node then this is equals to - * the real size of the file + void createBuffer(const std::filesystem::path &path, bool home_node); + + /** + * @brief Resizes the internal buffer to accommodate more data. + * @param data_size Required additional size. + * @return Pointer to the expanded buffer. + */ + char *expandBuffer(off64_t data_size); + + /** @brief Dump _buf buffer to the file system. */ + void dump(); + + /** + * @brief Tracks a new data range in the file. + * @param new_start Starting offset of the data. + * @param new_end Ending offset of the data. + */ + void insertSector(off64_t new_start, off64_t new_end); + + /** + * @brief Fetches data from a remote CAPIO node. + * @param dest Destination node identifier. + * @param offset File offset to read from. + * @param buffer_size Amount of data to fetch. + */ + void readFromNode(const std::string &dest, off64_t offset, off64_t buffer_size) const; + + /** + * @brief Transfers data from an SPSC queue into the file buffer. + * @param queue Source queue. + * @param offset Destination offset in this file. + * @param num_bytes Number of bytes to transfer. */ - [[nodiscard]] off64_t getStoredSize() const { - std::lock_guard lock(_mutex); - return this->_getStoredSize(); - } - - /* - * Insert the new sector automatically modifying the - * existent _sectors if needed. - * - * Params: - * off64_t new_start: the beginning of the sector to insert - * off64_t new_end: the beginning of the sector to insert - * - * new_start must be > new_end otherwise the behaviour - * in undefined - * + void readFromQueue(SPSCQueue &queue, size_t offset, long int num_bytes) const; + + /** @brief Explicitly sets the total file size. */ + void setRealFileSize(off64_t size); + + /** @brief Marks that at least one write has occurred. */ + void registerFirstWrite(); + + /** + * @brief Increases the count of files contained in this directory. + * @param count the number to increase the internal counter */ - void insertSector(off64_t new_start, off64_t new_end) { - START_LOG(gettid(), "call(new_start=%ld, new_end=%ld)", new_start, new_end); - - auto p = std::make_pair(new_start, new_end); - std::lock_guard lock(_mutex); - - if (_sectors.empty()) { - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - return; - } - auto it_lbound = _sectors.upper_bound(p); - if (it_lbound == _sectors.begin()) { - if (new_end < it_lbound->first) { - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - } else { - auto it = it_lbound; - bool end_before = false; - bool end_inside = false; - while (it != _sectors.end() && !end_before && !end_inside) { - end_before = p.second < it->first; - if (!end_before) { - end_inside = p.second <= it->second; - if (!end_inside) { - ++it; - } - } - } - - if (end_inside) { - p.second = it->second; - ++it; - } - _sectors.erase(it_lbound, it); - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - } - } else { - --it_lbound; - auto it = it_lbound; - if (p.first <= it_lbound->second) { - // new sector starts inside a sector - p.first = it_lbound->first; - } else { // in this way the sector will not be deleted - ++it_lbound; - } - bool end_before = false; - bool end_inside = false; - while (it != _sectors.end() && !end_before && !end_inside) { - end_before = p.second < it->first; - if (!end_before) { - end_inside = p.second <= it->second; - if (!end_inside) { - ++it; - } - } - } - - if (end_inside) { - p.second = it->second; - ++it; - } - _sectors.erase(it_lbound, it); - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - } - } - - [[nodiscard]] bool closed() const { - return _n_close_expected == -1 || _n_close == _n_close_expected; - } - - [[nodiscard]] bool deletable() const { return _n_opens <= 0; } - - [[nodiscard]] bool directory() const { return _directory; } - - void open() { _n_opens++; } - - /* - * From the manual: - * - * Adjust the file offset to the next location in the file - * greater than or equal to offset containing data. If - * offset points to data, then the file offset is set to - * offset. - * - * Fails if offset points past the end of the file. - * + void incrementDirFileCnt(int count = 1); + + /** @return Pointer to the raw memory buffer. */ + char *getBuffer() const; + + /** @return Vector of TID/FD pairs. */ + [[nodiscard]] const std::vector> &getFds() const; + + /** @return The physical size of the current buffer. */ + [[nodiscard]] off64_t getBufSize() const; + + /** @return The logical size of the file. */ + [[nodiscard]] off64_t getRealFileSize() const; + + /** @return The total size, accounting for holes and metadata. */ + [[nodiscard]] off64_t getFileSize() const; + + /** @return Size of data currently residing on this node. */ + [[nodiscard]] off64_t getStoredSize() const; + + /** @return Count of files currently indexed in this directory. */ + [[nodiscard]] int getDirectoryContainedFileCount() const; + + /** @return Expected total files in this directory. */ + [[nodiscard]] int getDirectoryExpectedFileCount() const; + + /** @return Reference to the internal sector map. */ + [[nodiscard]] const std::set, compareSectors> &getSectors() const; + + /** + * @brief Finds the end of the sector containing the offset. + * @param offset Position to check. + * @return End offset of the sector, or -1 if in a hole. */ - off64_t seekData(off64_t offset) { - if (_sectors.empty()) { - if (offset == 0) { - return 0; - } else { - return -1; - } - } - auto it = _sectors.upper_bound(std::make_pair(offset, 0)); - if (it == _sectors.begin()) { - return it->first; - } - --it; - if (offset <= it->second) { - return offset; - } else { - ++it; - if (it == _sectors.end()) { - return -1; - } else { - return it->first; - } - } - } - - /* - * From the manual: - * - * Adjust the file offset to the next hole in the file - * greater than or equal to offset. If offset points into - * the middle of a hole, then the file offset is set to - * offset. If there is no hole past offset, then the file - * offset is adjusted to the end of the file (i.e., there is - * an implicit hole at the end of any file). - * - * - * Fails if offset points past the end of the file. - * + [[nodiscard]] off64_t getSectorEnd(off64_t offset) const; + + /** + * @brief Finds the next data segment. + * @param offset Start searching from here. + * @return Offset of data, or error if beyond end of file. */ - [[nodiscard]] off64_t seekHole(off64_t offset) const { - if (_sectors.empty()) { - if (offset == 0) { - return 0; - } else { - return -1; - } - } - auto it = _sectors.upper_bound(std::make_pair(offset, 0)); - if (it == _sectors.begin()) { - return offset; - } - --it; - if (offset <= it->second) { - return it->second; - } else { - ++it; - if (it == _sectors.end()) { - return -1; - } else { - return offset; - } - } - } - - void removeFd(int tid, int fd) { - auto it = std::find(_threads_fd.begin(), _threads_fd.end(), std::make_pair(tid, fd)); - if (it != _threads_fd.end()) { - _threads_fd.erase(it); - } - } + off64_t seekData(off64_t offset); /** - * Save data inside the capio_file buffer - * @param buffer - * @return + * @brief Finds the next hole in the file. + * @param offset Start searching from here. + * @return Offset of the hole. */ - void readFromNode(const std::string &dest, off64_t offset, off64_t buffer_size) { - std::unique_lock lock(_mutex); - backend->recv_file(_buf + offset, dest, buffer_size); - _data_avail_cv.notify_all(); - } - - void readFromQueue(SPSCQueue &queue, size_t offset, long int num_bytes) { - START_LOG(gettid(), "call()"); - - std::unique_lock lock(_mutex); - queue.read(_buf + offset, num_bytes); - _data_avail_cv.notify_all(); - } + [[nodiscard]] off64_t seekHole(off64_t offset) const; }; -#endif // CAPIO_SERVER_STORAGE_CAPIO_FILE_HPP \ No newline at end of file +#endif // CAPIO_SERVER_CAPIO_FILE_HPP \ No newline at end of file diff --git a/capio/server/include/utils/common.hpp b/capio/server/include/utils/common.hpp index 0b2d0ef38..2682fd2f8 100644 --- a/capio/server/include/utils/common.hpp +++ b/capio/server/include/utils/common.hpp @@ -51,7 +51,7 @@ inline void send_dirent_to_client(int tid, int fd, CapioFile &c_file, off64_t of } const auto &path_to_check = storage_manager->getPath(tid, fd); - if (!c_file.complete() && CapioCLEngine::get().isFirable(path_to_check)) { + if (!c_file.isCommitted() && CapioCLEngine::get().isFirable(path_to_check)) { LOG("File %s has mode no_update and not enough data is available", path_to_check.c_str()); std::thread t(wait_for_dirent_data, (last_entry + 1) * sizeof(linux_dirent64), tid, fd, count, std::ref(c_file)); diff --git a/capio/server/include/utils/location.hpp b/capio/server/include/utils/location.hpp index c43489a7e..5d573f950 100644 --- a/capio/server/include/utils/location.hpp +++ b/capio/server/include/utils/location.hpp @@ -1,12 +1,15 @@ #ifndef CAPIO_SERVER_UTILS_LOCATIONS_HPP #define CAPIO_SERVER_UTILS_LOCATIONS_HPP +#include "remote/backend.hpp" + #include #include #include "utils/types.hpp" extern char *node_name; +extern Backend *backend; constexpr char CAPIO_SERVER_FILES_LOCATION_NAME[] = "files_location_%s.txt"; constexpr char CAPIO_SERVER_INVALIDATE_FILE_PATH_CHAR = '#'; diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp new file mode 100644 index 000000000..c2bed8dde --- /dev/null +++ b/capio/server/src/capio_file.cpp @@ -0,0 +1,361 @@ +#include "common/logger.hpp" +#include "remote/backend.hpp" +#include "server/include/storage/capio_file.hpp" +#include "utils/common.hpp" + +#include + +bool CapioFile::compareSectors::operator()(const std::pair &lhs, + const std::pair &rhs) const { + return (lhs.first < rhs.first); +} + +CapioFile::CapioFile() = default; + +CapioFile::CapioFile(const bool directory, const int n_files_expected, const bool permanent, + const off64_t init_size, const long int n_close_expected) + : _buf_size(init_size), _n_close_expected(n_close_expected), + _n_files_expected(n_files_expected + 2), _directory(directory), _permanent(permanent) {} + +CapioFile::CapioFile(const bool directory, const bool permanent, const off64_t init_size, + const long int n_close_expected) + : _buf_size(init_size), _n_close_expected(n_close_expected), _directory(directory), + _permanent(permanent) {} + +CapioFile::~CapioFile() { + START_LOG(gettid(), "call()"); + LOG("Deleting capio_file"); + + if (_permanent && _home_node) { + if (_directory) { + delete[] _buf; + } else { + if (munmap(_buf, _buf_size) == -1) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "WARN: unable to unmap CapioFile: " + std::string(strerror(errno))); + } + } + } else { + delete[] _buf; + } +} + +bool CapioFile::isCommitted() const { + START_LOG(gettid(), "capio_file is complete? %s", this->_committed ? "true" : "false"); + std::lock_guard lg(_mutex); + return this->_committed; +} + +void CapioFile::waitForData(long offset) const { + START_LOG(gettid(), "call()"); + LOG("Thread waiting for data to be available"); + std::unique_lock lock(_mutex); + _data_avail_cv.wait(lock, + [offset, this] { return this->_getStoredSize() >= offset || _committed; }); +} + +void CapioFile::setCommitted(bool commit) { + START_LOG(gettid(), "setting capio_file._complete=%s", commit ? "true" : "false"); + std::lock_guard lg(_mutex); + if (this->_committed != commit) { + this->_committed = commit; + if (this->_committed) { + _committed_cv.notify_all(); + _data_avail_cv.notify_all(); + } + } +} + +void CapioFile::addFd(int tid, int fd) { _threads_fd.emplace_back(tid, fd); } + +void CapioFile::waitForCommit() const { + START_LOG(gettid(), "call()"); + LOG("Thread waiting for file to be committed"); + std::unique_lock lock(_mutex); + _committed_cv.wait(lock, [this] { return _committed; }); +} + +void CapioFile::close() { + _n_close++; + _n_opens--; +} + +void CapioFile::dump() { + START_LOG(gettid(), "call()"); + + if (_permanent && !_directory && _home_node) { + off64_t size = getFileSize(); + if (ftruncate(_fd, size) == -1) { + ERR_EXIT("ftruncate commit capio_file"); + } + _buf_size = size; + if (::close(_fd) == -1) { + ERR_EXIT("close commit capio_file"); + } + } +} + +void CapioFile::createBuffer(const std::filesystem::path &path, const bool home_node) { + START_LOG(gettid(), "call(path=%s, home_node=%s)", path.c_str(), home_node ? "true" : "false"); + if (bufferToAllocate()) { + std::lock_guard lock(_mutex); + _home_node = home_node; + if (_permanent && home_node) { + if (_directory) { + std::filesystem::create_directory(path); + std::filesystem::permissions(path, std::filesystem::perms::owner_all); + _buf = new char[_buf_size]; + } else { + LOG("creating mem mapped file"); + _fd = ::open(path.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IROTH); + if (_fd == -1) { + ERR_EXIT("open %s CapioFile constructor", path.c_str()); + } + if (ftruncate(_fd, _buf_size) == -1) { + ERR_EXIT("ftruncate CapioFile constructor"); + } + _buf = static_cast( + mmap(nullptr, _buf_size, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0)); + if (_buf == MAP_FAILED) { + ERR_EXIT("mmap CapioFile constructor"); + } + } + } else { + _buf = new char[_buf_size]; + } + } +} + +void CapioFile::_memcopyCapioFile(char *new_p, char *old_p) const { + for (auto §or : _sectors) { + off64_t lbound = sector.first; + off64_t ubound = sector.second; + off64_t sector_length = ubound - lbound; + memcpy(new_p + lbound, old_p + lbound, sector_length); + } +} + +char *CapioFile::expandBuffer(const off64_t data_size) { + const off64_t double_size = _buf_size * 2; + const off64_t new_size = std::max(data_size, double_size); + const auto new_buf = new char[new_size]; + std::lock_guard lock(_mutex); + _memcopyCapioFile(new_buf, _buf); + delete[] _buf; + _buf = new_buf; + _buf_size = new_size; + return new_buf; +} + +char *CapioFile::getBuffer() const { return _buf; } + +off64_t CapioFile::getBufSize() const { return _buf_size; } + +const std::vector> &CapioFile::getFds() const { return _threads_fd; } + +off64_t CapioFile::getFileSize() const { + std::lock_guard lock(_mutex); + if (!_sectors.empty()) { + return _sectors.rbegin()->second; + } else { + return 0; + } +} + +off64_t CapioFile::getSectorEnd(off64_t offset) const { + START_LOG(gettid(), "call(offset=%ld)", offset); + + off64_t sector_end = -1; + auto it = _sectors.upper_bound(std::make_pair(offset, 0)); + + if (!_sectors.empty() && it != _sectors.begin()) { + --it; + if (offset <= it->second) { + sector_end = it->second; + } + } + + return sector_end; +} + +const std::set, CapioFile::compareSectors> & +CapioFile::getSectors() const { + return _sectors; +} + +off64_t CapioFile::getStoredSize() const { + std::lock_guard lock(_mutex); + return this->_getStoredSize(); +} + +void CapioFile::insertSector(off64_t new_start, off64_t new_end) { + START_LOG(gettid(), "call(new_start=%ld, new_end=%ld)", new_start, new_end); + + auto p = std::make_pair(new_start, new_end); + std::lock_guard lock(_mutex); + + if (_sectors.empty()) { + LOG("Insert sector <%ld, %ld>", p.first, p.second); + _sectors.insert(p); + return; + } + auto it_lbound = _sectors.upper_bound(p); + if (it_lbound == _sectors.begin()) { + if (new_end < it_lbound->first) { + LOG("Insert sector <%ld, %ld>", p.first, p.second); + _sectors.insert(p); + } else { + auto it = it_lbound; + bool end_before = false; + bool end_inside = false; + while (it != _sectors.end() && !end_before && !end_inside) { + end_before = p.second < it->first; + if (!end_before) { + end_inside = p.second <= it->second; + if (!end_inside) { + ++it; + } + } + } + + if (end_inside) { + p.second = it->second; + ++it; + } + _sectors.erase(it_lbound, it); + LOG("Insert sector <%ld, %ld>", p.first, p.second); + _sectors.insert(p); + } + } else { + --it_lbound; + auto it = it_lbound; + if (p.first <= it_lbound->second) { + // new sector starts inside a sector + p.first = it_lbound->first; + } else { // in this way the sector will not be deleted + ++it_lbound; + } + bool end_before = false; + bool end_inside = false; + while (it != _sectors.end() && !end_before && !end_inside) { + end_before = p.second < it->first; + if (!end_before) { + end_inside = p.second <= it->second; + if (!end_inside) { + ++it; + } + } + } + + if (end_inside) { + p.second = it->second; + ++it; + } + _sectors.erase(it_lbound, it); + LOG("Insert sector <%ld, %ld>", p.first, p.second); + _sectors.insert(p); + } +} + +bool CapioFile::closed() const { return _n_close_expected == -1 || _n_close == _n_close_expected; } + +bool CapioFile::deletable() const { return _n_opens <= 0; } + +bool CapioFile::isDirectory() const { return _directory; } + +void CapioFile::open() { _n_opens++; } + +off64_t CapioFile::seekData(off64_t offset) { + if (_sectors.empty()) { + if (offset == 0) { + return 0; + } else { + return -1; + } + } + auto it = _sectors.upper_bound(std::make_pair(offset, 0)); + if (it == _sectors.begin()) { + return it->first; + } + --it; + if (offset <= it->second) { + return offset; + } else { + ++it; + if (it == _sectors.end()) { + return -1; + } else { + return it->first; + } + } +} + +off64_t CapioFile::seekHole(off64_t offset) const { + if (_sectors.empty()) { + if (offset == 0) { + return 0; + } else { + return -1; + } + } + auto it = _sectors.upper_bound(std::make_pair(offset, 0)); + if (it == _sectors.begin()) { + return offset; + } + --it; + if (offset <= it->second) { + return it->second; + } else { + ++it; + if (it == _sectors.end()) { + return -1; + } else { + return offset; + } + } +} + +void CapioFile::removeFd(int tid, int fd) { + auto it = std::find(_threads_fd.begin(), _threads_fd.end(), std::make_pair(tid, fd)); + if (it != _threads_fd.end()) { + _threads_fd.erase(it); + } +} + +void CapioFile::readFromNode(const std::string &dest, off64_t offset, off64_t buffer_size) const { + std::unique_lock lock(_mutex); + backend->recv_file(_buf + offset, dest, buffer_size); + _data_avail_cv.notify_all(); +} + +void CapioFile::readFromQueue(SPSCQueue &queue, size_t offset, long int num_bytes) const { + START_LOG(gettid(), "call()"); + + std::unique_lock lock(_mutex); + queue.read(_buf + offset, num_bytes); + _data_avail_cv.notify_all(); +} + +off64_t CapioFile::_getStoredSize() const { + const auto it = _sectors.rbegin(); + return (it == _sectors.rend()) ? 0 : it->second; +} + +bool CapioFile::bufferToAllocate() const { + std::lock_guard lg(_mutex); + return _buf == nullptr; +} + +off64_t CapioFile::getRealFileSize() const { return this->_real_file_size; } + +void CapioFile::setRealFileSize(const off64_t size) { this->_real_file_size = size; } + +bool CapioFile::isFirstWrite() const { return this->_first_write; } + +void CapioFile::registerFirstWrite() { this->_first_write = false; } + +void CapioFile::incrementDirFileCnt(const int count) { this->_n_files += count; } + +int CapioFile::getDirectoryContainedFileCount() const { return this->_n_files; } + +int CapioFile::getDirectoryExpectedFileCount() const { return this->_n_files_expected; } \ No newline at end of file diff --git a/capio/server/src/storage_manager.cpp b/capio/server/src/storage_manager.cpp index adb139b8e..d64a471c6 100644 --- a/capio/server/src/storage_manager.cpp +++ b/capio/server/src/storage_manager.cpp @@ -12,7 +12,6 @@ #include "utils/common.hpp" #include "utils/location.hpp" #include "utils/shared_mutex.hpp" -#include "utils/types.hpp" extern char *node_name; @@ -39,18 +38,18 @@ void StorageManager::addDirectoryEntry(const pid_t tid, const std::filesystem::p ld.d_reclen = sizeof(linux_dirent64); CapioFile &c_file = get(dir); - c_file.createBufferIfNeeded(dir, true); + c_file.createBuffer(dir, true); void *file_shm = c_file.getBuffer(); const off64_t file_size = c_file.getStoredSize(); const off64_t data_size = file_size + ld.d_reclen; - const size_t file_shm_size = c_file.getBufferSize(); + const size_t file_shm_size = c_file.getBufSize(); ld.d_off = data_size; if (data_size > file_shm_size) { file_shm = c_file.expandBuffer(data_size); } - ld.d_type = (c_file.directory() ? DT_DIR : DT_REG); + ld.d_type = (c_file.isDirectory() ? DT_DIR : DT_REG); memcpy((char *) file_shm + file_size, &ld, sizeof(ld)); const off64_t base_offset = file_size; @@ -59,10 +58,11 @@ void StorageManager::addDirectoryEntry(const pid_t tid, const std::filesystem::p reinterpret_cast(static_cast(file_shm) + file_size)->d_name); c_file.insertSector(base_offset, data_size); - ++c_file.n_files; + c_file.incrementDirFileCnt(); + client_manager->registerProducedFile(tid, dir); - if (c_file.n_files == c_file.n_files_expected) { - c_file.setComplete(); + if (c_file.getDirectoryContainedFileCount() == c_file.getDirectoryExpectedFileCount()) { + c_file.setCommitted(); } } StorageManager::StorageManager() { @@ -305,8 +305,8 @@ off64_t StorageManager::addDirectory(const pid_t tid, const std::filesystem::pat if (!get_file_location_opt(path)) { CapioFile &c_file = add(path, true, CAPIO_DEFAULT_DIR_INITIAL_SIZE); - if (c_file.first_write) { - c_file.first_write = false; + if (c_file.isFirstWrite()) { + c_file.registerFirstWrite(); // TODO: it works only if there is one prod per file if (is_capio_dir(path)) { add_file_location(path, node_name, -1); @@ -326,8 +326,8 @@ off64_t StorageManager::addDirectory(const pid_t tid, const std::filesystem::pat void StorageManager::updateDirectory(const pid_t tid, const std::filesystem::path &file_path) { START_LOG(gettid(), "call(file_path=%s)", file_path.c_str()); const auto &dir = get_parent_dir_path(file_path); - if (CapioFile &c_file = get(dir.c_str()); c_file.first_write) { - c_file.first_write = false; + if (CapioFile &c_file = get(dir.c_str()); c_file.isFirstWrite()) { + c_file.registerFirstWrite(); write_file_location(dir); } addDirectoryEntry(tid, file_path, dir, REGULAR_ENTRY); diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index c1e6d4c62..29bc25c79 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -1,8 +1,8 @@ -#ifndef CAPIO_CAPIO_FILE_HPP -#define CAPIO_CAPIO_FILE_HPP #include "server/include/storage/capio_file.hpp" #include "common/env.hpp" + #include +#include TEST(ServerTest, TestInsertSingleSector) { CapioFile c_file; @@ -59,4 +59,219 @@ TEST(ServerTest, TestInsertTwoOverlappingSectorsNested) { EXPECT_EQ(sectors.size(), 1); EXPECT_NE(sectors.find({1L, 4L}), sectors.end()); } -#endif // CAPIO_CAPIO_FILE_HPP + +TEST(ServerTest, TestDestructionOfPermanentCapioFile) { + auto *c_file = new CapioFile(false, true, 1000, 1); + c_file->createBuffer("test.dat", true); + delete c_file; + EXPECT_TRUE(std::filesystem::exists("test.dat")); + std::filesystem::remove("test.dat"); +} + +TEST(ServerTest, TestDestructionOfPermanentCapioFileDirectory) { + auto *c_file = new CapioFile(true, true, 1000, 1); + c_file->createBuffer("mydirectory", true); + delete c_file; + EXPECT_TRUE(std::filesystem::exists("mydirectory")); + EXPECT_TRUE(std::filesystem::is_directory("mydirectory")); + std::filesystem::remove("mydirectory"); +} + +TEST(ServerTest, TestCapioFileWaitForDataMultithreaded) { + CapioFile file; + + SPSCQueue queue("test_queue", get_cache_lines(), get_cache_line_size(), "test_wf"); + + std::mutex _lock; + _lock.lock(); + + std::thread t([&_lock, &file, &queue] { + _lock.lock(); + file.expandBuffer(1000); + file.registerFirstWrite(); + + EXPECT_NE(file.getBuffer(), nullptr); + char buffer[1000]; + for (std::size_t i = 0; i < 1000; ++i) { + buffer[i] = 33 + (i % 93); + } + + queue.write(buffer, 1000); + file.insertSector(0, 1000); + file.readFromQueue(queue, 0, 1000); + }); + + _lock.unlock(); + file.waitForData(1000); + + auto stored_size = file.getFileSize(); + EXPECT_EQ(stored_size, 1000); + + const auto buf = file.getBuffer(); + EXPECT_NE(buf, nullptr); + for (std::size_t i = 0; i < 1000; ++i) { + EXPECT_EQ(buf[i], 33 + (i % 93)); + } + + t.join(); +} + +TEST(ServerTest, TestCapioFileWaitForCompletion) { + CapioFile file; + + std::mutex _lock; + _lock.lock(); + + std::thread t([&] { + _lock.lock(); + file.expandBuffer(1000); + file.registerFirstWrite(); + + EXPECT_NE(file.getBuffer(), nullptr); + char buffer[1000]; + for (std::size_t i = 0; i < 1000; ++i) { + buffer[i] = 33 + (i % 93); + } + + memcpy(file.getBuffer(), buffer, 1000); + + file.insertSector(0, 1000); + file.setCommitted(); + }); + + _lock.unlock(); + file.waitForCommit(); + + auto stored_size = file.getFileSize(); + EXPECT_EQ(stored_size, 1000); + + const auto buf = file.getBuffer(); + EXPECT_NE(buf, nullptr); + for (std::size_t i = 0; i < 1000; ++i) { + EXPECT_EQ(buf[i], 33 + (i % 93)); + } + + t.join(); +} + +TEST(ServerTest, TestCommitCapioFile) { + auto file = new CapioFile(false, true, 1000, 1); + file->createBuffer("test.dat", true); + EXPECT_EQ(std::filesystem::file_size("test.dat"), 1000); + file->close(); + file->dump(); + EXPECT_EQ(std::filesystem::file_size("test.dat"), 0); + delete file; + EXPECT_TRUE(std::filesystem::exists("test.dat")); + std::filesystem::remove("test.dat"); +} + +TEST(ServerTest, TestCommitAndDeleteDirectory) { + EXPECT_FALSE(std::filesystem::exists("mydir")); + auto file = new CapioFile(true, true, 1000, 1); + file->createBuffer("mydir", true); + EXPECT_TRUE(std::filesystem::exists("mydir")); + EXPECT_TRUE(std::filesystem::is_directory("mydir")); + delete file; + EXPECT_TRUE(std::filesystem::exists("mydir")); + std::filesystem::remove("mydir"); +} + +TEST(ServerTest, TesMemcpyCapioFile) { + CapioFile file; + + file.createBuffer("test.dat", true); + + // NOTE: here we only simulate a write operation on the file, without actually writing to _buf + file.insertSector(0, 100); + file.insertSector(100, 200); + + file.expandBuffer(2000); + + EXPECT_EQ(file.getStoredSize(), 200); + EXPECT_EQ(file.getBufSize(), 2000); +} + +TEST(ServerTest, TestCloseCapioFile) { + CapioFile file(false, false, 0, -1); + EXPECT_TRUE(file.closed()); // TEST for n_close_expected == -1 + + CapioFile file1(false, false, 0, 10); + EXPECT_FALSE(file1.closed()); + for (std::size_t i = 0; i < 10; ++i) { + file1.open(); + EXPECT_FALSE(file1.closed()); + file1.close(); + } + EXPECT_TRUE(file1.closed()); +} + +TEST(ServerTest, TestCapioFileSeekData) { + CapioFile file; + + EXPECT_EQ(file.seekData(100), -1); + EXPECT_EQ(file.seekData(0), 0); + + file.insertSector(0, 1000); + EXPECT_EQ(file.seekData(100), 100); + EXPECT_EQ(file.seekData(2000), -1); + file.insertSector(2000, 3000); + EXPECT_NE(file.seekData(1500), 1500); // return here the closest offset... + + CapioFile file1; + file1.insertSector(200, 300); + EXPECT_EQ(file1.seekData(1), 200); +} + +TEST(ServerTest, TestCapioFileSeekHole) { + CapioFile file; + + EXPECT_EQ(file.seekHole(100), -1); + EXPECT_EQ(file.seekHole(0), 0); + file.insertSector(0, 1000); + EXPECT_EQ(file.seekHole(100), 1000); + EXPECT_EQ(file.seekHole(2000), -1); + file.insertSector(2000, 3000); + EXPECT_EQ(file.seekHole(1500), 1500); // return here the closest offset... + + CapioFile file1; + file1.insertSector(200, 300); + EXPECT_EQ(file1.seekHole(1), 1); +} + +TEST(ServerTest, TestAddAndRemoveFD) { + CapioFile file; + file.addFd(12345, 4); + file.addFd(12345, 5); + + file.removeFd(12345, 6); + EXPECT_EQ(file.getFds().size(), 2); + file.removeFd(12345, 5); + EXPECT_EQ(file.getFds().size(), 1); + file.removeFd(12345, 4); + EXPECT_EQ(file.getFds().size(), 0); +} + +TEST(ServerTest, TestSetGetRealFileSize) { + CapioFile file; + EXPECT_EQ(file.getRealFileSize(), 0); + file.setRealFileSize(1234); + EXPECT_EQ(file.getRealFileSize(), 1234); +} + +TEST(ServerTest, TestDeletePermanentDirectory) { + const auto file = new CapioFile(true, true, 1000, 1); + file->createBuffer("testDir", true); + delete file; + EXPECT_TRUE(std::filesystem::exists("testDir")); + EXPECT_TRUE(std::filesystem::is_directory("testDir")); + std::filesystem::remove("testDir"); +} + +TEST(ServerTest, TestFileSetCommitToFalse) { + CapioFile file; + file.setCommitted(); + EXPECT_TRUE(file.isCommitted()); + file.setCommitted(false); + EXPECT_FALSE(file.isCommitted()); +} diff --git a/capio/tests/unit/server/src/storage_manager.cpp b/capio/tests/unit/server/src/storage_manager.cpp index 6f550a215..e86229e8f 100644 --- a/capio/tests/unit/server/src/storage_manager.cpp +++ b/capio/tests/unit/server/src/storage_manager.cpp @@ -48,12 +48,12 @@ TEST(StorageManagerTestEnvironment, testInitDirectory) { const auto &dir = storage_manager->get("myDirectory"); - EXPECT_EQ(dir.getBufferSize(), CAPIO_DEFAULT_DIR_INITIAL_SIZE); + EXPECT_EQ(dir.getBufSize(), CAPIO_DEFAULT_DIR_INITIAL_SIZE); storage_manager->updateDirectory(1, "myDirectory"); const auto &dir1 = storage_manager->get("myDirectory"); - EXPECT_FALSE(dir1.first_write); + EXPECT_FALSE(dir1.isFirstWrite()); } TEST(StorageManagerTestEnvironment, testAddDirectoryFailure) { From b14cedef56bbc27ecd243523e4b7853020cf1992 Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 12 Mar 2026 13:44:23 +0000 Subject: [PATCH 2/8] Fixed read cache for getdents requests Added notification of data being delivered --- capio/posix/utils/cache.hpp | 13 +++++- capio/server/include/storage/capio_file.hpp | 2 +- capio/server/src/capio_file.cpp | 44 +++++++++++++++++---- capio/server/src/storage_manager.cpp | 3 +- 4 files changed, 50 insertions(+), 12 deletions(-) diff --git a/capio/posix/utils/cache.hpp b/capio/posix/utils/cache.hpp index d7c7e4307..618711930 100644 --- a/capio/posix/utils/cache.hpp +++ b/capio/posix/utils/cache.hpp @@ -70,8 +70,17 @@ class ReadCache { _read(buffer, remaining_bytes); buffer = reinterpret_cast(buffer) + remaining_bytes; - if (read_size > _max_line_size) { - LOG("count - remaining_bytes %ld > _max_line_size %ld", read_size, _max_line_size); + // NOTE: if getdents send a request for exactly the correct amount of data. + if (read_size > _max_line_size || is_getdents) { + DBG(capio_syscall(SYS_gettid), [&] { + if (is_getdents) { + LOG("NOTE: requesting getdents data. sending request for exact amount!"); + } else { + LOG("count - remaining_bytes %ld > _max_line_size %ld", read_size, + _max_line_size); + } + }()); + LOG("Reading exactly requested size"); off64_t end_of_read = is_getdents ? getdents_request(fd, read_size, is64bit, _tid) : read_request(fd, read_size, _tid); diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index 0be89f72d..8d07d4fc6 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -29,7 +29,7 @@ class CapioFile { off64_t _buf_size = 0; ///< Allocated size of _buf int _fd = -1; ///< File descriptor for permanent/mmap storage int _n_links = 1; ///< Number of symbolic links to the file - long int _n_close_expected = -1; ///< Target close() operations for commitment + long int _n_close_expected = 0; ///< Target close() operations for commitment long int _n_close = 0; ///< Current count of close() operations int _n_opens = 0; ///< Current count of open() operations int _n_files = 0; ///< Count of dirent64 stored (if directory) diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index c2bed8dde..d3a4ce119 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -1,6 +1,6 @@ +#include "server/include/storage/capio_file.hpp" #include "common/logger.hpp" #include "remote/backend.hpp" -#include "server/include/storage/capio_file.hpp" #include "utils/common.hpp" #include @@ -257,7 +257,14 @@ void CapioFile::insertSector(off64_t new_start, off64_t new_end) { } } -bool CapioFile::closed() const { return _n_close_expected == -1 || _n_close == _n_close_expected; } +bool CapioFile::closed() const { + START_LOG(gettid(), "call()"); + LOG("_n_close_expected = %d", _n_close_expected); + LOG("_n_close = %d", _n_close); + LOG("_n_opens = %d", _n_opens); + + return _n_close_expected == 0 || _n_close == _n_close_expected; +} bool CapioFile::deletable() const { return _n_opens <= 0; } @@ -346,16 +353,37 @@ bool CapioFile::bufferToAllocate() const { return _buf == nullptr; } -off64_t CapioFile::getRealFileSize() const { return this->_real_file_size; } +off64_t CapioFile::getRealFileSize() const { + std::unique_lock lock(_mutex); + return this->_real_file_size; +} -void CapioFile::setRealFileSize(const off64_t size) { this->_real_file_size = size; } +void CapioFile::setRealFileSize(const off64_t size) { + START_LOG(gettid(), "call(size=%ld)", size); + std::unique_lock lock(_mutex); + this->_real_file_size = size; +} -bool CapioFile::isFirstWrite() const { return this->_first_write; } +bool CapioFile::isFirstWrite() const { + std::unique_lock lock(_mutex); + return this->_first_write; +} -void CapioFile::registerFirstWrite() { this->_first_write = false; } +void CapioFile::registerFirstWrite() { + std::unique_lock lock(_mutex); + this->_first_write = false; +} -void CapioFile::incrementDirFileCnt(const int count) { this->_n_files += count; } +void CapioFile::incrementDirFileCnt(const int count) { + START_LOG(gettid(), "call(count=%d)", count); + std::unique_lock lock(_mutex); + this->_n_files += count; + this->_data_avail_cv.notify_all(); +} -int CapioFile::getDirectoryContainedFileCount() const { return this->_n_files; } +int CapioFile::getDirectoryContainedFileCount() const { + std::unique_lock lock(_mutex); + return this->_n_files; +} int CapioFile::getDirectoryExpectedFileCount() const { return this->_n_files_expected; } \ No newline at end of file diff --git a/capio/server/src/storage_manager.cpp b/capio/server/src/storage_manager.cpp index d64a471c6..9bdb1a736 100644 --- a/capio/server/src/storage_manager.cpp +++ b/capio/server/src/storage_manager.cpp @@ -61,7 +61,8 @@ void StorageManager::addDirectoryEntry(const pid_t tid, const std::filesystem::p c_file.incrementDirFileCnt(); client_manager->registerProducedFile(tid, dir); - if (c_file.getDirectoryContainedFileCount() == c_file.getDirectoryExpectedFileCount()) { + if (c_file.getDirectoryContainedFileCount() == c_file.getDirectoryExpectedFileCount() && + CapioCLEngine::get().getCommitRule(file_path) == capiocl::commitRules::ON_N_FILES) { c_file.setCommitted(); } } From 574058a9f15af1842309c1184ceabe34636e65ad Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 18 Mar 2026 12:56:20 +0000 Subject: [PATCH 3/8] Added log messages to cli --- capio/server/src/client_manager.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/capio/server/src/client_manager.cpp b/capio/server/src/client_manager.cpp index aef6d9538..54c21f95d 100644 --- a/capio/server/src/client_manager.cpp +++ b/capio/server/src/client_manager.cpp @@ -7,6 +7,7 @@ #include "common/queue.hpp" #include "utils/capiocl_adapter.hpp" #include "utils/common.hpp" +#include "utils/location.hpp" ClientManager::ClientDataBuffers::ClientDataBuffers(const std::string &clientToServerName, const std::string &serverToClientName, @@ -52,6 +53,10 @@ void ClientManager::registerClient(pid_t tid, const std::string &app_name, const }); t.detach(); } + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, std::string("Registered client: ") + node_name + + "::" + app_name + + "::" + std::to_string(tid)); } void ClientManager::unlockClonedChild(const pid_t tid) { @@ -74,6 +79,9 @@ void ClientManager::removeClient(const pid_t tid) { if (const auto response_buffer = responses.find(tid); response_buffer != responses.end()) { responses.erase(response_buffer); } + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, std::string("Removed client: ") + node_name + + "::" + app_name + + "::" + std::to_string(tid)); } void ClientManager::replyToClient(const pid_t tid, const off64_t offset) { From c1ab1a12eea9162adda5dda067de9e65d60a35ac Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 18 Mar 2026 13:40:47 +0000 Subject: [PATCH 4/8] Added log messages to cli --- capio/server/src/client_manager.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/capio/server/src/client_manager.cpp b/capio/server/src/client_manager.cpp index 54c21f95d..4edbbe148 100644 --- a/capio/server/src/client_manager.cpp +++ b/capio/server/src/client_manager.cpp @@ -54,7 +54,7 @@ void ClientManager::registerClient(pid_t tid, const std::string &app_name, const t.detach(); } - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, std::string("Registered client: ") + node_name + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, std::string("[+Client]") + node_name + "::" + app_name + "::" + std::to_string(tid)); } @@ -79,7 +79,7 @@ void ClientManager::removeClient(const pid_t tid) { if (const auto response_buffer = responses.find(tid); response_buffer != responses.end()) { responses.erase(response_buffer); } - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, std::string("Removed client: ") + node_name + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, std::string("[~Client]") + node_name + "::" + app_name + "::" + std::to_string(tid)); } From 1ef694232fa2670dde89867cf8c3e06d144e49e7 Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 18 Mar 2026 13:45:35 +0000 Subject: [PATCH 5/8] Added log messages to cli --- capio/server/src/client_manager.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/capio/server/src/client_manager.cpp b/capio/server/src/client_manager.cpp index 4edbbe148..71d4faa59 100644 --- a/capio/server/src/client_manager.cpp +++ b/capio/server/src/client_manager.cpp @@ -54,7 +54,7 @@ void ClientManager::registerClient(pid_t tid, const std::string &app_name, const t.detach(); } - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, std::string("[+Client]") + node_name + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, std::string("[ClientManager] + ") + node_name + "::" + app_name + "::" + std::to_string(tid)); } @@ -79,7 +79,7 @@ void ClientManager::removeClient(const pid_t tid) { if (const auto response_buffer = responses.find(tid); response_buffer != responses.end()) { responses.erase(response_buffer); } - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, std::string("[~Client]") + node_name + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, std::string("[ClientManager] ~ ") + node_name + "::" + app_name + "::" + std::to_string(tid)); } From 6493a88121739fead71707e858ced3457f38a12b Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 18 Mar 2026 21:35:17 +0000 Subject: [PATCH 6/8] SHM canary help --- capio/common/shm.hpp | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/capio/common/shm.hpp b/capio/common/shm.hpp index a8da727ab..572b8403b 100644 --- a/capio/common/shm.hpp +++ b/capio/common/shm.hpp @@ -53,14 +53,7 @@ class CapioShmCanary { } _shm_id = shm_open(_canary_name.data(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); if (_shm_id == -1) { - LOG(CAPIO_SHM_CANARY_ERROR, _canary_name.data()); -#ifndef __CAPIO_POSIX - auto message = new char[strlen(CAPIO_SHM_CANARY_ERROR)]; - sprintf(message, CAPIO_SHM_CANARY_ERROR, _canary_name.data()); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << message << std::endl; - delete[] message; -#endif - ERR_EXIT("ERR: shm canary flag already exists"); + ERR_EXIT(CAPIO_SHM_CANARY_ERROR, _canary_name.data()); } }; @@ -105,7 +98,7 @@ inline void *get_shm(const std::string &shm_name) { // if we are not creating a new object, mode is equals to 0 int fd = shm_open(shm_name.c_str(), O_RDWR, 0); // to be closed - struct stat sb {}; + struct stat sb{}; if (fd == -1) { ERR_EXIT("get_shm shm_open %s", shm_name.c_str()); } @@ -130,7 +123,7 @@ inline void *get_shm_if_exist(const std::string &shm_name) { // if we are not creating a new object, mode is equals to 0 int fd = shm_open(shm_name.c_str(), O_RDWR, 0); // to be closed - struct stat sb {}; + struct stat sb{}; if (fd == -1) { if (errno == ENOENT) { return nullptr; From 054073c84637aead1a1add030742d34c67a91ac9 Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 18 Mar 2026 22:17:37 +0000 Subject: [PATCH 7/8] Added NoBackend option --- capio/server/capio_server.cpp | 1 - .../server/include/remote/backend/include.hpp | 1 + capio/server/include/remote/backend/mpi.hpp | 1 + .../include/remote/backend/no_backend.hpp | 44 +++++++++++++++++++ capio/server/include/remote/listener.hpp | 5 +++ 5 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 capio/server/include/remote/backend/no_backend.hpp diff --git a/capio/server/capio_server.cpp b/capio/server/capio_server.cpp index 59f22f4b6..4040aeea9 100644 --- a/capio/server/capio_server.cpp +++ b/capio/server/capio_server.cpp @@ -93,7 +93,6 @@ static constexpr std::array build_request_handle START_LOG(gettid(), "call()"); - MPI_Comm_size(MPI_COMM_WORLD, &n_servers); setup_signal_handlers(); backend->handshake_servers(); diff --git a/capio/server/include/remote/backend/include.hpp b/capio/server/include/remote/backend/include.hpp index 6291a9500..ce298b767 100644 --- a/capio/server/include/remote/backend/include.hpp +++ b/capio/server/include/remote/backend/include.hpp @@ -5,4 +5,5 @@ */ #include "mpi.hpp" +#include "no_backend.hpp" #endif // CAPIO_SERVER_REMOTE_BACKEND_INCLUDE_HPP diff --git a/capio/server/include/remote/backend/mpi.hpp b/capio/server/include/remote/backend/mpi.hpp index cda73bc91..f24c47428 100644 --- a/capio/server/include/remote/backend/mpi.hpp +++ b/capio/server/include/remote/backend/mpi.hpp @@ -26,6 +26,7 @@ class MPIBackend : public Backend { LOG("Mpi has multithreading support? %s (%d)", provided == MPI_THREAD_MULTIPLE ? "yes" : "no", provided); MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &n_servers); LOG("node_rank=%d", &rank); if (provided != MPI_THREAD_MULTIPLE) { LOG("Error: The threading support level is not MPI_THREAD_MULTIPLE (is %d)", provided); diff --git a/capio/server/include/remote/backend/no_backend.hpp b/capio/server/include/remote/backend/no_backend.hpp new file mode 100644 index 000000000..d8760f7a5 --- /dev/null +++ b/capio/server/include/remote/backend/no_backend.hpp @@ -0,0 +1,44 @@ +#ifndef CAPIO_NO_BACKEND_HPP +#define CAPIO_NO_BACKEND_HPP + +class NoBackend : public Backend { + + public: + NoBackend(int argc, char **argv) { + START_LOG(gettid(), "call()"); + n_servers = 1; + node_name = new char[HOST_NAME_MAX]; + gethostname(node_name, HOST_NAME_MAX); + } + + ~NoBackend() override = default; + + const std::set get_nodes() override { return {}; } + + void handshake_servers() override {} + + RemoteRequest read_next_request() override { + START_LOG(gettid(), "call()"); + LOG("Halting thread execution as NoBackend was chosen"); + while (true) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + return {nullptr, ""}; + } + + void send_file(char *shm, long int nbytes, const std::string &target) override { + START_LOG(gettid(), "call(%.50s, %ld, %s)", shm, nbytes, target.c_str()); + } + + void send_request(const char *message, int message_len, const std::string &target) override { + START_LOG(gettid(), "call(message=%s, message_len=%d, target=%s)", message, message_len, + target.c_str()); + } + + void recv_file(char *shm, const std::string &source, long int bytes_expected) override { + START_LOG(gettid(), "call(shm=%ld, source=%s, bytes_expected=%ld)", shm, source.c_str(), + bytes_expected); + } +}; + +#endif // CAPIO_NO_BACKEND_HPP diff --git a/capio/server/include/remote/listener.hpp b/capio/server/include/remote/listener.hpp index 61f89efbf..b965323a1 100644 --- a/capio/server/include/remote/listener.hpp +++ b/capio/server/include/remote/listener.hpp @@ -46,6 +46,11 @@ inline Backend *select_backend(const std::string &backend_name, int argc, char * << std::endl; return new MPISYNCBackend(argc, argv); } + + if (backend_name == "none") { + return new NoBackend(argc, argv); + } + LOG("Backend %s does not exist in CAPIO. Reverting back to the default MPI backend", backend_name.c_str()); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " Backend " << backend_name From 6d8006ff7308265b7a9182c2c54fc80f05170a15 Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 18 Mar 2026 22:42:58 +0000 Subject: [PATCH 8/8] Bugfix in no_backend --- capio/server/include/remote/backend/no_backend.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capio/server/include/remote/backend/no_backend.hpp b/capio/server/include/remote/backend/no_backend.hpp index d8760f7a5..02e42e004 100644 --- a/capio/server/include/remote/backend/no_backend.hpp +++ b/capio/server/include/remote/backend/no_backend.hpp @@ -13,7 +13,7 @@ class NoBackend : public Backend { ~NoBackend() override = default; - const std::set get_nodes() override { return {}; } + const std::set get_nodes() override { return {node_name}; } void handshake_servers() override {}