Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion capio/common/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ constexpr char CAPIO_LOG_SERVER_CLI_CONT_ON_ERR_WARNING[] =

constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_BACKEND_HELP[] =
"Backend used in CAPIO. The value [backend] can be one of the following implemented backends: "
"\n\t> mpi (default)\n\t> mpisync";
"\n\t> mpi \n\t> mpisync \n\t> none (default)";

// Cli pre messages
constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_INFO[] = "[\033[1;32mCAPIO-SERVER\033[0m ";
Expand Down
7 changes: 2 additions & 5 deletions capio/server/capio_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,15 @@
#include "common/logger.hpp"
#include "common/requests.hpp"
#include "common/semaphore.hpp"
#include "remote/backend.hpp"
#include "storage/capio_file.hpp"
#include "utils/common.hpp"
#include "utils/env.hpp"
#include "utils/types.hpp"

ClientManager *client_manager;
StorageManager *storage_manager;

int n_servers;
// name of the node
char *node_name;
Backend *backend;

#include "handlers.hpp"
#include "utils/location.hpp"
Expand Down Expand Up @@ -93,7 +91,6 @@ static constexpr std::array<CSHandler_t, CAPIO_NR_REQUESTS> build_request_handle

START_LOG(gettid(), "call()");

MPI_Comm_size(MPI_COMM_WORLD, &n_servers);
setup_signal_handlers();
backend->handshake_servers();

Expand Down
5 changes: 3 additions & 2 deletions capio/server/include/handlers/getdents.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "utils/location.hpp"

extern StorageManager *storage_manager;
extern Backend *backend;

