Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ jobs:
--exclude-throw-branches \
--xml coverage.xml \
--gcov-executable gcov \
--exclude capio/tests \
../build

- name: "Compute Valid Artifact Name"
Expand Down
1 change: 1 addition & 0 deletions capio/common/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <cxxabi.h>
#include <fcntl.h>
#include <fstream>
#include <memory>
#include <string>
#include <sys/mman.h>
#include <unistd.h>
Expand Down
13 changes: 3 additions & 10 deletions capio/common/shm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,7 @@ class CapioShmCanary {
}
_shm_id = shm_open(_canary_name.data(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
if (_shm_id == -1) {
LOG(CAPIO_SHM_CANARY_ERROR, _canary_name.data());
#ifndef __CAPIO_POSIX
auto message = new char[strlen(CAPIO_SHM_CANARY_ERROR)];
sprintf(message, CAPIO_SHM_CANARY_ERROR, _canary_name.data());
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << message << std::endl;
delete[] message;
#endif
ERR_EXIT("ERR: shm canary flag already exists");
ERR_EXIT(CAPIO_SHM_CANARY_ERROR, _canary_name.data());
}
};

Expand Down Expand Up @@ -105,7 +98,7 @@ inline void *get_shm(const std::string &shm_name) {

// if we are not creating a new object, mode is equals to 0
int fd = shm_open(shm_name.c_str(), O_RDWR, 0); // to be closed
struct stat sb {};
struct stat sb{};
if (fd == -1) {
ERR_EXIT("get_shm shm_open %s", shm_name.c_str());
}
Expand All @@ -130,7 +123,7 @@ inline void *get_shm_if_exist(const std::string &shm_name) {

// if we are not creating a new object, mode is equals to 0
int fd = shm_open(shm_name.c_str(), O_RDWR, 0); // to be closed
struct stat sb {};
struct stat sb{};
if (fd == -1) {
if (errno == ENOENT) {
return nullptr;
Expand Down
13 changes: 11 additions & 2 deletions capio/posix/utils/cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,17 @@ class ReadCache {
_read(buffer, remaining_bytes);
buffer = reinterpret_cast<char *>(buffer) + remaining_bytes;

if (read_size > _max_line_size) {
LOG("count - remaining_bytes %ld > _max_line_size %ld", read_size, _max_line_size);
// NOTE: if getdents send a request for exactly the correct amount of data.
if (read_size > _max_line_size || is_getdents) {
DBG(capio_syscall(SYS_gettid), [&] {
if (is_getdents) {
LOG("NOTE: requesting getdents data. sending request for exact amount!");
} else {
LOG("count - remaining_bytes %ld > _max_line_size %ld", read_size,
_max_line_size);
}
}());

LOG("Reading exactly requested size");
off64_t end_of_read = is_getdents ? getdents_request(fd, read_size, is64bit, _tid)
: read_request(fd, read_size, _tid);
Expand Down
1 change: 0 additions & 1 deletion capio/server/capio_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ static constexpr std::array<CSHandler_t, CAPIO_NR_REQUESTS> build_request_handle

START_LOG(gettid(), "call()");

MPI_Comm_size(MPI_COMM_WORLD, &n_servers);
setup_signal_handlers();
backend->handshake_servers();

Expand Down
4 changes: 2 additions & 2 deletions capio/server/include/handlers/close.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ inline void handle_close(int tid, int fd) {
c_file.closed()) {
LOG("Capio File %s is closed and commit rule is on_close. setting it to complete",
path.c_str());
c_file.setComplete();
c_file.commit();
c_file.setCommitted();
c_file.dump();
}

LOG("Deleting capio file %s from tid=%d", path.c_str(), tid);
Expand Down
12 changes: 6 additions & 6 deletions capio/server/include/handlers/exig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ inline void handle_exit_group(int tid) {
LOG("Handling file %s", path.c_str());
if (CapioCLEngine::get().getCommitRule(path) == capiocl::commitRules::ON_TERMINATION) {
CapioFile &c_file = storage_manager->get(path);
if (c_file.directory()) {
if (c_file.isDirectory()) {
LOG("file %s is dir", path.c_str());
long int n_committed = c_file.n_files_expected;
if (n_committed <= c_file.n_files) {
if (const long int n_committed = c_file.getDirectoryExpectedFileCount();
n_committed <= c_file.getDirectoryContainedFileCount()) {
LOG("Setting file %s to complete", path.c_str());
c_file.setComplete();
c_file.setCommitted();
}
} else {
LOG("Setting file %s to complete", path.c_str());
c_file.setComplete();
c_file.commit();
c_file.setCommitted();
c_file.dump();
}
c_file.close();
}
Expand Down
7 changes: 4 additions & 3 deletions capio/server/include/handlers/getdents.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ inline void request_remote_getdents(int tid, int fd, off64_t count) {
off64_t end_of_read = offset + count;
off64_t end_of_sector = c_file.getSectorEnd(offset);

if (c_file.complete() && (end_of_read <= end_of_sector ||
(end_of_sector == -1 ? 0 : end_of_sector) == c_file.real_file_size)) {
if (c_file.isCommitted() &&
(end_of_read <= end_of_sector ||
(end_of_sector == -1 ? 0 : end_of_sector) == c_file.getRealFileSize())) {
LOG("Handling local read");
send_dirent_to_client(tid, fd, c_file, offset, count);
} else if (end_of_read <= end_of_sector) {
LOG("?");
c_file.createBufferIfNeeded(storage_manager->getPath(tid, fd), false);
c_file.createBuffer(storage_manager->getPath(tid, fd), false);
client_manager->replyToClient(tid, offset, c_file.getBuffer(), count);
storage_manager->setFileOffset(tid, fd, offset + count);
} else {
Expand Down
4 changes: 2 additions & 2 deletions capio/server/include/handlers/open.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ inline void update_file_metadata(const std::filesystem::path &path, int tid, int
: storage_manager->add(path, false, get_file_initial_size());
storage_manager->addFileToTid(tid, fd, path, offset);

if (c_file.first_write && is_creat) {
if (c_file.isFirstWrite() && is_creat) {
c_file.registerFirstWrite();
client_manager->registerProducedFile(tid, path);
c_file.first_write = false;
write_file_location(path);
storage_manager->updateDirectory(tid, path);
}
Expand Down
15 changes: 8 additions & 7 deletions capio/server/include/handlers/read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ inline void handle_pending_read(int tid, int fd, long int process_offset, long i
bytes_read = end_of_sector - process_offset;
}

c_file.createBufferIfNeeded(path, false);
c_file.createBuffer(path, false);
client_manager->replyToClient(tid, process_offset, c_file.getBuffer(), bytes_read);
storage_manager->setFileOffset(tid, fd, process_offset + bytes_read);

Expand All @@ -47,13 +47,13 @@ inline void handle_local_read(int tid, int fd, off64_t count, bool is_prod) {
off64_t process_offset = storage_manager->getFileOffset(tid, fd);

// if a process is the producer of a file, then the file is always complete for that process
const bool file_complete = c_file.complete() || is_prod;
const bool file_complete = c_file.isCommitted() || is_prod;

if (!(file_complete || CapioCLEngine::get().isFirable(path))) {
// wait for file to be completed and then do what is done inside handle pending read
LOG("Data is not available yet. Starting async thread to wait for file availability");
std::thread t([&c_file, tid, fd, count, process_offset] {
c_file.waitForCompletion();
c_file.waitForCommit();
handle_pending_read(tid, fd, process_offset, count);
});
t.detach();
Expand Down Expand Up @@ -85,7 +85,7 @@ inline void handle_local_read(int tid, int fd, off64_t count, bool is_prod) {
const auto read_size = std::min(count, end_of_sector - process_offset);
LOG("Requested read within end of sector, and data is available. Serving %ld bytes", read_size);

c_file.createBufferIfNeeded(path, false);
c_file.createBuffer(path, false);
client_manager->replyToClient(tid, process_offset, c_file.getBuffer(), read_size);
storage_manager->setFileOffset(tid, fd, process_offset + read_size);
}
Expand All @@ -99,13 +99,14 @@ inline void request_remote_read(int tid, int fd, off64_t count) {
off64_t end_of_read = offset + count;
off64_t end_of_sector = c_file.getSectorEnd(offset);

if (c_file.complete() && (end_of_read <= end_of_sector ||
(end_of_sector == -1 ? 0 : end_of_sector) == c_file.real_file_size)) {
if (c_file.isCommitted() &&
(end_of_read <= end_of_sector ||
(end_of_sector == -1 ? 0 : end_of_sector) == c_file.getRealFileSize())) {
LOG("Handling local read");
handle_local_read(tid, fd, count, true);
} else if (end_of_read <= end_of_sector) {
LOG("Data is present locally and can be served to client");
c_file.createBufferIfNeeded(path, false);
c_file.createBuffer(path, false);

client_manager->replyToClient(tid, offset, c_file.getBuffer(), count);
storage_manager->setFileOffset(tid, fd, offset + count);
Expand Down
11 changes: 6 additions & 5 deletions capio/server/include/handlers/stat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ void wait_for_file_completion(int tid, const std::filesystem::path &path) {
CapioFile &c_file = storage_manager->get(path);

// if file is streamable
if (c_file.complete() || CapioCLEngine::get().isFirable(path) ||
if (c_file.isCommitted() || CapioCLEngine::get().isFirable(path) ||
strcmp(std::get<0>(get_file_location(path)), node_name) == 0) {

client_manager->replyToClient(tid, c_file.getFileSize());
client_manager->replyToClient(tid, static_cast<int>(c_file.directory() ? 1 : 0));
client_manager->replyToClient(tid, static_cast<int>(c_file.isDirectory() ? 1 : 0));

} else {
handle_remote_stat_request(tid, path);
Expand Down Expand Up @@ -72,15 +72,16 @@ inline void reply_stat(int tid, const std::filesystem::path &path) {
LOG("File is now present from remote node. retrieving file again.");
file_location_opt = get_file_location_opt(path);
}
if (c_file.complete() || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 ||

if (c_file.isCommitted() || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 ||
CapioCLEngine::get().isFirable(path) || capio_dir == path) {
LOG("Sending response to client");
client_manager->replyToClient(tid, c_file.getFileSize());
client_manager->replyToClient(tid, static_cast<int>(c_file.directory() ? 1 : 0));
client_manager->replyToClient(tid, static_cast<int>(c_file.isDirectory() ? 1 : 0));
} else {
LOG("Delegating backend to reply to remote stats");
// send a request for file. then start a thread to wait for the request completion
c_file.createBufferIfNeeded(path, false);
c_file.createBuffer(path, false);
handle_remote_stat_request(tid, path);
}
}
Expand Down
11 changes: 6 additions & 5 deletions capio/server/include/handlers/write.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ void write_handler(const char *const str) {
off64_t end_of_write = offset + count;
const std::filesystem::path &path = storage_manager->getPath(tid, fd);
CapioFile &c_file = storage_manager->get(path);
off64_t file_shm_size = c_file.getBufferSize();
SPSCQueue &data_buf = client_manager->getClientToServerDataBuffers(tid);

c_file.createBufferIfNeeded(path, true);
off64_t file_shm_size = c_file.getBufSize();
SPSCQueue &data_buf = client_manager->getClientToServerDataBuffers(tid);

c_file.createBuffer(path, true);
if (end_of_write > file_shm_size) {
c_file.expandBuffer(end_of_write);
}
c_file.readFromQueue(data_buf, offset, count);

client_manager->registerProducedFile(tid, path);
c_file.insertSector(offset, end_of_write);
if (c_file.first_write) {
c_file.first_write = false;
if (c_file.isFirstWrite()) {
c_file.registerFirstWrite();
write_file_location(path);
// TODO: it works only if there is one prod per file
storage_manager->updateDirectory(tid, path);
Expand Down
4 changes: 3 additions & 1 deletion capio/server/include/remote/backend.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#ifndef CAPIO_SERVER_REMOTE_BACKEND_HPP
#define CAPIO_SERVER_REMOTE_BACKEND_HPP
#include "common/logger.hpp"
#include <charconv>
#include <set>

#include "common/logger.hpp"

class RemoteRequest {
private:
Expand Down
1 change: 1 addition & 0 deletions capio/server/include/remote/backend/include.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
*/

#include "mpi.hpp"
#include "no_backend.hpp"
#endif // CAPIO_SERVER_REMOTE_BACKEND_INCLUDE_HPP
1 change: 1 addition & 0 deletions capio/server/include/remote/backend/mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class MPIBackend : public Backend {
LOG("Mpi has multithreading support? %s (%d)",
provided == MPI_THREAD_MULTIPLE ? "yes" : "no", provided);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &n_servers);
LOG("node_rank=%d", &rank);
if (provided != MPI_THREAD_MULTIPLE) {
LOG("Error: The threading support level is not MPI_THREAD_MULTIPLE (is %d)", provided);
Expand Down
44 changes: 44 additions & 0 deletions capio/server/include/remote/backend/no_backend.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#ifndef CAPIO_NO_BACKEND_HPP
#define CAPIO_NO_BACKEND_HPP

class NoBackend : public Backend {

public:
NoBackend(int argc, char **argv) {
START_LOG(gettid(), "call()");
n_servers = 1;
node_name = new char[HOST_NAME_MAX];
gethostname(node_name, HOST_NAME_MAX);
}

~NoBackend() override = default;

const std::set<std::string> get_nodes() override { return {node_name}; }

void handshake_servers() override {}

RemoteRequest read_next_request() override {
START_LOG(gettid(), "call()");
LOG("Halting thread execution as NoBackend was chosen");
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return {nullptr, ""};
}

void send_file(char *shm, long int nbytes, const std::string &target) override {
START_LOG(gettid(), "call(%.50s, %ld, %s)", shm, nbytes, target.c_str());
}

void send_request(const char *message, int message_len, const std::string &target) override {
START_LOG(gettid(), "call(message=%s, message_len=%d, target=%s)", message, message_len,
target.c_str());
}

void recv_file(char *shm, const std::string &source, long int bytes_expected) override {
START_LOG(gettid(), "call(shm=%ld, source=%s, bytes_expected=%ld)", shm, source.c_str(),
bytes_expected);
}
};

#endif // CAPIO_NO_BACKEND_HPP
19 changes: 10 additions & 9 deletions capio/server/include/remote/handlers/read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ inline void handle_read_reply(int tid, int fd, long count, off64_t file_size, of
const std::filesystem::path &path = storage_manager->getPath(tid, fd);
CapioFile &c_file = storage_manager->get(path);
off64_t offset = storage_manager->getFileOffset(tid, fd);
c_file.real_file_size = file_size;
c_file.insertSector(offset, offset + nbytes);
c_file.setComplete(complete);

c_file.setRealFileSize(file_size);
c_file.insertSector(offset, offset + nbytes);
c_file.setCommitted(complete);
off64_t end_of_sector = c_file.getSectorEnd(offset);
c_file.createBufferIfNeeded(path, false);
c_file.createBuffer(path, false);

off64_t bytes_read;
off64_t end_of_read = offset + count;
if (end_of_sector > end_of_read) {
Expand All @@ -75,7 +76,7 @@ void wait_for_data(const std::filesystem::path &path, const std::string &dest, i
const CapioFile &c_file = storage_manager->get(path);
// wait that nbytes are written
c_file.waitForData(offset + count);
serve_remote_read(path, dest, tid, fd, count, offset, c_file.complete(), is_getdents);
serve_remote_read(path, dest, tid, fd, count, offset, c_file.isCommitted(), is_getdents);
}

inline void handle_remote_read(const std::filesystem::path &path, const std::string &source,
Expand All @@ -86,8 +87,8 @@ inline void handle_remote_read(const std::filesystem::path &path, const std::str

CapioFile &c_file = storage_manager->get(path);
bool data_available = (offset + count <= c_file.getStoredSize());
if (c_file.complete() || (CapioCLEngine::get().isFirable(path) && data_available)) {
serve_remote_read(path, source, tid, fd, count, offset, c_file.complete(), is_getdents);
if (c_file.isCommitted() || (CapioCLEngine::get().isFirable(path) && data_available)) {
serve_remote_read(path, source, tid, fd, count, offset, c_file.isCommitted(), is_getdents);
} else {
std::thread t(wait_for_data, path, source, tid, fd, count, offset, is_getdents);
t.detach();
Expand All @@ -107,9 +108,9 @@ inline void handle_remote_read_reply(const std::string &source, int tid, int fd,
off64_t offset = storage_manager->getFileOffset(tid, fd);
CapioFile &c_file = storage_manager->get(path);

c_file.createBufferIfNeeded(path, false);
c_file.createBuffer(path, false);
if (nbytes != 0) {
auto file_shm_size = c_file.getBufferSize();
auto file_shm_size = c_file.getBufSize();
auto file_size_recv = offset + nbytes;
if (file_size_recv > file_shm_size) {
c_file.expandBuffer(file_size_recv);
Expand Down
Loading