Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ log = "0.4.17"
tokio = { version = "1.25.0", optional = true }
bincode = { version = "2.0.0-rc.3", optional = true }
async-stream = "0.3.5"

dashmap = "6.0.1"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this dependency should be optional.
Only included for "broadcast" feature.

Check for example:

remote = ["tokio/net", "bincode"]

[package.metadata.docs.rs]
features = ["remote", "broadcast", "sink-stream"]

Expand Down
85 changes: 33 additions & 52 deletions lib/src/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::future::Future;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

igumn@lenovo MINGW64 ~/gabriel2 (shitcodebykaushik-broacdcast)
$ cargo test
warning: virtual workspace defaulting to `resolver = "1"` despite one or more workspace members being on edition 2021 which implies `resolver = "2"`
note: to keep the current resolver, specify `workspace.resolver = "1"` in the workspace root's manifest
note: to use the edition 2021 resolver, specify `workspace.resolver = "2"` in the workspace root's manifest
note: for more details see https://doc.rust-lang.org/cargo/reference/resolver.html#resolver-versions
   Compiling gabriel2 v1.4.3 (C:\Users\igumn\gabriel2\lib)
error[E0252]: the name `DashMap` is defined multiple times
  --> lib\src\broadcast.rs:18:5
   |
3  | use dashmap::DashMap;
   |     ---------------- previous import of the type `DashMap` here
...
18 | use dashmap::DashMap;
   |     ^^^^^^^^^^^^^^^^ `DashMap` reimported here
   |
   = note: `DashMap` must be defined only once in the type namespace of this module

warning: unused import: `dashmap::DashMap`
  --> lib\src\broadcast.rs:18:5
   |
18 | use dashmap::DashMap;
   |     ^^^^^^^^^^^^^^^^
   |
   = note: `#[warn(unused_imports)]` on by default

error[E0308]: mismatched types
   --> lib\src\broadcast.rs:107:25
    |
107 |                     for (_id, callback) in subscribers {
    |                         ^^^^^^^^^^^^^^^    ----------- this is an iterator with items of type `dashmap::mapref::multiple::RefMulti<'_, usize, Pin<Box<(dyn EventCallback<E> + 'static)>>>`
    |                         |
    |                         expected `RefMulti<'_, usize, Pin<Box<dyn EventCallback<E>>>>`, found `(_, _)`
    |
    = note: expected struct `dashmap::mapref::multiple::RefMulti<'_, usize, Pin<Box<(dyn EventCallback<E> + 'static)>>>`
                found tuple `(_, _)`

Some errors have detailed explanations: E0252, E0308.
For more information about an error, try `rustc --explain E0252`.
warning: `gabriel2` (lib) generated 1 warning
error: could not compile `gabriel2` (lib) due to 2 previous errors; 1 warning emitted
warning: build failed, waiting for other jobs to finish...
warning: `gabriel2` (lib test) generated 1 warning (1 duplicate)
error: could not compile `gabriel2` (lib test) due to 2 previous errors; 1 warning emitted

use std::pin::Pin;
use dashmap::DashMap;

use crate::{SSSD};

use std::collections::HashMap;
use crate::{SSSD}; // Replace with your actual crate items

use std::sync::{
atomic::{AtomicUsize, Ordering},
Expand All @@ -12,27 +11,23 @@ use std::sync::{

use tokio::{
sync::{
mpsc::{
self,
error::{SendError},
Sender
},
Mutex
}
mpsc::{self, error::SendError, Sender},
},
};


use dashmap::DashMap;

/// Marker-trait for events
pub trait Event: Copy + Clone + SSSD {}
impl <S> Event for S where S: Copy + Clone + SSSD {}
pub trait Event: Copy + Clone + SSSD {} // Replace SSSD with your actual trait

impl<S> Event for S where S: Copy + Clone + SSSD {}

/// `EventCallback` is a marker-trait for callback which will be stored in subscribers
/// Auto-implemented
pub trait EventCallback<E>: Send + Sync + 'static {
fn call<'a>(&'a self, event: E) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
where
Self: 'a;
where
Self: 'a;
}