inline void request_remote_getdents(int tid, int fd, off64_t count) {
START_LOG(gettid(), "call(tid=%d, fd=%d, count=%ld)", tid, fd, count);
Expand Down Expand Up @@ -50,15 +51,15 @@ inline void handle_getdents(int tid, int fd, long int count) {

loop_load_file_location(path_to_check);

if (strcmp(std::get<0>(get_file_location(path_to_check)), node_name) == 0) {
if (std::get<0>(get_file_location(path_to_check)) == backend->get_node_name()) {
handle_getdents(tid, fd, count);
} else {

request_remote_getdents(tid, fd, count);
}
});
t.detach();
} else if (is_prod || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 ||
} else if (is_prod || std::get<0>(file_location_opt->get()) == backend->get_node_name() ||
capio_dir == path_to_check) {
CapioFile &c_file = storage_manager->get(path_to_check);
off64_t offset = storage_manager->getFileOffset(tid, fd);
Expand Down
6 changes: 4 additions & 2 deletions capio/server/include/handlers/read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#include "utils/location.hpp"

extern Backend *backend;

std::mutex local_read_mutex;

extern ClientManager *client_manager;
Expand Down Expand Up @@ -122,7 +124,7 @@ void wait_for_file(const std::filesystem::path &path, int tid, int fd, off64_t c
loop_load_file_location(path);

// check if the file is local or remote
if (strcmp(std::get<0>(get_file_location(path)), node_name) == 0) {
if (std::get<0>(get_file_location(path)) == backend->get_node_name().c_str()) {
handle_local_read(tid, fd, count, false);
} else {
request_remote_read(tid, fd, count);
Expand All @@ -146,7 +148,7 @@ inline void handle_read(int tid, int fd, off64_t count) {
// launch a thread that checks when the file is created
std::thread t(wait_for_file, path, tid, fd, count);
t.detach();
} else if (is_prod || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 ||
} else if (is_prod || std::get<0>(file_location_opt->get()) == backend->get_node_name() ||
capio_dir == path) {
LOG("File is local. handling local read");
handle_local_read(tid, fd, count, is_prod);
Expand Down
5 changes: 3 additions & 2 deletions capio/server/include/handlers/stat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

extern StorageManager *storage_manager;
extern ClientManager *client_manager;
extern Backend *backend;

void wait_for_file_completion(int tid, const std::filesystem::path &path) {
START_LOG(gettid(), "call(tid=%d, path=%s)", tid, path.c_str());
Expand All @@ -26,7 +27,7 @@ void wait_for_file_completion(int tid, const std::filesystem::path &path) {

// if file is streamable
if (c_file.isCommitted() || CapioCLEngine::get().isFirable(path) ||
strcmp(std::get<0>(get_file_location(path)), node_name) == 0) {
std::get<0>(get_file_location(path)) == backend->get_node_name().c_str()) {

client_manager->replyToClient(tid, c_file.getFileSize());
client_manager->replyToClient(tid, static_cast<int>(c_file.isDirectory() ? 1 : 0));
Expand Down Expand Up @@ -72,7 +73,7 @@ inline void reply_stat(int tid, const std::filesystem::path &path) {
LOG("File is now present from remote node. retrieving file again.");
file_location_opt = get_file_location_opt(path);
}
if (c_file.isCommitted() || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 ||
if (c_file.isCommitted() || std::get<0>(file_location_opt->get()) == backend->get_node_name() ||
CapioCLEngine::get().isFirable(path) || capio_dir == path) {
LOG("Sending response to client");
client_manager->replyToClient(tid, c_file.getFileSize());
Expand Down
58 changes: 28 additions & 30 deletions capio/server/include/remote/backend.hpp
Original file line number Diff line number Diff line change
@@ -1,39 +1,33 @@
#ifndef CAPIO_SERVER_REMOTE_BACKEND_HPP
#define CAPIO_SERVER_REMOTE_BACKEND_HPP
#include "common/logger.hpp"
#include <charconv>
#include <set>

#include "common/logger.hpp"

class RemoteRequest {
private:
char *_buf_recv;
int _code;
const std::string _source;

public:
RemoteRequest(char *buf_recv, const std::string &source) : _source(source) {
START_LOG(gettid(), "call(buf_recv=%s, source=%s)", buf_recv, source.c_str());
int code;
auto [ptr, ec] = std::from_chars(buf_recv, buf_recv + 4, code);
if (ec == std::errc()) {
this->_code = code;
this->_buf_recv = new char[CAPIO_SERVER_REQUEST_MAX_SIZE];
strcpy(this->_buf_recv, ptr + 1);
LOG("Received request %d from %s : %s", this->_code, this->_source.c_str(),
this->_buf_recv);
} else {
this->_code = -1;
}
};

/**
* Instantiate a new RemoteRequest
* @param buf_recv The buffer containing the raw request
* @param source The source that generated the request
*/
RemoteRequest(char *buf_recv, const std::string &source);
RemoteRequest(const RemoteRequest &) = delete;
RemoteRequest &operator=(const RemoteRequest &) = delete;

~RemoteRequest() { delete[] _buf_recv; }
~RemoteRequest();

[[nodiscard]] auto get_source() const { return this->_source; }
[[nodiscard]] auto get_content() const { return this->_buf_recv; }
[[nodiscard]] auto get_code() const { return this->_code; }
/// Get the source node name of the request
[[nodiscard]] const std::string &get_source() const;
/// Get the content of the request
[[nodiscard]] const char *get_content() const;
/// Get the request code
[[nodiscard]] int get_code() const;
};

/**
Expand All @@ -43,14 +37,21 @@ class RemoteRequest {
* functions in a dedicated backend.
*/
class Backend {
protected:
int n_servers;
std::string node_name;

public:
explicit Backend(unsigned int node_name_max_length);

virtual ~Backend() = default;

/**
* Returns the node names of the CAPIO servers
* @return A set containing the node names of all CAPIO servers
*/
virtual const std::set<std::string> get_nodes() = 0;
/// Return THIS node name as configured by the derived backend class
[[nodiscard]] const std::string &get_node_name() const;

/// Get a std::set containing the node names of all CAPIO servers for which a handshake
/// occurred (including current instance node name)
virtual const std::set<std::string> get_nodes();

/**
* Handshake the server applications
Expand All @@ -67,7 +68,7 @@ class Backend {
* Send file
* @param shm buffer of data to be sent
* @param nbytes length of @param shm
* @param dest target to send files to
* @param target target to send files to
*/
virtual void send_file(char *shm, long int nbytes, const std::string &target) = 0;

Expand All @@ -88,7 +89,4 @@ class Backend {
virtual void send_request(const char *message, int message_len, const std::string &target) = 0;
};

// FIXME: Remove the inline specifier
inline Backend *backend;

#endif // CAPIO_SERVER_REMOTE_BACKEND_HPP
1 change: 1 addition & 0 deletions capio/server/include/remote/backend/include.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
*/

#include "mpi.hpp"
#include "no_backend.hpp"
#endif // CAPIO_SERVER_REMOTE_BACKEND_INCLUDE_HPP
Loading
Loading