futures_core/task/__internal/atomic_waker.rs
1use core::cell::UnsafeCell;
2use core::fmt;
3use core::sync::atomic::AtomicUsize;
4use core::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5use core::task::Waker;
6
7/// A synchronization primitive for task wakeup.
8///
9/// Sometimes the task interested in a given event will change over time.
10/// An `AtomicWaker` can coordinate concurrent notifications with the consumer
11/// potentially "updating" the underlying task to wake up. This is useful in
12/// scenarios where a computation completes in another thread and wants to
13/// notify the consumer, but the consumer is in the process of being migrated to
14/// a new logical task.
15///
16/// Consumers should call `register` before checking the result of a computation
17/// and producers should call `wake` after producing the computation (this
18/// differs from the usual `thread::park` pattern). It is also permitted for
19/// `wake` to be called **before** `register`. This results in a no-op.
20///
21/// A single `AtomicWaker` may be reused for any number of calls to `register` or
22/// `wake`.
23///
24/// # Memory ordering
25///
26/// Calling `register` "acquires" all memory "released" by calls to `wake`
27/// before the call to `register`. Later calls to `wake` will wake the
28/// registered waker (on contention this wake might be triggered in `register`).
29///
30/// For concurrent calls to `register` (should be avoided) the ordering is only
31/// guaranteed for the winning call.
32///
33/// # Examples
34///
35/// Here is a simple example providing a `Flag` that can be signalled manually
36/// when it is ready.
37///
38/// ```
39/// use futures::future::Future;
40/// use futures::task::{Context, Poll, AtomicWaker};
41/// use std::sync::Arc;
42/// use std::sync::atomic::AtomicBool;
43/// use std::sync::atomic::Ordering::Relaxed;
44/// use std::pin::Pin;
45///
46/// struct Inner {
47/// waker: AtomicWaker,
48/// set: AtomicBool,
49/// }
50///
51/// #[derive(Clone)]
52/// struct Flag(Arc<Inner>);
53///
54/// impl Flag {
55/// pub fn new() -> Self {
56/// Self(Arc::new(Inner {
57/// waker: AtomicWaker::new(),
58/// set: AtomicBool::new(false),
59/// }))
60/// }
61///
62/// pub fn signal(&self) {
63/// self.0.set.store(true, Relaxed);
64/// self.0.waker.wake();
65/// }
66/// }
67///
68/// impl Future for Flag {
69/// type Output = ();
70///
71/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
72/// // quick check to avoid registration if already done.
73/// if self.0.set.load(Relaxed) {
74/// return Poll::Ready(());
75/// }
76///
77/// self.0.waker.register(cx.waker());
78///
79/// // Need to check condition **after** `register` to avoid a race
80/// // condition that would result in lost notifications.
81/// if self.0.set.load(Relaxed) {
82/// Poll::Ready(())
83/// } else {
84/// Poll::Pending
85/// }
86/// }
87/// }
88/// ```
89pub struct AtomicWaker {
90 state: AtomicUsize,
91 waker: UnsafeCell<Option<Waker>>,
92}
93
94// `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell
95// stores a `Waker` value produced by calls to `register` and many threads can
96// race to take the waker (to wake it) by calling `wake`.
97//
98// If a new `Waker` instance is produced by calling `register` before an
99// existing one is consumed, then the existing one is overwritten.
100//
101// While `AtomicWaker` is single-producer, the implementation ensures memory
102// safety. In the event of concurrent calls to `register`, there will be a
103// single winner whose waker will get stored in the cell. The losers will not
104// have their tasks woken. As such, callers should ensure to add synchronization
105// to calls to `register`.
106//
107// The implementation uses a single `AtomicUsize` value to coordinate access to
108// the `Waker` cell. There are two bits that are operated on independently.
109// These are represented by `REGISTERING` and `WAKING`.
110//
111// The `REGISTERING` bit is set when a producer enters the critical section. The
112// `WAKING` bit is set when a consumer enters the critical section. Neither bit
113// being set is represented by `WAITING`.
114//
115// A thread obtains an exclusive lock on the waker cell by transitioning the
116// state from `WAITING` to `REGISTERING` or `WAKING`, depending on the operation
117// the thread wishes to perform. When this transition is made, it is guaranteed
118// that no other thread will access the waker cell.
119//
120// # Registering
121//
122// On a call to `register`, an attempt to transition the state from WAITING to
123// REGISTERING is made. On success, the caller obtains a lock on the waker cell.
124//
125// If the lock is obtained, then the thread sets the waker cell to the waker
126// provided as an argument. Then it attempts to transition the state back from
127// `REGISTERING` -> `WAITING`.
128//
129// If this transition is successful, then the registering process is complete
130// and the next call to `wake` will observe the waker.
131//
132// If the transition fails, then there was a concurrent call to `wake` that was
133// unable to access the waker cell (due to the registering thread holding the
134// lock). To handle this, the registering thread removes the waker it just set
135// from the cell and calls `wake` on it. This call to wake represents the
136// attempt to wake by the other thread (that set the `WAKING` bit). The state is
137// then transitioned from `REGISTERING | WAKING` back to `WAITING`. This
138// transition must succeed because, at this point, the state cannot be
139// transitioned by another thread.
140//
141// # Waking
142//
143// On a call to `wake`, an attempt to transition the state from `WAITING` to
144// `WAKING` is made. On success, the caller obtains a lock on the waker cell.
145//
146// If the lock is obtained, then the thread takes ownership of the current value
147// in the waker cell, and calls `wake` on it. The state is then transitioned
148// back to `WAITING`. This transition must succeed as, at this point, the state
149// cannot be transitioned by another thread.
150//
151// If the thread is unable to obtain the lock, the `WAKING` bit is still. This
152// is because it has either been set by the current thread but the previous
153// value included the `REGISTERING` bit **or** a concurrent thread is in the
154// `WAKING` critical section. Either way, no action must be taken.
155//
156// If the current thread is the only concurrent call to `wake` and another
157// thread is in the `register` critical section, when the other thread **exits**
158// the `register` critical section, it will observe the `WAKING` bit and handle
159// the wake itself.
160//
161// If another thread is in the `wake` critical section, then it will handle
162// waking the task.
163//
164// # A potential race (is safely handled).
165//
166// Imagine the following situation:
167//
168// * Thread A obtains the `wake` lock and wakes a task.
169//
170// * Before thread A releases the `wake` lock, the woken task is scheduled.
171//
172// * Thread B attempts to wake the task. In theory this should result in the
173// task being woken, but it cannot because thread A still holds the wake lock.
174//
175// This case is handled by requiring users of `AtomicWaker` to call `register`
176// **before** attempting to observe the application state change that resulted
177// in the task being awoken. The wakers also change the application state before
178// calling wake.
179//
180// Because of this, the waker will do one of two things.
181//
182// 1) Observe the application state change that Thread B is woken for. In this
183// case, it is OK for Thread B's wake to be lost.
184//
185// 2) Call register before attempting to observe the application state. Since
186// Thread A still holds the `wake` lock, the call to `register` will result
187// in the task waking itself and get scheduled again.
188
189/// Idle state
190const WAITING: usize = 0;
191
192/// A new waker value is being registered with the `AtomicWaker` cell.
193const REGISTERING: usize = 0b01;
194
195/// The waker currently registered with the `AtomicWaker` cell is being woken.
196const WAKING: usize = 0b10;
197
198impl AtomicWaker {
199 /// Create an `AtomicWaker`.
200 pub const fn new() -> Self {
201 // Make sure that task is Sync
202 trait AssertSync: Sync {}
203 impl AssertSync for Waker {}
204
205 Self { state: AtomicUsize::new(WAITING), waker: UnsafeCell::new(None) }
206 }
207
208 /// Registers the waker to be notified on calls to `wake`.
209 ///
210 /// The new task will take place of any previous tasks that were registered
211 /// by previous calls to `register`. Any calls to `wake` that happen after
212 /// a call to `register` (as defined by the memory ordering rules), will
213 /// notify the `register` caller's task and deregister the waker from future
214 /// notifications. Because of this, callers should ensure `register` gets
215 /// invoked with a new `Waker` **each** time they require a wakeup.
216 ///
217 /// It is safe to call `register` with multiple other threads concurrently
218 /// calling `wake`. This will result in the `register` caller's current
219 /// task being notified once.
220 ///
221 /// This function is safe to call concurrently, but this is generally a bad
222 /// idea. Concurrent calls to `register` will attempt to register different
223 /// tasks to be notified. One of the callers will win and have its task set,
224 /// but there is no guarantee as to which caller will succeed.
225 ///
226 /// # Examples
227 ///
228 /// Here is how `register` is used when implementing a flag.
229 ///
230 /// ```
231 /// use futures::future::Future;
232 /// use futures::task::{Context, Poll, AtomicWaker};
233 /// use std::sync::atomic::AtomicBool;
234 /// use std::sync::atomic::Ordering::Relaxed;
235 /// use std::pin::Pin;
236 ///
237 /// struct Flag {
238 /// waker: AtomicWaker,
239 /// set: AtomicBool,
240 /// }
241 ///
242 /// impl Future for Flag {
243 /// type Output = ();
244 ///
245 /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
246 /// // Register **before** checking `set` to avoid a race condition
247 /// // that would result in lost notifications.
248 /// self.waker.register(cx.waker());
249 ///
250 /// if self.set.load(Relaxed) {
251 /// Poll::Ready(())
252 /// } else {
253 /// Poll::Pending
254 /// }
255 /// }
256 /// }
257 /// ```
258 pub fn register(&self, waker: &Waker) {
259 match self
260 .state
261 .compare_exchange(WAITING, REGISTERING, Acquire, Acquire)
262 .unwrap_or_else(|x| x)
263 {
264 WAITING => {
265 unsafe {
266 // Locked acquired, update the waker cell
267 *self.waker.get() = Some(waker.clone());
268
269 // Release the lock. If the state transitioned to include
270 // the `WAKING` bit, this means that at least one wake has
271 // been called concurrently.
272 //
273 // Start by assuming that the state is `REGISTERING` as this
274 // is what we just set it to. If this holds, we know that no
275 // other writes were performed in the meantime, so there is
276 // nothing to acquire, only release. In case of concurrent
277 // wakers, we need to acquire their releases, so success needs
278 // to do both.
279 let res = self.state.compare_exchange(REGISTERING, WAITING, AcqRel, Acquire);
280
281 match res {
282 Ok(_) => {
283 // memory ordering: acquired self.state during CAS
284 // - if previous wakes went through it syncs with
285 // their final release (`fetch_and`)
286 // - if there was no previous wake the next wake
287 // will wake us, no sync needed.
288 }
289 Err(actual) => {
290 // This branch can only be reached if at least one
291 // concurrent thread called `wake`. In this
292 // case, `actual` **must** be `REGISTERING |
293 // `WAKING`.
294 debug_assert_eq!(actual, REGISTERING | WAKING);
295
296 // Take the waker to wake once the atomic operation has
297 // completed.
298 let waker = (*self.waker.get()).take().unwrap();
299
300 // We need to return to WAITING state (clear our lock and
301 // concurrent WAKING flag). This needs to acquire all
302 // WAKING fetch_or releases and it needs to release our
303 // update to self.waker, so we need a `swap` operation.
304 self.state.swap(WAITING, AcqRel);
305
306 // memory ordering: we acquired the state for all
307 // concurrent wakes, but future wakes might still
308 // need to wake us in case we can't make progress
309 // from the pending wakes.
310 //
311 // So we simply schedule to come back later (we could
312 // also simply leave the registration in place above).
313 waker.wake();
314 }
315 }
316 }
317 }
318 WAKING => {
319 // Currently in the process of waking the task, i.e.,
320 // `wake` is currently being called on the old task handle.
321 //
322 // memory ordering: we acquired the state for all
323 // concurrent wakes, but future wakes might still
324 // need to wake us in case we can't make progress
325 // from the pending wakes.
326 //
327 // So we simply schedule to come back later (we
328 // could also spin here trying to acquire the lock
329 // to register).
330 waker.wake_by_ref();
331 }
332 state => {
333 // In this case, a concurrent thread is holding the
334 // "registering" lock. This probably indicates a bug in the
335 // caller's code as racing to call `register` doesn't make much
336 // sense.
337 //
338 // memory ordering: don't care. a concurrent register() is going
339 // to succeed and provide proper memory ordering.
340 //
341 // We just want to maintain memory safety. It is ok to drop the
342 // call to `register`.
343 debug_assert!(state == REGISTERING || state == REGISTERING | WAKING);
344 }
345 }
346 }
347
348 /// Calls `wake` on the last `Waker` passed to `register`.
349 ///
350 /// If `register` has not been called yet, then this does nothing.
351 pub fn wake(&self) {
352 if let Some(waker) = self.take() {
353 waker.wake();
354 }
355 }
356
357 /// Returns the last `Waker` passed to `register`, so that the user can wake it.
358 ///
359 ///
360 /// Sometimes, just waking the AtomicWaker is not fine grained enough. This allows the user
361 /// to take the waker and then wake it separately, rather than performing both steps in one
362 /// atomic action.
363 ///
364 /// If a waker has not been registered, this returns `None`.
365 pub fn take(&self) -> Option<Waker> {
366 // AcqRel ordering is used in order to acquire the value of the `task`
367 // cell as well as to establish a `release` ordering with whatever
368 // memory the `AtomicWaker` is associated with.
369 match self.state.fetch_or(WAKING, AcqRel) {
370 WAITING => {
371 // The waking lock has been acquired.
372 let waker = unsafe { (*self.waker.get()).take() };
373
374 // Release the lock
375 self.state.fetch_and(!WAKING, Release);
376
377 waker
378 }
379 state => {
380 // There is a concurrent thread currently updating the
381 // associated task.
382 //
383 // Nothing more to do as the `WAKING` bit has been set. It
384 // doesn't matter if there are concurrent registering threads or
385 // not.
386 //
387 debug_assert!(
388 state == REGISTERING || state == REGISTERING | WAKING || state == WAKING
389 );
390 None
391 }
392 }
393 }
394}
395
396impl Default for AtomicWaker {
397 fn default() -> Self {
398 Self::new()
399 }
400}
401
402impl fmt::Debug for AtomicWaker {
403 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404 write!(f, "AtomicWaker")
405 }
406}
407
408unsafe impl Send for AtomicWaker {}
409unsafe impl Sync for AtomicWaker {}