diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..fad45ba --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,585 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + +[[package]] +name = "anyhow" +version = "1.0.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0" + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "autocfg" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + +[[package]] +name = "c_linked_list" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4964518bd3b4a8190e832886cdc0da9794f12e8e6c1613a9e90ff331c4c8724b" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "chrono" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +dependencies = [ + "libc", + "num-integer", + "num-traits", + "time", + "winapi 0.3.9", +] + +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + +[[package]] +name = "derive_more" +version = "0.99.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn", +] + +[[package]] +name = "env_logger" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "gcc" +version = "0.3.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" + +[[package]] +name = "get_if_addrs" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abddb55a898d32925f3148bd281174a68eeb68bbfd9a5938a57b18f506ee4ef7" +dependencies = [ + "c_linked_list", + "get_if_addrs-sys", + "libc", + "winapi 0.2.8", +] + +[[package]] +name = "get_if_addrs-sys" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d04f9fb746cf36b191c00f3ede8bde9c8e64f9f4b05ae2694a9ccf5e3f5ab48" +dependencies = [ + "gcc", + "libc", +] + +[[package]] +name = "getrandom" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi 0.3.9", +] + +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "itoa" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0005d08a8f7b65fb8073cb697aa0b12b631ed251ce73d862ce50eeb52ce3b50" + +[[package]] +name = "lock_api" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88943dd7ef4a2e5a4bfa2753aaab3013e34ce2533d1996fb18ef591e315e2b3b" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + +[[package]] +name = "memchr" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" + +[[package]] +name = "mio" +version = "0.7.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi 0.3.9", +] + +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi 0.3.9", +] + +[[package]] +name = "molecular-rust" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "derive_more", + "env_logger", + "get_if_addrs", + "hostname", + "lazy_static", + "log", + "regex", + "serde_json", + "thiserror", + "tokio", + "uuid", +] + +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi 0.3.9", +] + +[[package]] +name = "num-integer" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +dependencies = [ + "autocfg", +] + +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" + +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi 0.3.9", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" + +[[package]] +name = "proc-macro2" +version = "1.0.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quote" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "864d3e96a899863136fc6e99f3d7cae289dafe43bf2c5ac19b70df7210c0a145" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + +[[package]] +name = "ryu" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "semver" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0486718e92ec9a68fbed73bb5ef687d71103b142595b406835649bebd33f72c7" + +[[package]] +name = "serde" +version = "1.0.136" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" + +[[package]] +name = "serde_json" +version = "1.0.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d23c1ba4cf0efd44be32017709280b32d1cea5c3f1275c3b6d9e8bc54f758085" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" + +[[package]] +name = "syn" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "thiserror" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "time" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" +dependencies = [ + "libc", + "wasi", + "winapi 0.3.9", +] + +[[package]] +name = "tokio" +version = "1.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a" +dependencies = [ + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "once_cell", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "tokio-macros", + "winapi 0.3.9", +] + +[[package]] +name = "tokio-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-xid" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" + +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +dependencies = [ + "getrandom", + "serde", +] + +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi 0.3.9", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..14cfeb6 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "molecular-rust" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +chrono = "0.4.19" +env_logger = "0.9.0" +log= "0.4.14" +serde_json = "1.0.0" +anyhow = "1.0.53" +tokio = { version = "1.16.1", features = ["full" ,"time"] } +regex = "1.5.4" +lazy_static = "1.3.0" +derive_more = "0.99.17" +thiserror = "1.0" +uuid = { version = "0.8", features = ["serde", "v4"] } +hostname = "0.3.1" +get_if_addrs = "0.5.3" \ No newline at end of file diff --git a/makefile b/makefile new file mode 100644 index 0000000..beec8ab --- /dev/null +++ b/makefile @@ -0,0 +1,8 @@ +#Run library test while suppressing the warnings +test: + RUSTFLAGS=-Awarnings cargo test --lib -- --nocapture +test-broker: + RUSTFLAGS=-Awarnings cargo test --lib -- broker::tests --nocapture +test-services: + RUSTFLAGS=-Awarnings cargo test --lib -- services::tests --nocapture +.PHONY: test test-broker test-services \ No newline at end of file diff --git a/src/bin/entrypoint.rs b/src/bin/entrypoint.rs new file mode 100644 index 0000000..f328e4d --- /dev/null +++ b/src/bin/entrypoint.rs @@ -0,0 +1 @@ +fn main() {} diff --git a/src/broker.rs b/src/broker.rs new file mode 100644 index 0000000..c6d0b31 --- /dev/null +++ b/src/broker.rs @@ -0,0 +1,701 @@ +use std::{ + any::Any, + collections::HashMap, + sync::{mpsc::Receiver, Arc}, +}; + +use anyhow::{bail, Result}; +use log::{debug, info, warn}; + +use crate::{ + context::Context, + errors::ServiceBrokerError, + registry::{self, endpoint_list, node, ActionEndpoint, EndpointTrait, EventEndpoint, Payload}, + strategies::{RoundRobinStrategy, Strategy}, + utils, +}; +use chrono::{DateTime, Duration, Local, NaiveDateTime, Utc}; +use serde_json::Value; +use tokio::{ + sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, RwLock, + }, + task, +}; + +use crate::{registry::Logger, service::ServiceSpec, Registry, Service}; + +#[derive(Debug)] +struct RetryPolicy { + enabled: bool, + retries: usize, + delay: usize, + max_delay: usize, + factor: usize, + /* + check : + */ +} +impl Default for RetryPolicy { + fn default() -> Self { + Self { + enabled: false, + retries: 5, + delay: 100, + max_delay: 1000, + factor: 2, + } + } +} + +#[derive(Debug)] +pub struct BrokerOptions { + transporter: String, + heartbeat_frequency: Duration, + heartbeat_timeout: Duration, + offline_check_frequency: Duration, + offline_timeout: Duration, + neighbours_checkout_timeout: Duration, + wait_for_dependencies_timeout: Duration, + namespace: String, + request_timeout: Duration, + mcall_timeout: Duration, + retry_policy: RetryPolicy, + pub max_call_level: usize, + pub metrics: bool, + metrics_rate: f32, + wait_for_neighbours_interval: Duration, + dont_wait_for_neighbours: bool, + strategy_factory: RwLock, + pub metadata : Value + /* + discover_node_id : fn()->String, + metrics bool + metric + middleware + loglevel + logformat + transporter factory + */ +} + +impl Default for BrokerOptions { + fn default() -> Self { + Self { + transporter: "TCP".to_string(), + heartbeat_frequency: Duration::seconds(5), + heartbeat_timeout: Duration::seconds(15), + offline_check_frequency: Duration::seconds(20), + offline_timeout: Duration::minutes(10), + dont_wait_for_neighbours: true, + neighbours_checkout_timeout: Duration::seconds(2), + wait_for_dependencies_timeout: Duration::seconds(2), + namespace: "".to_string(), + request_timeout: Duration::seconds(3), + mcall_timeout: Duration::seconds(5), + retry_policy: RetryPolicy::default(), + metrics: false, + metrics_rate: 1.0, + max_call_level: 100, + wait_for_neighbours_interval: Duration::milliseconds(200), + strategy_factory: RwLock::new(RoundRobinStrategy::new()),metadata:Value::Null + } + } +} + +#[derive(Debug)] + +pub struct ServiceBroker { + reciever: UnboundedReceiver, + pub(crate) sender: UnboundedSender, + started: bool, + namespace: Option, + metdata: Payload, + pub node_id: String, + pub instance: String, + services: Vec, + pub transit: Option, + pub logger: Arc, + pub options: BrokerOptions, + + /* + local bus + + logger + metricss + middlewere + cacher + serializer + error generator + validator + tracer + transporter + */ + registry: Option, +} +#[derive(Debug)] + +pub struct Transit {} + +impl ServiceBroker { + fn start(&mut self) { + let time = Utc::now(); + self.started = true; + } + + fn stop(&mut self) { + todo!("handle stopping the broker") + } + + fn add_local_service(&mut self, service: Service) { + self.services.push(service); + } + fn register_local_service(&mut self, service: ServiceSpec) -> anyhow::Result<()> { + match &mut self.registry { + Some(registry) => registry.register_local_service(service), + None => todo!(), + } + } + + async fn destroy_service(&mut self, name: &str, version: &str) -> Result<()> { + let service_index = self.get_local_service_index(name, version); + if service_index.is_none() { + bail!( + "no service with the name {} and version {} found", + name, + version + ); + } + let service_index = service_index.unwrap(); + let mut full_name = "".to_string(); + + { + let service = self.services.get_mut(service_index).unwrap(); + full_name = service.full_name.clone(); + service.stop().await; + } + { + self.services.remove(service_index); + } + match &mut self.registry { + Some(registry) => registry.unregister_service(&full_name, Some(&self.node_id)), + None => todo!(), + } + + self.services_changed(true); + Ok(()) + } + + fn services_changed(&self, local_service: bool) { + if self.started && local_service { + todo!("notifify remote nodes") + } + } + fn get_local_service_index(&self, name: &str, version: &str) -> Option { + self.services.iter().position(|s| { + if s.name == name && s.version == version { + return true; + } + false + }) + } + + fn wait_for_services(&self, service_names: Vec, timeout: i64, interval: i64) { + info!("Waiting for service(s) {:?}", service_names); + let start_time = Local::now(); + let check = async { + let service_statuses = service_names.iter().map(|service_name| { + let status = self + .registry + .as_ref() + .unwrap() + .has_services(service_name, None); + return ServiceStatus { + name: service_name, + available: status, + }; + }); + + let available_services: Vec = service_statuses + .clone() + .filter(|service| service.available) + .collect(); + if available_services.len() == service_names.len() { + info!("Service(s) {:?} are available.", service_names); + return; + } + + let unavailable_services: Vec = service_statuses + .filter(|service| !service.available) + .collect(); + + debug!("{} {:?} of {} services are availablle. {} {} are still unavailable. Waiting for further..." , + available_services.len(), + available_services.iter().map(|service_status| service_status.name).collect::(), + service_names.len(), + unavailable_services.len(), + unavailable_services.iter().map(|service_status|service_status.name).collect::()); + + if Local::now() - start_time > Duration::milliseconds(timeout) { + //TODO: reject the future. + return; + } + //TODO: add the setTimeout thing + // Delay::new() + }; + } + + async fn call( + &self, + action_name: &str, + params: Payload, + opts: CallOptions, + sender: oneshot::Sender>, + ) -> anyhow::Result<()> { + let ctx = Context::new(&self, "test_service".to_string()); + let ctx = ctx.child_action_context(&self, params.clone(), Some(opts.clone()), action_name); + let endpoint = self.find_next_action_endpoint(action_name, &opts, &ctx)?; + let endpoint = endpoint.clone(); + if endpoint.is_local() { + debug!( + "Call action locally. {{ action: {} , request_id : {:?} }}", + ctx.action(), + ctx.request_id + ) + } else { + debug!( + "Call action on remote node. {{ action: {} , node_id : {} , request_id : {:?} }}", + ctx.action(), + ctx.node_id(), + ctx.request_id + ) + } + task::spawn( async move { + let result = (endpoint.action.handler)(ctx, Some(params)); + let _ = sender.send(Ok(result)); + }); + Ok(()) + } + + fn find_next_action_endpoint( + &self, + action_name: &str, + opts: &CallOptions, + ctx: &Context, + ) -> anyhow::Result<&ActionEndpoint> { + if let Some(node_id) = &opts.node_id { + let ep = self + .registry + .as_ref() + .unwrap() + .get_action_endpoint_by_node_id(action_name, node_id); + match ep { + Some(ep) => Ok(ep), + None => { + warn!("Service {} is not found on {} node.", action_name, node_id); + + bail!(ServiceBrokerError::ServiceNotFound { + action_name: action_name.to_string(), + node_id: node_id.to_string() + }) + } + } + } else { + //Get endpoint list by action name. + let ep_list = self + .registry + .as_ref() + .unwrap() + .get_action_endpoints(action_name); + match ep_list { + Some(ep_list) => { + let ep = ep_list.next(ctx, &self.options.strategy_factory); + match ep { + Some(ep) => Ok(ep), + None => { + warn!("Service {} is not available.", action_name); + bail!(ServiceBrokerError::ServiceNotAvailable { + action_name: action_name.to_string(), + node_id: "".to_string() + }); + } + } + } + None => { + warn!("Service {} is not registered.", action_name); + + bail!(ServiceBrokerError::ServiceNotFound { + action_name: action_name.to_string(), + node_id: "".to_string() + }) + } + } + } + } + + fn get_local_action_endpoint( + &self, + action_name: &str, + ctx: &Context, + ) -> anyhow::Result<&ActionEndpoint> { + //Find action endpoints by name. + let ep_list = self + .registry + .as_ref() + .unwrap() + .get_action_endpoints(action_name); + let available = match ep_list { + Some(endpoint_list) => !endpoint_list.has_local(), + None => false, + }; + if !available { + bail!(ServiceBrokerError::ServiceNotFound { + action_name: action_name.to_string(), + node_id: self.node_id.clone() + }) + } + //Get local endpoint. + match ep_list + .unwrap() + .next_local(ctx, &self.options.strategy_factory) + { + Some(ep) => Ok(ep), + None => { + bail!(ServiceBrokerError::ServiceNotAvailable { + action_name: action_name.to_string(), + node_id: self.node_id.clone() + }) + } + } + } + + fn get_local_node_info(&self) -> anyhow::Result { + self.registry.as_ref().unwrap().get_local_node_info(false) + } + + fn emit_local_services(&self, ctx: Context) { + todo!("event catalog implementation left") + + // self.registry. + } + + fn get_cpu_usage() { + todo!("get cpu usageI") + } + + fn generate_uid() -> String { + utils::generate_uuid() + } +} + +#[derive(Debug)] +pub enum ServiceBrokerMessage { + AddLocalService(Service), + RegisterLocalService(ServiceSpec), + WaitForServices { + dependencies: Vec, + timeout: i64, + interval: i64, + }, + Broadcast { + event_name: String, + data: Value, + opts: Value, + }, + Emit { + event_name: String, + data: Value, + opts: Value, + }, + Call { + action_name: String, + params: Payload, + opts: CallOptions, + result_channel: oneshot::Sender>, + }, + Close, +} +impl PartialEq for ServiceBrokerMessage { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::AddLocalService(l0), Self::AddLocalService(r0)) => l0 == r0, + (Self::RegisterLocalService(l0), Self::RegisterLocalService(r0)) => l0 == r0, + ( + Self::WaitForServices { + dependencies: l_dependencies, + timeout: l_timeout, + interval: l_interval, + }, + Self::WaitForServices { + dependencies: r_dependencies, + timeout: r_timeout, + interval: r_interval, + }, + ) => { + l_dependencies == r_dependencies + && l_timeout == r_timeout + && l_interval == r_interval + } + ( + Self::Broadcast { + event_name: l_event_name, + data: l_data, + opts: l_opts, + }, + Self::Broadcast { + event_name: r_event_name, + data: r_data, + opts: r_opts, + }, + ) => l_event_name == r_event_name && l_data == r_data && l_opts == r_opts, + ( + Self::Emit { + event_name: l_event_name, + data: l_data, + opts: l_opts, + }, + Self::Emit { + event_name: r_event_name, + data: r_data, + opts: r_opts, + }, + ) => l_event_name == r_event_name && l_data == r_data && l_opts == r_opts, + ( + Self::Call { + action_name: l_action_name, + params: l_params, + opts: l_opts, + result_channel: l_result_channel, + }, + Self::Call { + action_name: r_action_name, + params: r_params, + opts: r_opts, + result_channel: r_result_channel, + }, + ) => l_action_name == r_action_name && l_params == r_params && l_opts == r_opts, + (Self::Close, Self::Close) => true, + _ => false, + } + } +} + +#[derive(Debug)] +pub struct HandlerResult { + // pub(crate) data: u32, + pub(crate) data : Box, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CallOptions { + meta: Payload, + node_id: Option, +} + +struct ServiceStatus<'a> { + name: &'a str, + available: bool, +} + +#[cfg(test)] +mod tests { + use crate::{ + registry::{Action, Visibility}, + service::{Schema, SchemaMerged}, + Registry, Service, + }; + use tokio::{sync::mpsc, task}; + + use super::*; + fn test_merge_func() { + println!("test merge function"); + } + fn test_started_func() { + println!("test start func"); + } + fn test_created_func() { + println!("test created func"); + } + fn test_stop_func() { + println!("test stop func"); + } + fn action_func(context: Context, payload: Option) -> HandlerResult { + + let data = fibonacci(40); + + HandlerResult {data: Box::new(data) } + } + + fn get_test_broker( + recv: mpsc::UnboundedReceiver, + sender: mpsc::UnboundedSender, + ) -> ServiceBroker { + ServiceBroker { + reciever: recv, + started: false, + namespace: None, + metdata: Payload {}, + sender, + node_id: "test_node".to_string(), + instance: "test_instance".to_string(), + services: Vec::new(), + transit: None, + logger: Arc::new(Logger {}), + options: BrokerOptions::default(), + registry: None, + } + } + fn get_test_schema(dependencies: Option>) -> Schema { + let merged = SchemaMerged::MergedFn(test_merge_func); + let schema = Schema { + mixins: None, + actions: None, + events: None, + merged, + name: "test_service".to_string(), + version: None, + settings: HashMap::new(), + metadata: None, + created: Some(test_created_func), + started: Some(test_started_func), + stopped: Some(test_stop_func), + dependencies: dependencies, + }; + + schema + } + fn fibonacci(n: u32) -> u32 { + match n { + 0 => 1, + 1 => 1, + _ => fibonacci(n - 1) + fibonacci(n - 2), + } + } + fn get_test_service( + schema: Option, + settings: Option>, + actions: Option>, + broker_sender: Option>, + ) -> Service { + let name = "test_service".to_string(); + let version = "1.0".to_string(); + let full_name = Service::get_versioned_full_name(&name, Some(&version)); + let settings = match settings { + Some(settings) => settings, + None => HashMap::new(), + }; + let schema = match schema { + Some(schema) => schema, + None => get_test_schema(None), + }; + + let original_schema = get_test_schema(None); + let broker_sender = match broker_sender { + Some(sender) => sender, + None => { + let (sender, recv) = mpsc::unbounded_channel::(); + sender + } + }; + let service = Service { + name, + full_name, + version, + settings, + schema, + original_schema: Some(original_schema), + metadata: HashMap::new(), + actions: actions, + events: None, + broker_sender, + }; + service + } + #[test] + fn broker_call() { + let (sender, recv) = mpsc::unbounded_channel::(); + let (_, fake_recv) = mpsc::unbounded_channel(); + let mut broker_original = get_test_broker(recv, sender.clone()); + let broker = get_test_broker(fake_recv, sender.clone()); + let broker_arc = Arc::new(broker); + let action = Action { + name: "action_func".to_string(), + visibility: Visibility::Public, + handler: action_func, + }; + let service = get_test_service(None, None, Some(vec![action]), Some(sender.clone())); + let registry = Registry::new(Arc::clone(&broker_arc), sender.clone()); + let actions = registry.actions.clone(); + broker_original.registry = Some(registry); + broker_original.start(); + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + let join_handle = rt.spawn(async move { + while let Some(message) = broker_original.reciever.recv().await { + match message { + ServiceBrokerMessage::AddLocalService(service) => { + broker_original.add_local_service(service) + } + ServiceBrokerMessage::RegisterLocalService(service_spec) => broker_original + .register_local_service(service_spec) + .unwrap(), + ServiceBrokerMessage::Call { + action_name, + params, + opts, + result_channel, + } => { + let result = broker_original + .call(&action_name, params, opts, result_channel) + .await; + } + ServiceBrokerMessage::Close => return, + _ => {} + } + } + }); + // let sender = sender.clone(); + task::spawn(async move { + service.init().await; + service.start().await; + println!("{:?}", actions); + let start = Local::now(); + let sender = sender.clone(); + let mut jhs = Vec::new(); + for _ in 0..10 { + let sender = sender.clone(); + let jh = task::spawn(async move { + let (one_sender, recv) = oneshot::channel(); + + let _ = sender.send(ServiceBrokerMessage::Call { + action_name: "action_func".to_string(), + params: Payload {}, + opts: CallOptions { + meta: Payload {}, + node_id: Some("test_node".to_string()), + }, + result_channel: one_sender, + }); + let _result = recv.await; + println!("{:?}" , _result); + }); + jhs.push(jh); + } + for jh in jhs{ + jh.await; + } + let end = Local::now(); + let __ = sender.send(ServiceBrokerMessage::Close); + println!("{:?}", end - start); + drop(sender); + }); + + let _ = join_handle.await; + }); + } +} diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 0000000..0c42bcf --- /dev/null +++ b/src/context.rs @@ -0,0 +1,237 @@ +use std::sync::Arc; + +use anyhow::{bail, Result}; +use chrono::NaiveDateTime; +use serde_json::{json, Value}; +use tokio::sync::{ + mpsc::{Sender, UnboundedSender}, + oneshot, +}; + +use crate::{ + broker::{BrokerOptions, CallOptions}, + registry::{service_item::ServiceItem, Action, EndpointTrait, EndpointType, Event, Payload}, + utils, HandlerResult, ServiceBroker, ServiceBrokerMessage, +}; +#[derive(Debug, Clone)] + +struct ContextOptions { + timeout: Option, + retries: Option, +} + +#[derive(Debug)] +pub struct Context { + id: String, + pub request_id: Option, + broker_sender: UnboundedSender, + action: Option, + event: Option, + parent_id: Option, + event_groups: Option>, + event_type: EventType, + pub params: Option, + meta: Payload, + caller: Option, + locals: Option, + node_id: Option, + + tracing: bool, + level: usize, + + service: String, + + options: ContextOptions, + parent_ctx: Option>, + /* + tracing + span + span_stack + need_ack + ack_id + startTime = null; + startHrTime = null; + stopTime = null; + duration = null; + error = null; + */ + cached_result: bool, +} +#[derive(Debug)] +enum EventType { + Emit, + Broadcast, +} + +impl Context { + pub fn new(broker: &ServiceBroker, service: String) -> Self { + let id = utils::generate_uuid(); + let request_id = id.clone(); + let meta = Payload {}; + + Self { + id, + request_id: Some(request_id), + broker_sender: broker.sender.clone(), + action: None, + event: None, + parent_id: None, + event_groups: None, + event_type: EventType::Emit, + meta, + caller: None, + locals: None, + node_id: Some(broker.node_id.clone()), + tracing: false, + level: 1, + options: ContextOptions { + timeout: None, + retries: None, + }, + parent_ctx: None, + cached_result: false, + service: service, + params: None, + } + } + + pub fn child_action_context( + &self, + broker: &ServiceBroker, + params: Payload, + opts: Option, + action: &str, + ) -> Self { + // let parent_ctx = self.clone(); + let meta = self.meta.to_owned(); + if broker.options.metrics { + //TODO: + //meta = meta.add("tracing" , true) + } + if opts.is_some() { + //TODO: + //copy the options meta + } + let id = utils::generate_uuid(); + let mut request_id = id.clone(); + if let Some(id) = self.request_id.clone() { + request_id = id; + } + + let mut caller = action.to_string(); + + let service: Vec<&str> = caller.split('.').collect(); + let service = service.get(0).unwrap().to_string(); + Self { + id, + request_id: Some(request_id), + broker_sender: self.broker_sender.clone(), + action: Some(action.to_string()), + event: None, + parent_id: Some(self.id.clone()), + event_groups: None, + event_type: EventType::Emit, + meta, + caller: Some(caller), + locals: self.locals.to_owned(), + node_id: self.node_id.clone(), + tracing: self.tracing, + level: self.level + 1, + options: self.options.to_owned(), + parent_ctx: None, + cached_result: false, + service: service, + params: Some(params), + } + } + + pub fn action(&self) -> &str { + self.action.as_ref().unwrap() + } + pub fn node_id(&self) -> &String { + self.node_id.as_ref().unwrap() + } + + async fn call( + &self, + broker_options: &BrokerOptions, + action_name: &str, + params: Payload, + mut opts: CallOptions, + ) -> Result<()> { + // if let Some(opts) = opts { + //TODO:set the parent context + //opts.insert("parentCtx".to_string(), self.clone()); + // } + // let opts = Value::Object(opts.unwrap().to_owned()); + if self.options.timeout > Some(0) { + //TODO: callculate time difference for distributed distance + } + + if broker_options.max_call_level > 0 && self.level >= broker_options.max_call_level { + bail!("Max call level error") + } + + let (sender, recv) = oneshot::channel::>(); + + let _result = self.broker_sender.send(ServiceBrokerMessage::Call { + action_name: action_name.to_string(), + params, + opts, + result_channel: sender, + }); + let result = recv.await?; + + //TODO:merge meta of the context object + + Ok(()) + } + + async fn emit(&self, event_name: &str, data: Value, opts: Option) { + let mut opts: Value = match opts { + Some(opts) => { + let value = json!({ "groups": opts }); + value + } + _ => json!({}), + }; + if let Some(groups) = opts.get("groups") { + if !groups.is_array() { + opts = json!({ "groups": vec![groups] }); + }; + } + //TODO:Set the parent context + let _result = self.broker_sender.send(ServiceBrokerMessage::Emit { + event_name: event_name.to_string(), + data, + opts, + }); + } + + async fn broadcast(&self, event_name: &str, data: Value, opts: Option) { + let mut opts: Value = match opts { + Some(opts) => { + let value = json!({ "groups": opts }); + value + } + _ => json!({}), + }; + if let Some(groups) = opts.get("groups") { + if !groups.is_array() { + opts = json!({ "groups": vec![groups] }); + }; + } + //TODO:Set the parent context + let _result = self.broker_sender.send(ServiceBrokerMessage::Broadcast { + event_name: event_name.to_string(), + data, + opts, + }); + } + fn start_span() { + todo!("while implementing tracing") + } + fn finish_span() { + todo!("while implementing tracing") + } +} diff --git a/src/errors/mod.rs b/src/errors/mod.rs new file mode 100644 index 0000000..790bf1f --- /dev/null +++ b/src/errors/mod.rs @@ -0,0 +1,4 @@ +mod registry; +mod service_broker; +pub use registry::RegistryError; +pub use service_broker::ServiceBrokerError; diff --git a/src/errors/registry.rs b/src/errors/registry.rs new file mode 100644 index 0000000..ce9f62d --- /dev/null +++ b/src/errors/registry.rs @@ -0,0 +1,8 @@ +use thiserror::Error; +#[derive(Error, Debug)] +pub enum RegistryError { + #[error("No local node found")] + NoLocalNodeFound, + #[error("No service found")] + NoServiceItemFound, +} diff --git a/src/errors/service_broker.rs b/src/errors/service_broker.rs new file mode 100644 index 0000000..ee83608 --- /dev/null +++ b/src/errors/service_broker.rs @@ -0,0 +1,14 @@ +use thiserror::Error; +#[derive(Error, Debug)] +pub enum ServiceBrokerError { + #[error("Service {action_name:?} is not registered locally.")] + ServiceNotFound { + action_name: String, + node_id: String, + }, + #[error("Service {action_name:?} is not available locally.")] + ServiceNotAvailable { + action_name: String, + node_id: String, + }, +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..1da9a74 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,16 @@ +pub mod broker; +pub mod context; +pub mod errors; +pub mod logger; +pub mod packet; +pub mod registry; +pub mod service; +pub mod strategies; +pub mod utils; +pub use broker::HandlerResult; +pub use broker::ServiceBroker; +pub use broker::ServiceBrokerMessage; +pub use registry::Registry; +pub use service::Service; + +const INTERNAL_PREFIX: char = '$'; diff --git a/src/logger.rs b/src/logger.rs new file mode 100644 index 0000000..80703e2 --- /dev/null +++ b/src/logger.rs @@ -0,0 +1,5 @@ +trait LoggerTrait { + fn init(); + fn stop(); + fn get_log_handler(); +} diff --git a/src/packet.rs b/src/packet.rs new file mode 100644 index 0000000..8af8a94 --- /dev/null +++ b/src/packet.rs @@ -0,0 +1,18 @@ +enum PacketType { + Unknown, + Event, + Request, + Response, + Discover, + Info, + Disconnect, + Heartbeat, + Ping, + Pongs, +} + +impl From for String { + fn from(p: PacketType) -> Self { + "as".to_string() + } +} diff --git a/src/registry/action_catalog.rs b/src/registry/action_catalog.rs new file mode 100644 index 0000000..8930496 --- /dev/null +++ b/src/registry/action_catalog.rs @@ -0,0 +1,74 @@ +use std::collections::HashMap; + +use super::*; + +#[derive(PartialEq, Eq, Default, Debug, Clone)] +pub struct ActionCatalog { + actions: ActionsMap, +} + +impl ActionCatalog { + pub fn add(&mut self, node: &Node, service: &ServiceItem, action: Action) { + let list = self.actions.get_mut(&action.name); + match list { + Some(list) => list.add(node, service, action), + None => { + let name = action.name.clone(); + let mut list = EndpointList::new(name, None); + let name = action.name.clone(); + list.add(node, service, action); + self.actions.insert(name, list); + } + } + } + pub fn get(&self, action_name: &str) -> Option<&EndpointList> { + self.actions.get(action_name) + } + fn is_available(&self, action_name: &str) -> bool { + match self.actions.get(action_name) { + Some(el) => el.has_available(), + None => false, + } + } + fn remove_by_service(&mut self, service: &ServiceItem) { + self.actions.iter_mut().for_each(|item| { + let (_, el) = item; + el.remove_by_service(service); + }); + } + pub fn remove(&mut self, action_name: &str, node_id: &str) { + let list = self.actions.get_mut(action_name); + if let Some(el) = list { + el.remove_by_node_id(node_id); + } + } + pub fn list(&self, opts: ListOptions) -> Vec<&EndpointList> { + let res: HashMap<&String, &EndpointList> = self + .actions + .iter() + .filter(|item| { + let (name, ep_list) = item; + if opts.skip_internal && get_internal_service_regex_match(name) { + return false; + } + if opts.only_local && !ep_list.has_local() { + return false; + } + if opts.only_available && !ep_list.has_available() { + return false; + } + if ep_list.count() > 0 { + let ep = ep_list.endpoints.get(0); + if let Some(ep) = ep { + if ep.action.visibility == Visibility::Protected { + return false; + } + } + } + true + }) + .collect(); + let res = res.values().map(|ep| ep.to_owned()).collect(); + res + } +} diff --git a/src/registry/action_endpoint.rs b/src/registry/action_endpoint.rs new file mode 100644 index 0000000..9d971c6 --- /dev/null +++ b/src/registry/action_endpoint.rs @@ -0,0 +1,45 @@ +use super::*; + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct ActionEndpoint { + endpoint: Endpoint, + pub action: Action, + pub name: String, +} + +impl EndpointTrait for ActionEndpoint { + type Data = Action; + fn update(&mut self, data: Self::Data) { + self.action = data; + } + fn new(node_id: &str, service: &ServiceItem, data: Self::Data) -> Self { + let endpoint = Endpoint::new(node_id, service); + let name = format!("{}:{}", endpoint.node_id, data.name); + Self { + endpoint, + name, + action: data, + } + } + + fn node(&self) -> &str { + &self.endpoint.node_id + } + + fn is_local(&self) -> bool { + self.endpoint.local + } + fn is_available(&self) -> bool { + self.endpoint.state + } + fn id(&self) -> &str { + &self.endpoint.node_id + } + fn service_name(&self) -> &str { + &self.endpoint.service + } + + fn ep_type(&self) -> EndpointType { + EndpointType::Action + } +} diff --git a/src/registry/endpoint_list.rs b/src/registry/endpoint_list.rs new file mode 100644 index 0000000..e0d0569 --- /dev/null +++ b/src/registry/endpoint_list.rs @@ -0,0 +1,169 @@ +use tokio::sync::RwLock; + +use crate::INTERNAL_PREFIX; + +use super::*; +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct EndpointList { + pub name: String, + group: Option, + internal: bool, + pub endpoints: Vec, + local_endpoints: Vec, +} + +impl EndpointList { + pub fn new(name: String, group: Option) -> Self { + let internal = name.starts_with(INTERNAL_PREFIX); + let endpoints = Vec::new(); + let local_endpoints = Vec::new(); + + Self { + name, + group, + endpoints, + local_endpoints, + internal, + } + } + + pub fn add(&mut self, node: &Node, service: &ServiceItem, data: E::Data) { + let entry = self + .endpoints + .iter_mut() + .find(|x| x.service_name() == service.unique_name()); + if let Some(entry) = entry { + entry.update(data); + return; + } + + let ep = E::new(&node.id, service, data); + + self.endpoints.push(ep.clone()); + if ep.is_local() { + self.local_endpoints.push(ep) + } + } + fn get_first(&self) -> Option<&E> { + self.endpoints.get(0) + } + + fn select<'a, S: Strategy>( + &self, + list: Vec<&'a E>, + ctx: &Context, + strategy: &RwLock, + ) -> Option<&'a E> { + let mut strategy = strategy.blocking_write(); + let ret = strategy.select(list, Some(ctx)); + ret + } + + pub fn next(&self, ctx: &Context, strategy: &RwLock) -> Option<&E> { + if self.endpoints.is_empty() { + return None; + } + + if self.internal && self.has_local() { + return self.next_local(ctx, strategy); + } + + if self.endpoints.len() == 1 { + let ep = self.endpoints.get(0); + if let Some(ep) = ep { + ep.is_available(); + return Some(ep); + } + return None; + } + + //TODO: Search local item based on registr opts + let ep_list: Vec<&E> = self + .endpoints + .iter() + .filter(|ep| ep.is_available()) + .collect(); + if ep_list.is_empty() { + return None; + } + + self.select(ep_list, ctx, strategy) + } + pub fn next_local(&self, ctx: &Context, strategy: &RwLock) -> Option<&E> { + if self.local_endpoints.is_empty() { + return None; + } + + if self.local_endpoints.len() == 1 { + let ep = self.local_endpoints.get(0); + if let Some(ep) = ep { + ep.is_available(); + return Some(ep); + } + return None; + } + + //TODO: Search local item based on registr opts + let ep_list: Vec<&E> = self + .endpoints + .iter() + .filter(|ep| ep.is_available()) + .collect(); + if ep_list.is_empty() { + return None; + } + + self.select(ep_list, ctx, strategy) + } + + pub fn has_available(&self) -> bool { + for ep in self.endpoints.iter() { + if ep.is_available() { + return true; + } + } + false + } + pub fn has_local(&self) -> bool { + !self.local_endpoints.is_empty() + } + + fn update_local_endpoints(&mut self) { + let mut local: Vec = Vec::new(); + for ep in &self.endpoints { + if ep.is_local() { + let e = ep.clone(); + local.push(e); + } + } + std::mem::swap(&mut local, &mut self.local_endpoints); + drop(local); + } + + pub fn count(&self) -> usize { + self.endpoints.len() + } + pub fn get_endpoint_by_node_id(&self, node_id: &str) -> Option<&E> { + self.endpoints + .iter() + .find(|e| e.id() == node_id && e.is_available()) + } + fn has_node_id(&self, node_id: &str) -> bool { + self.endpoints.iter().any(|e| e.id() == node_id) + } + pub fn remove_by_service(&mut self, service: &ServiceItem) { + self.endpoints.retain(|ep| { + let delete = ep.service_name() == service.unique_name(); + !delete + }); + self.update_local_endpoints(); + } + + pub fn remove_by_node_id(&mut self, node_id: &str) { + self.endpoints.retain(|ep| { + let delete = ep.id() == node_id; + !delete + }); + self.update_local_endpoints(); + } +} diff --git a/src/registry/event_endpoint.rs b/src/registry/event_endpoint.rs new file mode 100644 index 0000000..fe74f55 --- /dev/null +++ b/src/registry/event_endpoint.rs @@ -0,0 +1,43 @@ +use super::*; + +#[derive(Clone)] +pub struct EventEndpoint { + endpoint: Endpoint, + event: Event, +} + +impl EndpointTrait for EventEndpoint { + type Data = Event; + fn update(&mut self, data: Self::Data) { + self.event = data; + } + fn new(node_id: &str, service: &ServiceItem, data: Self::Data) -> Self { + let endpoint = Endpoint::new(node_id, service); + Self { + endpoint, + + event: data, + } + } + + fn node(&self) -> &str { + &self.endpoint.node_id + } + + fn is_local(&self) -> bool { + self.endpoint.local + } + fn is_available(&self) -> bool { + self.endpoint.state + } + fn id(&self) -> &str { + &self.endpoint.node_id + } + fn service_name(&self) -> &str { + &self.endpoint.service + } + + fn ep_type(&self) -> EndpointType { + EndpointType::Event + } +} diff --git a/src/registry/mod.rs b/src/registry/mod.rs new file mode 100644 index 0000000..1566041 --- /dev/null +++ b/src/registry/mod.rs @@ -0,0 +1,134 @@ +pub mod action_catalog; +pub mod action_endpoint; +pub mod endpoint_list; +pub mod event_endpoint; +pub mod node; +pub mod node_catalog; +pub mod registry; +pub mod service_catalog; +pub mod service_item; + +use std::collections::HashMap; +use std::sync::Arc; + +use super::service::Service; +use crate::{context::Context, strategies::Strategy, HandlerResult}; +use action_catalog::ActionCatalog; +pub use action_endpoint::ActionEndpoint; +pub use endpoint_list::EndpointList; +pub use event_endpoint::EventEndpoint; +use lazy_static::lazy_static; +pub use node::{Client, Node}; +use node_catalog::NodeCatalog; + +use regex::Regex; +pub use registry::Registry; +use service_catalog::ServiceCatalog; +use service_item::ServiceItem; +// pub use event_endpoint::EventEndpoint; + +type ActionsMap = HashMap>; + +fn get_internal_service_regex_match(text: &str) -> bool { + lazy_static! { + static ref RE: Regex = Regex::new(r"^\$").unwrap(); + } + RE.is_match(text) +} + +#[derive(PartialEq, Eq, Debug)] +pub struct Logger {} + +pub type ActionHandler = fn(Context, Option) -> HandlerResult; + +#[derive(Default, Debug, PartialEq, Eq, Clone)] +pub struct Payload {} + +#[derive(PartialEq, Eq, Clone, Debug)] + +pub struct Action { + pub name: String, + pub(crate) visibility: Visibility, + pub handler: ActionHandler, + // service: Option, +} + +impl Action { + pub fn new(name: String, handler: ActionHandler) -> Self { + Self { + name, + visibility: Visibility::Protected, + handler, + // service: None, + } + } + // pub fn set_service(mut self, service: Service) -> Action { + // self.service = Some(service); + // self + // } +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub enum Visibility { + Published, + Public, + Protected, + Private, +} +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct Event { + pub event_name: String, +} + +///Endpoint trait for endpoint list +pub trait EndpointTrait { + ///Data is eiter an Action struct or Event structs + type Data; + fn new(node_id: &str, service: &ServiceItem, data: Self::Data) -> Self; + fn node(&self) -> &str; + fn update(&mut self, data: Self::Data); + fn is_local(&self) -> bool; + fn is_available(&self) -> bool; + fn id(&self) -> &str; + fn service_name(&self) -> &str; + fn ep_type(&self) -> EndpointType; +} + +#[derive(PartialEq, Eq, Clone, Debug)] +struct Endpoint { + service: String, + state: bool, + node_id: String, + local: bool, +} + +impl Endpoint { + fn new(node_id: &str, service: &ServiceItem) -> Self { + let local = service.local; + let service = service.unique_name(); + Self { + service, + state: true, + node_id: node_id.to_string(), + local, + } + } +} + +pub enum EndpointType { + Action, + Event, +} + +#[derive(PartialEq, Eq)] +pub struct Opts { + strategy: T, +} +pub struct ListOptions { + only_local: bool, + only_available: bool, + skip_internal: bool, + with_actions: bool, + with_events: bool, + grouping: bool, +} diff --git a/src/registry/node.rs b/src/registry/node.rs new file mode 100644 index 0000000..aa29004 --- /dev/null +++ b/src/registry/node.rs @@ -0,0 +1,118 @@ +use std::net::IpAddr; + +use chrono::Duration; +use serde_json::Value; + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct Node { + pub id: String, + instance_id: Option, + pub available: bool, + pub local: bool, + last_heartbeat_time: Duration, + metadata : Value, + /* feields that need to be added later. + config + + */ + client: Option, + ip_list: Vec, + port: Option, + hostname: Option, + udp_address: Option, + pub raw_info: Option, + /* + cpu + cpuseq + */ + pub services: Vec, + pub seq: usize, + offline_since: Option, +} + +impl Node { + pub fn new(id: String) -> Self { + Self { + id, + instance_id: None, + available: true, + local: false, + client: None, + raw_info: None, + metadata : Value::Null, + //TODO: + /* + change this later with actual process uptime. + */ + last_heartbeat_time: Duration::seconds(1), + ip_list: Vec::new(), + port: None, + hostname: None, + udp_address: None, + services: Vec::new(), + seq: 0, + offline_since: None, + } + } + pub fn update(&mut self) { + todo!() + } + pub fn update_local_info(&mut self) { + todo!() + } + pub fn hearbeat(&mut self) { + if !self.available { + self.available = true; + self.offline_since = None; + } + todo!() + } + pub fn disconnect(&mut self) { + if self.available { + self.seq = self.seq.saturating_add(1); + /* update this with process uptime + self.offline_since = + */ + } + self.available = false; + } + + pub fn services_len(&self) -> usize { + self.services.len() + } + pub fn set_local(mut self, value: bool) -> Self { + self.local = value; + self + } + pub fn set_ip_list(mut self, ip_list: Vec) -> Self { + self.ip_list = ip_list; + self + } + pub fn set_instance_id(mut self, instance_id: String) -> Self { + self.instance_id = Some(instance_id); + self + } + pub fn set_hostname(mut self, hostname: String) -> Self { + self.hostname = Some(hostname); + self + } + pub fn set_client(mut self, client: Client) -> Self { + self.client = Some(client); + self + } + pub fn set_seq(mut self, seq: usize) -> Self { + self.seq = seq; + self + } + pub fn set_metadata(mut self , metadata:Value)->Self{ + self.metadata = metadata; + self + } +} +#[derive(PartialEq, Eq, Clone, Debug)] + +pub struct Client { + pub(crate) client_type: String, + pub(crate) version: String, + pub(crate) lang_version: String, +} diff --git a/src/registry/node_catalog.rs b/src/registry/node_catalog.rs new file mode 100644 index 0000000..14ab60e --- /dev/null +++ b/src/registry/node_catalog.rs @@ -0,0 +1,129 @@ +use std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr}, + sync::Arc, +}; + +use anyhow::bail; +use serde_json::Value; + +use crate::{errors::RegistryError, utils}; + +use super::{Client, Node}; + +#[derive(Debug)] + +pub struct NodeCatalog { + nodes: HashMap, + local_node_id: String, +} +impl NodeCatalog { + pub fn new() -> Self { + //TODO: add the create local node logic here + Self { + nodes: HashMap::new(), + local_node_id: "".to_string(), + } + } + ///Create a local node + pub fn create_local_node(&mut self, client_version: String, node_id: String, instance_id: String , metadata:Value) { + let client = Client { + client_type: "rust".to_string(), + lang_version: "1.56.1".to_string(), + version:client_version, + }; + let node = Node::new(node_id) + .set_local(true) + .set_ip_list(utils::ip_list()) + .set_instance_id(instance_id) + .set_hostname(utils::hostname().into_owned()) + .set_seq(1) + .set_client(client). + set_metadata(metadata); + self.nodes.insert(node.id.to_string(), node.clone()); + self.local_node_id = node.id; + } + pub fn add(&mut self, id: &str, node: Node) { + self.nodes.insert(id.to_string(), node); + } + pub fn had_node(&self, id: &str) -> bool { + self.nodes.get(id).is_some() + } + pub fn get_node(&self, id: &str) -> Option<&Node> { + self.nodes.get(id) + } + pub fn get_node_mut(&mut self, id: &str) -> Option<&mut Node> { + self.nodes.get_mut(id) + } + pub fn local_node(&self) -> anyhow::Result<&Node> { + match self.get_node(&self.local_node_id) { + Some(node) => Ok(node), + None => bail!(RegistryError::NoLocalNodeFound), + } + } + pub fn local_node_mut(&mut self) -> anyhow::Result<&mut Node> { + let local_node_id = self.local_node_id.clone(); + match self.get_node_mut(&local_node_id) { + Some(node) => Ok(node), + None => bail!(RegistryError::NoLocalNodeFound), + } + } + + pub fn delete(&mut self, id: &str) -> Option { + self.nodes.remove(id) + } + pub fn count(&self) -> usize { + self.nodes.len() + } + pub fn online_count(&self) -> usize { + let mut count: usize = 0; + self.nodes.iter().for_each(|node_item| { + let (_, node) = node_item; + if node.available { + count = count.saturating_add(1); + } + }); + count + } + pub fn process_node_info(&self) { + todo!() + } + pub fn disconnect(&mut self) { + todo!() + } + + pub fn list(&self, only_available: bool, with_services: bool) -> Vec<&Node> { + self.nodes + .values() + .filter(|node| { + if only_available && !node.available { + return false; + } + if with_services && node.services_len() == 0 { + return false; + } + true + }) + .collect() + } + pub fn list_mut(&mut self, only_available: bool, with_services: bool) -> Vec<&mut Node> { + self.nodes + .values_mut() + .filter(|node| { + if only_available && !node.available { + return false; + } + if with_services && node.services_len() == 0 { + return false; + } + true + }) + .collect() + } + pub fn nodes_vec(&self) -> Vec<&Node> { + self.nodes.values().collect() + } + pub fn nodes_vec_mut(&mut self) -> Vec<&mut Node> { + self.nodes.values_mut().collect() + } +} diff --git a/src/registry/registry.rs b/src/registry/registry.rs new file mode 100644 index 0000000..70ab00b --- /dev/null +++ b/src/registry/registry.rs @@ -0,0 +1,219 @@ +use crate::{errors::RegistryError, service::ServiceSpec, ServiceBroker, ServiceBrokerMessage}; + +use super::*; +use anyhow::bail; +use serde_json::Value; +use tokio::sync::mpsc::UnboundedSender; +#[derive(Debug)] + +pub struct Registry { + broker_sender: UnboundedSender, + broker: Arc, + nodes: NodeCatalog, + services: ServiceCatalog, + pub(crate) actions: ActionCatalog, + /* + events + metrics + strategy factor + discoverer + opts + events + */ +} +impl Registry { + pub fn new( + broker: Arc, + broker_sender: UnboundedSender, + ) -> Self { + //TODO:get the library version for the client + + let mut nodes = NodeCatalog::new(); + nodes.create_local_node( + "1.0".to_string(), + broker.node_id.clone(), + broker.instance.clone(), + broker.options.metadata.clone(), + ); + let services = ServiceCatalog::default(); + let actions = ActionCatalog::default(); + Registry { + broker_sender, + broker, + nodes, + services, + actions, + } + } + + fn init() { + todo!("initialze discoverer") + } + fn stop() { + todo!("stop discoverre") + } + + fn register_moleculer_metrics(&self) { + todo!("register molecular metrics") + } + fn update_metrics(&self) { + todo!("update metrics") + } + pub fn register_local_service(&mut self, svc: ServiceSpec) -> anyhow::Result<()> { + if !self + .services + .has(&svc.full_name, Some(&self.broker.node_id)) + { + let node = self.nodes.local_node()?; + + let service_full_name = self.services.add(node, &svc, true); + if let Some(actions) = svc.actions { + self.register_actions(&service_full_name, actions)?; + } + if let Some(events) = svc.events { + self.register_events(); + } + + { + let local_node = self.nodes.local_node_mut()?; + + local_node.services.push(service_full_name); + } + } + Ok(()) + } + pub fn register_services() { + todo!("add remote serice support") + } + fn check_action_visibility(action: &Action, node: &Node) -> bool { + match action.visibility { + Visibility::Published => true, + Visibility::Public => true, + Visibility::Protected => node.local, + _ => false, + } + } + fn register_actions( + &mut self, + service_full_name: &str, + actions: Vec, + ) -> anyhow::Result<()> { + let node = self.nodes.local_node()?; + let service = self.services.get_mut(service_full_name, Some(&node.id))?; + + actions.iter().for_each(|action| { + if !Registry::check_action_visibility(action, node) { + return; + } + if node.local { + //TODO:stuff with middleware and handlers. + } else if self.broker.transit.is_some() { + //TODO: for remote services + return; + } + self.actions.add(node, service, action.to_owned()); + //TODO: + // service.add_action(action) + //add the action to the service. + }); + Ok(()) + } + fn create_private_action_endpoint(&self, action: Action) -> anyhow::Result { + let node = self.nodes.local_node()?; + + todo!("add service to action") + // let action_ep = ActionEndpoint::new(node, service, action); + // Ok(action_ep) + } + pub fn has_services(&self, full_name: &str, node_id: Option<&str>) -> bool { + self.services.has(full_name, node_id) + } + pub fn get_action_endpoints(&self, action_name: &str) -> Option<&EndpointList> { + self.actions.get(action_name) + } + pub fn get_action_endpoint_by_node_id( + &self, + action_name: &str, + node_id: &str, + ) -> Option<&ActionEndpoint> { + let list = self.actions.get(action_name); + if let Some(list) = list { + return list.get_endpoint_by_node_id(node_id); + } + None + } + pub fn unregister_service(&mut self, full_name: &str, node_id: Option<&str>) { + let id = match node_id { + Some(node_id) => node_id.to_string(), + None => self.broker.node_id.clone(), + }; + self.services.remove(full_name, &id); + match node_id { + Some(id) => { + if id == self.broker.node_id { + self.regenerate_local_raw_info(Some(true)); + } + } + None => { + self.regenerate_local_raw_info(Some(true)); + } + } + } + fn unregister_service_by_node_id(&mut self, node_id: &str) { + self.services.remove_all_by_node_id(node_id); + } + fn unregiste_action(&mut self, node_id: &str, action_name: &str) { + self.actions.remove(action_name, node_id); + } + + fn register_events(&mut self) { + todo!() + } + fn unregister_event(&mut self, node_id: &str, event_name: &str) { + todo!() + } + + fn regenerate_local_raw_info(&self, inc_seq: Option) -> Value { + todo!() + } + + pub fn get_local_node_info(&self, force: bool) -> anyhow::Result { + // if let None = self.nodes.local_node() { + // return Ok(self.regenerate_local_raw_info(None)); + // } + if force { + return Ok(self.regenerate_local_raw_info(None)); + } + + let value = self.nodes.local_node()?.raw_info.to_owned(); + match value { + Some(value) => Ok(value), + None => bail!(RegistryError::NoLocalNodeFound), + } + } + fn get_node_info(&self, node_id: &str) -> Option { + todo!() + } + fn process_node_info(&self) { + todo!() + } + pub fn get_node_list(&self, only_available: bool, with_services: bool) -> Vec<&Node> { + self.nodes.list(only_available, with_services) + } + pub fn get_services_list(&self, opts: ListOptions) -> Vec<&ServiceItem> { + self.services.list(opts) + } + fn get_action_list(&self, opts: ListOptions) -> Vec<&ActionEndpoint> { + //self.actions.list(opts) + todo!() + } + fn get_event_list(&self) -> Vec<&EventEndpoint> { + todo!() + } + fn get_node_raw_list(&self) { + todo!() + } +} +#[cfg(test)] + +mod tests {} diff --git a/src/registry/service_catalog.rs b/src/registry/service_catalog.rs new file mode 100644 index 0000000..79df9dd --- /dev/null +++ b/src/registry/service_catalog.rs @@ -0,0 +1,95 @@ +use anyhow::bail; + +use crate::{errors::RegistryError, service::ServiceSpec}; + +use super::*; +#[derive(PartialEq, Eq, Default, Debug)] +pub struct ServiceCatalog { + services: Vec, +} + +impl ServiceCatalog { + ///Add a new service + pub fn add(&mut self, node: &Node, service: &ServiceSpec, local: bool) -> String { + let service_item = ServiceItem::new(node, service, local); + let full_name = service_item.full_name.clone(); + self.services.push(service_item); + full_name + } + ///Check the service exsists + pub fn has(&self, full_name: &str, node_id: Option<&str>) -> bool { + let svc = self + .services + .iter() + .find(|svc| svc.equals(full_name, node_id)); + svc.is_some() + } + pub fn get(&self, full_name: &str, node_id: Option<&str>) -> Option<&ServiceItem> { + self.services + .iter() + .find(|svc| svc.equals(full_name, node_id)) + } + pub fn get_mut( + &mut self, + full_name: &str, + node_id: Option<&str>, + ) -> anyhow::Result<&mut ServiceItem> { + let result = self + .services + .iter_mut() + .find(|svc| svc.equals(full_name, node_id)); + match result { + Some(service_item) => Ok(service_item), + None => bail!(RegistryError::NoServiceItemFound), + } + } + pub fn list(&self, opts: ListOptions) -> Vec<&ServiceItem> { + self.services + .iter() + .filter(|svc| { + if opts.skip_internal && get_internal_service_regex_match(&svc.name) { + return false; + } + if opts.only_local && !svc.local { + return false; + } + //TODO: find a way to get node available + // if opts.only_available && !svc.node.available { + // return false; + // } + + true + }) + .collect() + // TODO:("implement grouping and all that stuff") + } + pub fn get_local_node_service(&self) { + todo!() + } + //remove all endpoints by node_id. + pub fn remove_all_by_node_id(&mut self, node_id: &str) { + let services: Vec<&ServiceItem> = self + .services + .iter() + .filter(|svc| { + if svc.node == node_id { + todo!("remove actions and events in registry"); + return false; + } + true + }) + .collect(); + todo!("updat the service") + } + + pub fn remove(&mut self, full_name: &str, node_id: &str) { + self.services.retain(|svc| { + if svc.equals(full_name, Some(node_id)) { + todo!("remove actions and events in registry"); + + return false; + } + true + }) + } +} diff --git a/src/registry/service_item.rs b/src/registry/service_item.rs new file mode 100644 index 0000000..80a9980 --- /dev/null +++ b/src/registry/service_item.rs @@ -0,0 +1,59 @@ +use crate::service::ServiceSpec; + +use super::*; + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct ServiceItem { + pub name: String, + pub node: String, + pub local: bool, + pub full_name: String, + version: String, + actions: ActionsMap, + /* + eventsmap + metadata + settings + */ +} +impl ServiceItem { + pub fn new(node: &Node, service: &ServiceSpec, local: bool) -> Self { + Self { + node: node.id.clone(), + local, + actions: HashMap::new(), + full_name: service.full_name.to_string(), + version: service.version.to_string(), + name: service.name.to_string(), + } + } + pub fn equals(&self, full_name: &str, node_id: Option<&str>) -> bool { + match node_id { + Some(id) => self.node == id && self.full_name == full_name, + None => self.full_name == full_name, + } + } + + ///Update service properties + pub fn update(&mut self, service: &Service) { + self.full_name = service.full_name.to_string(); + self.version = service.version.to_string(); + /* + settings + metadata + */ + todo!() + } + ///Add action to service + pub fn add_action(&mut self, action: EndpointList) { + let name = action.name.clone(); + self.actions.insert(name, action); + todo!("Decide if we want an arc of action or make a copy of that actions") + } + pub fn add_event(&mut self, event: EndpointList) { + todo!("Implement the events map") + } + pub fn unique_name(&self) -> String { + format!("{}{}{}", self.full_name, self.version, self.node) + } +} diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..287c98f --- /dev/null +++ b/src/service.rs @@ -0,0 +1,480 @@ +use std::collections::HashMap; + +use crate::{ + registry::{Action, ActionHandler, Event}, + ServiceBrokerMessage, +}; +use log::info; +use tokio::sync::mpsc::UnboundedSender; + +#[derive(Clone, Debug)] +pub struct Service { + pub name: String, + pub full_name: String, + pub version: String, + pub(crate) settings: HashMap, + pub(crate) schema: Schema, + pub(crate) original_schema: Option, + pub(crate) metadata: HashMap, + pub actions: Option>, + pub events: Option>, + pub(crate) broker_sender: UnboundedSender, +} + +impl PartialEq for Service { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.full_name == other.full_name + && self.version == other.version + && self.settings == other.settings + && self.schema == other.schema + && self.original_schema == other.original_schema + && self.metadata == other.metadata + && self.actions == other.actions + && self.events == other.events + } +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct Schema { + pub(crate) mixins: Option>, + pub(crate) actions: Option>, + pub(crate) events: Option>, + pub(crate) merged: SchemaMerged, + pub(crate) name: String, + pub(crate) version: Option, + pub(crate) settings: HashMap, + pub(crate) metadata: Option>, + pub(crate) created: Option, + pub(crate) started: Option, + pub(crate) stopped: Option, + pub(crate) dependencies: Option>, +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct SchemaMixins {} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct SchemaActions { + name: String, + handler: fn(), +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct SchemaEvents {} +#[derive(PartialEq, Eq, Clone, Debug)] +pub enum SchemaMerged { + MergedFn(fn()), + MergedFnVec(Vec), +} +#[derive(PartialEq, Debug)] +pub struct ServiceSpec { + pub(crate) name: String, + pub(crate) version: String, + pub(crate) full_name: String, + settings: HashMap, + /* + pub(crate)metadata + pub(crate)*/ + pub(crate) actions: Option>, + pub(crate) events: Option>, +} + +impl Service { + fn get_service_spec(&self) -> ServiceSpec { + //TODO: do something about actions and events + ServiceSpec { + name: self.name.clone(), + full_name: self.full_name.clone(), + settings: self.get_public_settings(), + version: self.version.clone(), + actions: self.actions.clone(), + events: None, + } + } + + fn parse_service_schema(&mut self, schema: Schema) { + self.original_schema = Some(schema.clone()); + + match schema.merged { + SchemaMerged::MergedFn(merged) => merged(), + SchemaMerged::MergedFnVec(func_vec) => { + for func in func_vec { + func() + } + } + } + + self.name = schema.name; + self.version = match schema.version { + Some(v) => v, + None => "0.0.1".to_string(), + }; + self.settings = schema.settings; + self.metadata = match schema.metadata { + Some(metadata) => metadata, + None => HashMap::new(), + }; + //TODO: + //self.schema = schema; + let version = self.settings.get("$noVersionPrefix"); + + self.full_name = Service::get_versioned_full_name(&self.name, version); + //TODO: get the logger from the broker. + //self.logger = + + //TODO:register methods. + + todo!("add service specification") + } + + fn get_public_settings(&self) -> HashMap { + self.settings.clone() + } + + pub async fn init(&self) { + info!("Service {} is createing....", self.full_name); + if let Some(created) = self.schema.created { + created(); + } + let _result = self + .broker_sender + .send(ServiceBrokerMessage::AddLocalService(self.clone())); + info!("Service {} created.", self.full_name); + + // todo!("call broker middlware") + } + + pub async fn start(&self) { + info!("Service {} is starting...", self.full_name); + + if let Some(dependencies) = &self.schema.dependencies { + let timeout: i64 = match self.settings.get("$dependencyTimeout") { + //TODO:raise an error rateher than default. + Some(val) => val.parse().unwrap_or(0), + None => 0, + }; + //TODO: get interal from broker options + let interval: i64 = match self.settings.get("$dependencyInterval") { + Some(val) => val.parse().unwrap_or(0), + None => 0, + }; + + self.wait_for_services(dependencies, timeout, interval) + .await; + } + + if let Some(started) = &self.schema.started { + started(); + } + + let _result = self + .broker_sender + .send(ServiceBrokerMessage::RegisterLocalService( + Service::get_service_spec(self), + )); + + info!("Service {} started.", self.full_name); + + // todo!("call service starting middleware"); + //todo!("call service started middleware") + } + + pub async fn stop(&self) { + info!("Service {} is stopping...", self.full_name); + + if let Some(stopped) = self.schema.stopped { + stopped(); + } + + info!("Service {} stopped.", self.full_name); + + todo!("call service stopping middlewares"); + todo!("call service stopped middleware"); + } + + fn create_action(&self, action_def: ActionHandler, name: &str) -> Action { + let mut action = Action::new(name.to_string(), action_def); + let name_prefix = self.settings.get("$noServiceNamePrefix"); + if let Some(name_prefix) = name_prefix { + let name_prefix: bool = name_prefix.parse().unwrap(); + if !name_prefix { + action.name = format!("{}.{}", self.full_name.to_string(), action.name); + } + } + //TODO add caching settings from settins + action + } + + /// create an interal service method. + fn create_method() { + todo!() + } + + ///create an event subscription for broker + fn create_event() { + todo!() + } + + async fn wait_for_services(&self, service_names: &[String], timeout: i64, interval: i64) { + let _result = self + .broker_sender + .send(ServiceBrokerMessage::WaitForServices { + dependencies: service_names.to_vec(), + interval, + timeout, + }); + } + + pub fn get_versioned_full_name(name: &str, version: Option<&String>) -> String { + let mut name = name.to_string(); + if let Some(v) = version { + name = format!("{}.{}", v, name); + } + name + } +} +#[cfg(test)] +mod tests { + use tokio::sync::mpsc::{self, UnboundedSender}; + + use crate::{ + context::Context, + registry::{ActionEndpoint, Payload}, + HandlerResult, + }; + + use super::*; + fn test_merge_func() { + println!("test merge function"); + } + fn test_started_func() { + println!("test start func"); + } + fn test_created_func() { + println!("test created func"); + } + fn test_stop_func() { + println!("test stop func"); + } + fn action_func(context: Context, payload: Option) -> HandlerResult { + println!("action_func"); + println!("context: {:?}", context); + println!("payload: {:?}", payload); + HandlerResult { data: Box::new(1) } + } + + fn get_test_schema(dependencies: Option>) -> Schema { + let merged = SchemaMerged::MergedFn(test_merge_func); + let schema = Schema { + mixins: None, + actions: None, + events: None, + merged, + name: "test_service".to_string(), + version: None, + settings: HashMap::new(), + metadata: None, + created: Some(test_created_func), + started: Some(test_started_func), + stopped: Some(test_stop_func), + dependencies: dependencies, + }; + + schema + } + + fn get_test_service( + schema: Option, + settings: Option>, + actions: Option>, + broker_sender: Option>, + ) -> Service { + let name = "test_service".to_string(); + let version = "1.0".to_string(); + let full_name = Service::get_versioned_full_name(&name, Some(&version)); + let settings = match settings { + Some(settings) => settings, + None => HashMap::new(), + }; + let schema = match schema { + Some(schema) => schema, + None => get_test_schema(None), + }; + + let original_schema = get_test_schema(None); + let broker_sender = match broker_sender { + Some(sender) => sender, + None => { + let (sender, recv) = mpsc::unbounded_channel::(); + sender + } + }; + let service = Service { + name, + full_name, + version, + settings, + schema, + original_schema: Some(original_schema), + metadata: HashMap::new(), + actions: actions, + events: None, + broker_sender, + }; + service + } + #[test] + fn service_get_service_spec() { + let service = get_test_service(None, None, None, None); + let service_spec = ServiceSpec { + name: service.name.clone(), + version: service.version.clone(), + full_name: service.full_name.clone(), + settings: service.settings.clone(), + actions: None, + events: None, + }; + + let service_spec_gen = service.get_service_spec(); + assert_eq!(service_spec_gen, service_spec) + } + #[test] + fn service_get_public_settings() { + let mut settings = HashMap::new(); + settings.insert("test".to_string(), "settings".to_string()); + let service = get_test_service(None, Some(settings.clone()), None, None); + assert_eq!(service.get_public_settings(), settings); + } + #[test] + fn service_init() { + let (sender, mut recv) = mpsc::unbounded_channel::(); + let service = get_test_service(None, None, None, Some(sender)); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { service.init().await }); + rt.block_on(async { + let result = recv.recv().await; + + let expected_result = ServiceBrokerMessage::AddLocalService(service); + assert_eq!(result, Some(expected_result)); + }); + } + #[test] + fn service_start() { + let (sender, mut recv) = mpsc::unbounded_channel::(); + let service = get_test_service(None, None, None, Some(sender)); + let service_spec = service.get_service_spec(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { service.start().await }); + rt.block_on(async { + let result = recv.recv().await; + let expected_result = ServiceBrokerMessage::RegisterLocalService(service_spec); + assert_eq!(result, Some(expected_result)); + }); + } + #[test] + fn service_start_with_dependencies() { + let (sender, mut recv) = mpsc::unbounded_channel::(); + let service_names: Vec = + vec!["Service one".to_string(), " Service two".to_string()]; + + let schema = get_test_schema(Some(service_names.clone())); + let service = get_test_service(Some(schema), None, None, Some(sender)); + let service_spec = service.get_service_spec(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { service.start().await }); + rt.block_on(async { + let result = recv.recv().await; + let expected_result = ServiceBrokerMessage::WaitForServices { + dependencies: service_names, + timeout: 0, + interval: 0, + }; + assert_eq!(result, Some(expected_result)); + + let result = recv.recv().await; + let expected_result = ServiceBrokerMessage::RegisterLocalService(service_spec); + assert_eq!(result, Some(expected_result)); + }); + } + #[test] + fn service_create_action_name() { + let no_service_name_prefix = "$noServiceNamePrefix".to_string(); + let name = "action_func"; + let mut settings = HashMap::new(); + settings.insert(no_service_name_prefix.clone(), "true".to_string()); + let service = get_test_service(None, Some(settings.clone()), None, None); + let action = service.create_action(action_func, name); + let expected_action = Action::new(name.to_string(), action_func); + assert_eq!(action, expected_action); + settings.insert(no_service_name_prefix.clone(), "false".to_string()); + let service = get_test_service(None, Some(settings), None, None); + let action_2 = service.create_action(action_func, name); + let name = format!("{}.{}", service.full_name, name); + let expected_action_2 = Action::new(name, action_func); + assert_eq!(action_2, expected_action_2); + let service = get_test_service(None, None, None, None); + let name = "action_func"; + let expected_action_3 = Action::new(name.to_string(), action_func); + let action_3 = service.create_action(action_func, name); + assert_eq!(action_3, expected_action_3); + } + #[test] + #[should_panic] + fn service_create_action_name_panic() { + let no_service_name_prefix = "$noServiceNamePrefix".to_string(); + let mut settings = HashMap::new(); + let name = "action_func"; + settings.insert(no_service_name_prefix, "non_bool_value".to_string()); + let service = get_test_service(None, Some(settings), None, None); + let action = service.create_action(action_func, name); + } + #[test] + fn service_wait_for_service() { + let (sender, mut recv) = mpsc::unbounded_channel::(); + let service = get_test_service(None, None, None, Some(sender)); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let service_names: Vec = + vec!["Service one".to_string(), " Service two".to_string()]; + let timeout = 10; + let interval = 20; + rt.block_on(async { + let _result = service + .wait_for_services(&service_names, timeout, interval) + .await; + }); + rt.block_on(async { + let result = recv.recv().await; + let expected_result = ServiceBrokerMessage::WaitForServices { + dependencies: service_names, + timeout, + interval, + }; + assert_eq!(result, Some(expected_result)); + }); + } + #[test] + fn service_get_versioned_full_name() { + let version = "1.0"; + let name = "test_service"; + let expected_full_name = format!("{}.{}", version, name); + let version = "1.0".to_string(); + let version = Some(&version); + let full_name = Service::get_versioned_full_name(name, version); + assert_eq!(full_name, expected_full_name); + let full_name_2 = Service::get_versioned_full_name(name, None); + assert_eq!(full_name_2, name); + } +} diff --git a/src/strategies/mod.rs b/src/strategies/mod.rs new file mode 100644 index 0000000..81d677b --- /dev/null +++ b/src/strategies/mod.rs @@ -0,0 +1,14 @@ +mod round_robin; +use crate::context::Context; +use crate::registry::EndpointTrait; +pub use round_robin::RoundRobinStrategy; +pub trait Strategy { + // fn new(registry: Arc, broker: Arc, opts: StrategyOpts) -> Self; + fn select<'a, E: EndpointTrait>( + &mut self, + list: Vec<&'a E>, + ctx: Option<&Context>, + ) -> Option<&'a E>; +} + +pub struct StrategyOpts {} diff --git a/src/strategies/round_robin.rs b/src/strategies/round_robin.rs new file mode 100644 index 0000000..ceb3422 --- /dev/null +++ b/src/strategies/round_robin.rs @@ -0,0 +1,43 @@ +use super::*; +use std::sync::Arc; + +#[derive(Debug)] +pub struct RoundRobinStrategy { + counter: usize, +} + +impl RoundRobinStrategy { + pub fn new() -> Self { + RoundRobinStrategy { counter: 0 } + } +} +impl Default for RoundRobinStrategy { + fn default() -> Self { + Self::new() + } +} + +impl Strategy for RoundRobinStrategy { + // fn new(registry: Arc, broker: Arc, opts: StrategyOpts) -> Self { + // Self { + // broker, + // registry, + // opts, + // counter: 0, + // } + // } + fn select<'a, E: EndpointTrait>( + &mut self, + list: Vec<&'a E>, + ctx: Option<&Context>, + ) -> Option<&'a E> { + if self.counter >= list.len() { + self.counter = 0; + } + self.counter = self.counter.saturating_add(1); + if let Some(ep) = list.get(self.counter) { + return Some(*ep); + } + None + } +} diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..0d6f0cb --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,31 @@ +use std::borrow::Cow; + +use uuid::Uuid; + +pub(crate) fn generate_uuid() -> String { + Uuid::new_v4().to_string() +} + +fn remove_from_list(list: &mut Vec, value: &T) { + list.retain(|t| { + if t == value { + return false; + } + true + }); +} +pub(crate) fn hostname() -> Cow<'static, str> { + hostname::get() + .map(|s| Cow::Owned(s.to_string_lossy().to_string().to_lowercase())) + .unwrap_or_else(|_| Cow::Borrowed("unknown_host_name")) +} + +pub(crate) fn ip_list() -> Vec { + get_if_addrs::get_if_addrs() + .unwrap_or_default() + .iter() + .map(|interface| interface.addr.ip()) + .filter(|ip| ip.is_ipv4() && !ip.is_loopback()) + .map(|ip| ip.to_string()) + .collect() +}