wasm_bindgen_futures/task/
singlethread.rs

1use alloc::boxed::Box;
2use alloc::rc::Rc;
3use core::cell::{Cell, RefCell};
4use core::future::Future;
5use core::mem::ManuallyDrop;
6use core::pin::Pin;
7use core::task::{Context, RawWaker, RawWakerVTable, Waker};
8
9struct Inner {
10    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
11    waker: Waker,
12}
13
14pub(crate) struct Task {
15    // The actual Future that we're executing as part of this task.
16    //
17    // This is an Option so that the Future can be immediately dropped when it's
18    // finished
19    inner: RefCell<Option<Inner>>,
20
21    // This is used to ensure that the Task will only be queued once
22    is_queued: Cell<bool>,
23}
24
25impl Task {
26    pub(crate) fn spawn(future: Pin<Box<dyn Future<Output = ()> + 'static>>) {
27        let this = Rc::new(Self {
28            inner: RefCell::new(None),
29            is_queued: Cell::new(true),
30        });
31
32        let waker = unsafe { Waker::from_raw(Task::into_raw_waker(Rc::clone(&this))) };
33
34        *this.inner.borrow_mut() = Some(Inner { future, waker });
35
36        crate::queue::Queue::with(|queue| queue.schedule_task(this));
37    }
38
39    fn force_wake(this: Rc<Self>) {
40        crate::queue::Queue::with(|queue| {
41            queue.push_task(this);
42        });
43    }
44
45    fn wake(this: Rc<Self>) {
46        // If we've already been placed on the run queue then there's no need to
47        // requeue ourselves since we're going to run at some point in the
48        // future anyway.
49        if this.is_queued.replace(true) {
50            return;
51        }
52
53        Self::force_wake(this);
54    }
55
56    fn wake_by_ref(this: &Rc<Self>) {
57        // If we've already been placed on the run queue then there's no need to
58        // requeue ourselves since we're going to run at some point in the
59        // future anyway.
60        if this.is_queued.replace(true) {
61            return;
62        }
63
64        Self::force_wake(Rc::clone(this));
65    }
66
67    /// Creates a standard library `RawWaker` from an `Rc` of ourselves.
68    ///
69    /// Note that in general this is wildly unsafe because everything with
70    /// Futures requires `Sync` + `Send` with regard to Wakers. For wasm,
71    /// however, everything is guaranteed to be singlethreaded (since we're
72    /// compiled without the `atomics` feature) so we "safely lie" and say our
73    /// `Rc` pointer is good enough.
74    ///
75    /// The implementation is based off of futures::task::ArcWake
76    unsafe fn into_raw_waker(this: Rc<Self>) -> RawWaker {
77        unsafe fn raw_clone(ptr: *const ()) -> RawWaker {
78            let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
79            Task::into_raw_waker(Rc::clone(&ptr))
80        }
81
82        unsafe fn raw_wake(ptr: *const ()) {
83            let ptr = Rc::from_raw(ptr as *const Task);
84            Task::wake(ptr);
85        }
86
87        unsafe fn raw_wake_by_ref(ptr: *const ()) {
88            let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
89            Task::wake_by_ref(&ptr);
90        }
91
92        unsafe fn raw_drop(ptr: *const ()) {
93            drop(Rc::from_raw(ptr as *const Task));
94        }
95
96        static VTABLE: RawWakerVTable =
97            RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop);
98
99        RawWaker::new(Rc::into_raw(this) as *const (), &VTABLE)
100    }
101
102    pub(crate) fn run(&self) {
103        let mut borrow = self.inner.borrow_mut();
104
105        // Wakeups can come in after a Future has finished and been destroyed,
106        // so handle this gracefully by just ignoring the request to run.
107        let inner = match borrow.as_mut() {
108            Some(inner) => inner,
109            None => return,
110        };
111
112        // Ensure that if poll calls `waker.wake()` we can get enqueued back on
113        // the run queue.
114        self.is_queued.set(false);
115
116        let poll = {
117            let mut cx = Context::from_waker(&inner.waker);
118            inner.future.as_mut().poll(&mut cx)
119        };
120
121        // If a future has finished (`Ready`) then clean up resources associated
122        // with the future ASAP. This ensures that we don't keep anything extra
123        // alive in-memory by accident. Our own struct, `Rc<Task>` won't
124        // actually go away until all wakers referencing us go away, which may
125        // take quite some time, so ensure that the heaviest of resources are
126        // released early.
127        if poll.is_ready() {
128            *borrow = None;
129        }
130    }
131}