diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index 12812f74b..64a56db95 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -233,6 +233,7 @@ jobs: --xml coverage.xml \ --gcov-executable gcov \ --exclude capio/tests \ + --gcov-ignore-parse-errors=negative_hits.warn \ ../build - name: "Compute Valid Artifact Name" diff --git a/capio/posix/utils/cache.hpp b/capio/posix/utils/cache.hpp index d7c7e4307..597d644a5 100644 --- a/capio/posix/utils/cache.hpp +++ b/capio/posix/utils/cache.hpp @@ -70,7 +70,8 @@ class ReadCache { _read(buffer, remaining_bytes); buffer = reinterpret_cast(buffer) + remaining_bytes; - if (read_size > _max_line_size) { + // NOTE: if getdents send a request for exactly the correct amount of data. + if (read_size > _max_line_size || is_getdents) { LOG("count - remaining_bytes %ld > _max_line_size %ld", read_size, _max_line_size); LOG("Reading exactly requested size"); off64_t end_of_read = is_getdents ? getdents_request(fd, read_size, is64bit, _tid) diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index 35ff7a843..d9295d420 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -31,11 +31,11 @@ class CapioFile { int _fd = -1; ///< File descriptor for permanent/mmap storage // TODO: check if it is possible to move from int to unsigned int - std::atomic _n_close = 0; ///< Current count of close() operations - std::atomic _n_opens = 0; ///< Current count of open() operations - std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) - const int _n_files_expected = -1; ///< Target dirent64 count (if directory) - const int _n_close_expected = -1; ///< Target close() operations for commitment + std::atomic _n_close = 0; ///< Current count of close() operations + std::atomic _n_opens = 0; ///< Current count of open() operations + std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) + const unsigned int _n_files_expected = 0; ///< Target dirent64 count (if directory) + const unsigned int _n_close_expected = 0; ///< Target close() operations for commitment bool _home_node = false; ///< True if this is the home node bool _committed = false; ///< True if file is finalized @@ -83,7 +83,7 @@ class CapioFile { * @param init_size Initial buffer allocation size. * @param n_close_expected Expected number of close calls. */ - CapioFile(bool directory, int n_files_expected, bool permanent, off64_t init_size, + CapioFile(bool directory, unsigned int n_files_expected, bool permanent, off64_t init_size, int n_close_expected); /** @@ -93,7 +93,7 @@ class CapioFile { * @param init_size Initial buffer allocation size. * @param n_close_expected Expected number of close calls. */ - CapioFile(bool directory, bool permanent, off64_t init_size, int n_close_expected); + CapioFile(bool directory, bool permanent, off64_t init_size, unsigned int n_close_expected); CapioFile(const CapioFile &) = delete; CapioFile &operator=(const CapioFile &) = delete; @@ -228,7 +228,7 @@ class CapioFile { [[nodiscard]] int getCurrentDirectoryFileCount() const; /** @return Expected total files in this directory. */ - [[nodiscard]] int getDirectoryExpectedFileCount() const; + [[nodiscard]] unsigned int getDirectoryExpectedFileCount() const; /** @return Reference to the internal sector map. */ [[nodiscard]] const std::set, compareSectors> &getSectors() const; diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index 45baf5c40..855dda932 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -1,4 +1,5 @@ #include "server/include/storage/capio_file.hpp" + #include "common/logger.hpp" #include "remote/backend.hpp" #include "server/include/utils/common.hpp" @@ -11,13 +12,13 @@ bool CapioFile::compareSectors::operator()(const std::pair &lh CapioFile::CapioFile() = default; -CapioFile::CapioFile(const bool directory, const int n_files_expected, const bool permanent, - const off64_t init_size, const int n_close_expected) +CapioFile::CapioFile(const bool directory, const unsigned int n_files_expected, + const bool permanent, const off64_t init_size, const int n_close_expected) : _buf_size(init_size), _n_files_expected(n_files_expected + 2), _n_close_expected(n_close_expected), _directory(directory), _permanent(permanent) {} CapioFile::CapioFile(const bool directory, const bool permanent, const off64_t init_size, - const int n_close_expected) + const unsigned int n_close_expected) : _buf_size(init_size), _n_close_expected(n_close_expected), _directory(directory), _permanent(permanent) {} @@ -230,7 +231,7 @@ void CapioFile::insertSector(off64_t new_start, off64_t new_end) { _sectors.emplace(new_start, new_end); } -bool CapioFile::closed() const { return _n_close_expected == -1 || _n_close == _n_close_expected; } +bool CapioFile::closed() const { return _n_close_expected <= 0 || _n_close == _n_close_expected; } bool CapioFile::deletable() const { return _n_opens <= 0; } @@ -333,8 +334,11 @@ bool CapioFile::isFirstWrite() const { return this->_first_write; } void CapioFile::registerFirstWrite() { this->_first_write = false; } -void CapioFile::incrementDirectoryFileCount(const int count) { this->_n_files += count; } +void CapioFile::incrementDirectoryFileCount(const int count) { + this->_n_files += count; + this->_data_avail_cv.notify_all(); +} int CapioFile::getCurrentDirectoryFileCount() const { return this->_n_files; } -int CapioFile::getDirectoryExpectedFileCount() const { return this->_n_files_expected; } +unsigned int CapioFile::getDirectoryExpectedFileCount() const { return this->_n_files_expected; } diff --git a/capio/server/src/storage_manager.cpp b/capio/server/src/storage_manager.cpp index 93639cfc1..ef193a942 100644 --- a/capio/server/src/storage_manager.cpp +++ b/capio/server/src/storage_manager.cpp @@ -61,7 +61,8 @@ void StorageManager::addDirectoryEntry(const pid_t tid, const std::filesystem::p c_file.insertSector(base_offset, data_size); c_file.incrementDirectoryFileCount(); client_manager->registerProducedFile(tid, dir); - if (c_file.getCurrentDirectoryFileCount() == c_file.getDirectoryExpectedFileCount()) { + if (c_file.getCurrentDirectoryFileCount() == c_file.getDirectoryExpectedFileCount() && + CapioCLEngine::get().getCommitRule(file_path) == capiocl::commitRules::ON_N_FILES) { c_file.setCommitted(); } } diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index 7742b865c..834b04d24 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -1,6 +1,10 @@ -#include "server/include/storage/capio_file.hpp" +#include "storage/capio_file.hpp" +#include "common/dirent.hpp" #include "common/env.hpp" #include "remote/backend.hpp" +#include "storage/manager.hpp" + +extern StorageManager *storage_manager; #include #include @@ -234,7 +238,7 @@ TEST(ServerTest, TesMemcpyCapioFile) { } TEST(ServerTest, TestCloseCapioFile) { - CapioFile file(false, false, 0, -1); + CapioFile file(false, false, 0, 0); EXPECT_TRUE(file.closed()); // TEST for n_close_expected == -1 CapioFile file1(false, false, 0, 10); @@ -362,3 +366,59 @@ TEST(ServerTest, TestGetSectorEnd) { EXPECT_EQ(file.getSectorEnd(120), 1234); EXPECT_EQ(file.getSectorEnd(12000), -1); } + +TEST(ServerTest, TestSimulateDirectorySrteaming) { + + constexpr int NUM_FILES_EXPECTED = 10; + + const std::filesystem::path CAPIO_DIR = "/tmp"; + const std::filesystem::path stream_directory = CAPIO_DIR / "my_streaming_directory"; + + setenv("CAPIO_DIR", CAPIO_DIR.c_str(), 1); + storage_manager->addDirectory(1234, CAPIO_DIR); + storage_manager->addDirectory(1234, stream_directory); + + std::mutex mutex_continue; + + std::thread t([&] { + for (auto i = 0; i < NUM_FILES_EXPECTED; ++i) { + mutex_continue.lock(); + const std::string filename = "file." + std::to_string(i); + storage_manager->updateDirectory(1234, stream_directory / filename); + } + }); + + const auto &file = storage_manager->get(stream_directory); + + long current_offset = 0; + + linux_dirent64 dirent{}; + + file.waitForData(current_offset + sizeof(linux_dirent64)); + memcpy(&dirent, file.getBuffer() + current_offset, sizeof(linux_dirent64)); + current_offset += sizeof(linux_dirent64); + EXPECT_EQ(strcmp(dirent.d_name, "."), 0); + bzero(&dirent, sizeof(linux_dirent64)); + + file.waitForData(current_offset + sizeof(linux_dirent64)); + memcpy(&dirent, file.getBuffer() + current_offset, sizeof(linux_dirent64)); + current_offset += sizeof(linux_dirent64); + EXPECT_EQ(strcmp(dirent.d_name, ".."), 0); + bzero(&dirent, sizeof(linux_dirent64)); + + for (auto i = 0; i < NUM_FILES_EXPECTED; ++i) { + + file.waitForData(current_offset + sizeof(linux_dirent64)); + memcpy(&dirent, file.getBuffer() + current_offset, sizeof(linux_dirent64)); + const std::string expected_filename = "file." + std::to_string(i); + EXPECT_EQ(strcmp(dirent.d_name, expected_filename.c_str()), 0); + bzero(&dirent, sizeof(linux_dirent64)); + mutex_continue.unlock(); + current_offset += sizeof(linux_dirent64); + } + + t.join(); + + storage_manager->remove(stream_directory); + storage_manager->remove(CAPIO_DIR); +} \ No newline at end of file