Skip to main content

leodos_libcfs/runtime/
reactor.rs

1//! Reactor: a per-task registry of fds waiting for IO.
2//!
3//! Leaf futures that wrap an OSAL selectable handle (UDP sockets,
4//! TCP sockets, files opened O_NONBLOCK) call
5//! [`register_read`] when they return `Poll::Pending`. The
6//! runtime then blocks the task in `OS_SelectMultiple` until any
7//! registered fd becomes readable or a timeout elapses.
8//!
9//! The reactor is installed into the `Waker` passed to polls by
10//! [`crate::runtime::Runtime`]. Leaf futures find it by reading
11//! the waker's data pointer.
12
13use crate::os::id::OsalId;
14use crate::os::net::select_multiple;
15use crate::os::net::FdSet;
16use core::cell::RefCell;
17use core::task::RawWaker;
18use core::task::RawWakerVTable;
19use core::task::Waker;
20
21struct State {
22    read_set: FdSet,
23    write_set: FdSet,
24    has_reads: bool,
25    has_writes: bool,
26    woken: bool,
27}
28
29impl State {
30    fn new() -> Self {
31        Self {
32            read_set: FdSet::new(),
33            write_set: FdSet::new(),
34            has_reads: false,
35            has_writes: false,
36            woken: false,
37        }
38    }
39}
40
41/// Per-task reactor state.
42pub struct Reactor {
43    state: RefCell<State>,
44}
45
46impl Reactor {
47    pub(crate) fn new() -> Self {
48        Self { state: RefCell::new(State::new()) }
49    }
50
51    fn register_read(&self, id: OsalId) {
52        let mut s = self.state.borrow_mut();
53        s.read_set.add(id);
54        s.has_reads = true;
55    }
56
57    fn register_write(&self, id: OsalId) {
58        let mut s = self.state.borrow_mut();
59        s.write_set.add(id);
60        s.has_writes = true;
61    }
62
63    fn take_woken(&self) -> bool {
64        let mut s = self.state.borrow_mut();
65        let prev = s.woken;
66        s.woken = false;
67        prev
68    }
69
70    fn set_woken(&self) {
71        self.state.borrow_mut().woken = true;
72    }
73
74    /// Blocks the task until any registered fd is readable /
75    /// writable or the timeout elapses. Clears the registration
76    /// sets. If no fds are registered, sleeps for `timeout_ms`
77    /// milliseconds so apps that only use non-fd primitives
78    /// (e.g. SB pipes) still yield the CPU.
79    ///
80    /// TODO: per-leaf wakers + persistent FdSet so unrelated
81    /// leaves aren't re-polled on every wake. See CLAUDE.md.
82    pub(crate) fn block(&self, timeout_ms: i32) {
83        let Ok(mut s) = self.state.try_borrow_mut() else {
84            return;
85        };
86        if !s.has_reads && !s.has_writes {
87            drop(s);
88            let duration = core::time::Duration::from_millis(timeout_ms.max(0) as u64);
89            let _ = crate::os::task::delay(duration);
90            return;
91        }
92        // Move the sets out so `select_multiple` can receive
93        // `&mut FdSet` while we drop the RefCell borrow. The
94        // ownership transfer is effectively a fd-set reset.
95        let mut read_set = core::mem::replace(&mut s.read_set, FdSet::new());
96        let mut write_set = core::mem::replace(&mut s.write_set, FdSet::new());
97        s.has_reads = false;
98        s.has_writes = false;
99        drop(s);
100        let _ = select_multiple(Some(&mut read_set), Some(&mut write_set), timeout_ms);
101    }
102
103    pub(crate) fn was_woken(&self) -> bool {
104        self.take_woken()
105    }
106}
107
108/// Build a `Waker` whose data pointer is a `&Reactor`.
109///
110/// SAFETY: the caller must ensure the `Reactor` outlives every
111/// clone of the returned waker.
112pub(crate) unsafe fn waker_from_reactor(reactor: &Reactor) -> Waker {
113    unsafe {
114        Waker::from_raw(RawWaker::new(
115            reactor as *const Reactor as *const (),
116            &VTABLE,
117        ))
118    }
119}
120
121const VTABLE: RawWakerVTable = RawWakerVTable::new(w_clone, w_wake, w_wake_by_ref, w_drop);
122
123unsafe fn w_clone(data: *const ()) -> RawWaker {
124    RawWaker::new(data, &VTABLE)
125}
126
127unsafe fn w_wake(data: *const ()) {
128    unsafe { w_wake_by_ref(data) };
129}
130
131unsafe fn w_wake_by_ref(data: *const ()) {
132    if data.is_null() {
133        return;
134    }
135    let reactor = unsafe { &*(data as *const Reactor) };
136    reactor.set_woken();
137}
138
139unsafe fn w_drop(_data: *const ()) {}
140
141/// Registers a read interest for `id` using the reactor carried
142/// in `waker`. No-op if the waker was not produced by a
143/// reactor-backed runtime.
144pub fn register_read(waker: &Waker, id: OsalId) {
145    let data = waker.data();
146    if data.is_null() {
147        return;
148    }
149    let reactor = unsafe { &*(data as *const Reactor) };
150    reactor.register_read(id);
151}
152
153/// Registers a write interest for `id` using the reactor carried
154/// in `waker`. No-op if the waker was not produced by a
155/// reactor-backed runtime.
156pub fn register_write(waker: &Waker, id: OsalId) {
157    let data = waker.data();
158    if data.is_null() {
159        return;
160    }
161    let reactor = unsafe { &*(data as *const Reactor) };
162    reactor.register_write(id);
163}