futures_channel/mpsc/
mod.rs

1//! A multi-producer, single-consumer queue for sending values across
2//! asynchronous tasks.
3//!
4//! Similarly to the `std`, channel creation provides [`Receiver`] and
5//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6//! read values out of the channel. If there is no message to read from the
7//! channel, the current task will be notified when a new value is sent.
8//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9//! the channel. If the channel is at capacity, the send will be rejected and
10//! the task will be notified when additional capacity is available. In other
11//! words, the channel provides backpressure.
12//!
13//! Unbounded channels are also available using the `unbounded` constructor.
14//!
15//! # Disconnection
16//!
17//! When all [`Sender`] handles have been dropped, it is no longer
18//! possible to send values into the channel. This is considered the termination
19//! event of the stream. As such, [`Receiver::poll_next`]
20//! will return `Ok(Ready(None))`.
21//!
22//! If the [`Receiver`] handle is dropped, then messages can no longer
23//! be read out of the channel. In this case, all further attempts to send will
24//! result in an error.
25//!
26//! # Clean Shutdown
27//!
28//! If the [`Receiver`] is simply dropped, then it is possible for
29//! there to be messages still in the channel that will not be processed. As
30//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31//! receiver will first call `close`, which will prevent any further messages to
32//! be sent into the channel. Then, the receiver consumes the channel to
33//! completion, at which point the receiver can be dropped.
34//!
35//! [`Sender`]: struct.Sender.html
36//! [`Receiver`]: struct.Receiver.html
37//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38//! [`Receiver::poll_next`]:
39//!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40
41// At the core, the channel uses an atomic FIFO queue for message passing. This
42// queue is used as the primary coordination primitive. In order to enforce
43// capacity limits and handle back pressure, a secondary FIFO queue is used to
44// send parked task handles.
45//
46// The general idea is that the channel is created with a `buffer` size of `n`.
47// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48// slot to hold a message. This allows `Sender` to know for a fact that a send
49// will succeed *before* starting to do the actual work of sending the value.
50// Since most of this work is lock-free, once the work starts, it is impossible
51// to safely revert.
52//
53// If the sender is unable to process a send operation, then the current
54// task is parked and the handle is sent on the parked task queue.
55//
56// Note that the implementation guarantees that the channel capacity will never
57// exceed the configured limit, however there is no *strict* guarantee that the
58// receiver will wake up a parked task *immediately* when a slot becomes
59// available. However, it will almost always unpark a task when a slot becomes
60// available and it is *guaranteed* that a sender will be unparked when the
61// message that caused the sender to become parked is read out of the channel.
62//
63// The steps for sending a message are roughly:
64//
65// 1) Increment the channel message count
66// 2) If the channel is at capacity, push the task handle onto the wait queue
67// 3) Push the message onto the message queue.
68//
69// The steps for receiving a message are roughly:
70//
71// 1) Pop a message from the message queue
72// 2) Pop a task handle from the wait queue
73// 3) Decrement the channel message count.
74//
75// It's important for the order of operations on lock-free structures to happen
76// in reverse order between the sender and receiver. This makes the message
77// queue the primary coordination structure and establishes the necessary
78// happens-before semantics required for the acquire / release semantics used
79// by the queue structure.
80
81use futures_core::stream::{FusedStream, Stream};
82use futures_core::task::__internal::AtomicWaker;
83use futures_core::task::{Context, Poll, Waker};
84use std::fmt;
85use std::pin::Pin;
86use std::sync::atomic::AtomicUsize;
87use std::sync::atomic::Ordering::SeqCst;
88use std::sync::{Arc, Mutex};
89use std::thread;
90
91use crate::mpsc::queue::Queue;
92
93mod queue;
94#[cfg(feature = "sink")]
95mod sink_impl;
96
97#[derive(Debug)]
98struct UnboundedSenderInner<T> {
99    // Channel state shared between the sender and receiver.
100    inner: Arc<UnboundedInner<T>>,
101}
102
103#[derive(Debug)]
104struct BoundedSenderInner<T> {
105    // Channel state shared between the sender and receiver.
106    inner: Arc<BoundedInner<T>>,
107
108    // Handle to the task that is blocked on this sender. This handle is sent
109    // to the receiver half in order to be notified when the sender becomes
110    // unblocked.
111    sender_task: Arc<Mutex<SenderTask>>,
112
113    // `true` if the sender might be blocked. This is an optimization to avoid
114    // having to lock the mutex most of the time.
115    maybe_parked: bool,
116}
117
118// We never project Pin<&mut SenderInner> to `Pin<&mut T>`
119impl<T> Unpin for UnboundedSenderInner<T> {}
120impl<T> Unpin for BoundedSenderInner<T> {}
121
122/// The transmission end of a bounded mpsc channel.
123///
124/// This value is created by the [`channel`](channel) function.
125#[derive(Debug)]
126pub struct Sender<T>(Option<BoundedSenderInner<T>>);
127
128/// The transmission end of an unbounded mpsc channel.
129///
130/// This value is created by the [`unbounded`](unbounded) function.
131#[derive(Debug)]
132pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
133
134trait AssertKinds: Send + Sync + Clone {}
135impl AssertKinds for UnboundedSender<u32> {}
136
137/// The receiving end of a bounded mpsc channel.
138///
139/// This value is created by the [`channel`](channel) function.
140#[derive(Debug)]
141pub struct Receiver<T> {
142    inner: Option<Arc<BoundedInner<T>>>,
143}
144
145/// The receiving end of an unbounded mpsc channel.
146///
147/// This value is created by the [`unbounded`](unbounded) function.
148#[derive(Debug)]
149pub struct UnboundedReceiver<T> {
150    inner: Option<Arc<UnboundedInner<T>>>,
151}
152
153// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
154impl<T> Unpin for UnboundedReceiver<T> {}
155
156/// The error type for [`Sender`s](Sender) used as `Sink`s.
157#[derive(Clone, Debug, PartialEq, Eq)]
158pub struct SendError {
159    kind: SendErrorKind,
160}
161
162/// The error type returned from [`try_send`](Sender::try_send).
163#[derive(Clone, PartialEq, Eq)]
164pub struct TrySendError<T> {
165    err: SendError,
166    val: T,
167}
168
169#[derive(Clone, Debug, PartialEq, Eq)]
170enum SendErrorKind {
171    Full,
172    Disconnected,
173}
174
175/// The error type returned from [`try_next`](Receiver::try_next).
176pub struct TryRecvError {
177    _priv: (),
178}
179
180impl fmt::Display for SendError {
181    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182        if self.is_full() {
183            write!(f, "send failed because channel is full")
184        } else {
185            write!(f, "send failed because receiver is gone")
186        }
187    }
188}
189
190impl std::error::Error for SendError {}
191
192impl SendError {
193    /// Returns `true` if this error is a result of the channel being full.
194    pub fn is_full(&self) -> bool {
195        match self.kind {
196            SendErrorKind::Full => true,
197            _ => false,
198        }
199    }
200
201    /// Returns `true` if this error is a result of the receiver being dropped.
202    pub fn is_disconnected(&self) -> bool {
203        match self.kind {
204            SendErrorKind::Disconnected => true,
205            _ => false,
206        }
207    }
208}
209
210impl<T> fmt::Debug for TrySendError<T> {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
213    }
214}
215
216impl<T> fmt::Display for TrySendError<T> {
217    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218        if self.is_full() {
219            write!(f, "send failed because channel is full")
220        } else {
221            write!(f, "send failed because receiver is gone")
222        }
223    }
224}
225
226impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
227
228impl<T> TrySendError<T> {
229    /// Returns `true` if this error is a result of the channel being full.
230    pub fn is_full(&self) -> bool {
231        self.err.is_full()
232    }
233
234    /// Returns `true` if this error is a result of the receiver being dropped.
235    pub fn is_disconnected(&self) -> bool {
236        self.err.is_disconnected()
237    }
238
239    /// Returns the message that was attempted to be sent but failed.
240    pub fn into_inner(self) -> T {
241        self.val
242    }
243
244    /// Drops the message and converts into a `SendError`.
245    pub fn into_send_error(self) -> SendError {
246        self.err
247    }
248}
249
250impl fmt::Debug for TryRecvError {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        f.debug_tuple("TryRecvError").finish()
253    }
254}
255
256impl fmt::Display for TryRecvError {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        write!(f, "receiver channel is empty")
259    }
260}
261
262impl std::error::Error for TryRecvError {}
263
264#[derive(Debug)]
265struct UnboundedInner<T> {
266    // Internal channel state. Consists of the number of messages stored in the
267    // channel as well as a flag signalling that the channel is closed.
268    state: AtomicUsize,
269
270    // Atomic, FIFO queue used to send messages to the receiver
271    message_queue: Queue<T>,
272
273    // Number of senders in existence
274    num_senders: AtomicUsize,
275
276    // Handle to the receiver's task.
277    recv_task: AtomicWaker,
278}
279
280#[derive(Debug)]
281struct BoundedInner<T> {
282    // Max buffer size of the channel. If `None` then the channel is unbounded.
283    buffer: usize,
284
285    // Internal channel state. Consists of the number of messages stored in the
286    // channel as well as a flag signalling that the channel is closed.
287    state: AtomicUsize,
288
289    // Atomic, FIFO queue used to send messages to the receiver
290    message_queue: Queue<T>,
291
292    // Atomic, FIFO queue used to send parked task handles to the receiver.
293    parked_queue: Queue<Arc<Mutex<SenderTask>>>,
294
295    // Number of senders in existence
296    num_senders: AtomicUsize,
297
298    // Handle to the receiver's task.
299    recv_task: AtomicWaker,
300}
301
302// Struct representation of `Inner::state`.
303#[derive(Debug, Clone, Copy)]
304struct State {
305    // `true` when the channel is open
306    is_open: bool,
307
308    // Number of messages in the channel
309    num_messages: usize,
310}
311
312// The `is_open` flag is stored in the left-most bit of `Inner::state`
313const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
314
315// When a new channel is created, it is created in the open state with no
316// pending messages.
317const INIT_STATE: usize = OPEN_MASK;
318
319// The maximum number of messages that a channel can track is `usize::max_value() >> 1`
320const MAX_CAPACITY: usize = !(OPEN_MASK);
321
322// The maximum requested buffer size must be less than the maximum capacity of
323// a channel. This is because each sender gets a guaranteed slot.
324const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
325
326// Sent to the consumer to wake up blocked producers
327#[derive(Debug)]
328struct SenderTask {
329    task: Option<Waker>,
330    is_parked: bool,
331}
332
333impl SenderTask {
334    fn new() -> Self {
335        Self { task: None, is_parked: false }
336    }
337
338    fn notify(&mut self) {
339        self.is_parked = false;
340
341        if let Some(task) = self.task.take() {
342            task.wake();
343        }
344    }
345}
346
347/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
348///
349/// Being bounded, this channel provides backpressure to ensure that the sender
350/// outpaces the receiver by only a limited amount. The channel's capacity is
351/// equal to `buffer + num-senders`. In other words, each sender gets a
352/// guaranteed slot in the channel capacity, and on top of that there are
353/// `buffer` "first come, first serve" slots available to all senders.
354///
355/// The [`Receiver`](Receiver) returned implements the
356/// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
357/// `Sink`.
358pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
359    // Check that the requested buffer size does not exceed the maximum buffer
360    // size permitted by the system.
361    assert!(buffer < MAX_BUFFER, "requested buffer size too large");
362
363    let inner = Arc::new(BoundedInner {
364        buffer,
365        state: AtomicUsize::new(INIT_STATE),
366        message_queue: Queue::new(),
367        parked_queue: Queue::new(),
368        num_senders: AtomicUsize::new(1),
369        recv_task: AtomicWaker::new(),
370    });
371
372    let tx = BoundedSenderInner {
373        inner: inner.clone(),
374        sender_task: Arc::new(Mutex::new(SenderTask::new())),
375        maybe_parked: false,
376    };
377
378    let rx = Receiver { inner: Some(inner) };
379
380    (Sender(Some(tx)), rx)
381}
382
383/// Creates an unbounded mpsc channel for communicating between asynchronous
384/// tasks.
385///
386/// A `send` on this channel will always succeed as long as the receive half has
387/// not been closed. If the receiver falls behind, messages will be arbitrarily
388/// buffered.
389///
390/// **Note** that the amount of available system memory is an implicit bound to
391/// the channel. Using an `unbounded` channel has the ability of causing the
392/// process to run out of memory. In this case, the process will be aborted.
393pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
394    let inner = Arc::new(UnboundedInner {
395        state: AtomicUsize::new(INIT_STATE),
396        message_queue: Queue::new(),
397        num_senders: AtomicUsize::new(1),
398        recv_task: AtomicWaker::new(),
399    });
400
401    let tx = UnboundedSenderInner { inner: inner.clone() };
402
403    let rx = UnboundedReceiver { inner: Some(inner) };
404
405    (UnboundedSender(Some(tx)), rx)
406}
407
408/*
409 *
410 * ===== impl Sender =====
411 *
412 */
413
414impl<T> UnboundedSenderInner<T> {
415    fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
416        let state = decode_state(self.inner.state.load(SeqCst));
417        if state.is_open {
418            Poll::Ready(Ok(()))
419        } else {
420            Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
421        }
422    }
423
424    // Push message to the queue and signal to the receiver
425    fn queue_push_and_signal(&self, msg: T) {
426        // Push the message onto the message queue
427        self.inner.message_queue.push(msg);
428
429        // Signal to the receiver that a message has been enqueued. If the
430        // receiver is parked, this will unpark the task.
431        self.inner.recv_task.wake();
432    }
433
434    // Increment the number of queued messages. Returns the resulting number.
435    fn inc_num_messages(&self) -> Option<usize> {
436        let mut curr = self.inner.state.load(SeqCst);
437
438        loop {
439            let mut state = decode_state(curr);
440
441            // The receiver end closed the channel.
442            if !state.is_open {
443                return None;
444            }
445
446            // This probably is never hit? Odds are the process will run out of
447            // memory first. It may be worth to return something else in this
448            // case?
449            assert!(
450                state.num_messages < MAX_CAPACITY,
451                "buffer space \
452                    exhausted; sending this messages would overflow the state"
453            );
454
455            state.num_messages += 1;
456
457            let next = encode_state(&state);
458            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
459                Ok(_) => return Some(state.num_messages),
460                Err(actual) => curr = actual,
461            }
462        }
463    }
464
465    /// Returns whether the senders send to the same receiver.
466    fn same_receiver(&self, other: &Self) -> bool {
467        Arc::ptr_eq(&self.inner, &other.inner)
468    }
469
470    /// Returns whether the sender send to this receiver.
471    fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
472        Arc::ptr_eq(&self.inner, inner)
473    }
474
475    /// Returns pointer to the Arc containing sender
476    ///
477    /// The returned pointer is not referenced and should be only used for hashing!
478    fn ptr(&self) -> *const UnboundedInner<T> {
479        &*self.inner
480    }
481
482    /// Returns whether this channel is closed without needing a context.
483    fn is_closed(&self) -> bool {
484        !decode_state(self.inner.state.load(SeqCst)).is_open
485    }
486
487    /// Closes this channel from the sender side, preventing any new messages.
488    fn close_channel(&self) {
489        // There's no need to park this sender, its dropping,
490        // and we don't want to check for capacity, so skip
491        // that stuff from `do_send`.
492
493        self.inner.set_closed();
494        self.inner.recv_task.wake();
495    }
496}
497
498impl<T> BoundedSenderInner<T> {
499    /// Attempts to send a message on this `Sender`, returning the message
500    /// if there was an error.
501    fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
502        // If the sender is currently blocked, reject the message
503        if !self.poll_unparked(None).is_ready() {
504            return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
505        }
506
507        // The channel has capacity to accept the message, so send it
508        self.do_send_b(msg)
509    }
510
511    // Do the send without failing.
512    // Can be called only by bounded sender.
513    fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
514        // Anyone calling do_send *should* make sure there is room first,
515        // but assert here for tests as a sanity check.
516        debug_assert!(self.poll_unparked(None).is_ready());
517
518        // First, increment the number of messages contained by the channel.
519        // This operation will also atomically determine if the sender task
520        // should be parked.
521        //
522        // `None` is returned in the case that the channel has been closed by the
523        // receiver. This happens when `Receiver::close` is called or the
524        // receiver is dropped.
525        let park_self = match self.inc_num_messages() {
526            Some(num_messages) => {
527                // Block if the current number of pending messages has exceeded
528                // the configured buffer size
529                num_messages > self.inner.buffer
530            }
531            None => {
532                return Err(TrySendError {
533                    err: SendError { kind: SendErrorKind::Disconnected },
534                    val: msg,
535                })
536            }
537        };
538
539        // If the channel has reached capacity, then the sender task needs to
540        // be parked. This will send the task handle on the parked task queue.
541        //
542        // However, when `do_send` is called while dropping the `Sender`,
543        // `task::current()` can't be called safely. In this case, in order to
544        // maintain internal consistency, a blank message is pushed onto the
545        // parked task queue.
546        if park_self {
547            self.park();
548        }
549
550        self.queue_push_and_signal(msg);
551
552        Ok(())
553    }
554
555    // Push message to the queue and signal to the receiver
556    fn queue_push_and_signal(&self, msg: T) {
557        // Push the message onto the message queue
558        self.inner.message_queue.push(msg);
559
560        // Signal to the receiver that a message has been enqueued. If the
561        // receiver is parked, this will unpark the task.
562        self.inner.recv_task.wake();
563    }
564
565    // Increment the number of queued messages. Returns the resulting number.
566    fn inc_num_messages(&self) -> Option<usize> {
567        let mut curr = self.inner.state.load(SeqCst);
568
569        loop {
570            let mut state = decode_state(curr);
571
572            // The receiver end closed the channel.
573            if !state.is_open {
574                return None;
575            }
576
577            // This probably is never hit? Odds are the process will run out of
578            // memory first. It may be worth to return something else in this
579            // case?
580            assert!(
581                state.num_messages < MAX_CAPACITY,
582                "buffer space \
583                    exhausted; sending this messages would overflow the state"
584            );
585
586            state.num_messages += 1;
587
588            let next = encode_state(&state);
589            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
590                Ok(_) => return Some(state.num_messages),
591                Err(actual) => curr = actual,
592            }
593        }
594    }
595
596    fn park(&mut self) {
597        {
598            let mut sender = self.sender_task.lock().unwrap();
599            sender.task = None;
600            sender.is_parked = true;
601        }
602
603        // Send handle over queue
604        let t = self.sender_task.clone();
605        self.inner.parked_queue.push(t);
606
607        // Check to make sure we weren't closed after we sent our task on the
608        // queue
609        let state = decode_state(self.inner.state.load(SeqCst));
610        self.maybe_parked = state.is_open;
611    }
612
613    /// Polls the channel to determine if there is guaranteed capacity to send
614    /// at least one item without waiting.
615    ///
616    /// # Return value
617    ///
618    /// This method returns:
619    ///
620    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
621    /// - `Poll::Pending` if the channel may not have
622    ///   capacity, in which case the current task is queued to be notified once
623    ///   capacity is available;
624    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
625    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
626        let state = decode_state(self.inner.state.load(SeqCst));
627        if !state.is_open {
628            return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
629        }
630
631        self.poll_unparked(Some(cx)).map(Ok)
632    }
633
634    /// Returns whether the senders send to the same receiver.
635    fn same_receiver(&self, other: &Self) -> bool {
636        Arc::ptr_eq(&self.inner, &other.inner)
637    }
638
639    /// Returns whether the sender send to this receiver.
640    fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
641        Arc::ptr_eq(&self.inner, receiver)
642    }
643
644    /// Returns pointer to the Arc containing sender
645    ///
646    /// The returned pointer is not referenced and should be only used for hashing!
647    fn ptr(&self) -> *const BoundedInner<T> {
648        &*self.inner
649    }
650
651    /// Returns whether this channel is closed without needing a context.
652    fn is_closed(&self) -> bool {
653        !decode_state(self.inner.state.load(SeqCst)).is_open
654    }
655
656    /// Closes this channel from the sender side, preventing any new messages.
657    fn close_channel(&self) {
658        // There's no need to park this sender, its dropping,
659        // and we don't want to check for capacity, so skip
660        // that stuff from `do_send`.
661
662        self.inner.set_closed();
663        self.inner.recv_task.wake();
664    }
665
666    fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
667        // First check the `maybe_parked` variable. This avoids acquiring the
668        // lock in most cases
669        if self.maybe_parked {
670            // Get a lock on the task handle
671            let mut task = self.sender_task.lock().unwrap();
672
673            if !task.is_parked {
674                self.maybe_parked = false;
675                return Poll::Ready(());
676            }
677
678            // At this point, an unpark request is pending, so there will be an
679            // unpark sometime in the future. We just need to make sure that
680            // the correct task will be notified.
681            //
682            // Update the task in case the `Sender` has been moved to another
683            // task
684            task.task = cx.map(|cx| cx.waker().clone());
685
686            Poll::Pending
687        } else {
688            Poll::Ready(())
689        }
690    }
691}
692
693impl<T> Sender<T> {
694    /// Attempts to send a message on this `Sender`, returning the message
695    /// if there was an error.
696    pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
697        if let Some(inner) = &mut self.0 {
698            inner.try_send(msg)
699        } else {
700            Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
701        }
702    }
703
704    /// Send a message on the channel.
705    ///
706    /// This function should only be called after
707    /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
708    /// ready to receive a message.
709    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
710        self.try_send(msg).map_err(|e| e.err)
711    }
712
713    /// Polls the channel to determine if there is guaranteed capacity to send
714    /// at least one item without waiting.
715    ///
716    /// # Return value
717    ///
718    /// This method returns:
719    ///
720    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
721    /// - `Poll::Pending` if the channel may not have
722    ///   capacity, in which case the current task is queued to be notified once
723    ///   capacity is available;
724    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
725    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
726        let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
727        inner.poll_ready(cx)
728    }
729
730    /// Returns whether this channel is closed without needing a context.
731    pub fn is_closed(&self) -> bool {
732        self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
733    }
734
735    /// Closes this channel from the sender side, preventing any new messages.
736    pub fn close_channel(&mut self) {
737        if let Some(inner) = &mut self.0 {
738            inner.close_channel();
739        }
740    }
741
742    /// Disconnects this sender from the channel, closing it if there are no more senders left.
743    pub fn disconnect(&mut self) {
744        self.0 = None;
745    }
746
747    /// Returns whether the senders send to the same receiver.
748    pub fn same_receiver(&self, other: &Self) -> bool {
749        match (&self.0, &other.0) {
750            (Some(inner), Some(other)) => inner.same_receiver(other),
751            _ => false,
752        }
753    }
754
755    /// Returns whether the sender send to this receiver.
756    pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
757        match (&self.0, &receiver.inner) {
758            (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
759            _ => false,
760        }
761    }
762
763    /// Hashes the receiver into the provided hasher
764    pub fn hash_receiver<H>(&self, hasher: &mut H)
765    where
766        H: std::hash::Hasher,
767    {
768        use std::hash::Hash;
769
770        let ptr = self.0.as_ref().map(|inner| inner.ptr());
771        ptr.hash(hasher);
772    }
773}
774
775impl<T> UnboundedSender<T> {
776    /// Check if the channel is ready to receive a message.
777    pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
778        let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
779        inner.poll_ready_nb()
780    }
781
782    /// Returns whether this channel is closed without needing a context.
783    pub fn is_closed(&self) -> bool {
784        self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
785    }
786
787    /// Closes this channel from the sender side, preventing any new messages.
788    pub fn close_channel(&self) {
789        if let Some(inner) = &self.0 {
790            inner.close_channel();
791        }
792    }
793
794    /// Disconnects this sender from the channel, closing it if there are no more senders left.
795    pub fn disconnect(&mut self) {
796        self.0 = None;
797    }
798
799    // Do the send without parking current task.
800    fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
801        if let Some(inner) = &self.0 {
802            if inner.inc_num_messages().is_some() {
803                inner.queue_push_and_signal(msg);
804                return Ok(());
805            }
806        }
807
808        Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
809    }
810
811    /// Send a message on the channel.
812    ///
813    /// This method should only be called after `poll_ready` has been used to
814    /// verify that the channel is ready to receive a message.
815    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
816        self.do_send_nb(msg).map_err(|e| e.err)
817    }
818
819    /// Sends a message along this channel.
820    ///
821    /// This is an unbounded sender, so this function differs from `Sink::send`
822    /// by ensuring the return type reflects that the channel is always ready to
823    /// receive messages.
824    pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
825        self.do_send_nb(msg)
826    }
827
828    /// Returns whether the senders send to the same receiver.
829    pub fn same_receiver(&self, other: &Self) -> bool {
830        match (&self.0, &other.0) {
831            (Some(inner), Some(other)) => inner.same_receiver(other),
832            _ => false,
833        }
834    }
835
836    /// Returns whether the sender send to this receiver.
837    pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
838        match (&self.0, &receiver.inner) {
839            (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
840            _ => false,
841        }
842    }
843
844    /// Hashes the receiver into the provided hasher
845    pub fn hash_receiver<H>(&self, hasher: &mut H)
846    where
847        H: std::hash::Hasher,
848    {
849        use std::hash::Hash;
850
851        let ptr = self.0.as_ref().map(|inner| inner.ptr());
852        ptr.hash(hasher);
853    }
854}
855
856impl<T> Clone for Sender<T> {
857    fn clone(&self) -> Self {
858        Self(self.0.clone())
859    }
860}
861
862impl<T> Clone for UnboundedSender<T> {
863    fn clone(&self) -> Self {
864        Self(self.0.clone())
865    }
866}
867
868impl<T> Clone for UnboundedSenderInner<T> {
869    fn clone(&self) -> Self {
870        // Since this atomic op isn't actually guarding any memory and we don't
871        // care about any orderings besides the ordering on the single atomic
872        // variable, a relaxed ordering is acceptable.
873        let mut curr = self.inner.num_senders.load(SeqCst);
874
875        loop {
876            // If the maximum number of senders has been reached, then fail
877            if curr == MAX_BUFFER {
878                panic!("cannot clone `Sender` -- too many outstanding senders");
879            }
880
881            debug_assert!(curr < MAX_BUFFER);
882
883            let next = curr + 1;
884            match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
885                Ok(_) => {
886                    // The ABA problem doesn't matter here. We only care that the
887                    // number of senders never exceeds the maximum.
888                    return Self { inner: self.inner.clone() };
889                }
890                Err(actual) => curr = actual,
891            }
892        }
893    }
894}
895
896impl<T> Clone for BoundedSenderInner<T> {
897    fn clone(&self) -> Self {
898        // Since this atomic op isn't actually guarding any memory and we don't
899        // care about any orderings besides the ordering on the single atomic
900        // variable, a relaxed ordering is acceptable.
901        let mut curr = self.inner.num_senders.load(SeqCst);
902
903        loop {
904            // If the maximum number of senders has been reached, then fail
905            if curr == self.inner.max_senders() {
906                panic!("cannot clone `Sender` -- too many outstanding senders");
907            }
908
909            debug_assert!(curr < self.inner.max_senders());
910
911            let next = curr + 1;
912            match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
913                Ok(_) => {
914                    // The ABA problem doesn't matter here. We only care that the
915                    // number of senders never exceeds the maximum.
916                    return Self {
917                        inner: self.inner.clone(),
918                        sender_task: Arc::new(Mutex::new(SenderTask::new())),
919                        maybe_parked: false,
920                    };
921                }
922                Err(actual) => curr = actual,
923            }
924        }
925    }
926}
927
928impl<T> Drop for UnboundedSenderInner<T> {
929    fn drop(&mut self) {
930        // Ordering between variables don't matter here
931        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
932
933        if prev == 1 {
934            self.close_channel();
935        }
936    }
937}
938
939impl<T> Drop for BoundedSenderInner<T> {
940    fn drop(&mut self) {
941        // Ordering between variables don't matter here
942        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
943
944        if prev == 1 {
945            self.close_channel();
946        }
947    }
948}
949
950/*
951 *
952 * ===== impl Receiver =====
953 *
954 */
955
956impl<T> Receiver<T> {
957    /// Closes the receiving half of a channel, without dropping it.
958    ///
959    /// This prevents any further messages from being sent on the channel while
960    /// still enabling the receiver to drain messages that are buffered.
961    pub fn close(&mut self) {
962        if let Some(inner) = &mut self.inner {
963            inner.set_closed();
964
965            // Wake up any threads waiting as they'll see that we've closed the
966            // channel and will continue on their merry way.
967            while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
968                task.lock().unwrap().notify();
969            }
970        }
971    }
972
973    /// Tries to receive the next message without notifying a context if empty.
974    ///
975    /// It is not recommended to call this function from inside of a future,
976    /// only when you've otherwise arranged to be notified when the channel is
977    /// no longer empty.
978    ///
979    /// This function returns:
980    /// * `Ok(Some(t))` when message is fetched
981    /// * `Ok(None)` when channel is closed and no messages left in the queue
982    /// * `Err(e)` when there are no messages available, but channel is not yet closed
983    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
984        match self.next_message() {
985            Poll::Ready(msg) => Ok(msg),
986            Poll::Pending => Err(TryRecvError { _priv: () }),
987        }
988    }
989
990    fn next_message(&mut self) -> Poll<Option<T>> {
991        let inner = match self.inner.as_mut() {
992            None => return Poll::Ready(None),
993            Some(inner) => inner,
994        };
995        // Pop off a message
996        match unsafe { inner.message_queue.pop_spin() } {
997            Some(msg) => {
998                // If there are any parked task handles in the parked queue,
999                // pop one and unpark it.
1000                self.unpark_one();
1001
1002                // Decrement number of messages
1003                self.dec_num_messages();
1004
1005                Poll::Ready(Some(msg))
1006            }
1007            None => {
1008                let state = decode_state(inner.state.load(SeqCst));
1009                if state.is_closed() {
1010                    // If closed flag is set AND there are no pending messages
1011                    // it means end of stream
1012                    self.inner = None;
1013                    Poll::Ready(None)
1014                } else {
1015                    // If queue is open, we need to return Pending
1016                    // to be woken up when new messages arrive.
1017                    // If queue is closed but num_messages is non-zero,
1018                    // it means that senders updated the state,
1019                    // but didn't put message to queue yet,
1020                    // so we need to park until sender unparks the task
1021                    // after queueing the message.
1022                    Poll::Pending
1023                }
1024            }
1025        }
1026    }
1027
1028    // Unpark a single task handle if there is one pending in the parked queue
1029    fn unpark_one(&mut self) {
1030        if let Some(inner) = &mut self.inner {
1031            if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1032                task.lock().unwrap().notify();
1033            }
1034        }
1035    }
1036
1037    fn dec_num_messages(&self) {
1038        if let Some(inner) = &self.inner {
1039            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1040            // unless there's underflow, and we know there's no underflow
1041            // because number of messages at this point is always > 0.
1042            inner.state.fetch_sub(1, SeqCst);
1043        }
1044    }
1045}
1046
1047// The receiver does not ever take a Pin to the inner T
1048impl<T> Unpin for Receiver<T> {}
1049
1050impl<T> FusedStream for Receiver<T> {
1051    fn is_terminated(&self) -> bool {
1052        self.inner.is_none()
1053    }
1054}
1055
1056impl<T> Stream for Receiver<T> {
1057    type Item = T;
1058
1059    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1060        // Try to read a message off of the message queue.
1061        match self.next_message() {
1062            Poll::Ready(msg) => {
1063                if msg.is_none() {
1064                    self.inner = None;
1065                }
1066                Poll::Ready(msg)
1067            }
1068            Poll::Pending => {
1069                // There are no messages to read, in this case, park.
1070                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1071                // Check queue again after parking to prevent race condition:
1072                // a message could be added to the queue after previous `next_message`
1073                // before `register` call.
1074                self.next_message()
1075            }
1076        }
1077    }
1078}
1079
1080impl<T> Drop for Receiver<T> {
1081    fn drop(&mut self) {
1082        // Drain the channel of all pending messages
1083        self.close();
1084        if self.inner.is_some() {
1085            loop {
1086                match self.next_message() {
1087                    Poll::Ready(Some(_)) => {}
1088                    Poll::Ready(None) => break,
1089                    Poll::Pending => {
1090                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1091
1092                        // If the channel is closed, then there is no need to park.
1093                        if state.is_closed() {
1094                            break;
1095                        }
1096
1097                        // TODO: Spinning isn't ideal, it might be worth
1098                        // investigating using a condvar or some other strategy
1099                        // here. That said, if this case is hit, then another thread
1100                        // is about to push the value into the queue and this isn't
1101                        // the only spinlock in the impl right now.
1102                        thread::yield_now();
1103                    }
1104                }
1105            }
1106        }
1107    }
1108}
1109
1110impl<T> UnboundedReceiver<T> {
1111    /// Closes the receiving half of a channel, without dropping it.
1112    ///
1113    /// This prevents any further messages from being sent on the channel while
1114    /// still enabling the receiver to drain messages that are buffered.
1115    pub fn close(&mut self) {
1116        if let Some(inner) = &mut self.inner {
1117            inner.set_closed();
1118        }
1119    }
1120
1121    /// Tries to receive the next message without notifying a context if empty.
1122    ///
1123    /// It is not recommended to call this function from inside of a future,
1124    /// only when you've otherwise arranged to be notified when the channel is
1125    /// no longer empty.
1126    ///
1127    /// This function returns:
1128    /// * `Ok(Some(t))` when message is fetched
1129    /// * `Ok(None)` when channel is closed and no messages left in the queue
1130    /// * `Err(e)` when there are no messages available, but channel is not yet closed
1131    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1132        match self.next_message() {
1133            Poll::Ready(msg) => Ok(msg),
1134            Poll::Pending => Err(TryRecvError { _priv: () }),
1135        }
1136    }
1137
1138    fn next_message(&mut self) -> Poll<Option<T>> {
1139        let inner = match self.inner.as_mut() {
1140            None => return Poll::Ready(None),
1141            Some(inner) => inner,
1142        };
1143        // Pop off a message
1144        match unsafe { inner.message_queue.pop_spin() } {
1145            Some(msg) => {
1146                // Decrement number of messages
1147                self.dec_num_messages();
1148
1149                Poll::Ready(Some(msg))
1150            }
1151            None => {
1152                let state = decode_state(inner.state.load(SeqCst));
1153                if state.is_closed() {
1154                    // If closed flag is set AND there are no pending messages
1155                    // it means end of stream
1156                    self.inner = None;
1157                    Poll::Ready(None)
1158                } else {
1159                    // If queue is open, we need to return Pending
1160                    // to be woken up when new messages arrive.
1161                    // If queue is closed but num_messages is non-zero,
1162                    // it means that senders updated the state,
1163                    // but didn't put message to queue yet,
1164                    // so we need to park until sender unparks the task
1165                    // after queueing the message.
1166                    Poll::Pending
1167                }
1168            }
1169        }
1170    }
1171
1172    fn dec_num_messages(&self) {
1173        if let Some(inner) = &self.inner {
1174            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1175            // unless there's underflow, and we know there's no underflow
1176            // because number of messages at this point is always > 0.
1177            inner.state.fetch_sub(1, SeqCst);
1178        }
1179    }
1180}
1181
1182impl<T> FusedStream for UnboundedReceiver<T> {
1183    fn is_terminated(&self) -> bool {
1184        self.inner.is_none()
1185    }
1186}
1187
1188impl<T> Stream for UnboundedReceiver<T> {
1189    type Item = T;
1190
1191    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1192        // Try to read a message off of the message queue.
1193        match self.next_message() {
1194            Poll::Ready(msg) => {
1195                if msg.is_none() {
1196                    self.inner = None;
1197                }
1198                Poll::Ready(msg)
1199            }
1200            Poll::Pending => {
1201                // There are no messages to read, in this case, park.
1202                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1203                // Check queue again after parking to prevent race condition:
1204                // a message could be added to the queue after previous `next_message`
1205                // before `register` call.
1206                self.next_message()
1207            }
1208        }
1209    }
1210}
1211
1212impl<T> Drop for UnboundedReceiver<T> {
1213    fn drop(&mut self) {
1214        // Drain the channel of all pending messages
1215        self.close();
1216        if self.inner.is_some() {
1217            loop {
1218                match self.next_message() {
1219                    Poll::Ready(Some(_)) => {}
1220                    Poll::Ready(None) => break,
1221                    Poll::Pending => {
1222                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1223
1224                        // If the channel is closed, then there is no need to park.
1225                        if state.is_closed() {
1226                            break;
1227                        }
1228
1229                        // TODO: Spinning isn't ideal, it might be worth
1230                        // investigating using a condvar or some other strategy
1231                        // here. That said, if this case is hit, then another thread
1232                        // is about to push the value into the queue and this isn't
1233                        // the only spinlock in the impl right now.
1234                        thread::yield_now();
1235                    }
1236                }
1237            }
1238        }
1239    }
1240}
1241
1242/*
1243 *
1244 * ===== impl Inner =====
1245 *
1246 */
1247
1248impl<T> UnboundedInner<T> {
1249    // Clear `open` flag in the state, keep `num_messages` intact.
1250    fn set_closed(&self) {
1251        let curr = self.state.load(SeqCst);
1252        if !decode_state(curr).is_open {
1253            return;
1254        }
1255
1256        self.state.fetch_and(!OPEN_MASK, SeqCst);
1257    }
1258}
1259
1260impl<T> BoundedInner<T> {
1261    // The return value is such that the total number of messages that can be
1262    // enqueued into the channel will never exceed MAX_CAPACITY
1263    fn max_senders(&self) -> usize {
1264        MAX_CAPACITY - self.buffer
1265    }
1266
1267    // Clear `open` flag in the state, keep `num_messages` intact.
1268    fn set_closed(&self) {
1269        let curr = self.state.load(SeqCst);
1270        if !decode_state(curr).is_open {
1271            return;
1272        }
1273
1274        self.state.fetch_and(!OPEN_MASK, SeqCst);
1275    }
1276}
1277
1278unsafe impl<T: Send> Send for UnboundedInner<T> {}
1279unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1280
1281unsafe impl<T: Send> Send for BoundedInner<T> {}
1282unsafe impl<T: Send> Sync for BoundedInner<T> {}
1283
1284impl State {
1285    fn is_closed(&self) -> bool {
1286        !self.is_open && self.num_messages == 0
1287    }
1288}
1289
1290/*
1291 *
1292 * ===== Helpers =====
1293 *
1294 */
1295
1296fn decode_state(num: usize) -> State {
1297    State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1298}
1299
1300fn encode_state(state: &State) -> usize {
1301    let mut num = state.num_messages;
1302
1303    if state.is_open {
1304        num |= OPEN_MASK;
1305    }
1306
1307    num
1308}