diff --git a/CMakeLists.txt b/CMakeLists.txt index 0f6709d..02b0487 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,19 +57,11 @@ FetchContent_Declare( GIT_TAG v1.4.3 ) -FetchContent_Declare( - httplib - GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git - GIT_TAG v0.29.0 -) - - set(JSONCONS_BUILD_TESTS OFF CACHE BOOL "" FORCE) set(JSONCONS_BUILD_EXAMPLES OFF CACHE BOOL "" FORCE) set(JSONCONS_BUILD_FUZZERS OFF CACHE BOOL "" FORCE) -set(HTTPLIB_USE_ZSTD_IF_AVAILABLE OFF CACHE BOOL "" FORCE) -FetchContent_MakeAvailable(jsoncons tomlplusplus httplib) +FetchContent_MakeAvailable(jsoncons tomlplusplus) if (BUILD_PYTHON_BINDINGS) FetchContent_Declare( @@ -143,13 +135,11 @@ target_include_directories(libcapio_cl PUBLIC ${jsoncons_SOURCE_DIR}/include ${CAPIOCL_JSON_SCHEMAS_DIRECTORY} ${TOMLPLUSPLUS_SOURCE_DIR}/include - ${httplib_SOURCE_DIR} ) target_link_libraries(libcapio_cl PUBLIC) target_link_libraries(libcapio_cl PRIVATE tomlplusplus::tomlplusplus - httplib::httplib ) find_library(LIBANL anl) diff --git a/README.md b/README.md index 412738c..f83498a 100644 --- a/README.md +++ b/README.md @@ -187,43 +187,6 @@ engine.print() Serializer.dump(engine, "my_workflow", "my_workflow.json") ``` -## CapioCL Web API Documentation - -This section describes the REST-style Web API exposed by the CapioCL Web Server. -The server provides HTTP endpoints for configuring and querying the CapioCL engine at runtime. -Within the `bruno_webapi_tests` you can find several tests and examples on how to perform -requests to the API webserver using [bruno](https://www.usebruno.com). - -All endpoints communicate using JSON over HTTP. To enable the webserver, users needs to explicitly start it with: - -```cpp -capiocl::engine::Engine engine(); - -// start engine with default parameters -engine.startApiServer(); - -// or by specifying the address and port: -engine.startApiServer("127.0.0.1", 5520); -``` - - -or equivalently in python with: - -```python -engine = py_capio_cl.Engine() - -#start engine with default parameters -engine.startApiServer() - -# or by specifying the address and port: -engine.startApiServer("127.0.0.1", 5520) -``` - -By default, the webserver listens only on local connection at the following address: ```127.0.0.1:5520```. No -authentication -services are currently available, and as such, users should put particular care when allowing connections from external -endpoints. - ## Notes - All GET endpoints expect a JSON body containing the targeted file path. diff --git a/bindings/python_bindings.cpp b/bindings/python_bindings.cpp index 864199d..2edab01 100644 --- a/bindings/python_bindings.cpp +++ b/bindings/python_bindings.cpp @@ -1,13 +1,14 @@ +#include #include #include #include #include +#include #include "capiocl.hpp" #include "capiocl/engine.h" #include "capiocl/monitor.h" #include "capiocl/parser.h" -#include "capiocl/printer.h" #include "capiocl/serializer.h" namespace py = pybind11; @@ -20,6 +21,11 @@ PYBIND11_MODULE(_py_capio_cl, m) { py::register_exception(m, "SerializerException"); py::register_exception(m, "MonitorException"); + m.attr("CAPIO_CL_DEFAULT_WF_NAME") = py::str(capiocl::CAPIO_CL_DEFAULT_WF_NAME); + m.attr("DEFAULT_MCAST_GROUP") = + py::make_tuple(capiocl::configuration::defaults::DEFAULT_API_MULTICAST_IP.v, + std::stoi(capiocl::configuration::defaults::DEFAULT_API_MULTICAST_PORT.v)); + py::module_ fire_rules = m.def_submodule("fire_rules", "CAPIO-CL fire rules"); fire_rules.attr("UPDATE") = py::str(capiocl::fireRules::UPDATE); fire_rules.attr("NO_UPDATE") = py::str(capiocl::fireRules::NO_UPDATE); @@ -40,9 +46,14 @@ PYBIND11_MODULE(_py_capio_cl, m) { .def("print", &capiocl::engine::Engine::print) .def("contains", &capiocl::engine::Engine::contains, py::arg("path")) .def("size", &capiocl::engine::Engine::size) - .def("add", &capiocl::engine::Engine::add, py::arg("path"), py::arg("producers"), - py::arg("consumers"), py::arg("commit_rule"), py::arg("fire_rule"), - py::arg("permanent"), py::arg("exclude"), py::arg("dependencies")) + .def("add", + py::overload_cast &, + std::vector &, const std::string &, const std::string &, + bool, bool, std::vector &>( + &capiocl::engine::Engine::add), + py::arg("path"), py::arg("producers"), py::arg("consumers"), py::arg("commit_rule"), + py::arg("fire_rule"), py::arg("permanent"), py::arg("exclude"), + py::arg("dependencies")) .def("addProducer", &capiocl::engine::Engine::addProducer, py::arg("path"), py::arg("producer")) .def("addConsumer", &capiocl::engine::Engine::addConsumer, py::arg("path"), @@ -98,8 +109,7 @@ PYBIND11_MODULE(_py_capio_cl, m) { .def("isCommitted", &capiocl::engine::Engine::isCommitted, py::arg("path")) .def("setHomeNode", &capiocl::engine::Engine::setHomeNode, py::arg("path")) .def("getPaths", &capiocl::engine::Engine::getPaths) - .def("startApiServer", &capiocl::engine::Engine::startApiServer, - py::arg("address") = "127.0.0.1", py::arg("port") = 5520) + .def("startApiServer", &capiocl::engine::Engine::startApiServer) .def("__str__", &capiocl::engine::Engine::print) .def("__repr__", [](const capiocl::engine::Engine &e) { @@ -120,4 +130,24 @@ PYBIND11_MODULE(_py_capio_cl, m) { m.def("serialize", &capiocl::serializer::Serializer::dump, py::arg("engine"), py::arg("filename"), py::arg("version") = capiocl::CAPIO_CL_VERSION::V1); + + py::class_(m, "CapioCLEntry") + .def(py::init<>()) + .def_readwrite("producers", &capiocl::engine::CapioCLEntry::producers) + .def_readwrite("consumers", &capiocl::engine::CapioCLEntry::consumers) + .def_readwrite("file_dependencies", &capiocl::engine::CapioCLEntry::file_dependencies) + .def_readwrite("commit_rule", &capiocl::engine::CapioCLEntry::commit_rule) + .def_readwrite("fire_rule", &capiocl::engine::CapioCLEntry::fire_rule) + .def_readwrite("directory_children_count", + &capiocl::engine::CapioCLEntry::directory_children_count) + .def_readwrite("commit_on_close_count", + &capiocl::engine::CapioCLEntry::commit_on_close_count) + .def_readwrite("enable_directory_count_update", + &capiocl::engine::CapioCLEntry::enable_directory_count_update) + .def_readwrite("store_in_memory", &capiocl::engine::CapioCLEntry::store_in_memory) + .def_readwrite("permanent", &capiocl::engine::CapioCLEntry::permanent) + .def_readwrite("excluded", &capiocl::engine::CapioCLEntry::excluded) + .def_readwrite("is_file", &capiocl::engine::CapioCLEntry::is_file) + .def_static("from_json", &capiocl::engine::CapioCLEntry::fromJson, py::arg("in")) + .def("to_json", &capiocl::engine::CapioCLEntry::toJson); } \ No newline at end of file diff --git a/bruno_webapi_tests/add_consumer.bru b/bruno_webapi_tests/add_consumer.bru deleted file mode 100644 index 8a1db14..0000000 --- a/bruno_webapi_tests/add_consumer.bru +++ /dev/null @@ -1,23 +0,0 @@ -meta { - name: add_consumer - type: http - seq: 2 -} - -post { - url: http://localhost:5520/consumer - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt", - "consumer" : "sample2" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/add_file_dependency.bru b/bruno_webapi_tests/add_file_dependency.bru deleted file mode 100644 index 20bccbd..0000000 --- a/bruno_webapi_tests/add_file_dependency.bru +++ /dev/null @@ -1,23 +0,0 @@ -meta { - name: add_file_dependency - type: http - seq: 4 -} - -post { - url: http://localhost:5520/dependency - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt", - "dependency" : "myFile.dat" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/add_producer.bru b/bruno_webapi_tests/add_producer.bru deleted file mode 100644 index 8165600..0000000 --- a/bruno_webapi_tests/add_producer.bru +++ /dev/null @@ -1,23 +0,0 @@ -meta { - name: add_producer - type: http - seq: 3 -} - -post { - url: http://localhost:5520/producer - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt", - "producer" : "sample1" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/add_workflow_name.bru b/bruno_webapi_tests/add_workflow_name.bru deleted file mode 100644 index caad638..0000000 --- a/bruno_webapi_tests/add_workflow_name.bru +++ /dev/null @@ -1,22 +0,0 @@ -meta { - name: add_workflow_name - type: http - seq: 1 -} - -post { - url: http://localhost:5520/workflow - body: json - auth: inherit -} - -body:json { - { - "name": "workflow_demo" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/bruno.json b/bruno_webapi_tests/bruno.json deleted file mode 100644 index 04d4d50..0000000 --- a/bruno_webapi_tests/bruno.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "version": "1", - "name": "bruno_webapi_tests", - "type": "collection", - "ignore": [ - "node_modules", - ".git" - ] -} \ No newline at end of file diff --git a/bruno_webapi_tests/collection.bru b/bruno_webapi_tests/collection.bru deleted file mode 100644 index e69de29..0000000 diff --git a/bruno_webapi_tests/get_commit_close_count.bru b/bruno_webapi_tests/get_commit_close_count.bru deleted file mode 100644 index 49a1ed3..0000000 --- a/bruno_webapi_tests/get_commit_close_count.bru +++ /dev/null @@ -1,22 +0,0 @@ -meta { - name: get_commit_close_count - type: http - seq: 18 -} - -get { - url: http://localhost:5520/commit/close-count - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/get_commit_on_n_files_count.bru b/bruno_webapi_tests/get_commit_on_n_files_count.bru deleted file mode 100644 index dbc6e2b..0000000 --- a/bruno_webapi_tests/get_commit_on_n_files_count.bru +++ /dev/null @@ -1,22 +0,0 @@ -meta { - name: get_commit_on_n_files_count - type: http - seq: 17 -} - -get { - url: http://localhost:5520/commit/file-count - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/get_commit_rule.bru b/bruno_webapi_tests/get_commit_rule.bru deleted file mode 100644 index 3950caf..0000000 --- a/bruno_webapi_tests/get_commit_rule.bru +++ /dev/null @@ -1,22 +0,0 @@ -meta { - name: get_commit_rule - type: http - seq: 16 -} - -get { - url: http://localhost:5520/commit - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/get_consumer.bru b/bruno_webapi_tests/get_consumer.bru deleted file mode 100644 index 8b13fba..0000000 --- a/bruno_webapi_tests/get_consumer.bru +++ /dev/null @@ -1,22 +0,0 @@ -meta { - name: get_consumer - type: http - seq: 14 -} - -get { - url: http://localhost:5520/consumer - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/get_directory.bru b/bruno_webapi_tests/get_directory.bru deleted file mode 100644 index 50c9b5c..0000000 --- a/bruno_webapi_tests/get_directory.bru +++ /dev/null @@ -1,22 +0,0 @@ -meta { - name: get_directory - type: http - seq: 22 -} - -get { - url: http://localhost:5520/directory - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/get_excluded.bru b/bruno_webapi_tests/get_excluded.bru deleted file mode 100644 index 56ad262..0000000 --- a/bruno_webapi_tests/get_excluded.bru +++ /dev/null @@ -1,22 +0,0 @@ -meta { - name: get_excluded - type: http - seq: 21 -} - -get { - url: http://localhost:5520/exclude - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/get_file_dependencies.bru b/bruno_webapi_tests/get_file_dependencies.bru deleted file mode 100644 index cff2964..0000000 --- a/bruno_webapi_tests/get_file_dependencies.bru +++ /dev/null @@ -1,22 +0,0 @@ -meta { - name: get_file_dependencies - type: http - seq: 15 -} - -get { - url: http://localhost:5520/dependency - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/get_fire_rule.bru b/bruno_webapi_tests/get_fire_rule.bru deleted file mode 100644 index fe4845f..0000000 --- a/bruno_webapi_tests/get_fire_rule.bru +++ /dev/null @@ -1,22 +0,0 @@ -meta { - name: get_fire_rule - type: http - seq: 19 -} - -get { - url: http://localhost:5520/fire - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/get_permanent.bru b/bruno_webapi_tests/get_permanent.bru deleted file mode 100644 index 87f3d53..0000000 --- a/bruno_webapi_tests/get_permanent.bru +++ /dev/null @@ -1,22 +0,0 @@ -meta { - name: get_permanent - type: http - seq: 20 -} - -get { - url: http://localhost:5520/permanent - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/get_producer.bru b/bruno_webapi_tests/get_producer.bru deleted file mode 100644 index a612e47..0000000 --- a/bruno_webapi_tests/get_producer.bru +++ /dev/null @@ -1,22 +0,0 @@ -meta { - name: get_producer - type: http - seq: 13 -} - -get { - url: http://localhost:5520/producer - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/get_workflow_name.bru b/bruno_webapi_tests/get_workflow_name.bru deleted file mode 100644 index 6270aa1..0000000 --- a/bruno_webapi_tests/get_workflow_name.bru +++ /dev/null @@ -1,16 +0,0 @@ -meta { - name: get_workflow_name - type: http - seq: 12 -} - -get { - url: http://localhost:5520/workflow - body: none - auth: inherit -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/set_commit_close_count.bru b/bruno_webapi_tests/set_commit_close_count.bru deleted file mode 100644 index bf4e62d..0000000 --- a/bruno_webapi_tests/set_commit_close_count.bru +++ /dev/null @@ -1,23 +0,0 @@ -meta { - name: set_commit_close_count - type: http - seq: 7 -} - -post { - url: http://localhost:5520/commit/close-count - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt", - "count" : 123 - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/set_commit_on_n_files_count.bru b/bruno_webapi_tests/set_commit_on_n_files_count.bru deleted file mode 100644 index b0a50f2..0000000 --- a/bruno_webapi_tests/set_commit_on_n_files_count.bru +++ /dev/null @@ -1,23 +0,0 @@ -meta { - name: set_commit_on_n_files_count - type: http - seq: 6 -} - -post { - url: http://localhost:5520/commit/file-count - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt", - "count" : 10 - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/set_commit_rule.bru b/bruno_webapi_tests/set_commit_rule.bru deleted file mode 100644 index c12c188..0000000 --- a/bruno_webapi_tests/set_commit_rule.bru +++ /dev/null @@ -1,23 +0,0 @@ -meta { - name: set_commit_rule - type: http - seq: 5 -} - -post { - url: http://localhost:5520/commit - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt", - "commit" : "on_close" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/set_directory.bru b/bruno_webapi_tests/set_directory.bru deleted file mode 100644 index 6dd873c..0000000 --- a/bruno_webapi_tests/set_directory.bru +++ /dev/null @@ -1,23 +0,0 @@ -meta { - name: set_directory - type: http - seq: 11 -} - -post { - url: http://localhost:5520/directory - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt", - "directory" : true - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/set_excluded.bru b/bruno_webapi_tests/set_excluded.bru deleted file mode 100644 index 72584db..0000000 --- a/bruno_webapi_tests/set_excluded.bru +++ /dev/null @@ -1,23 +0,0 @@ -meta { - name: set_excluded - type: http - seq: 10 -} - -post { - url: http://localhost:5520/exclude - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt", - "exclude" : true - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/set_fire_rule.bru b/bruno_webapi_tests/set_fire_rule.bru deleted file mode 100644 index 6edad97..0000000 --- a/bruno_webapi_tests/set_fire_rule.bru +++ /dev/null @@ -1,23 +0,0 @@ -meta { - name: set_fire_rule - type: http - seq: 8 -} - -post { - url: http://localhost:5520/fire - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt", - "fire" : "no_update" - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/bruno_webapi_tests/set_permanent.bru b/bruno_webapi_tests/set_permanent.bru deleted file mode 100644 index 1505180..0000000 --- a/bruno_webapi_tests/set_permanent.bru +++ /dev/null @@ -1,23 +0,0 @@ -meta { - name: set_permanent - type: http - seq: 9 -} - -post { - url: http://localhost:5520/permanent - body: json - auth: inherit -} - -body:json { - { - "path" : "/tmp/test.txt", - "permanent" : true - } -} - -settings { - encodeUrl: true - timeout: 0 -} diff --git a/capiocl.hpp b/capiocl.hpp index 3944312..aa239b2 100644 --- a/capiocl.hpp +++ b/capiocl.hpp @@ -93,8 +93,8 @@ class CapioClConfigurationException; struct defaults; } // namespace configuration -namespace webapi { -class CapioClWebApiServer; +namespace api { +class CapioClApiServer; } } // namespace capiocl diff --git a/capiocl/webapi.h b/capiocl/api.h similarity index 54% rename from capiocl/webapi.h rename to capiocl/api.h index b322c2c..b9c1e0c 100644 --- a/capiocl/webapi.h +++ b/capiocl/api.h @@ -1,25 +1,29 @@ #ifndef CAPIO_CL_WEBAPI_H #define CAPIO_CL_WEBAPI_H +#include #include #include "capiocl.hpp" +#include "configuration.h" /// @brief Class that exposes a REST Web Server to interact with the current configuration -class capiocl::webapi::CapioClWebApiServer { +class capiocl::api::CapioClApiServer { /// @brief asynchronous running webserver thread std::thread _webApiThread; /// @brief port on which the current server runs - int _port; + const configuration::CapioClConfiguration &capiocl_configuration; + + /// @brief variable to tell the thread to terminate + std::atomic _terminate = false; public: /// @brief default constructor. - CapioClWebApiServer(engine::Engine *engine, const std::string &web_server_address, - int web_server_port); + CapioClApiServer(engine::Engine *engine, configuration::CapioClConfiguration &config); /// @brief Default Destructor - ~CapioClWebApiServer(); + ~CapioClApiServer(); }; #endif // CAPIO_CL_WEBAPI_H diff --git a/capiocl/configuration.h b/capiocl/configuration.h index 2c56d21..2703ea5 100644 --- a/capiocl/configuration.h +++ b/capiocl/configuration.h @@ -29,6 +29,10 @@ struct capiocl::configuration::defaults { static ConfigurationEntry DEFAULT_MONITOR_HOMENODE_PORT; /// @brief Enable File system monitor by default static ConfigurationEntry DEFAULT_MONITOR_FS_ENABLED; + /// @brief IP multicast address for receiving and sending changes in the CapioCL configuration + static ConfigurationEntry DEFAULT_API_MULTICAST_IP; + /// @brief IP multicast port for receiving and sending changes in the CapioCL configuration + static ConfigurationEntry DEFAULT_API_MULTICAST_PORT; }; /// @brief Load configuration and store it from a CAPIO-CL TOML configuration file diff --git a/capiocl/engine.h b/capiocl/engine.h index d3b1ae0..71f6376 100644 --- a/capiocl/engine.h +++ b/capiocl/engine.h @@ -1,14 +1,70 @@ #ifndef CAPIO_CL_ENGINE_H #define CAPIO_CL_ENGINE_H +#include +#include #include #include "capiocl.hpp" +#include "capiocl/api.h" #include "capiocl/monitor.h" #include "capiocl/serializer.h" -#include "capiocl/webapi.h" /// @brief Namespace containing the CAPIO-CL Engine namespace capiocl::engine { + +/// @brief Internal CAPIO-CL Engine storage entity. Each CapioCLEntry is an entry for a given +/// file handled by CAPIO-CL +struct CapioCLEntry final { + // LCOV_EXCL_START + ///@brief Producers of file + std::vector producers; + ///@brief consumers of file + std::vector consumers; + ///@brief Dependencies for Commit + std::vector file_dependencies; + ///@brief Commit rule + std::string commit_rule = commitRules::ON_TERMINATION; + ///@brief Fire rule + std::string fire_rule = fireRules::UPDATE; + ///@brief Expected number of files in directory + long directory_children_count = 0; + ///@brief Expected close count + long commit_on_close_count = 0; + /// @brief whether to update or not directory item count + bool enable_directory_count_update = true; + /// @brief Store in memory or on the file system + bool store_in_memory = false; + /// @brief whether the file should persiste after workflow termination + bool permanent = false; + /// @brief whether to ignore this entry + bool excluded = false; + /// @brief whether this entry is a file or a directory + bool is_file = true; + // LCOV_EXCL_STOP + + /** + * Generate a new CapioClEntry from a JSON input + * @param in string with JSON to be parsed + * @return CapioClEntry with the input values + */ + static CapioCLEntry fromJson(const std::string &in); + + /// @brief Serialize this entry to a JSON object returned as string to be sent over network + [[nodiscard]] std::string toJson() const; + + /// @brief add a new CapioClEntry to this one + CapioCLEntry &operator+=(const CapioCLEntry &rhs); + + /// @brief add a new CapioClEntry to this one + CapioCLEntry operator+(const CapioCLEntry &rhs); + + /// @brief check for equality of rules + bool operator==(const CapioCLEntry &other); + + /// @brief check for inequality + bool operator!=(const CapioCLEntry &other); +}; + /** * @brief Engine for managing CAPIO-CL configuration entries. * The CapioCLEngine class stores and manages configuration rules for files @@ -26,6 +82,11 @@ namespace capiocl::engine { */ class Engine final { friend class serializer::Serializer; + + /// @brief Synchronization variable for locking + mutable std::shared_mutex _shared_mutex; + + /// @brief Whether current engine instance should store all files in memory bool store_all_in_memory = false; ///@brief Configuration imported from CAPIO-CL config TOML file @@ -41,27 +102,7 @@ class Engine final { std::string workflow_name; /// @brief CAPIO-CL APIs Web Server - std::unique_ptr webapi_server; - - // LCOV_EXCL_START - /// @brief Internal CAPIO-CL Engine storage entity. Each CapioCLEntry is an entry for a given - /// file handled by CAPIO-CL - struct CapioCLEntry final { - std::vector producers; - std::vector consumers; - std::vector file_dependencies; - std::string commit_rule = commitRules::ON_TERMINATION; - std::string fire_rule = fireRules::UPDATE; - long directory_children_count = 0; - long commit_on_close_count = 0; - bool enable_directory_count_update = true; // whether to update or not directory item count - bool store_in_memory = false; - bool permanent = false; - bool excluded = false; - bool is_file = true; - }; - - // LCOV_EXCL_STOP + std::unique_ptr webapi_server; /// @brief Hash map used to store the configuration from CAPIO-CL mutable std::unordered_map _capio_cl_entries; @@ -139,6 +180,13 @@ class Engine final { const std::string &fire_rule, bool permanent, bool exclude, std::vector &dependencies); + /** + * Add a new CapioClEntry to the internal Database + * @param path The path of the CapioCLEnty + * @param entry the new entry to add + */ + void add(const std::filesystem::path &path, const CapioCLEntry &entry) const; + /** * @brief Add a new producer to a file entry. * @@ -171,7 +219,7 @@ class Engine final { * * @param path Path of the new file. */ - void newFile(const std::filesystem::path &path); + void newFile(const std::filesystem::path &path) const; /** * @brief Remove a file from the configuration. @@ -296,7 +344,7 @@ class Engine final { std::vector getConsumers(const std::filesystem::path &path) const; /// @brief Get the commit-on-close counter for a file. - long getCommitCloseCount(const std::filesystem::path::iterator::reference &path) const; + long getCommitCloseCount(const std::filesystem::path &path) const; /// @brief Get file dependencies. std::vector @@ -410,11 +458,10 @@ class Engine final { void useDefaultConfiguration(); /** - * Start the thread involved in the handling of dynamic changes to CapioCl configuration - * @param address address to listen to. defaulto to 127.0.0.1 - * @param port Port to listen to. defaults to 5520 + * Start the thread involved in the handling of dynamic changes to CapioCL configuration. + * Runtime parameters to the server are specified within the CapioCLConfiguration */ - void startApiServer(const std::string &address = "127.0.0.1", int port = 5520); + void startApiServer(); }; } // namespace capiocl::engine diff --git a/py_capio_cl/decorators.py b/py_capio_cl/decorators.py index d3a8ae8..cb13784 100644 --- a/py_capio_cl/decorators.py +++ b/py_capio_cl/decorators.py @@ -1,9 +1,13 @@ +import json +import socket from functools import wraps -import requests +from py_capio_cl import CAPIO_CL_DEFAULT_WF_NAME, DEFAULT_MCAST_GROUP +from py_capio_cl import CapioCLEntry def CapioCLRule(path: str, + workflow_name: str = CAPIO_CL_DEFAULT_WF_NAME, committed: str | None = None, fire: str | None = None, close_count: int | None = None, @@ -13,109 +17,64 @@ def CapioCLRule(path: str, is_excluded: bool | None = None, producers: list[str] | None = None, consumers: list[str] | None = None, - file_dependencies: list[str] | None = None + file_dependencies: list[str] | None = None, + multicast_group: tuple[str, int] = DEFAULT_MCAST_GROUP ): if not path: raise RuntimeError("ERROR: cannot specify a CAPIO-CL rule without setting a path!") - def _perform_request(endpoint, payload=None): - response = requests.post(endpoint, json=payload, headers={"content-type": "application/json"}) - json = response.json() - if "OK" not in json["status"]: - print(f"ERR: {json['what']}" if "what" in json else "ERR: no error message in response!") - + rule = CapioCLEntry() if committed: - _perform_request( - endpoint="http://localhost:5520/commit", - payload={ - "path": path, - "commit": committed - }) - + rule.commit_rule = committed if fire: - _perform_request( - endpoint="http://localhost:5520/fire", - payload={ - "path": path, - "fire": fire - }) + rule.fire_rule = fire if close_count: - _perform_request( - endpoint="http://localhost:5520/commit/close-count", - payload={ - "path": path, - "count": close_count - }) + rule.commit_on_close_count = close_count if directory_n_file_expected: - _perform_request( - endpoint="http://localhost:5520/commit/file-count", - payload={ - "path": path, - "count": directory_n_file_expected - }) - - if is_directory is not None: - _perform_request( - endpoint="http://localhost:5520/directory", - payload={ - "path": path, - "directory": is_directory - } - ) - - if is_permanent is not None: - _perform_request( - endpoint="http://localhost:5520/permanent", - payload={ - "path": path, - "permanent": is_permanent - } - ) - if is_excluded is not None: - _perform_request( - endpoint="http://localhost:5520/exclude", - payload={ - "path": path, - "excluded": is_excluded - } - ) + rule.directory_children_count = directory_n_file_expected + + if is_directory: + rule.is_file = not is_directory + + if is_permanent: + rule.permanent = is_permanent + + if is_excluded: + rule.excluded = is_excluded if producers: - for producer in producers: - _perform_request( - endpoint="http://localhost:5520/producer", - payload={ - "path": path, - "producer": producer - } - ) + rule.producers = producers if consumers: - for consumer in consumers: - _perform_request( - endpoint="http://localhost:5520/consumer", - payload={ - "path": path, - "consumer": consumer - } - ) + rule.consumers = consumers if file_dependencies: - for dependency in file_dependencies: - _perform_request( - endpoint="http://localhost:5520/dependency", - payload={ - "path": path, - "dependency": dependency - } - ) + rule.file_dependencies = file_dependencies + + request_body = dict() + request_body["path"] = path + request_body["workflow_name"] = workflow_name + request_body["CapioClEntry"] = json.loads(rule.to_json()) + + message = json.dumps(request_body).encode('utf-8') + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + + ttl = 1 # no broadcast outside current subnet + sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl) + + try: + sock.sendto(message, multicast_group) + print(f"[[py_capio_cl]] Sent bcast message for path: {path}") + finally: + sock.close() def _capiocl_rule(func): @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) + return wrapper return _capiocl_rule diff --git a/src/Engine.cpp b/src/Engine.cpp index b333d1e..d6951fc 100644 --- a/src/Engine.cpp +++ b/src/Engine.cpp @@ -9,7 +9,24 @@ #include "capiocl/monitor.h" #include "capiocl/printer.h" +/// @brief Class to implement a shared mutex lock guard +template class shared_lock_guard { + public: + /// @brief Constructor: acquire semaphore shared + explicit shared_lock_guard(SharedMutex &m) : mutex_(m) { mutex_.lock_shared(); } + /// @brief Destructor: release resources + ~shared_lock_guard() { mutex_.unlock_shared(); } + + shared_lock_guard(const shared_lock_guard &) = delete; + shared_lock_guard &operator=(const shared_lock_guard &) = delete; + + private: + /// @brief Reference to mutex + SharedMutex &mutex_; +}; + void capiocl::engine::Engine::print() const { + // First message printer::print(printer::CLI_LEVEL_JSON, ""); printer::print(printer::CLI_LEVEL_JSON, "Composition of expected CAPIO FS: "); @@ -203,22 +220,27 @@ void capiocl::engine::Engine::compute_directory_entry_count( } bool capiocl::engine::Engine::contains(const std::filesystem::path &file) const { + shared_lock_guard slg(_shared_mutex); return std::any_of(_capio_cl_entries.begin(), _capio_cl_entries.end(), [&](auto const &entry) { return fnmatch(entry.first.c_str(), file.c_str(), FNM_NOESCAPE) == 0; }); } -size_t capiocl::engine::Engine::size() const { return this->_capio_cl_entries.size(); } +size_t capiocl::engine::Engine::size() const { + shared_lock_guard slg(_shared_mutex); + return this->_capio_cl_entries.size(); +} void capiocl::engine::Engine::add(std::filesystem::path &path, std::vector &producers, std::vector &consumers, const std::string &commit_rule, const std::string &fire_rule, bool permanent, bool exclude, std::vector &dependencies) { + if (path.empty()) { return; } - + std::lock_guard lg(_shared_mutex); this->_newFile(path); CapioCLEntry &entry = _capio_cl_entries.at(path); @@ -230,16 +252,33 @@ void capiocl::engine::Engine::add(std::filesystem::path &path, std::vector_newFile(path); } + std::lock_guard lg(_shared_mutex); + + if (_capio_cl_entries.find(path) == _capio_cl_entries.end()) { + _capio_cl_entries[path] = entry; + } else { + _capio_cl_entries[path] += entry; + } +} + +void capiocl::engine::Engine::newFile(const std::filesystem::path &path) const { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); +} long capiocl::engine::Engine::getDirectoryFileCount(const std::filesystem::path &path) const { + if (path.empty()) { return 0; } - - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - return itm->second.directory_children_count; + { + shared_lock_guard slg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + return itm->second.directory_children_count; + } } this->_newFile(path); return getDirectoryFileCount(path); @@ -247,21 +286,27 @@ long capiocl::engine::Engine::getDirectoryFileCount(const std::filesystem::path void capiocl::engine::Engine::addProducer(const std::filesystem::path &path, std::string &producer) { + if (path.empty()) { return; } - producer.erase(remove_if(producer.begin(), producer.end(), isspace), producer.end()); + { + std::lock_guard lg(_shared_mutex); - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - auto &vec = itm->second.producers; - if (std::find(vec.begin(), vec.end(), producer) == vec.end()) { - vec.emplace_back(producer); - return; - } else { - return; + producer.erase(remove_if(producer.begin(), producer.end(), isspace), producer.end()); + + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + auto &vec = itm->second.producers; + if (std::find(vec.begin(), vec.end(), producer) == vec.end()) { + vec.emplace_back(producer); + return; + } else { + return; + } } } + this->newFile(path); this->addProducer(path, producer); } @@ -272,14 +317,18 @@ void capiocl::engine::Engine::addConsumer(const std::filesystem::path &path, return; } - consumer.erase(remove_if(consumer.begin(), consumer.end(), isspace), consumer.end()); - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - auto &vec = itm->second.consumers; - if (std::find(vec.begin(), vec.end(), consumer) == vec.end()) { - vec.emplace_back(consumer); + { + std::lock_guard lg(_shared_mutex); + consumer.erase(remove_if(consumer.begin(), consumer.end(), isspace), consumer.end()); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + auto &vec = itm->second.consumers; + if (std::find(vec.begin(), vec.end(), consumer) == vec.end()) { + vec.emplace_back(consumer); + } + return; } - return; } + this->newFile(path); this->addConsumer(path, consumer); } @@ -290,12 +339,15 @@ void capiocl::engine::Engine::addFileDependency(const std::filesystem::path &pat return; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - auto &vec = itm->second.file_dependencies; - if (std::find(vec.begin(), vec.end(), file_dependency) == vec.end()) { - vec.emplace_back(file_dependency); + { + std::lock_guard lg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + auto &vec = itm->second.file_dependencies; + if (std::find(vec.begin(), vec.end(), file_dependency) == vec.end()) { + vec.emplace_back(file_dependency); + } + return; } - return; } this->newFile(path); this->setCommitRule(path, commitRules::ON_FILE); @@ -309,12 +361,17 @@ void capiocl::engine::Engine::setCommitRule(const std::filesystem::path &path, } const auto commit = commitRules::sanitize(commit_rule); - - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - itm->second.commit_rule = commit; - return; + { + std::lock_guard lg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + itm->second.commit_rule = commit; + return; + } + } + { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); } - this->_newFile(path); this->setCommitRule(path, commit); } @@ -323,11 +380,18 @@ std::string capiocl::engine::Engine::getCommitRule(const std::filesystem::path & return commitRules::ON_TERMINATION; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - return itm->second.commit_rule; + { + shared_lock_guard slg(_shared_mutex); + + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + return itm->second.commit_rule; + } } - this->_newFile(path); + { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); + } return getCommitRule(path); } @@ -336,11 +400,18 @@ std::string capiocl::engine::Engine::getFireRule(const std::filesystem::path &pa return fireRules::NO_UPDATE; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - return itm->second.fire_rule; + { + shared_lock_guard slg(_shared_mutex); + + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + return itm->second.fire_rule; + } } - this->_newFile(path); + { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); + } return getFireRule(path); } @@ -352,11 +423,15 @@ void capiocl::engine::Engine::setFireRule(const std::filesystem::path &path, const auto fire = fireRules::sanitize(fire_rule); - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - itm->second.fire_rule = fire; - return; + { + std::lock_guard lg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + itm->second.fire_rule = fire; + return; + } + this->_newFile(path); } - this->_newFile(path); + setFireRule(path, fire); } @@ -365,11 +440,17 @@ bool capiocl::engine::Engine::isFirable(const std::filesystem::path &path) const return true; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - return itm->second.fire_rule == fireRules::NO_UPDATE; + { + shared_lock_guard slg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + return itm->second.fire_rule == fireRules::NO_UPDATE; + } } - this->_newFile((path)); + { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); + } return isFirable(path); } @@ -378,11 +459,14 @@ void capiocl::engine::Engine::setPermanent(const std::filesystem::path &path, bo return; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - itm->second.permanent = value; - return; + { + std::lock_guard lg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + itm->second.permanent = value; + return; + } + this->_newFile(path); } - this->_newFile(path); setPermanent(path, value); } @@ -391,11 +475,17 @@ bool capiocl::engine::Engine::isPermanent(const std::filesystem::path &path) con return true; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - return itm->second.permanent; + { + shared_lock_guard slg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + return itm->second.permanent; + } } - this->_newFile(path); + { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); + } return isPermanent(path); } @@ -408,6 +498,8 @@ void capiocl::engine::Engine::setCommitted(const std::filesystem::path &path) co } std::vector capiocl::engine::Engine::getPaths() const { + shared_lock_guard slg(_shared_mutex); + std::vector paths; for (const auto &[k, v] : _capio_cl_entries) { paths.push_back(k); @@ -419,12 +511,14 @@ void capiocl::engine::Engine::setExclude(const std::filesystem::path &path, cons if (path.empty()) { return; } - - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - itm->second.excluded = value; - return; + { + std::lock_guard lg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + itm->second.excluded = value; + return; + } + this->_newFile(path); } - this->_newFile(path); setExclude(path, value); } @@ -433,11 +527,14 @@ void capiocl::engine::Engine::setDirectory(const std::filesystem::path &path) { return; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - itm->second.is_file = false; - return; + { + std::lock_guard lg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + itm->second.is_file = false; + return; + } + this->_newFile(path); } - this->_newFile(path); setDirectory(path); } @@ -446,11 +543,14 @@ void capiocl::engine::Engine::setFile(const std::filesystem::path &path) { return; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - itm->second.is_file = true; - return; + { + std::lock_guard lg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + itm->second.is_file = true; + return; + } + this->_newFile(path); } - this->_newFile(path); setFile(path); } @@ -459,10 +559,17 @@ bool capiocl::engine::Engine::isFile(const std::filesystem::path &path) const { return true; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - return itm->second.is_file; + { + shared_lock_guard slg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + return itm->second.is_file; + } + } + + { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); } - this->_newFile(path); return isPermanent(path); } @@ -470,7 +577,6 @@ bool capiocl::engine::Engine::isDirectory(const std::filesystem::path &path) con if (path.empty()) { return true; } - return !isFile(path); } @@ -480,11 +586,14 @@ void capiocl::engine::Engine::setCommitedCloseNumber(const std::filesystem::path return; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - itm->second.commit_on_close_count = num; - return; + { + std::lock_guard lg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + itm->second.commit_on_close_count = num; + return; + } + this->_newFile(path); } - this->_newFile(path); setCommitedCloseNumber(path, num); } @@ -494,17 +603,26 @@ void capiocl::engine::Engine::setDirectoryFileCount(const std::filesystem::path return; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - this->setDirectory(path); - itm->second.directory_children_count = num; - itm->second.enable_directory_count_update = false; - return; + const auto paths = getPaths(); + for (const auto &file_paths : paths) { + this->setDirectory(file_paths); + } + + { + std::lock_guard lg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + itm->second.directory_children_count = num; + itm->second.enable_directory_count_update = false; + return; + } + this->_newFile(path); } - this->_newFile(path); this->setDirectoryFileCount(path, num); } void capiocl::engine::Engine::remove(const std::filesystem::path &path) const { + std::lock_guard lg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm == _capio_cl_entries.end()) { return; } @@ -513,6 +631,8 @@ void capiocl::engine::Engine::remove(const std::filesystem::path &path) const { std::vector capiocl::engine::Engine::getConsumers(const std::filesystem::path &path) const { + std::lock_guard lg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { return itm->second.consumers; } @@ -525,16 +645,23 @@ bool capiocl::engine::Engine::isConsumer(const std::filesystem::path &path, return true; } - for (const auto &[pattern, entry] : _capio_cl_entries) { - if (fnmatch(pattern.c_str(), path.c_str(), FNM_NOESCAPE) == 0) { - const auto &consumers = entry.consumers; - if (std::find(consumers.begin(), consumers.end(), app_name) != consumers.end()) { - return true; + { + shared_lock_guard slg(_shared_mutex); + + for (const auto &[pattern, entry] : _capio_cl_entries) { + if (fnmatch(pattern.c_str(), path.c_str(), FNM_NOESCAPE) == 0) { + const auto &consumers = entry.consumers; + if (std::find(consumers.begin(), consumers.end(), app_name) != consumers.end()) { + return true; + } } } } + { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); + } - this->_newFile(path); return false; } @@ -544,10 +671,18 @@ capiocl::engine::Engine::getProducers(const std::filesystem::path &path) const { return {}; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - return itm->second.producers; + { + shared_lock_guard slg(_shared_mutex); + + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + return itm->second.producers; + } + } + + { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); } - this->_newFile(path); return getProducers(path); } @@ -556,17 +691,23 @@ bool capiocl::engine::Engine::isProducer(const std::filesystem::path &path, if (path.empty()) { return true; } - - for (const auto &[pattern, entry] : _capio_cl_entries) { - if (fnmatch(pattern.c_str(), path.c_str(), FNM_NOESCAPE) == 0) { - const auto &producers = entry.producers; - if (std::find(producers.begin(), producers.end(), app_name) != producers.end()) { - return true; + { + shared_lock_guard slg(_shared_mutex); + + for (const auto &[pattern, entry] : _capio_cl_entries) { + if (fnmatch(pattern.c_str(), path.c_str(), FNM_NOESCAPE) == 0) { + const auto &producers = entry.producers; + if (std::find(producers.begin(), producers.end(), app_name) != producers.end()) { + return true; + } } } } - this->_newFile(path); + { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); + } return false; } @@ -584,30 +725,41 @@ void capiocl::engine::Engine::setFileDeps(const std::filesystem::path &path, newFile(itm); } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - itm->second.file_dependencies = dependencies; - return; + { + std::lock_guard lg(_shared_mutex); + + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + itm->second.file_dependencies = dependencies; + return; + } + this->_newFile(path); } - this->_newFile(path); setFileDeps(path, dependencies); } -long capiocl::engine::Engine::getCommitCloseCount( - const std::filesystem::path::iterator::reference &path) const { +long capiocl::engine::Engine::getCommitCloseCount(const std::filesystem::path &path) const { if (path.empty()) { return 0; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - return itm->second.commit_on_close_count; - } + { + shared_lock_guard slg(_shared_mutex); - this->_newFile(path); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + return itm->second.commit_on_close_count; + } + } + { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); + } return getCommitCloseCount(path); } std::vector capiocl::engine::Engine::getCommitOnFileDependencies(const std::filesystem::path &path) const { + shared_lock_guard slg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { return itm->second.file_dependencies; } @@ -619,37 +771,52 @@ void capiocl::engine::Engine::setStoreFileInMemory(const std::filesystem::path & return; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - itm->second.store_in_memory = true; - return; + { + std::lock_guard lg(_shared_mutex); + + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + itm->second.store_in_memory = true; + return; + } + this->_newFile(path); } - this->_newFile(path); setStoreFileInMemory(path); } void capiocl::engine::Engine::setAllStoreInMemory() { - this->store_all_in_memory = true; - for (const auto &[fst, snd] : _capio_cl_entries) { - this->setStoreFileInMemory(fst); + { + std::lock_guard lg(_shared_mutex); + this->store_all_in_memory = true; + } + + const auto paths = getPaths(); + for (const auto &path : paths) { + this->setStoreFileInMemory(path); } } void capiocl::engine::Engine::setWorkflowName(const std::string &name) { + std::lock_guard lg(_shared_mutex); this->workflow_name = name; } -const std::string &capiocl::engine::Engine::getWorkflowName() const { return this->workflow_name; } +const std::string &capiocl::engine::Engine::getWorkflowName() const { + shared_lock_guard slg(_shared_mutex); + return this->workflow_name; +} void capiocl::engine::Engine::setStoreFileInFileSystem(const std::filesystem::path &path) { if (path.empty()) { return; } - - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - itm->second.store_in_memory = false; - return; + { + std::lock_guard lg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + itm->second.store_in_memory = false; + return; + } + this->_newFile(path); } - this->_newFile(path); setStoreFileInFileSystem(path); } @@ -658,17 +825,25 @@ bool capiocl::engine::Engine::isStoredInMemory(const std::filesystem::path &path return true; } - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - return itm->second.store_in_memory; + { + shared_lock_guard slg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + return itm->second.store_in_memory; + } } - this->_newFile(path); + { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); + } return isStoredInMemory(path); } std::vector capiocl::engine::Engine::getFileToStoreInMemory() const { std::vector files; + shared_lock_guard slg(_shared_mutex); + for (const auto &[path, file] : _capio_cl_entries) { if (file.store_in_memory) { files.push_back(path); @@ -691,72 +866,35 @@ bool capiocl::engine::Engine::isExcluded(const std::filesystem::path &path) cons if (path.empty()) { return true; } - - if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { - return itm->second.excluded; + { + shared_lock_guard slg(_shared_mutex); + if (const auto itm = _capio_cl_entries.find(path); itm != _capio_cl_entries.end()) { + return itm->second.excluded; + } } - this->_newFile(path); + { + std::lock_guard lg(_shared_mutex); + this->_newFile(path); + } return isExcluded(path); } -bool capiocl::engine::Engine::operator==(const capiocl::engine::Engine &other) const { +bool capiocl::engine::Engine::operator==(const Engine &other) const { const auto &other_entries = other._capio_cl_entries; if (this->_capio_cl_entries.size() != other_entries.size()) { return false; } - for (const auto &[this_path, this_itm] : this->_capio_cl_entries) { + for (auto &[this_path, this_itm] : this->_capio_cl_entries) { if (other_entries.find(this_path) == other_entries.end()) { return false; } - auto other_itm = other_entries.at(this_path); - - if (this_itm.commit_rule != other_itm.commit_rule || - this_itm.fire_rule != other_itm.fire_rule || - this_itm.permanent != other_itm.permanent || this_itm.excluded != other_itm.excluded || - this_itm.is_file != other_itm.is_file || - this_itm.commit_on_close_count != other_itm.commit_on_close_count || - this_itm.directory_children_count != other_itm.directory_children_count || - this_itm.store_in_memory != other_itm.store_in_memory) { - return false; - } - auto this_producer = this_itm.producers; - auto other_producer = other_itm.producers; - if (this_producer.size() != other_producer.size()) { + if (auto other_itm = other_entries.at(this_path); this_itm != other_itm) { return false; } - for (const auto &entry : this_producer) { - if (std::find(other_producer.begin(), other_producer.end(), entry) == - other_producer.end()) { - return false; - } - } - - auto this_consumer = this_itm.consumers; - auto other_consumer = other_itm.consumers; - if (this_consumer.size() != other_consumer.size()) { - return false; - } - for (const auto &entry : this_consumer) { - if (std::find(other_consumer.begin(), other_consumer.end(), entry) == - other_consumer.end()) { - return false; - } - } - - auto this_deps = this_itm.file_dependencies; - auto other_deps = other_itm.file_dependencies; - if (this_deps.size() != other_deps.size()) { - return false; - } - for (const auto &entry : this_deps) { - if (std::find(other_deps.begin(), other_deps.end(), entry) == other_deps.end()) { - return false; - } - } } return true; } @@ -797,6 +935,150 @@ void capiocl::engine::Engine::useDefaultConfiguration() { monitor.registerMonitorBackend(new monitor::FileSystemMonitor()); } -void capiocl::engine::Engine::startApiServer(const std::string &address, const int port) { - webapi_server = std::make_unique(this, address, port); +void capiocl::engine::Engine::startApiServer() { + webapi_server = std::make_unique(this, configuration); +} + +capiocl::engine::CapioCLEntry capiocl::engine::CapioCLEntry::fromJson(const std::string &in) { + jsoncons::json j = jsoncons::json::parse(in); + CapioCLEntry entry; + + // Mapping JSON keys to struct members + if (j.contains("producers")) { + entry.producers = j["producers"].as>(); + } + if (j.contains("consumers")) { + entry.consumers = j["consumers"].as>(); + } + + if (j.contains("file_dependencies")) { + for (const auto &path_str : j["file_dependencies"].array_range()) { + entry.file_dependencies.emplace_back(path_str.as()); + } + } + + entry.commit_rule = j.get_value_or("commit_rule", entry.commit_rule); + entry.fire_rule = j.get_value_or("fire_rule", entry.fire_rule); + entry.directory_children_count = + j.get_value_or("directory_children_count", entry.directory_children_count); + entry.commit_on_close_count = + j.get_value_or("commit_on_close_count", entry.commit_on_close_count); + entry.enable_directory_count_update = + j.get_value_or("enable_directory_count_update", entry.enable_directory_count_update); + entry.store_in_memory = j.get_value_or("store_in_memory", entry.store_in_memory); + entry.permanent = j.get_value_or("permanent", entry.permanent); + entry.excluded = j.get_value_or("excluded", entry.excluded); + entry.is_file = j.get_value_or("is_file", entry.is_file); + + return entry; +} + +std::string capiocl::engine::CapioCLEntry::toJson() const { + jsoncons::json j; + j["producers"] = producers; + j["consumers"] = consumers; + + jsoncons::json deps = jsoncons::json::array(); + for (const auto &p : file_dependencies) { + deps.push_back(p.string()); + } + j["file_dependencies"] = deps; + + j["commit_rule"] = commit_rule; + j["fire_rule"] = fire_rule; + j["directory_children_count"] = directory_children_count; + j["commit_on_close_count"] = commit_on_close_count; + j["enable_directory_count_update"] = enable_directory_count_update; + j["store_in_memory"] = store_in_memory; + j["permanent"] = permanent; + j["excluded"] = excluded; + j["is_file"] = is_file; + + return j.to_string(); +} + +capiocl::engine::CapioCLEntry &capiocl::engine::CapioCLEntry::operator+=(const CapioCLEntry &rhs) { + auto merge_vec = [](std::vector &dest, const std::vector &src) { + dest.insert(dest.end(), src.begin(), src.end()); + }; + + merge_vec(this->producers, rhs.producers); + merge_vec(this->consumers, rhs.consumers); + + this->file_dependencies.insert(this->file_dependencies.end(), rhs.file_dependencies.begin(), + rhs.file_dependencies.end()); + + this->directory_children_count += rhs.directory_children_count; + this->commit_on_close_count += rhs.commit_on_close_count; + + this->enable_directory_count_update &= rhs.enable_directory_count_update; + this->store_in_memory |= rhs.store_in_memory; + this->permanent |= rhs.permanent; + this->excluded |= rhs.excluded; + this->is_file &= rhs.is_file; + + this->commit_rule = rhs.commit_rule; + this->fire_rule = rhs.fire_rule; + + return *this; +} + +capiocl::engine::CapioCLEntry capiocl::engine::CapioCLEntry::operator+(const CapioCLEntry &rhs) { + CapioCLEntry result = *this; + result += rhs; + return result; +} + +bool capiocl::engine::CapioCLEntry::operator==(const CapioCLEntry &other) { + + if (this->commit_rule != other.commit_rule || this->fire_rule != other.fire_rule || + this->permanent != other.permanent || this->excluded != other.excluded || + this->is_file != other.is_file || + this->commit_on_close_count != other.commit_on_close_count || + this->directory_children_count != other.directory_children_count || + this->store_in_memory != other.store_in_memory || + this->enable_directory_count_update != other.enable_directory_count_update) { + return false; + } + + const auto this_producer = this->producers; + auto other_producer = other.producers; + if (this_producer.size() != other_producer.size()) { + return false; + } + for (const auto &entry : this_producer) { + if (std::find(other_producer.begin(), other_producer.end(), entry) == + other_producer.end()) { + return false; + } + } + + const auto this_consumer = this->consumers; + auto other_consumer = other.consumers; + if (this_consumer.size() != other_consumer.size()) { + return false; + } + for (const auto &entry : this_consumer) { + if (std::find(other_consumer.begin(), other_consumer.end(), entry) == + other_consumer.end()) { + return false; + } + } + + const auto this_deps = this->file_dependencies; + auto other_deps = other.file_dependencies; + if (this_deps.size() != other_deps.size()) { + return false; + } + for (const auto &entry : this_deps) { + if (std::find(other_deps.begin(), other_deps.end(), entry) == other_deps.end()) { + return false; + } + } + + return true; +} + +bool capiocl::engine::CapioCLEntry::operator!=(const CapioCLEntry &other) { + return !(*this == other); } \ No newline at end of file diff --git a/src/api.cpp b/src/api.cpp new file mode 100644 index 0000000..bba49c1 --- /dev/null +++ b/src/api.cpp @@ -0,0 +1,132 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "capiocl/api.h" +#include "capiocl/engine.h" +#include "capiocl/printer.h" + +std::mutex _setupMtx; +std::condition_variable _setupCv; +bool thread_ready = false; + +/// @brief Main WebServer thread function +void server(const std::string &address, const int port, capiocl::engine::Engine *engine, + std::atomic *terminate) { + + constexpr int RECV_BUF_SIZE = 65535; + + const auto &wf_name = engine->getWorkflowName(); + + int fd = socket(AF_INET, SOCK_DGRAM, 0); + + int reuse = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); + + sockaddr_in localAddr{}; + localAddr.sin_family = AF_INET; + localAddr.sin_port = htons(port); + localAddr.sin_addr.s_addr = INADDR_ANY; + + bind(fd, reinterpret_cast(&localAddr), sizeof(localAddr)); + + ip_mreq group{}; + group.imr_multiaddr.s_addr = inet_addr(address.c_str()); + group.imr_interface.s_addr = INADDR_ANY; + setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)); + + char buffer[RECV_BUF_SIZE] = {0}; + sockaddr_in srcAddr{}; + socklen_t addrlen = sizeof(srcAddr); + + // timeout of 1 second for termination + timeval tv{}; + tv.tv_sec = 0; + tv.tv_usec = 500; + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + + thread_ready = true; + _setupCv.notify_all(); + + while (!*terminate) { + // GCOVR_EXCL_START + ssize_t n = recvfrom(fd, buffer, RECV_BUF_SIZE - 1, 0, + reinterpret_cast(&srcAddr), &addrlen); + // GCOVR_EXCL_STOP + + if (n <= 0) { + continue; + } + + buffer[n] = '\0'; + + try { + auto data = jsoncons::json::parse(buffer); // GCOVR_EXCL_LINE + const auto path = + data.get_value_or("path", ""); // GCOVR_EXCL_LINE + if (path.empty()) { + continue; + } + const auto workflow_name = + data.get_value_or("workflow_name", ""); // GCOVR_EXCL_LINE + if (workflow_name.empty()) { + continue; + } + const auto jsonEntry = + data.get_value_or("CapioClEntry", ""); // GCOVR_EXCL_LINE + if (jsonEntry.empty()) { + continue; + } + auto rule = capiocl::engine::CapioCLEntry::fromJson(jsonEntry); + + if (workflow_name == wf_name) { + engine->add(path, rule); + } else { + continue; + } + + } catch (const jsoncons::json_exception &e) { + capiocl::printer::print(capiocl::printer::CLI_LEVEL_ERROR, + "APIServer: Received invalid json: " + std::string(e.what())); + } + } + + close(fd); +} + +capiocl::api::CapioClApiServer::CapioClApiServer(engine::Engine *engine, + configuration::CapioClConfiguration &config) + : capiocl_configuration(config) { + + std::string address; + int port; + try { + config.getParameter("dynamic_api.ip", &address); // GCOVR_EXCL_LINE + } catch (...) { + address = configuration::defaults::DEFAULT_API_MULTICAST_IP.v; + } + + try { + config.getParameter("dynamic_api.port", &port); // GCOVR_EXCL_LINE + } catch (...) { + port = std::stoi(configuration::defaults::DEFAULT_API_MULTICAST_PORT.v); + } + + _webApiThread = std::thread(server, address, port, engine, &_terminate); + + std::unique_lock lock(_setupMtx); + _setupCv.wait(lock, [] { return thread_ready; }); + + printer::print(printer::CLI_LEVEL_INFO, "API server @ " + address + ":" + std::to_string(port)); +} + +capiocl::api::CapioClApiServer::~CapioClApiServer() { + _terminate = true; + _webApiThread.join(); +} diff --git a/src/configuration.cpp b/src/configuration.cpp index abc3324..ad62d09 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -42,6 +42,8 @@ void capiocl::configuration::CapioClConfiguration::loadDefaults() { this->set(defaults::DEFAULT_MONITOR_MCAST_DELAY); this->set(defaults::DEFAULT_MONITOR_FS_ENABLED); this->set(defaults::DEFAULT_MONITOR_MCAST_ENABLED); + this->set(defaults::DEFAULT_API_MULTICAST_PORT); + this->set(defaults::DEFAULT_API_MULTICAST_IP); } void capiocl::configuration::CapioClConfiguration::set(const std::string &key, std::string value) { diff --git a/src/defaults.cpp b/src/defaults.cpp index b27d922..f9c00b2 100644 --- a/src/defaults.cpp +++ b/src/defaults.cpp @@ -19,4 +19,10 @@ ConfigurationEntry capiocl::configuration::defaults::DEFAULT_MONITOR_MCAST_ENABL "monitor.mcast.enabled", "true"}; ConfigurationEntry capiocl::configuration::defaults::DEFAULT_MONITOR_FS_ENABLED{ - "monitor.filesystem.enabled", "true"}; \ No newline at end of file + "monitor.filesystem.enabled", "true"}; + +ConfigurationEntry capiocl::configuration::defaults::DEFAULT_API_MULTICAST_IP{"dynamic_api.ip", + "224.224.224.3"}; + +ConfigurationEntry capiocl::configuration::defaults::DEFAULT_API_MULTICAST_PORT{"dynamic_api.port", + "11223"}; \ No newline at end of file diff --git a/src/webapi.cpp b/src/webapi.cpp deleted file mode 100644 index a401da2..0000000 --- a/src/webapi.cpp +++ /dev/null @@ -1,261 +0,0 @@ -#include "httplib.h" -#include "jsoncons/json.hpp" - -#include "capiocl/engine.h" -#include "capiocl/printer.h" -#include "capiocl/webapi.h" - -template void ok_response(Res &res) { - res.status = 200; - res.set_content(R"({"status" : "OK"})", "application/json"); -} - -template void error_response(Res &res, const std::exception &e) { - res.status = 400; - res.set_content(std::string(R"({"status" : "error", "what" : ")") + - "Invalid request BODY data: " + e.what() + "\"}", - "application/json"); -} - -template void json_response(Res &res, const jsoncons::json &body) { - res.status = 200; - res.set_content(body.as_string(), "application/json"); -} - -template -void process_post_request(const Req &req, Res &res, Fn &&handler) { - try { - jsoncons::json request_body = jsoncons::json::parse(req.body.empty() ? "{}" : req.body); - - handler(request_body); - ok_response(res); - } catch (const std::exception &e) { - error_response(res, e); - } -} - -template -void process_get_request(const Req &req, Res &res, Fn &&handler) { - try { - jsoncons::json request_body = jsoncons::json::parse(req.body.empty() ? "{}" : req.body); - - jsoncons::json reply; - handler(request_body, reply); - json_response(res, reply); - } catch (const std::exception &e) { - error_response(res, e); - } -} - -/// @brief Main WebServer thread function -void server(const std::string &address, const int port, capiocl::engine::Engine *engine) { - - pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); - - capiocl::printer::print(capiocl::printer::CLI_LEVEL_INFO, - "Starting API server @ " + address + ":" + std::to_string(port)); - - httplib::Server _server; - - _server.Post("/producer", [&](const httplib::Request &req, httplib::Response &res) { - process_post_request(req, res, [&](jsoncons::json &request_body) { - const auto path = request_body["path"].as(); - auto producer = request_body["producer"].as(); - engine->addProducer(path, producer); - }); - }); - - _server.Get("/producer", [&](const httplib::Request &req, httplib::Response &res) { - process_get_request(req, res, [&](jsoncons::json &request_body, jsoncons::json &reply) { - const auto path = request_body["path"].as(); - reply["producers"] = engine->getProducers(path); - }); - }); - - _server.Post("/consumer", [&](const httplib::Request &req, httplib::Response &res) { - process_post_request(req, res, [&](jsoncons::json &request_body) { - const auto path = request_body["path"].as(); - auto consumer = request_body["consumer"].as(); - engine->addConsumer(path, consumer); - }); - }); - - _server.Get("/consumer", [&](const httplib::Request &req, httplib::Response &res) { - process_get_request(req, res, [&](jsoncons::json &request_body, jsoncons::json &reply) { - const auto path = request_body["path"].as(); - reply["consumers"] = engine->getConsumers(path); - }); - }); - - _server.Post("/dependency", [&](const httplib::Request &req, httplib::Response &res) { - process_post_request(req, res, [&](jsoncons::json &request_body) { - const auto path = request_body["path"].as(); - auto dependency = std::filesystem::path(request_body["dependency"].as()); - engine->addFileDependency(path, dependency); - }); - }); - - _server.Get("/dependency", [&](const httplib::Request &req, httplib::Response &res) { - process_get_request(req, res, [&](jsoncons::json &request_body, jsoncons::json &reply) { - const auto path = request_body["path"].as(); - std::vector deps; - for (const auto &file : engine->getCommitOnFileDependencies(path)) { - deps.emplace_back(file); - } - reply["dependencies"] = deps; - }); - }); - - _server.Post("/commit", [&](const httplib::Request &req, httplib::Response &res) { - process_post_request(req, res, [&](jsoncons::json &request_body) { - const auto path = request_body["path"].as(); - auto commit_rule = request_body["commit"].as(); - engine->setCommitRule(path, commit_rule); - }); - }); - - _server.Get("/commit", [&](const httplib::Request &req, httplib::Response &res) { - process_get_request(req, res, [&](jsoncons::json &request_body, jsoncons::json &reply) { - const auto path = request_body["path"].as(); - reply["commit"] = engine->getCommitRule(path); - }); - }); - - _server.Post("/commit/file-count", [&](const httplib::Request &req, httplib::Response &res) { - process_post_request(req, res, [&](jsoncons::json &request_body) { - const auto path = request_body["path"].as(); - auto count = request_body["count"].as(); - engine->setDirectoryFileCount(path, count); - }); - }); - - _server.Get("/commit/file-count", [&](const httplib::Request &req, httplib::Response &res) { - process_get_request(req, res, [&](jsoncons::json &request_body, jsoncons::json &reply) { - const auto path = request_body["path"].as(); - reply["count"] = engine->getDirectoryFileCount(path); - }); - }); - - _server.Post("/commit/close-count", [&](const httplib::Request &req, httplib::Response &res) { - process_post_request(req, res, [&](jsoncons::json &request_body) { - const auto path = request_body["path"].as(); - auto count = request_body["count"].as(); - engine->setCommitedCloseNumber(path, count); - }); - }); - - _server.Get("/commit/close-count", [&](const httplib::Request &req, httplib::Response &res) { - process_get_request(req, res, [&](jsoncons::json &request_body, jsoncons::json &reply) { - const auto path = request_body["path"].as(); - reply["count"] = engine->getCommitCloseCount(path); - }); - }); - - _server.Post("/fire", [&](const httplib::Request &req, httplib::Response &res) { - process_post_request(req, res, [&](jsoncons::json &request_body) { - const auto path = request_body["path"].as(); - auto fire_rule = request_body["fire"].as(); - engine->setFireRule(path, fire_rule); - }); - }); - - _server.Get("/fire", [&](const httplib::Request &req, httplib::Response &res) { - process_get_request(req, res, [&](jsoncons::json &request_body, jsoncons::json &reply) { - const auto path = request_body["path"].as(); - reply["fire"] = engine->getFireRule(path); - }); - }); - - _server.Post("/permanent", [&](const httplib::Request &req, httplib::Response &res) { - process_post_request(req, res, [&](jsoncons::json &request_body) { - const auto path = request_body["path"].as(); - const auto permanent = request_body["permanent"].as(); - engine->setPermanent(path, permanent); - }); - }); - - _server.Get("/permanent", [&](const httplib::Request &req, httplib::Response &res) { - process_get_request(req, res, [&](jsoncons::json &request_body, jsoncons::json &reply) { - const auto path = request_body["path"].as(); - reply["permanent"] = engine->isPermanent(path); - }); - }); - - _server.Post("/exclude", [&](const httplib::Request &req, httplib::Response &res) { - process_post_request(req, res, [&](jsoncons::json &request_body) { - const auto path = request_body["path"].as(); - const auto excluded = request_body["exclude"].as(); - engine->setExclude(path, excluded); - }); - }); - - _server.Get("/exclude", [&](const httplib::Request &req, httplib::Response &res) { - process_get_request(req, res, [&](jsoncons::json &request_body, jsoncons::json &reply) { - const auto path = request_body["path"].as(); - reply["exclude"] = engine->isExcluded(path); - }); - }); - - _server.Post("/directory", [&](const httplib::Request &req, httplib::Response &res) { - process_post_request(req, res, [&](jsoncons::json &request_body) { - const auto path = request_body["path"].as(); - if (request_body["directory"].as()) { - engine->setDirectory(path); - } else { - engine->setFile(path); - } - }); - }); - - _server.Get("/directory", [&](const httplib::Request &req, httplib::Response &res) { - process_get_request(req, res, [&](jsoncons::json &request_body, jsoncons::json &reply) { - const auto path = request_body["path"].as(); - reply["directory"] = engine->isDirectory(path); - }); - }); - - _server.Post("/workflow", [&](const httplib::Request &req, httplib::Response &res) { - process_post_request(req, res, [&](jsoncons::json &request_body) { - const auto workflow_name = request_body["name"].as(); - engine->setWorkflowName(workflow_name); - }); - }); - - _server.Get("/workflow", [&](const httplib::Request &req, httplib::Response &res) { - process_get_request( - req, res, [&]([[maybe_unused]] jsoncons::json &request_body, jsoncons::json &reply) { - reply["name"] = engine->getWorkflowName(); - }); - }); - - _server.Get("/terminate", [&]([[maybe_unused]] const httplib::Request &req, - [[maybe_unused]] httplib::Response &res) { - process_get_request(req, res, - [&]([[maybe_unused]] jsoncons::json &request_body, - [[maybe_unused]] jsoncons::json &reply) { - capiocl::printer::print(capiocl::printer::CLI_LEVEL_INFO, - "API server stopped"); - _server.stop(); - }); - }); - - _server.listen(address, port); -} - -capiocl::webapi::CapioClWebApiServer::CapioClWebApiServer(engine::Engine *engine, - const std::string &web_server_address, - const int web_server_port) - : _port(web_server_port) { - _webApiThread = std::thread(server, web_server_address, web_server_port, engine); -} - -capiocl::webapi::CapioClWebApiServer::~CapioClWebApiServer() { - - httplib::Client client("http://127.0.0.1:" + std::to_string(_port)); - client.Get("/terminate"); - if (_webApiThread.joinable()) { - _webApiThread.join(); - } else { - return; - } -} diff --git a/tests/cpp/test_apis.hpp b/tests/cpp/test_apis.hpp index c3312b4..770dddf 100644 --- a/tests/cpp/test_apis.hpp +++ b/tests/cpp/test_apis.hpp @@ -3,236 +3,146 @@ #define WEBSERVER_SUITE_NAME TestWebServerAPIS -#include "jsoncons/json.hpp" -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include -enum class HttpMethod { GET, POST, DELETE }; +#include "capiocl.hpp" +#include "capiocl/engine.h" -static size_t curl_write_response_handler(const char *ptr, const size_t size, size_t nmemb, - void *userdata) { - auto *response = static_cast(userdata); - response->append(ptr, size * nmemb); - return size * nmemb; -} +#include -inline jsoncons::json perform_request(const std::string &url, - const std::string &request_params_json_encode, - HttpMethod method = HttpMethod::GET) { - CURL *curl = curl_easy_init(); - if (!curl) { - throw std::runtime_error("curl_easy_init failed"); +inline bool sendMulticast(const std::string &message, const std::string &multicast_ip, int port) { + const int fd = socket(AF_INET, SOCK_DGRAM, 0); + if (fd < 0) { + perror("socket"); + return false; } - std::string response; - - curl_slist *headers = nullptr; - headers = curl_slist_append(headers, "Content-Type: application/json"); - headers = curl_slist_append(headers, "Accept: application/json"); - - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, request_params_json_encode.c_str()); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, request_params_json_encode.size()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_response_handler); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); - - switch (method) { - case HttpMethod::GET: - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET"); - break; - - case HttpMethod::POST: - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST"); - break; - - case HttpMethod::DELETE: - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE"); - break; + const u_char ttl = 3; + if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) < 0) { + perror("setsockopt (TTL)"); + close(fd); + return false; } - const CURLcode res = curl_easy_perform(curl); - - long http_code = 0; - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code); + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = inet_addr(multicast_ip.c_str()); + addr.sin_port = htons(port); - curl_slist_free_all(headers); - curl_easy_cleanup(curl); + ssize_t nbytes = sendto(fd, message.c_str(), message.size(), 0, + reinterpret_cast(&addr), sizeof(addr)); - if (res != CURLE_OK) { - throw std::runtime_error(curl_easy_strerror(res)); + if (nbytes < 0) { + perror("sendto"); + close(fd); + return false; } - std::cout << "DBG RES: " << response << std::endl; - return jsoncons::json::parse(std::string(response)); + close(fd); + return true; } -TEST(WEBSERVER_SUITE_NAME, testGetAndSetWorkflowName) { +TEST(WEBSERVER_SUITE_NAME, TestSerializationDeserializationCapioCLRule) { + capiocl::engine::CapioCLEntry entry; - // clean environment for wf name - unsetenv("WORKFLOW_NAME"); - - auto engine = capiocl::engine::Engine(); - engine.startApiServer(); + const std::string def_rule = + "{\"commit_on_close_count\":0,\"commit_rule\":\"on_termination\",\"consumers\":[]," + "\"directory_children_count\":0,\"enable_directory_count_update\":true,\"excluded\":false," + "\"file_dependencies\":[],\"fire_rule\":\"update\",\"is_file\":true,\"permanent\":false," + "\"producers\":[],\"store_in_memory\":false}"; - sleep(1); + EXPECT_EQ(def_rule, entry.toJson()); - auto response = perform_request("http://localhost:5520/workflow", "{}", HttpMethod::GET); - - EXPECT_FALSE(response.empty()); - EXPECT_TRUE(response["name"] == capiocl::CAPIO_CL_DEFAULT_WF_NAME); - - perform_request("http://localhost:5520/workflow", R"({"name": "test_workflow_0"})", - HttpMethod::POST); - response = perform_request("http://localhost:5520/workflow", "{}", HttpMethod::GET); - EXPECT_FALSE(response.empty()); - EXPECT_TRUE(response["name"] == "test_workflow_0"); -} - -TEST(WEBSERVER_SUITE_NAME, consumer) { - capiocl::engine::Engine engine; - engine.startApiServer(); - sleep(1); - - auto result = - perform_request("http://localhost:5520/consumer", - R"({"path" : "/tmp/test.txt", "consumer" : "sample2"})", HttpMethod::POST); - EXPECT_TRUE(result["status"] == "OK"); - result = perform_request("http://localhost:5520/consumer", R"({"path" : "/tmp/test.txt"})", - HttpMethod::GET); - EXPECT_TRUE(result["consumers"][0] == "sample2"); -} - -TEST(WEBSERVER_SUITE_NAME, producer) { - capiocl::engine::Engine engine; - engine.startApiServer(); - sleep(1); - - auto result = - perform_request("http://localhost:5520/producer", - R"({"path" : "/tmp/test.txt", "producer" : "sample1"})", HttpMethod::POST); - EXPECT_TRUE(result["status"] == "OK"); - result = perform_request("http://localhost:5520/producer", R"({"path" : "/tmp/test.txt"})", - HttpMethod::GET); - EXPECT_TRUE(result["producers"][0] == "sample1"); -} + entry.commit_on_close_count = 10; + entry.commit_rule = "on_close"; + entry.consumers = {"aaaaa"}; + entry.producers = {"bbbbb"}; + entry.directory_children_count = 12; + entry.enable_directory_count_update = false; + entry.excluded = true; + entry.file_dependencies = {"cccc"}; + entry.is_file = false; + entry.permanent = true; + entry.store_in_memory = true; -TEST(WEBSERVER_SUITE_NAME, commit) { - capiocl::engine::Engine engine; - engine.startApiServer(); - sleep(1); - - auto result = - perform_request("http://localhost:5520/commit", - R"({"path" : "/tmp/test.txt","commit" : "on_file"})", HttpMethod::POST); - EXPECT_TRUE(result["status"] == "OK"); - result = perform_request("http://localhost:5520/commit", R"({"path" : "/tmp/test.txt"})", - HttpMethod::GET); - EXPECT_TRUE(result["commit"] == "on_file"); -} + const std::string def_rule2 = + "{\"commit_on_close_count\":10,\"commit_rule\":\"on_close\",\"consumers\":[\"aaaaa\"]," + "\"directory_children_count\":12,\"enable_directory_count_update\":false,\"excluded\":true," + "\"file_dependencies\":[\"cccc\"],\"fire_rule\":\"update\",\"is_file\":false,\"permanent\":" + "true,\"producers\":[\"bbbbb\"],\"store_in_memory\":true}"; -TEST(WEBSERVER_SUITE_NAME, fire) { - capiocl::engine::Engine engine; - engine.startApiServer(); - sleep(1); - - auto result = - perform_request("http://localhost:5520/fire", - R"({"path" : "/tmp/test.txt","fire" : "no_update"})", HttpMethod::POST); - EXPECT_TRUE(result["status"] == "OK"); - result = perform_request("http://localhost:5520/fire", R"({"path" : "/tmp/test.txt"})", - HttpMethod::GET); - EXPECT_TRUE(result["fire"] == "no_update"); -} + EXPECT_EQ(def_rule2, entry.toJson()); -TEST(WEBSERVER_SUITE_NAME, fileDependency) { - capiocl::engine::Engine engine; - engine.startApiServer(); - sleep(1); - - auto result = perform_request("http://localhost:5520/dependency", - R"({"path" : "/tmp/test.txt", "dependency" : "myFile.dat"})", - HttpMethod::POST); - EXPECT_TRUE(result["status"] == "OK"); - result = perform_request("http://localhost:5520/dependency", R"({"path" : "/tmp/test.txt"})", - HttpMethod::GET); - EXPECT_TRUE(result["dependencies"][0] == "myFile.dat"); -} + auto new_rule = capiocl::engine::CapioCLEntry::fromJson(def_rule2); -TEST(WEBSERVER_SUITE_NAME, on_n_files) { - capiocl::engine::Engine engine; - engine.startApiServer(); - sleep(1); - - auto result = perform_request("http://localhost:5520/commit/file-count", - R"({"path" : "/tmp/test.txt","count" : 7892})", HttpMethod::POST); - EXPECT_TRUE(result["status"] == "OK"); - result = perform_request("http://localhost:5520/commit/file-count", - R"({"path" : "/tmp/test.txt"})", HttpMethod::GET); - EXPECT_EQ(result["count"], 7892); -} + EXPECT_TRUE(new_rule == entry); -TEST(WEBSERVER_SUITE_NAME, close_count) { - capiocl::engine::Engine engine; - engine.startApiServer(); - sleep(1); - - auto result = - perform_request("http://localhost:5520/commit/close-count", - R"({"path" : "/tmp/test.txt","count" : 12345})", HttpMethod::POST); - EXPECT_TRUE(result["status"] == "OK"); - result = perform_request("http://localhost:5520/commit/close-count", - R"({"path" : "/tmp/test.txt"})", HttpMethod::GET); - EXPECT_EQ(result["count"], 12345); + entry.enable_directory_count_update = true; + EXPECT_TRUE(entry != new_rule); } -TEST(WEBSERVER_SUITE_NAME, test_error) { +TEST(WEBSERVER_SUITE_NAME, TestWebServerAPIS) { capiocl::engine::Engine engine; engine.startApiServer(); - sleep(1); - - auto result = perform_request("http://localhost:5520/commit", R"({})", HttpMethod::POST); - EXPECT_TRUE(result["status"] == "error"); - EXPECT_GT(result["what"].as_string().size(), 0); -} - -TEST(WEBSERVER_SUITE_NAME, boolean_flag) { + EXPECT_FALSE(engine.contains("file.txt")); + + EXPECT_TRUE( + sendMulticast(R"({"path" : "file.txt", "workflow_name" : "notMyWorkflow"})", + capiocl::configuration::defaults::DEFAULT_API_MULTICAST_IP.v, + stoi(capiocl::configuration::defaults::DEFAULT_API_MULTICAST_PORT.v))); + + EXPECT_TRUE( + sendMulticast("notAValidJson", capiocl::configuration::defaults::DEFAULT_API_MULTICAST_IP.v, + stoi(capiocl::configuration::defaults::DEFAULT_API_MULTICAST_PORT.v))); + + EXPECT_TRUE( + sendMulticast(R"({"workflow_name" : "notMyWorkflow", "CapioClEntry":{}})", + capiocl::configuration::defaults::DEFAULT_API_MULTICAST_IP.v, + stoi(capiocl::configuration::defaults::DEFAULT_API_MULTICAST_PORT.v))); + + EXPECT_TRUE( + sendMulticast(R"({"path" : "file.txt", "CapioClEntry":{}})", + capiocl::configuration::defaults::DEFAULT_API_MULTICAST_IP.v, + stoi(capiocl::configuration::defaults::DEFAULT_API_MULTICAST_PORT.v))); + + EXPECT_TRUE( + sendMulticast(R"({"path" : "file.txt", "workflow_name" : "notMyWorkflow"})", + capiocl::configuration::defaults::DEFAULT_API_MULTICAST_IP.v, + stoi(capiocl::configuration::defaults::DEFAULT_API_MULTICAST_PORT.v))); + + capiocl::engine::CapioCLEntry entry; + entry.commit_rule = "on_file"; + entry.commit_on_close_count = 10; + entry.fire_rule = "no_update"; + + EXPECT_TRUE(sendMulticast( + R"({ "path" : "file.txt","workflow_name" : "notMyWorkflow", "CapioClEntry":)" + + entry.toJson() + "}", + capiocl::configuration::defaults::DEFAULT_API_MULTICAST_IP.v, + stoi(capiocl::configuration::defaults::DEFAULT_API_MULTICAST_PORT.v))); + + std::string request = R"({ "path" : "file.txt","workflow_name" : ")"; + request += capiocl::CAPIO_CL_DEFAULT_WF_NAME; + request += R"(", "CapioClEntry":)" + entry.toJson() + "}"; + + EXPECT_TRUE( + sendMulticast(request, capiocl::configuration::defaults::DEFAULT_API_MULTICAST_IP.v, + stoi(capiocl::configuration::defaults::DEFAULT_API_MULTICAST_PORT.v))); + + // timeout to allow server to process the request + while (!engine.contains("file.txt")) { + sleep(1); + } - capiocl::engine::Engine engine; - engine.startApiServer(); - sleep(1); - - auto result = - perform_request("http://localhost:5520/permanent", - R"({"path" : "/tmp/test.txt","permanent" : true})", HttpMethod::POST); - EXPECT_TRUE(result["status"] == "OK"); - result = perform_request("http://localhost:5520/permanent", R"({"path" : "/tmp/test.txt"})", - HttpMethod::GET); - EXPECT_TRUE(result["permanent"].as_bool()); - - result = perform_request("http://localhost:5520/exclude", - R"({"path" : "/tmp/test.txt","exclude" : true})", HttpMethod::POST); - EXPECT_TRUE(result["status"] == "OK"); - result = perform_request("http://localhost:5520/exclude", R"({"path" : "/tmp/test.txt"})", - HttpMethod::GET); - EXPECT_TRUE(result["exclude"].as_bool()); - - result = perform_request("http://localhost:5520/directory", - R"({"path" : "/tmp/test.txt","directory" : true})", HttpMethod::POST); - EXPECT_TRUE(result["status"] == "OK"); - result = perform_request("http://localhost:5520/directory", R"({"path" : "/tmp/test.txt"})", - HttpMethod::GET); - EXPECT_TRUE(result["directory"].as_bool()); - - result = perform_request("http://localhost:5520/directory", - R"({"path" : "/tmp/test.txt","directory" : false})", HttpMethod::POST); - EXPECT_TRUE(result["status"] == "OK"); - result = perform_request("http://localhost:5520/directory", R"({"path" : "/tmp/test.txt"})", - HttpMethod::GET); - EXPECT_FALSE(result["directory"].as_bool()); + EXPECT_TRUE(engine.contains("file.txt")); + EXPECT_EQ(engine.getCommitRule("file.txt"), entry.commit_rule); + EXPECT_EQ(engine.getCommitCloseCount("file.txt"), entry.commit_on_close_count); + EXPECT_EQ(engine.getFireRule("file.txt"), entry.fire_rule); } #endif // CAPIO_CL_TEST_APIS_HPP diff --git a/tests/cpp/test_engine.hpp b/tests/cpp/test_engine.hpp index 5a81274..db95aa1 100644 --- a/tests/cpp/test_engine.hpp +++ b/tests/cpp/test_engine.hpp @@ -638,4 +638,28 @@ TEST(ENGINE_SUITE_NAME, TestInheritanceFromParentPaths) { EXPECT_TRUE(engine.getFireRule("/test/a/b/c/d") == capiocl::fireRules::NO_UPDATE); } +TEST(ENGINE_SUITE_NAME, TestEngineAddCapioClRule) { + capiocl::engine::Engine engine; + + engine.newFile("/test1"); + + capiocl::engine::CapioCLEntry entry, entry1; + entry.commit_rule = "on_file"; + entry1.commit_rule = "on_commit"; + + engine.add("/test1", entry); + + EXPECT_EQ(entry.commit_rule, engine.getCommitRule("/test1")); + + engine.add("/test2", entry); + + EXPECT_EQ(entry.commit_rule, engine.getCommitRule("/test2")); + + const auto new_entry = entry + entry1; + EXPECT_TRUE(entry1 == new_entry); + EXPECT_FALSE(entry == new_entry); + + // TODO: test all entries of capioCL rule +} + #endif // CAPIO_CL_ENGINE_HPP \ No newline at end of file