diff --git a/capio/common/constants.hpp b/capio/common/constants.hpp index 82da39e5e..c647656f4 100644 --- a/capio/common/constants.hpp +++ b/capio/common/constants.hpp @@ -156,7 +156,7 @@ constexpr char CAPIO_LOG_SERVER_CLI_CONT_ON_ERR_WARNING[] = constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_BACKEND_HELP[] = "Backend used in CAPIO. The value [backend] can be one of the following implemented backends: " - "\n\t> mpi (default)\n\t> mpisync"; + "\n\t> mpi \n\t> mpisync \n\t> none (default)"; // Cli pre messages constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_INFO[] = "[\033[1;32mCAPIO-SERVER\033[0m "; diff --git a/capio/server/capio_server.cpp b/capio/server/capio_server.cpp index 59f22f4b6..956a31494 100644 --- a/capio/server/capio_server.cpp +++ b/capio/server/capio_server.cpp @@ -30,6 +30,7 @@ #include "common/logger.hpp" #include "common/requests.hpp" #include "common/semaphore.hpp" +#include "remote/backend.hpp" #include "storage/capio_file.hpp" #include "utils/common.hpp" #include "utils/env.hpp" @@ -37,10 +38,7 @@ ClientManager *client_manager; StorageManager *storage_manager; - -int n_servers; -// name of the node -char *node_name; +Backend *backend; #include "handlers.hpp" #include "utils/location.hpp" @@ -93,7 +91,6 @@ static constexpr std::array build_request_handle START_LOG(gettid(), "call()"); - MPI_Comm_size(MPI_COMM_WORLD, &n_servers); setup_signal_handlers(); backend->handshake_servers(); diff --git a/capio/server/include/handlers/getdents.hpp b/capio/server/include/handlers/getdents.hpp index d53002391..20f8299bf 100644 --- a/capio/server/include/handlers/getdents.hpp +++ b/capio/server/include/handlers/getdents.hpp @@ -10,6 +10,7 @@ #include "utils/location.hpp" extern StorageManager *storage_manager; +extern Backend *backend; inline void request_remote_getdents(int tid, int fd, off64_t count) { START_LOG(gettid(), "call(tid=%d, fd=%d, count=%ld)", tid, fd, count); @@ -50,7 +51,7 @@ inline void handle_getdents(int tid, int fd, long int count) { loop_load_file_location(path_to_check); - if (strcmp(std::get<0>(get_file_location(path_to_check)), node_name) == 0) { + if (std::get<0>(get_file_location(path_to_check)) == backend->get_node_name()) { handle_getdents(tid, fd, count); } else { @@ -58,7 +59,7 @@ inline void handle_getdents(int tid, int fd, long int count) { } }); t.detach(); - } else if (is_prod || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 || + } else if (is_prod || std::get<0>(file_location_opt->get()) == backend->get_node_name() || capio_dir == path_to_check) { CapioFile &c_file = storage_manager->get(path_to_check); off64_t offset = storage_manager->getFileOffset(tid, fd); diff --git a/capio/server/include/handlers/read.hpp b/capio/server/include/handlers/read.hpp index 2cff9afe8..e05ffc63e 100644 --- a/capio/server/include/handlers/read.hpp +++ b/capio/server/include/handlers/read.hpp @@ -9,6 +9,8 @@ #include "utils/location.hpp" +extern Backend *backend; + std::mutex local_read_mutex; extern ClientManager *client_manager; @@ -122,7 +124,7 @@ void wait_for_file(const std::filesystem::path &path, int tid, int fd, off64_t c loop_load_file_location(path); // check if the file is local or remote - if (strcmp(std::get<0>(get_file_location(path)), node_name) == 0) { + if (std::get<0>(get_file_location(path)) == backend->get_node_name().c_str()) { handle_local_read(tid, fd, count, false); } else { request_remote_read(tid, fd, count); @@ -146,7 +148,7 @@ inline void handle_read(int tid, int fd, off64_t count) { // launch a thread that checks when the file is created std::thread t(wait_for_file, path, tid, fd, count); t.detach(); - } else if (is_prod || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 || + } else if (is_prod || std::get<0>(file_location_opt->get()) == backend->get_node_name() || capio_dir == path) { LOG("File is local. handling local read"); handle_local_read(tid, fd, count, is_prod); diff --git a/capio/server/include/handlers/stat.hpp b/capio/server/include/handlers/stat.hpp index 8ff9352fe..e7e253061 100644 --- a/capio/server/include/handlers/stat.hpp +++ b/capio/server/include/handlers/stat.hpp @@ -16,6 +16,7 @@ extern StorageManager *storage_manager; extern ClientManager *client_manager; +extern Backend *backend; void wait_for_file_completion(int tid, const std::filesystem::path &path) { START_LOG(gettid(), "call(tid=%d, path=%s)", tid, path.c_str()); @@ -26,7 +27,7 @@ void wait_for_file_completion(int tid, const std::filesystem::path &path) { // if file is streamable if (c_file.isCommitted() || CapioCLEngine::get().isFirable(path) || - strcmp(std::get<0>(get_file_location(path)), node_name) == 0) { + std::get<0>(get_file_location(path)) == backend->get_node_name().c_str()) { client_manager->replyToClient(tid, c_file.getFileSize()); client_manager->replyToClient(tid, static_cast(c_file.isDirectory() ? 1 : 0)); @@ -72,7 +73,7 @@ inline void reply_stat(int tid, const std::filesystem::path &path) { LOG("File is now present from remote node. retrieving file again."); file_location_opt = get_file_location_opt(path); } - if (c_file.isCommitted() || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 || + if (c_file.isCommitted() || std::get<0>(file_location_opt->get()) == backend->get_node_name() || CapioCLEngine::get().isFirable(path) || capio_dir == path) { LOG("Sending response to client"); client_manager->replyToClient(tid, c_file.getFileSize()); diff --git a/capio/server/include/remote/backend.hpp b/capio/server/include/remote/backend.hpp index 342c511da..f1c827821 100644 --- a/capio/server/include/remote/backend.hpp +++ b/capio/server/include/remote/backend.hpp @@ -1,39 +1,33 @@ #ifndef CAPIO_SERVER_REMOTE_BACKEND_HPP #define CAPIO_SERVER_REMOTE_BACKEND_HPP -#include "common/logger.hpp" #include #include +#include "common/logger.hpp" + class RemoteRequest { - private: char *_buf_recv; int _code; const std::string _source; public: - RemoteRequest(char *buf_recv, const std::string &source) : _source(source) { - START_LOG(gettid(), "call(buf_recv=%s, source=%s)", buf_recv, source.c_str()); - int code; - auto [ptr, ec] = std::from_chars(buf_recv, buf_recv + 4, code); - if (ec == std::errc()) { - this->_code = code; - this->_buf_recv = new char[CAPIO_SERVER_REQUEST_MAX_SIZE]; - strcpy(this->_buf_recv, ptr + 1); - LOG("Received request %d from %s : %s", this->_code, this->_source.c_str(), - this->_buf_recv); - } else { - this->_code = -1; - } - }; - + /** + * Instantiate a new RemoteRequest + * @param buf_recv The buffer containing the raw request + * @param source The source that generated the request + */ + RemoteRequest(char *buf_recv, const std::string &source); RemoteRequest(const RemoteRequest &) = delete; RemoteRequest &operator=(const RemoteRequest &) = delete; - ~RemoteRequest() { delete[] _buf_recv; } + ~RemoteRequest(); - [[nodiscard]] auto get_source() const { return this->_source; } - [[nodiscard]] auto get_content() const { return this->_buf_recv; } - [[nodiscard]] auto get_code() const { return this->_code; } + /// Get the source node name of the request + [[nodiscard]] const std::string &get_source() const; + /// Get the content of the request + [[nodiscard]] const char *get_content() const; + /// Get the request code + [[nodiscard]] int get_code() const; }; /** @@ -43,14 +37,21 @@ class RemoteRequest { * functions in a dedicated backend. */ class Backend { + protected: + int n_servers; + std::string node_name; + public: + explicit Backend(unsigned int node_name_max_length); + virtual ~Backend() = default; - /** - * Returns the node names of the CAPIO servers - * @return A set containing the node names of all CAPIO servers - */ - virtual const std::set get_nodes() = 0; + /// Return THIS node name as configured by the derived backend class + [[nodiscard]] const std::string &get_node_name() const; + + /// Get a std::set containing the node names of all CAPIO servers for which a handshake + /// occurred (including current instance node name) + virtual const std::set get_nodes(); /** * Handshake the server applications @@ -67,7 +68,7 @@ class Backend { * Send file * @param shm buffer of data to be sent * @param nbytes length of @param shm - * @param dest target to send files to + * @param target target to send files to */ virtual void send_file(char *shm, long int nbytes, const std::string &target) = 0; @@ -88,7 +89,4 @@ class Backend { virtual void send_request(const char *message, int message_len, const std::string &target) = 0; }; -// FIXME: Remove the inline specifier -inline Backend *backend; - #endif // CAPIO_SERVER_REMOTE_BACKEND_HPP diff --git a/capio/server/include/remote/backend/include.hpp b/capio/server/include/remote/backend/include.hpp index 6291a9500..ce298b767 100644 --- a/capio/server/include/remote/backend/include.hpp +++ b/capio/server/include/remote/backend/include.hpp @@ -5,4 +5,5 @@ */ #include "mpi.hpp" +#include "no_backend.hpp" #endif // CAPIO_SERVER_REMOTE_BACKEND_INCLUDE_HPP diff --git a/capio/server/include/remote/backend/mpi.hpp b/capio/server/include/remote/backend/mpi.hpp index cda73bc91..c4e2cd822 100644 --- a/capio/server/include/remote/backend/mpi.hpp +++ b/capio/server/include/remote/backend/mpi.hpp @@ -1,7 +1,7 @@ #ifndef CAPIO_SERVER_REMOTE_BACKEND_MPI_HPP #define CAPIO_SERVER_REMOTE_BACKEND_MPI_HPP - #include +#include #include "remote/backend.hpp" @@ -11,155 +11,28 @@ class MPIBackend : public Backend { MPI_Request req{}; int rank = -1; - /* - * This structure holds inside the information to convert from hostname to MPI rank*/ + /// This structure holds inside the information to convert from hostname to MPI rank std::set nodes; std::unordered_map rank_nodes_equivalence; static constexpr long MPI_MAX_ELEM_COUNT = 1024L * 1024 * 1024; public: - MPIBackend(int argc, char **argv) { - int node_name_len, provided; - START_LOG(gettid(), "call()"); - LOG("Created a MPI backend"); - MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); - LOG("Mpi has multithreading support? %s (%d)", - provided == MPI_THREAD_MULTIPLE ? "yes" : "no", provided); - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - LOG("node_rank=%d", &rank); - if (provided != MPI_THREAD_MULTIPLE) { - LOG("Error: The threading support level is not MPI_THREAD_MULTIPLE (is %d)", provided); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } - - node_name = new char[MPI_MAX_PROCESSOR_NAME]; - MPI_Get_processor_name(node_name, &node_name_len); - LOG("Node name = %s, length=%d", node_name, node_name_len); - nodes.emplace(node_name); - rank_nodes_equivalence[std::to_string(rank)] = node_name; - rank_nodes_equivalence[node_name] = std::to_string(rank); - } - - ~MPIBackend() override { - START_LOG(gettid(), "Call()"); - MPI_Finalize(); - } - - inline const std::set get_nodes() override { return nodes; } - - inline void handshake_servers() override { - START_LOG(gettid(), "call()"); - - auto buf = std::unique_ptr(new char[MPI_MAX_PROCESSOR_NAME]); - for (int i = 0; i < n_servers; i += 1) { - if (i != rank) { - // TODO: possible deadlock - MPI_Send(node_name, strlen(node_name), MPI_CHAR, i, 0, MPI_COMM_WORLD); - std::fill(buf.get(), buf.get() + MPI_MAX_PROCESSOR_NAME, 0); - MPI_Recv(buf.get(), MPI_MAX_PROCESSOR_NAME, MPI_CHAR, i, 0, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - nodes.emplace(buf.get()); - rank_nodes_equivalence.emplace(buf.get(), std::to_string(i)); - rank_nodes_equivalence.emplace(std::to_string(i), buf.get()); - } - } - } - - RemoteRequest read_next_request() override { - START_LOG(gettid(), "call()"); - MPI_Status status; - char *buff = new char[CAPIO_SERVER_REQUEST_MAX_SIZE]; - LOG("initiating a lightweight MPI receive"); - MPI_Request request; - int received = 0; - - // receive from server - MPI_Irecv(buff, CAPIO_SERVER_REQUEST_MAX_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, - &request); - struct timespec sleepTime, returnTime; - sleepTime.tv_sec = 0; - sleepTime.tv_nsec = 200000; - - while (!received) { - MPI_Test(&request, &received, &status); - nanosleep(&sleepTime, &returnTime); - } - int bytes_received; - MPI_Get_count(&status, MPI_CHAR, &bytes_received); - - LOG("receive completed!"); - return {buff, rank_nodes_equivalence[std::to_string(status.MPI_SOURCE)]}; - } - - void send_file(char *shm, long int nbytes, const std::string &target) override { - START_LOG(gettid(), "call(%.50s, %ld, %s)", shm, nbytes, target.c_str()); - int elem_to_snd = 0; - int dest = std::stoi(rank_nodes_equivalence[target]); - for (long int k = 0; k < nbytes; k += elem_to_snd) { - // Compute the maximum amount to send for this chunk - elem_to_snd = static_cast(std::min(nbytes - k, MPI_MAX_ELEM_COUNT)); - - LOG("Sending %d bytes to %d with offset from beginning odf k=%ld", elem_to_snd, dest, - k); - MPI_Isend(shm + k, elem_to_snd, MPI_BYTE, dest, 0, MPI_COMM_WORLD, &req); - LOG("Sent chunk of %d bytes", elem_to_snd); - } - } - - void send_request(const char *message, int message_len, const std::string &target) override { - START_LOG(gettid(), "call(message=%s, message_len=%d, target=%s)", message, message_len, - target.c_str()); - const std::string &mpi_target = rank_nodes_equivalence[target]; - LOG("MPI_rank for target %s is %s", target.c_str(), mpi_target.c_str()); - - MPI_Send(message, message_len + 1, MPI_CHAR, std::stoi(mpi_target), 0, MPI_COMM_WORLD); - } - - inline void recv_file(char *shm, const std::string &source, long int bytes_expected) override { - START_LOG(gettid(), "call(shm=%ld, source=%s, bytes_expected=%ld)", shm, source.c_str(), - bytes_expected); - MPI_Status status; - int bytes_received = 0, count = 0, source_rank = std::stoi(rank_nodes_equivalence[source]); - LOG("Buffer is valid? %s", - shm != nullptr ? "yes" - : "NO! a nullptr was given to recv_file. this will make mpi crash!"); - for (long int k = 0; k < bytes_expected; k += bytes_received) { - - count = static_cast(std::min(bytes_expected - k, MPI_MAX_ELEM_COUNT)); - - LOG("Expected %ld bytes from %s with offset from beginning odf k=%ld", count, - source.c_str(), k); - MPI_Recv(shm + k, count, MPI_BYTE, source_rank, 0, MPI_COMM_WORLD, &status); - LOG("Received chunk"); - MPI_Get_count(&status, MPI_BYTE, &bytes_received); - LOG("Chunk size is %ld bytes", bytes_received); - } - } + MPIBackend(int argc, char **argv); + + ~MPIBackend() override; + const std::set get_nodes() override; + void handshake_servers() override; + RemoteRequest read_next_request() override; + void send_file(char *shm, long int nbytes, const std::string &target) override; + void send_request(const char *message, int message_len, const std::string &target) override; + void recv_file(char *shm, const std::string &source, long int bytes_expected) override; }; -class MPISYNCBackend : public MPIBackend { +class MPISYNCBackend final : public MPIBackend { public: - MPISYNCBackend(int argc, char *argv[]) : MPIBackend(argc, argv) { - START_LOG(gettid(), "call()"); - LOG("Wrapped MPI backend with MPISYC backend"); - } - - ~MPISYNCBackend() override { - START_LOG(gettid(), "Call()"); - MPI_Finalize(); - } - - RemoteRequest read_next_request() override { - START_LOG(gettid(), "call()"); - MPI_Status status; - char *buff = new char[CAPIO_SERVER_REQUEST_MAX_SIZE]; - LOG("initiating a synchronized MPI receive"); - MPI_Recv(buff, CAPIO_SERVER_REQUEST_MAX_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, - &status); // receive from server - - LOG("receive completed!"); - return {buff, rank_nodes_equivalence[std::to_string(status.MPI_SOURCE)]}; - } + MPISYNCBackend(int argc, char *argv[]); + ~MPISYNCBackend() override; + RemoteRequest read_next_request() override; }; #endif // CAPIO_SERVER_REMOTE_BACKEND_MPI_HPP \ No newline at end of file diff --git a/capio/server/include/remote/backend/no_backend.hpp b/capio/server/include/remote/backend/no_backend.hpp new file mode 100644 index 000000000..307d1543b --- /dev/null +++ b/capio/server/include/remote/backend/no_backend.hpp @@ -0,0 +1,15 @@ +#ifndef CAPIO_DEFAULT_HPP +#define CAPIO_DEFAULT_HPP +#include "remote/backend.hpp" + +class NoBackend final : public Backend { + public: + NoBackend(int argc, char **argv); + ~NoBackend() override = default; + void handshake_servers() override; + RemoteRequest read_next_request() override; + void send_file(char *shm, long int nbytes, const std::string &target) override; + void send_request(const char *message, int message_len, const std::string &target) override; + void recv_file(char *shm, const std::string &source, long int bytes_expected) override; +}; +#endif // CAPIO_DEFAULT_HPP diff --git a/capio/server/include/remote/listener.hpp b/capio/server/include/remote/listener.hpp index 61f89efbf..5a7ed9f84 100644 --- a/capio/server/include/remote/listener.hpp +++ b/capio/server/include/remote/listener.hpp @@ -25,12 +25,12 @@ build_server_request_handlers_table() { inline Backend *select_backend(const std::string &backend_name, int argc, char *argv[]) { START_LOG(gettid(), "call(backend_name=%s)", backend_name.c_str()); - if (backend_name.empty()) { + if (backend_name.empty() || backend_name == "none") { LOG("backend selected: none"); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO - << "Starting CAPIO with default backend (MPI) as no preferred backend was chosen" + << "Starting CAPIO with default backend (none) as no preferred backend was chosen" << std::endl; - return new MPIBackend(argc, argv); + return new NoBackend(argc, argv); } if (backend_name == "mpi") { @@ -46,21 +46,30 @@ inline Backend *select_backend(const std::string &backend_name, int argc, char * << std::endl; return new MPISYNCBackend(argc, argv); } - LOG("Backend %s does not exist in CAPIO. Reverting back to the default MPI backend", + + LOG("Backend %s does not exist in CAPIO. Reverting back to the default backend (none)", backend_name.c_str()); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " Backend " << backend_name - << " does not exist. Reverting to the default MPI backend" << std::endl; - return new MPIBackend(argc, argv); + << " does not exist. Reverting to the default backend (MPI)" << std::endl; + return new NoBackend(argc, argv); } -[[noreturn]] void capio_remote_listener(Semaphore &internal_server_sem) { +inline void capio_remote_listener(Semaphore &internal_server_sem) { static const std::array server_request_handlers = build_server_request_handlers_table(); - START_LOG(gettid(), "call()"); - internal_server_sem.lock(); + if (typeid(*backend) == typeid(NoBackend)) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO + << " RemoteListener] backend is of type NoBackend. Stopping " + "capio_remote_listener() execution." + << std::endl; + return; + } + + START_LOG(gettid(), "call()"); + while (true) { auto request = backend->read_next_request(); diff --git a/capio/server/include/remote/requests.hpp b/capio/server/include/remote/requests.hpp index b7515fa25..d01e8a196 100644 --- a/capio/server/include/remote/requests.hpp +++ b/capio/server/include/remote/requests.hpp @@ -4,6 +4,7 @@ #include "storage/manager.hpp" extern StorageManager *storage_manager; +extern Backend *backend; inline void serve_remote_stat_request(const std::filesystem::path &path, int source_tid, off64_t file_size, bool is_dir, const std::string &dest) { @@ -39,10 +40,11 @@ inline void handle_remote_stat_request(int tid, const std::filesystem::path &pat std::string dest = std::get<0>(get_file_location(path)); const char *const format = "%04d %d %s %s"; - const int size = - snprintf(nullptr, 0, format, CAPIO_SERVER_REQUEST_STAT, tid, node_name, path.c_str()); + const int size = snprintf(nullptr, 0, format, CAPIO_SERVER_REQUEST_STAT, tid, + backend->get_node_name().c_str(), path.c_str()); const std::unique_ptr message(new char[size + 1]); - sprintf(message.get(), format, CAPIO_SERVER_REQUEST_STAT, tid, node_name, path.c_str()); + sprintf(message.get(), format, CAPIO_SERVER_REQUEST_STAT, tid, backend->get_node_name().c_str(), + path.c_str()); LOG("destination=%s, message=%s", dest.c_str(), message.get()); backend->send_request(message.get(), size + 1, dest); diff --git a/capio/server/include/utils/location.hpp b/capio/server/include/utils/location.hpp index 5d573f950..6667ebf3d 100644 --- a/capio/server/include/utils/location.hpp +++ b/capio/server/include/utils/location.hpp @@ -8,7 +8,6 @@ #include "utils/types.hpp" -extern char *node_name; extern Backend *backend; constexpr char CAPIO_SERVER_FILES_LOCATION_NAME[] = "files_location_%s.txt"; @@ -270,7 +269,7 @@ inline void loop_load_file_location(const std::filesystem::path &path_to_load) { inline void open_files_location() { START_LOG(gettid(), "call()"); - std::string file_location_name = get_file_location_name(node_name); + std::string file_location_name = get_file_location_name(backend->get_node_name()); if ((files_location_fp = fopen(file_location_name.c_str(), "w+")) == nullptr) { ERR_EXIT("Error opening %s file: %d (%s)", file_location_name.c_str(), errno, strerror(errno)); @@ -296,11 +295,11 @@ inline void write_file_location(const std::filesystem::path &path_to_write) { if (offset == -1) { ERR_EXIT("lseek in write_file_location"); } - const std::string line = path_to_write.native() + " " + node_name + "\n"; + const std::string line = path_to_write.native() + " " + backend->get_node_name() + "\n"; if (write(files_location_fd, line.c_str(), line.length()) == -1) { ERR_EXIT("write in write_file_location"); } - add_file_location(path_to_write, node_name, offset); + add_file_location(path_to_write, backend->get_node_name().c_str(), offset); } #endif // CAPIO_SERVER_UTILS_LOCATIONS_HPP diff --git a/capio/server/src/backend.cpp b/capio/server/src/backend.cpp new file mode 100644 index 000000000..dc8c2521d --- /dev/null +++ b/capio/server/src/backend.cpp @@ -0,0 +1,25 @@ +#include "remote/backend.hpp" + +#include + +Backend::Backend(const unsigned int node_name_max_length) + : n_servers(1), node_name(node_name_max_length, '\0') { + // Note: default instantiation of node_name. the content of node_name may be changed by specific + // derived backend classes. + + gethostname(node_name.data(), node_name_max_length); + node_name.resize(strlen(node_name.data())); + + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " Backend] Node name: " << node_name + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " Backend] Node Count: " << n_servers + << std::endl; +} + +[[nodiscard]] const std::string &Backend::get_node_name() const { + START_LOG(gettid(), "call()"); + LOG("THIS node_name = %s", node_name.c_str()); + return node_name; +} + +const std::set Backend::get_nodes() { return {node_name}; } \ No newline at end of file diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index 855dda932..6b14b7c6f 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -5,6 +5,8 @@ #include "server/include/utils/common.hpp" #include "utils/shared_mutex.hpp" +extern Backend *backend; + bool CapioFile::compareSectors::operator()(const std::pair &lhs, const std::pair &rhs) const { return (lhs.first < rhs.first); diff --git a/capio/server/src/mpi_backend.cpp b/capio/server/src/mpi_backend.cpp new file mode 100644 index 000000000..7399b200d --- /dev/null +++ b/capio/server/src/mpi_backend.cpp @@ -0,0 +1,116 @@ +#include "remote/backend/mpi.hpp" + +MPIBackend::MPIBackend(int argc, char **argv) : Backend(MPI_MAX_PROCESSOR_NAME) { + int node_name_len, provided; + START_LOG(gettid(), "call()"); + LOG("Created a MPI backend"); + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + MPI_Comm_size(MPI_COMM_WORLD, &n_servers); + LOG("Mpi has multithreading support? %s (%d)", provided == MPI_THREAD_MULTIPLE ? "yes" : "no", + provided); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + LOG("node_rank=%d", &rank); + if (provided != MPI_THREAD_MULTIPLE) { + LOG("Error: The threading support level is not MPI_THREAD_MULTIPLE (is %d)", provided); + MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); + } + + MPI_Get_processor_name(node_name.data(), &node_name_len); + LOG("Node name = %s, length=%d", node_name.data(), node_name_len); + nodes.emplace(node_name); + rank_nodes_equivalence[std::to_string(rank)] = node_name; + rank_nodes_equivalence[node_name] = std::to_string(rank); +} + +MPIBackend::~MPIBackend() { + START_LOG(gettid(), "Call()"); + MPI_Finalize(); +} + +const std::set MPIBackend::get_nodes() { return nodes; } + +void MPIBackend::handshake_servers() { + START_LOG(gettid(), "call()"); + + auto buf = std::unique_ptr(new char[MPI_MAX_PROCESSOR_NAME]); + for (int i = 0; i < n_servers; i += 1) { + if (i != rank) { + // TODO: possible deadlock + MPI_Send(node_name.c_str(), node_name.length(), MPI_CHAR, i, 0, MPI_COMM_WORLD); + std::fill(buf.get(), buf.get() + MPI_MAX_PROCESSOR_NAME, 0); + MPI_Recv(buf.get(), MPI_MAX_PROCESSOR_NAME, MPI_CHAR, i, 0, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + nodes.emplace(buf.get()); + rank_nodes_equivalence.emplace(buf.get(), std::to_string(i)); + rank_nodes_equivalence.emplace(std::to_string(i), buf.get()); + } + } +} + +RemoteRequest MPIBackend::read_next_request() { + START_LOG(gettid(), "call()"); + MPI_Status status; + char *buff = new char[CAPIO_SERVER_REQUEST_MAX_SIZE]; + LOG("initiating a lightweight MPI receive"); + MPI_Request request; + int received = 0; + + // receive from server + MPI_Irecv(buff, CAPIO_SERVER_REQUEST_MAX_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, + &request); + struct timespec sleepTime, returnTime; + sleepTime.tv_sec = 0; + sleepTime.tv_nsec = 200000; + + while (!received) { + MPI_Test(&request, &received, &status); + nanosleep(&sleepTime, &returnTime); + } + int bytes_received; + MPI_Get_count(&status, MPI_CHAR, &bytes_received); + + LOG("receive completed!"); + return {buff, rank_nodes_equivalence[std::to_string(status.MPI_SOURCE)]}; +} +void MPIBackend::send_request(const char *message, int message_len, const std::string &target) { + START_LOG(gettid(), "call(message=%s, message_len=%d, target=%s)", message, message_len, + target.c_str()); + const std::string &mpi_target = rank_nodes_equivalence[target]; + LOG("MPI_rank for target %s is %s", target.c_str(), mpi_target.c_str()); + + MPI_Send(message, message_len + 1, MPI_CHAR, std::stoi(mpi_target), 0, MPI_COMM_WORLD); +} + +void MPIBackend::send_file(char *shm, long int nbytes, const std::string &target) { + START_LOG(gettid(), "call(%.50s, %ld, %s)", shm, nbytes, target.c_str()); + int elem_to_snd = 0; + int dest = std::stoi(rank_nodes_equivalence[target]); + for (long int k = 0; k < nbytes; k += elem_to_snd) { + // Compute the maximum amount to send for this chunk + elem_to_snd = static_cast(std::min(nbytes - k, MPI_MAX_ELEM_COUNT)); + + LOG("Sending %d bytes to %d with offset from beginning odf k=%ld", elem_to_snd, dest, k); + MPI_Isend(shm + k, elem_to_snd, MPI_BYTE, dest, 0, MPI_COMM_WORLD, &req); + LOG("Sent chunk of %d bytes", elem_to_snd); + } +} + +void MPIBackend::recv_file(char *shm, const std::string &source, long int bytes_expected) { + START_LOG(gettid(), "call(shm=%ld, source=%s, bytes_expected=%ld)", shm, source.c_str(), + bytes_expected); + MPI_Status status; + int bytes_received = 0, count = 0, source_rank = std::stoi(rank_nodes_equivalence[source]); + LOG("Buffer is valid? %s", + shm != nullptr ? "yes" : "NO! a nullptr was given to recv_file. this will make mpi crash!"); + for (long int k = 0; k < bytes_expected; k += bytes_received) { + + count = static_cast(std::min(bytes_expected - k, MPI_MAX_ELEM_COUNT)); + + LOG("Expected %ld bytes from %s with offset from beginning odf k=%ld", count, + source.c_str(), k); + MPI_Recv(shm + k, count, MPI_BYTE, source_rank, 0, MPI_COMM_WORLD, &status); + LOG("Received chunk"); + MPI_Get_count(&status, MPI_BYTE, &bytes_received); + LOG("Chunk size is %ld bytes", bytes_received); + } +} \ No newline at end of file diff --git a/capio/server/src/mpisync_backend.cpp b/capio/server/src/mpisync_backend.cpp new file mode 100644 index 000000000..54dd86283 --- /dev/null +++ b/capio/server/src/mpisync_backend.cpp @@ -0,0 +1,23 @@ +#include "remote/backend/mpi.hpp" + +MPISYNCBackend::MPISYNCBackend(int argc, char *argv[]) : MPIBackend(argc, argv) { + START_LOG(gettid(), "call()"); + LOG("Wrapped MPI backend with MPISYC backend"); +} + +MPISYNCBackend::~MPISYNCBackend() { + START_LOG(gettid(), "Call()"); + MPI_Finalize(); +} + +RemoteRequest MPISYNCBackend::read_next_request() { + START_LOG(gettid(), "call()"); + MPI_Status status; + char *buff = new char[CAPIO_SERVER_REQUEST_MAX_SIZE]; + LOG("initiating a synchronized MPI receive"); + MPI_Recv(buff, CAPIO_SERVER_REQUEST_MAX_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, + &status); // receive from server + + LOG("receive completed!"); + return {buff, rank_nodes_equivalence[std::to_string(status.MPI_SOURCE)]}; +} \ No newline at end of file diff --git a/capio/server/src/no_backend.cpp b/capio/server/src/no_backend.cpp new file mode 100644 index 000000000..4f80ba4b9 --- /dev/null +++ b/capio/server/src/no_backend.cpp @@ -0,0 +1,29 @@ +#include + +#include "remote/backend/no_backend.hpp" + +NoBackend::NoBackend(int argc, char **argv) : Backend(HOST_NAME_MAX) { + START_LOG(gettid(), "call()"); +} + +RemoteRequest NoBackend::read_next_request() { + START_LOG(gettid(), "call()"); + return {nullptr, ""}; +} + +void NoBackend::send_file(char *shm, const long int nbytes, const std::string &target) { + START_LOG(gettid(), "call(%.50s, %ld, %s)", shm, nbytes, target.c_str()); +} + +void NoBackend::handshake_servers() {} + +void NoBackend::send_request(const char *message, const int message_len, + const std::string &target) { + START_LOG(gettid(), "call(message=%s, message_len=%d, target=%s)", message, message_len, + target.c_str()); +} + +void NoBackend::recv_file(char *shm, const std::string &source, const long int bytes_expected) { + START_LOG(gettid(), "call(shm=%ld, source=%s, bytes_expected=%ld)", shm, source.c_str(), + bytes_expected); +} \ No newline at end of file diff --git a/capio/server/src/remote_request.cpp b/capio/server/src/remote_request.cpp new file mode 100644 index 000000000..a15bab108 --- /dev/null +++ b/capio/server/src/remote_request.cpp @@ -0,0 +1,21 @@ +#include "remote/backend.hpp" + +RemoteRequest::RemoteRequest(char *buf_recv, const std::string &source) : _source(source) { + START_LOG(gettid(), "call(buf_recv=%s, source=%s)", buf_recv, source.c_str()); + int code; + auto [ptr, ec] = std::from_chars(buf_recv, buf_recv + 4, code); + if (ec == std::errc()) { + this->_code = code; + this->_buf_recv = new char[CAPIO_SERVER_REQUEST_MAX_SIZE]; + strcpy(this->_buf_recv, ptr + 1); + LOG("Received request %d from %s : %s", this->_code, this->_source.c_str(), + this->_buf_recv); + } else { + this->_code = -1; + } +} + +RemoteRequest::~RemoteRequest() { delete[] _buf_recv; } +const std::string &RemoteRequest::get_source() const { return this->_source; } +[[nodiscard]] const char *RemoteRequest::get_content() const { return this->_buf_recv; } +[[nodiscard]] int RemoteRequest::get_code() const { return this->_code; } \ No newline at end of file diff --git a/capio/server/src/storage_manager.cpp b/capio/server/src/storage_manager.cpp index ef193a942..fbbb1bb28 100644 --- a/capio/server/src/storage_manager.cpp +++ b/capio/server/src/storage_manager.cpp @@ -14,7 +14,7 @@ #include "utils/shared_mutex.hpp" #include "utils/types.hpp" -extern char *node_name; +extern Backend *backend; void StorageManager::addDirectoryEntry(const pid_t tid, const std::filesystem::path &file_path, const std::string &dir, const ManagerDirEntryType type) { @@ -310,7 +310,7 @@ off64_t StorageManager::addDirectory(const pid_t tid, const std::filesystem::pat c_file.registerFirstWrite(); // TODO: it works only if there is one prod per file if (is_capio_dir(path)) { - add_file_location(path, node_name, -1); + add_file_location(path, backend->get_node_name().c_str(), -1); } else { write_file_location(path); updateDirectory(tid, path); diff --git a/capio/tests/unit/server/CMakeLists.txt b/capio/tests/unit/server/CMakeLists.txt index f564f9b89..42f3f35fa 100644 --- a/capio/tests/unit/server/CMakeLists.txt +++ b/capio/tests/unit/server/CMakeLists.txt @@ -2,6 +2,7 @@ # Target information ##################################### set(TARGET_NAME capio_server_unit_tests) +find_package(MPI REQUIRED) FetchContent_MakeAvailable(capio_cl) @@ -36,7 +37,7 @@ target_include_directories(${TARGET_NAME} PRIVATE ##################################### # Link libraries ##################################### -target_link_libraries(${TARGET_NAME} PRIVATE GTest::gtest_main rt libcapio_cl) +target_link_libraries(${TARGET_NAME} PRIVATE GTest::gtest_main rt libcapio_cl MPI::MPI_CXX) ##################################### # Configure tests diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index 834b04d24..4a9abb4d1 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -3,8 +3,10 @@ #include "common/env.hpp" #include "remote/backend.hpp" #include "storage/manager.hpp" +#include "utils/location.hpp" extern StorageManager *storage_manager; +extern Backend *backend; #include #include @@ -323,22 +325,32 @@ TEST(ServerTest, TestFileSetCommitToFalse) { class MockBackend : public Backend { public: + MockBackend() : Backend(HOST_NAME_MAX) {} + void recv_file(char *shm, const std::string &source, const long int bytes_expected) override { for (std::size_t i = 0; i < bytes_expected; ++i) { shm[i] = 33 + (i % 93); } } - const std::set get_nodes() override { return {}; } + const std::set get_nodes() override { return {node_name}; } void handshake_servers() override {} RemoteRequest read_next_request() override { return {nullptr, ""}; } void send_file(char *shm, long int nbytes, const std::string &target) override {} void send_request(const char *message, int message_len, const std::string &target) override {} }; -TEST(ServerTest, TestReadFromNodeMockBackend) { +class MockBackendTestFixture : public ::testing::Test { + protected: + void SetUp() override { + backend = new MockBackend(); + open_files_location(); + } - backend = new MockBackend(); + void TearDown() override { delete backend; } +}; + +TEST_F(MockBackendTestFixture, TestReadFromNodeMockBackend) { CapioFile file1; file1.createBuffer("testDir", true); @@ -352,8 +364,6 @@ TEST(ServerTest, TestReadFromNodeMockBackend) { for (std::size_t i = 0; i < 1000; ++i) { EXPECT_EQ(buf[i], 33 + (i % 93)); } - - delete backend; } TEST(ServerTest, TestGetSectorEnd) { @@ -367,7 +377,7 @@ TEST(ServerTest, TestGetSectorEnd) { EXPECT_EQ(file.getSectorEnd(12000), -1); } -TEST(ServerTest, TestSimulateDirectorySrteaming) { +TEST_F(MockBackendTestFixture, TestSimulateDirectoryStreaming) { constexpr int NUM_FILES_EXPECTED = 10; diff --git a/capio/tests/unit/server/src/main.cpp b/capio/tests/unit/server/src/main.cpp index 7e2bdb089..28cb277e4 100644 --- a/capio/tests/unit/server/src/main.cpp +++ b/capio/tests/unit/server/src/main.cpp @@ -1,11 +1,8 @@ #include -char *node_name; - #include "capiocl.hpp" #include "capiocl/engine.h" #include "client-manager/client_manager.hpp" -#include "common/constants.hpp" #include "storage/manager.hpp" #include "utils/capiocl_adapter.hpp" #include "utils/location.hpp" @@ -13,6 +10,7 @@ char *node_name; capiocl::engine::Engine *capio_cl_engine = nullptr; StorageManager *storage_manager = nullptr; ClientManager *client_manager = nullptr; +Backend *backend = nullptr; const capiocl::engine::Engine &CapioCLEngine::get() { return *capio_cl_engine; } @@ -22,10 +20,6 @@ class ServerUnitTestEnvironment : public testing::Environment { void SetUp() override { capio_cl_engine = new capiocl::engine::Engine(false); - node_name = new char[HOST_NAME_MAX]; - gethostname(node_name, HOST_NAME_MAX); - open_files_location(); - client_manager = new ClientManager(); storage_manager = new StorageManager(); }