Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions mea/src/oneshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ impl<T> fmt::Debug for Receiver<T> {

unsafe impl<T: Send> Send for Receiver<T> {}

// The Receiver can NOT be `Sync`! The current receive implementations that take `&self`
// assume no other receive operation runs in parallel.

impl<T> Unpin for Receiver<T> {}

impl<T> IntoFuture for Receiver<T> {
type Output = Result<T, RecvError>;

Expand Down Expand Up @@ -568,8 +573,12 @@ impl<T> Channel<T> {
}

#[inline(always)]
unsafe fn message(&self) -> &MaybeUninit<T> {
unsafe { &*self.message.get() }
unsafe fn message(&self) -> &T {
// SAFETY: The caller guarantees that no other thread will access the message field.
let message_container = unsafe { &*self.message.get() };

// SAFETY: The caller guarantees that the message has been initialized.
unsafe { message_container.assume_init_ref() }
}

#[inline(always)]
Expand Down Expand Up @@ -676,13 +685,23 @@ pub struct SendError<T> {
channel_ptr: NonNull<Channel<T>>,
}

// SAFETY: The SendError only contains a pointer to the channel. The constructor (if used
// correctly) guarantees exclusive ownership and access to the underlying channel. Since
// the message is Send (`T: Send`) it is safe to extract it or drop it via the SendError
// on any thread.
unsafe impl<T: Send> Send for SendError<T> {}

// SAFETY: Same basic safety as described in the Send impl above. Plus the fact that `T`
// is `Sync` allows the SendError to be shared between threads and hand out `&T` references
// as well.
unsafe impl<T: Sync> Sync for SendError<T> {}

impl<T> SendError<T> {
/// Get a reference to the message that failed to be sent.
pub fn as_inner(&self) -> &T {
unsafe { self.channel_ptr.as_ref().message().assume_init_ref() }
// SAFETY: we have exclusive ownership of the channel and require that the message has
// been initialized upon construction.
unsafe { self.channel_ptr.as_ref().message() }
}

/// Consumes the error and returns the message that failed to be sent.
Expand All @@ -708,12 +727,11 @@ impl<T> SendError<T> {

impl<T> Drop for SendError<T> {
fn drop(&mut self) {
// SAFETY: we have ownership of the channel and require that the message is initialized
// upon construction
unsafe {
self.channel_ptr.as_ref().drop_message();
dealloc(self.channel_ptr);
}
// SAFETY: there is a properly initialized message
unsafe { self.channel_ptr.as_ref().drop_message() };

// SAFETY: we own the channel
unsafe { dealloc(self.channel_ptr) };
}
}

Expand All @@ -725,7 +743,7 @@ impl<T> fmt::Display for SendError<T> {

impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SendError<{}>(..)", stringify!(T))
write!(f, "SendError<{}>(..)", core::any::type_name::<T>())
}
}

Expand Down
17 changes: 17 additions & 0 deletions mea/src/oneshot/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,20 @@ fn poll_then_drop_receiver_during_send() {
// The send operation should also not have panicked
t.join().unwrap();
}

#[test]
fn dropping_sender_disconnects_async_receiver() {
let (sender, receiver) = oneshot::channel::<()>();
assert!(!sender.is_closed());
assert!(!receiver.is_closed());
drop(sender);
assert!(receiver.is_closed());
}

#[test]
fn async_receiver_has_message() {
let (sender, receiver) = oneshot::channel();
assert!(!receiver.has_message());
assert!(sender.send(19i128).is_ok());
assert!(receiver.has_message());
}