Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ jobs:
--xml coverage.xml \
--gcov-executable gcov \
--exclude capio/tests \
--gcov-ignore-parse-errors=negative_hits.warn \
../build
- name: "Compute Valid Artifact Name"
Expand Down
3 changes: 2 additions & 1 deletion capio/posix/utils/cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class ReadCache {
_read(buffer, remaining_bytes);
buffer = reinterpret_cast<char *>(buffer) + remaining_bytes;

if (read_size > _max_line_size) {
// NOTE: if getdents send a request for exactly the correct amount of data.
if (read_size > _max_line_size || is_getdents) {
LOG("count - remaining_bytes %ld > _max_line_size %ld", read_size, _max_line_size);
LOG("Reading exactly requested size");
off64_t end_of_read = is_getdents ? getdents_request(fd, read_size, is64bit, _tid)
Expand Down
16 changes: 8 additions & 8 deletions capio/server/include/storage/capio_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ class CapioFile {
int _fd = -1; ///< File descriptor for permanent/mmap storage

// TODO: check if it is possible to move from int to unsigned int
std::atomic<int> _n_close = 0; ///< Current count of close() operations
std::atomic<int> _n_opens = 0; ///< Current count of open() operations
std::atomic<int> _n_files = 0; ///< Count of dirent64 stored (if directory)
const int _n_files_expected = -1; ///< Target dirent64 count (if directory)
const int _n_close_expected = -1; ///< Target close() operations for commitment
std::atomic<int> _n_close = 0; ///< Current count of close() operations
std::atomic<int> _n_opens = 0; ///< Current count of open() operations
std::atomic<int> _n_files = 0; ///< Count of dirent64 stored (if directory)
const unsigned int _n_files_expected = 0; ///< Target dirent64 count (if directory)
const unsigned int _n_close_expected = 0; ///< Target close() operations for commitment

bool _home_node = false; ///< True if this is the home node
bool _committed = false; ///< True if file is finalized
Expand Down Expand Up @@ -83,7 +83,7 @@ class CapioFile {
* @param init_size Initial buffer allocation size.
* @param n_close_expected Expected number of close calls.
*/
CapioFile(bool directory, int n_files_expected, bool permanent, off64_t init_size,
CapioFile(bool directory, unsigned int n_files_expected, bool permanent, off64_t init_size,
int n_close_expected);

/**
Expand All @@ -93,7 +93,7 @@ class CapioFile {
* @param init_size Initial buffer allocation size.
* @param n_close_expected Expected number of close calls.
*/
CapioFile(bool directory, bool permanent, off64_t init_size, int n_close_expected);
CapioFile(bool directory, bool permanent, off64_t init_size, unsigned int n_close_expected);

CapioFile(const CapioFile &) = delete;
CapioFile &operator=(const CapioFile &) = delete;
Expand Down Expand Up @@ -228,7 +228,7 @@ class CapioFile {
[[nodiscard]] int getCurrentDirectoryFileCount() const;

/** @return Expected total files in this directory. */
[[nodiscard]] int getDirectoryExpectedFileCount() const;
[[nodiscard]] unsigned int getDirectoryExpectedFileCount() const;

/** @return Reference to the internal sector map. */
[[nodiscard]] const std::set<std::pair<off64_t, off64_t>, compareSectors> &getSectors() const;
Expand Down
16 changes: 10 additions & 6 deletions capio/server/src/capio_file.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "server/include/storage/capio_file.hpp"

#include "common/logger.hpp"
#include "remote/backend.hpp"
#include "server/include/utils/common.hpp"
Expand All @@ -11,13 +12,13 @@ bool CapioFile::compareSectors::operator()(const std::pair<off64_t, off64_t> &lh

CapioFile::CapioFile() = default;

CapioFile::CapioFile(const bool directory, const int n_files_expected, const bool permanent,
const off64_t init_size, const int n_close_expected)
CapioFile::CapioFile(const bool directory, const unsigned int n_files_expected,
const bool permanent, const off64_t init_size, const int n_close_expected)
: _buf_size(init_size), _n_files_expected(n_files_expected + 2),
_n_close_expected(n_close_expected), _directory(directory), _permanent(permanent) {}

CapioFile::CapioFile(const bool directory, const bool permanent, const off64_t init_size,
const int n_close_expected)
const unsigned int n_close_expected)
: _buf_size(init_size), _n_close_expected(n_close_expected), _directory(directory),
_permanent(permanent) {}

Expand Down Expand Up @@ -230,7 +231,7 @@ void CapioFile::insertSector(off64_t new_start, off64_t new_end) {
_sectors.emplace(new_start, new_end);
}

bool CapioFile::closed() const { return _n_close_expected == -1 || _n_close == _n_close_expected; }
bool CapioFile::closed() const { return _n_close_expected <= 0 || _n_close == _n_close_expected; }

bool CapioFile::deletable() const { return _n_opens <= 0; }

Expand Down Expand Up @@ -333,8 +334,11 @@ bool CapioFile::isFirstWrite() const { return this->_first_write; }

void CapioFile::registerFirstWrite() { this->_first_write = false; }

void CapioFile::incrementDirectoryFileCount(const int count) { this->_n_files += count; }
void CapioFile::incrementDirectoryFileCount(const int count) {
this->_n_files += count;
this->_data_avail_cv.notify_all();
}

int CapioFile::getCurrentDirectoryFileCount() const { return this->_n_files; }

int CapioFile::getDirectoryExpectedFileCount() const { return this->_n_files_expected; }
unsigned int CapioFile::getDirectoryExpectedFileCount() const { return this->_n_files_expected; }
3 changes: 2 additions & 1 deletion capio/server/src/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ void StorageManager::addDirectoryEntry(const pid_t tid, const std::filesystem::p
c_file.insertSector(base_offset, data_size);
c_file.incrementDirectoryFileCount();
client_manager->registerProducedFile(tid, dir);
if (c_file.getCurrentDirectoryFileCount() == c_file.getDirectoryExpectedFileCount()) {
if (c_file.getCurrentDirectoryFileCount() == c_file.getDirectoryExpectedFileCount() &&
CapioCLEngine::get().getCommitRule(file_path) == capiocl::commitRules::ON_N_FILES) {
c_file.setCommitted();
}
}
Expand Down
64 changes: 62 additions & 2 deletions capio/tests/unit/server/src/capio_file.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#include "server/include/storage/capio_file.hpp"
#include "storage/capio_file.hpp"
#include "common/dirent.hpp"
#include "common/env.hpp"
#include "remote/backend.hpp"
#include "storage/manager.hpp"

extern StorageManager *storage_manager;

#include <gtest/gtest.h>
#include <thread>
Expand Down Expand Up @@ -234,7 +238,7 @@ TEST(ServerTest, TesMemcpyCapioFile) {
}

TEST(ServerTest, TestCloseCapioFile) {
CapioFile file(false, false, 0, -1);
CapioFile file(false, false, 0, 0);
EXPECT_TRUE(file.closed()); // TEST for n_close_expected == -1

CapioFile file1(false, false, 0, 10);
Expand Down Expand Up @@ -362,3 +366,59 @@ TEST(ServerTest, TestGetSectorEnd) {
EXPECT_EQ(file.getSectorEnd(120), 1234);
EXPECT_EQ(file.getSectorEnd(12000), -1);
}

TEST(ServerTest, TestSimulateDirectorySrteaming) {

constexpr int NUM_FILES_EXPECTED = 10;

const std::filesystem::path CAPIO_DIR = "/tmp";
const std::filesystem::path stream_directory = CAPIO_DIR / "my_streaming_directory";

setenv("CAPIO_DIR", CAPIO_DIR.c_str(), 1);
storage_manager->addDirectory(1234, CAPIO_DIR);
storage_manager->addDirectory(1234, stream_directory);

std::mutex mutex_continue;

std::thread t([&] {
for (auto i = 0; i < NUM_FILES_EXPECTED; ++i) {
mutex_continue.lock();
const std::string filename = "file." + std::to_string(i);
storage_manager->updateDirectory(1234, stream_directory / filename);
}
});

const auto &file = storage_manager->get(stream_directory);

long current_offset = 0;

linux_dirent64 dirent{};

file.waitForData(current_offset + sizeof(linux_dirent64));
memcpy(&dirent, file.getBuffer() + current_offset, sizeof(linux_dirent64));
current_offset += sizeof(linux_dirent64);
EXPECT_EQ(strcmp(dirent.d_name, "."), 0);
bzero(&dirent, sizeof(linux_dirent64));

file.waitForData(current_offset + sizeof(linux_dirent64));
memcpy(&dirent, file.getBuffer() + current_offset, sizeof(linux_dirent64));
current_offset += sizeof(linux_dirent64);
EXPECT_EQ(strcmp(dirent.d_name, ".."), 0);
bzero(&dirent, sizeof(linux_dirent64));

for (auto i = 0; i < NUM_FILES_EXPECTED; ++i) {

file.waitForData(current_offset + sizeof(linux_dirent64));
memcpy(&dirent, file.getBuffer() + current_offset, sizeof(linux_dirent64));
const std::string expected_filename = "file." + std::to_string(i);
EXPECT_EQ(strcmp(dirent.d_name, expected_filename.c_str()), 0);
bzero(&dirent, sizeof(linux_dirent64));
mutex_continue.unlock();
current_offset += sizeof(linux_dirent64);
}

t.join();

storage_manager->remove(stream_directory);
storage_manager->remove(CAPIO_DIR);
}
Loading