futures_core/task/__internal/
atomic_waker.rs

1use core::cell::UnsafeCell;
2use core::fmt;
3use core::sync::atomic::AtomicUsize;
4use core::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5use core::task::Waker;
6
7/// A synchronization primitive for task wakeup.
8///
9/// Sometimes the task interested in a given event will change over time.
10/// An `AtomicWaker` can coordinate concurrent notifications with the consumer
11/// potentially "updating" the underlying task to wake up. This is useful in
12/// scenarios where a computation completes in another thread and wants to
13/// notify the consumer, but the consumer is in the process of being migrated to
14/// a new logical task.
15///
16/// Consumers should call `register` before checking the result of a computation
17/// and producers should call `wake` after producing the computation (this
18/// differs from the usual `thread::park` pattern). It is also permitted for
19/// `wake` to be called **before** `register`. This results in a no-op.
20///
21/// A single `AtomicWaker` may be reused for any number of calls to `register` or
22/// `wake`.
23///
24/// # Memory ordering
25///
26/// Calling `register` "acquires" all memory "released" by calls to `wake`
27/// before the call to `register`.  Later calls to `wake` will wake the
28/// registered waker (on contention this wake might be triggered in `register`).
29///
30/// For concurrent calls to `register` (should be avoided) the ordering is only
31/// guaranteed for the winning call.
32///
33/// # Examples
34///
35/// Here is a simple example providing a `Flag` that can be signalled manually
36/// when it is ready.
37///
38/// ```
39/// use futures::future::Future;
40/// use futures::task::{Context, Poll, AtomicWaker};
41/// use std::sync::Arc;
42/// use std::sync::atomic::AtomicBool;
43/// use std::sync::atomic::Ordering::Relaxed;
44/// use std::pin::Pin;
45///
46/// struct Inner {
47///     waker: AtomicWaker,
48///     set: AtomicBool,
49/// }
50///
51/// #[derive(Clone)]
52/// struct Flag(Arc<Inner>);
53///
54/// impl Flag {
55///     pub fn new() -> Self {
56///         Self(Arc::new(Inner {
57///             waker: AtomicWaker::new(),
58///             set: AtomicBool::new(false),
59///         }))
60///     }
61///
62///     pub fn signal(&self) {
63///         self.0.set.store(true, Relaxed);
64///         self.0.waker.wake();
65///     }
66/// }
67///
68/// impl Future for Flag {
69///     type Output = ();
70///
71///     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
72///         // quick check to avoid registration if already done.
73///         if self.0.set.load(Relaxed) {
74///             return Poll::Ready(());
75///         }
76///
77///         self.0.waker.register(cx.waker());
78///
79///         // Need to check condition **after** `register` to avoid a race
80///         // condition that would result in lost notifications.
81///         if self.0.set.load(Relaxed) {
82///             Poll::Ready(())
83///         } else {
84///             Poll::Pending
85///         }
86///     }
87/// }
88/// ```
89pub struct AtomicWaker {
90    state: AtomicUsize,
91    waker: UnsafeCell<Option<Waker>>,
92}
93
94// `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell
95// stores a `Waker` value produced by calls to `register` and many threads can
96// race to take the waker (to wake it) by calling `wake`.
97//
98// If a new `Waker` instance is produced by calling `register` before an
99// existing one is consumed, then the existing one is overwritten.
100//
101// While `AtomicWaker` is single-producer, the implementation ensures memory
102// safety. In the event of concurrent calls to `register`, there will be a
103// single winner whose waker will get stored in the cell. The losers will not
104// have their tasks woken. As such, callers should ensure to add synchronization
105// to calls to `register`.
106//
107// The implementation uses a single `AtomicUsize` value to coordinate access to
108// the `Waker` cell. There are two bits that are operated on independently.
109// These are represented by `REGISTERING` and `WAKING`.
110//
111// The `REGISTERING` bit is set when a producer enters the critical section. The
112// `WAKING` bit is set when a consumer enters the critical section. Neither bit
113// being set is represented by `WAITING`.
114//
115// A thread obtains an exclusive lock on the waker cell by transitioning the
116// state from `WAITING` to `REGISTERING` or `WAKING`, depending on the operation
117// the thread wishes to perform. When this transition is made, it is guaranteed
118// that no other thread will access the waker cell.
119//
120// # Registering
121//
122// On a call to `register`, an attempt to transition the state from WAITING to
123// REGISTERING is made. On success, the caller obtains a lock on the waker cell.
124//
125// If the lock is obtained, then the thread sets the waker cell to the waker
126// provided as an argument. Then it attempts to transition the state back from
127// `REGISTERING` -> `WAITING`.
128//
129// If this transition is successful, then the registering process is complete
130// and the next call to `wake` will observe the waker.
131//
132// If the transition fails, then there was a concurrent call to `wake` that was
133// unable to access the waker cell (due to the registering thread holding the
134// lock). To handle this, the registering thread removes the waker it just set
135// from the cell and calls `wake` on it. This call to wake represents the
136// attempt to wake by the other thread (that set the `WAKING` bit). The state is
137// then transitioned from `REGISTERING | WAKING` back to `WAITING`.  This
138// transition must succeed because, at this point, the state cannot be
139// transitioned by another thread.
140//
141// # Waking
142//
143// On a call to `wake`, an attempt to transition the state from `WAITING` to
144// `WAKING` is made. On success, the caller obtains a lock on the waker cell.
145//
146// If the lock is obtained, then the thread takes ownership of the current value
147// in the waker cell, and calls `wake` on it. The state is then transitioned
148// back to `WAITING`. This transition must succeed as, at this point, the state
149// cannot be transitioned by another thread.
150//
151// If the thread is unable to obtain the lock, the `WAKING` bit is still.  This
152// is because it has either been set by the current thread but the previous
153// value included the `REGISTERING` bit **or** a concurrent thread is in the
154// `WAKING` critical section. Either way, no action must be taken.
155//
156// If the current thread is the only concurrent call to `wake` and another
157// thread is in the `register` critical section, when the other thread **exits**
158// the `register` critical section, it will observe the `WAKING` bit and handle
159// the wake itself.
160//
161// If another thread is in the `wake` critical section, then it will handle
162// waking the task.
163//
164// # A potential race (is safely handled).
165//
166// Imagine the following situation:
167//
168// * Thread A obtains the `wake` lock and wakes a task.
169//
170// * Before thread A releases the `wake` lock, the woken task is scheduled.
171//
172// * Thread B attempts to wake the task. In theory this should result in the
173//   task being woken, but it cannot because thread A still holds the wake lock.
174//
175// This case is handled by requiring users of `AtomicWaker` to call `register`
176// **before** attempting to observe the application state change that resulted
177// in the task being awoken. The wakers also change the application state before
178// calling wake.
179//
180// Because of this, the waker will do one of two things.
181//
182// 1) Observe the application state change that Thread B is woken for. In this
183//    case, it is OK for Thread B's wake to be lost.
184//
185// 2) Call register before attempting to observe the application state. Since
186//    Thread A still holds the `wake` lock, the call to `register` will result
187//    in the task waking itself and get scheduled again.
188
189/// Idle state
190const WAITING: usize = 0;
191
192/// A new waker value is being registered with the `AtomicWaker` cell.
193const REGISTERING: usize = 0b01;
194
195/// The waker currently registered with the `AtomicWaker` cell is being woken.
196const WAKING: usize = 0b10;
197
198impl AtomicWaker {
199    /// Create an `AtomicWaker`.
200    pub const fn new() -> Self {
201        // Make sure that task is Sync
202        trait AssertSync: Sync {}
203        impl AssertSync for Waker {}
204
205        Self { state: AtomicUsize::new(WAITING), waker: UnsafeCell::new(None) }
206    }
207
208    /// Registers the waker to be notified on calls to `wake`.
209    ///
210    /// The new task will take place of any previous tasks that were registered
211    /// by previous calls to `register`. Any calls to `wake` that happen after
212    /// a call to `register` (as defined by the memory ordering rules), will
213    /// notify the `register` caller's task and deregister the waker from future
214    /// notifications. Because of this, callers should ensure `register` gets
215    /// invoked with a new `Waker` **each** time they require a wakeup.
216    ///
217    /// It is safe to call `register` with multiple other threads concurrently
218    /// calling `wake`. This will result in the `register` caller's current
219    /// task being notified once.
220    ///
221    /// This function is safe to call concurrently, but this is generally a bad
222    /// idea. Concurrent calls to `register` will attempt to register different
223    /// tasks to be notified. One of the callers will win and have its task set,
224    /// but there is no guarantee as to which caller will succeed.
225    ///
226    /// # Examples
227    ///
228    /// Here is how `register` is used when implementing a flag.
229    ///
230    /// ```
231    /// use futures::future::Future;
232    /// use futures::task::{Context, Poll, AtomicWaker};
233    /// use std::sync::atomic::AtomicBool;
234    /// use std::sync::atomic::Ordering::Relaxed;
235    /// use std::pin::Pin;
236    ///
237    /// struct Flag {
238    ///     waker: AtomicWaker,
239    ///     set: AtomicBool,
240    /// }
241    ///
242    /// impl Future for Flag {
243    ///     type Output = ();
244    ///
245    ///     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
246    ///         // Register **before** checking `set` to avoid a race condition
247    ///         // that would result in lost notifications.
248    ///         self.waker.register(cx.waker());
249    ///
250    ///         if self.set.load(Relaxed) {
251    ///             Poll::Ready(())
252    ///         } else {
253    ///             Poll::Pending
254    ///         }
255    ///     }
256    /// }
257    /// ```
258    pub fn register(&self, waker: &Waker) {
259        match self
260            .state
261            .compare_exchange(WAITING, REGISTERING, Acquire, Acquire)
262            .unwrap_or_else(|x| x)
263        {
264            WAITING => {
265                unsafe {
266                    // Locked acquired, update the waker cell
267                    *self.waker.get() = Some(waker.clone());
268
269                    // Release the lock. If the state transitioned to include
270                    // the `WAKING` bit, this means that at least one wake has
271                    // been called concurrently.
272                    //
273                    // Start by assuming that the state is `REGISTERING` as this
274                    // is what we just set it to. If this holds, we know that no
275                    // other writes were performed in the meantime, so there is
276                    // nothing to acquire, only release. In case of concurrent
277                    // wakers, we need to acquire their releases, so success needs
278                    // to do both.
279                    let res = self.state.compare_exchange(REGISTERING, WAITING, AcqRel, Acquire);
280
281                    match res {
282                        Ok(_) => {
283                            // memory ordering: acquired self.state during CAS
284                            // - if previous wakes went through it syncs with
285                            //   their final release (`fetch_and`)
286                            // - if there was no previous wake the next wake
287                            //   will wake us, no sync needed.
288                        }
289                        Err(actual) => {
290                            // This branch can only be reached if at least one
291                            // concurrent thread called `wake`. In this
292                            // case, `actual` **must** be `REGISTERING |
293                            // `WAKING`.
294                            debug_assert_eq!(actual, REGISTERING | WAKING);
295
296                            // Take the waker to wake once the atomic operation has
297                            // completed.
298                            let waker = (*self.waker.get()).take().unwrap();
299
300                            // We need to return to WAITING state (clear our lock and
301                            // concurrent WAKING flag). This needs to acquire all
302                            // WAKING fetch_or releases and it needs to release our
303                            // update to self.waker, so we need a `swap` operation.
304                            self.state.swap(WAITING, AcqRel);
305
306                            // memory ordering: we acquired the state for all
307                            // concurrent wakes, but future wakes might still
308                            // need to wake us in case we can't make progress
309                            // from the pending wakes.
310                            //
311                            // So we simply schedule to come back later (we could
312                            // also simply leave the registration in place above).
313                            waker.wake();
314                        }
315                    }
316                }
317            }
318            WAKING => {
319                // Currently in the process of waking the task, i.e.,
320                // `wake` is currently being called on the old task handle.
321                //
322                // memory ordering: we acquired the state for all
323                // concurrent wakes, but future wakes might still
324                // need to wake us in case we can't make progress
325                // from the pending wakes.
326                //
327                // So we simply schedule to come back later (we
328                // could also spin here trying to acquire the lock
329                // to register).
330                waker.wake_by_ref();
331            }
332            state => {
333                // In this case, a concurrent thread is holding the
334                // "registering" lock. This probably indicates a bug in the
335                // caller's code as racing to call `register` doesn't make much
336                // sense.
337                //
338                // memory ordering: don't care. a concurrent register() is going
339                // to succeed and provide proper memory ordering.
340                //
341                // We just want to maintain memory safety. It is ok to drop the
342                // call to `register`.
343                debug_assert!(state == REGISTERING || state == REGISTERING | WAKING);
344            }
345        }
346    }
347
348    /// Calls `wake` on the last `Waker` passed to `register`.
349    ///
350    /// If `register` has not been called yet, then this does nothing.
351    pub fn wake(&self) {
352        if let Some(waker) = self.take() {
353            waker.wake();
354        }
355    }
356
357    /// Returns the last `Waker` passed to `register`, so that the user can wake it.
358    ///
359    ///
360    /// Sometimes, just waking the AtomicWaker is not fine grained enough. This allows the user
361    /// to take the waker and then wake it separately, rather than performing both steps in one
362    /// atomic action.
363    ///
364    /// If a waker has not been registered, this returns `None`.
365    pub fn take(&self) -> Option<Waker> {
366        // AcqRel ordering is used in order to acquire the value of the `task`
367        // cell as well as to establish a `release` ordering with whatever
368        // memory the `AtomicWaker` is associated with.
369        match self.state.fetch_or(WAKING, AcqRel) {
370            WAITING => {
371                // The waking lock has been acquired.
372                let waker = unsafe { (*self.waker.get()).take() };
373
374                // Release the lock
375                self.state.fetch_and(!WAKING, Release);
376
377                waker
378            }
379            state => {
380                // There is a concurrent thread currently updating the
381                // associated task.
382                //
383                // Nothing more to do as the `WAKING` bit has been set. It
384                // doesn't matter if there are concurrent registering threads or
385                // not.
386                //
387                debug_assert!(
388                    state == REGISTERING || state == REGISTERING | WAKING || state == WAKING
389                );
390                None
391            }
392        }
393    }
394}
395
396impl Default for AtomicWaker {
397    fn default() -> Self {
398        Self::new()
399    }
400}
401
402impl fmt::Debug for AtomicWaker {
403    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404        write!(f, "AtomicWaker")
405    }
406}
407
408unsafe impl Send for AtomicWaker {}
409unsafe impl Sync for AtomicWaker {}