impl<F, E, Fut> EventCallback<E> for F
Expand All @@ -42,47 +37,47 @@ where
Fut: Future<Output = ()> + Send + 'static,
{
fn call<'a>(&'a self, event: E) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
where
Self: 'a,
where
Self: 'a,
{
Box::pin(async move {
self(event).await;
})
}
}

/// # Event Bus
// # Event Bus
///
/// `EventBus<E,R>`, where `E` implements `Event` marker trait `R` `implements EventCallback<E>`
/// `EventBus<E>`, where `E` implements `Event`
///
/// `EventBus` accessible from all actors
///
/// on `new()` Event Bus spawnin background task with loop
/// on `new()` Event Bus spawning background task with loop
/// which is checking for events
///
/// ## Fields
///
/// * `subscribers` - HashMap that contains id of subsriber and tuple of (Event, Callback) which is executes on event (Variant of `E`)
/// * `subscribers` - DashMap that contains id of subscriber and tuple of (Event, Callback) which is executed on event (Variant of `E`)
///
/// * `events` - Vector of all events
/// * `tx` - Sender channel for publishing events
///
/// * `counter` - Atomic counter for generating subscriber IDs
///
/// ## Methods
///
/// * `new` - Creating new instance of `EventBus`
///
/// * `publish` - Sends an event to a
/// * `publish` - Sends an event to all subscribers
///
/// * `subscribe` - Creates record in `subscribers` of enum variant and callback to that variant, returns subscriber's id
/// and unwinding event stack, it's promissing that subscribers will react on events which are published
/// after subscription only
/// * `subscribe` - Registers a callback for a specific event type
///
/// * `unsubscribe` - Remove record from `subscribers`
/// * `unsubscribe` - Removes a subscriber based on subscriber ID
///
pub struct EventBus<E>
where
E: Event,
{
subscribers: Arc<Mutex<HashMap<usize, Pin<Box<dyn EventCallback<E>>>>>>,
subscribers: Arc<DashMap<usize, Pin<Box<dyn EventCallback<E>>>>>,
tx: Sender<E>,
counter: AtomicUsize,
}
Expand All @@ -91,12 +86,11 @@ impl<E> EventBus<E>
where
E: Event,
{
// FIXME: Calls function only once for some reason...
pub fn new() -> Self {
let (tx, mut rx) = mpsc::channel(1000);

let event_bus = Self {
subscribers: Arc::new(Mutex::new(HashMap::new())),
subscribers: Arc::new(DashMap::new()),
tx: tx,
counter: AtomicUsize::new(0),
};
Expand All @@ -107,13 +101,11 @@ where
let handle = tokio::runtime::Handle::current();
{
let _join_handler = handle.spawn(async move {
loop {
while let Some(event) = rx.recv().await {
let subscribers = subs.lock().await;
log::trace!("processing event: {:?}", event);
for (_id, callback) in &*subscribers {
callback.call(event).await;
}
while let Some(event) = rx.recv().await {
let subscribers = subs.iter();
log::trace!("processing event: {:?}", event);
for (_id, callback) in subscribers {
callback.call(event).await;
}
}
});
Expand All @@ -132,29 +124,18 @@ where
where
F: EventCallback<E>,
{

let id = self.counter.fetch_add(1, Ordering::SeqCst);
log::trace!("New subscriber in event bus. id: {}", id);

let mut subscribers = self.subscribers.lock().await;
subscribers.insert(id, Box::pin(callback));
self.subscribers.insert(id, Box::pin(callback));

id
}

pub async fn unsubscribe(&self, subscriber_id: usize) {

log::trace!("Removed subscriber in event bus. id: {}", subscriber_id);

let mut subscribers = self.subscribers.lock().await;
subscribers.remove(&subscriber_id);
self.subscribers.remove(&subscriber_id);
}

}

// impl<E> Default for EventBus<E>
// where E: Event,
// {
// fn default() -> Self {
// Self::new()
// }
// }
// Revie this code