diff --git a/mea/src/oneshot/mod.rs b/mea/src/oneshot/mod.rs index 18d4245..d6d2908 100644 --- a/mea/src/oneshot/mod.rs +++ b/mea/src/oneshot/mod.rs @@ -264,6 +264,11 @@ impl fmt::Debug for Receiver { unsafe impl Send for Receiver {} +// The Receiver can NOT be `Sync`! The current receive implementations that take `&self` +// assume no other receive operation runs in parallel. + +impl Unpin for Receiver {} + impl IntoFuture for Receiver { type Output = Result; @@ -568,8 +573,12 @@ impl Channel { } #[inline(always)] - unsafe fn message(&self) -> &MaybeUninit { - unsafe { &*self.message.get() } + unsafe fn message(&self) -> &T { + // SAFETY: The caller guarantees that no other thread will access the message field. + let message_container = unsafe { &*self.message.get() }; + + // SAFETY: The caller guarantees that the message has been initialized. + unsafe { message_container.assume_init_ref() } } #[inline(always)] @@ -676,13 +685,23 @@ pub struct SendError { channel_ptr: NonNull>, } +// SAFETY: The SendError only contains a pointer to the channel. The constructor (if used +// correctly) guarantees exclusive ownership and access to the underlying channel. Since +// the message is Send (`T: Send`) it is safe to extract it or drop it via the SendError +// on any thread. unsafe impl Send for SendError {} + +// SAFETY: Same basic safety as described in the Send impl above. Plus the fact that `T` +// is `Sync` allows the SendError to be shared between threads and hand out `&T` references +// as well. unsafe impl Sync for SendError {} impl SendError { /// Get a reference to the message that failed to be sent. pub fn as_inner(&self) -> &T { - unsafe { self.channel_ptr.as_ref().message().assume_init_ref() } + // SAFETY: we have exclusive ownership of the channel and require that the message has + // been initialized upon construction. + unsafe { self.channel_ptr.as_ref().message() } } /// Consumes the error and returns the message that failed to be sent. @@ -708,12 +727,11 @@ impl SendError { impl Drop for SendError { fn drop(&mut self) { - // SAFETY: we have ownership of the channel and require that the message is initialized - // upon construction - unsafe { - self.channel_ptr.as_ref().drop_message(); - dealloc(self.channel_ptr); - } + // SAFETY: there is a properly initialized message + unsafe { self.channel_ptr.as_ref().drop_message() }; + + // SAFETY: we own the channel + unsafe { dealloc(self.channel_ptr) }; } } @@ -725,7 +743,7 @@ impl fmt::Display for SendError { impl fmt::Debug for SendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "SendError<{}>(..)", stringify!(T)) + write!(f, "SendError<{}>(..)", core::any::type_name::()) } } diff --git a/mea/src/oneshot/tests.rs b/mea/src/oneshot/tests.rs index 8c8967a..af394c9 100644 --- a/mea/src/oneshot/tests.rs +++ b/mea/src/oneshot/tests.rs @@ -307,3 +307,20 @@ fn poll_then_drop_receiver_during_send() { // The send operation should also not have panicked t.join().unwrap(); } + +#[test] +fn dropping_sender_disconnects_async_receiver() { + let (sender, receiver) = oneshot::channel::<()>(); + assert!(!sender.is_closed()); + assert!(!receiver.is_closed()); + drop(sender); + assert!(receiver.is_closed()); +} + +#[test] +fn async_receiver_has_message() { + let (sender, receiver) = oneshot::channel(); + assert!(!receiver.has_message()); + assert!(sender.send(19i128).is_ok()); + assert!(receiver.has_message()); +}