leodos_libcfs/runtime/
reactor.rs1use crate::os::id::OsalId;
14use crate::os::net::select_multiple;
15use crate::os::net::FdSet;
16use core::cell::RefCell;
17use core::task::RawWaker;
18use core::task::RawWakerVTable;
19use core::task::Waker;
20
21struct State {
22 read_set: FdSet,
23 write_set: FdSet,
24 has_reads: bool,
25 has_writes: bool,
26 woken: bool,
27}
28
29impl State {
30 fn new() -> Self {
31 Self {
32 read_set: FdSet::new(),
33 write_set: FdSet::new(),
34 has_reads: false,
35 has_writes: false,
36 woken: false,
37 }
38 }
39}
40
41pub struct Reactor {
43 state: RefCell<State>,
44}
45
46impl Reactor {
47 pub(crate) fn new() -> Self {
48 Self { state: RefCell::new(State::new()) }
49 }
50
51 fn register_read(&self, id: OsalId) {
52 let mut s = self.state.borrow_mut();
53 s.read_set.add(id);
54 s.has_reads = true;
55 }
56
57 fn register_write(&self, id: OsalId) {
58 let mut s = self.state.borrow_mut();
59 s.write_set.add(id);
60 s.has_writes = true;
61 }
62
63 fn take_woken(&self) -> bool {
64 let mut s = self.state.borrow_mut();
65 let prev = s.woken;
66 s.woken = false;
67 prev
68 }
69
70 fn set_woken(&self) {
71 self.state.borrow_mut().woken = true;
72 }
73
74 pub(crate) fn block(&self, timeout_ms: i32) {
83 let Ok(mut s) = self.state.try_borrow_mut() else {
84 return;
85 };
86 if !s.has_reads && !s.has_writes {
87 drop(s);
88 let duration = core::time::Duration::from_millis(timeout_ms.max(0) as u64);
89 let _ = crate::os::task::delay(duration);
90 return;
91 }
92 let mut read_set = core::mem::replace(&mut s.read_set, FdSet::new());
96 let mut write_set = core::mem::replace(&mut s.write_set, FdSet::new());
97 s.has_reads = false;
98 s.has_writes = false;
99 drop(s);
100 let _ = select_multiple(Some(&mut read_set), Some(&mut write_set), timeout_ms);
101 }
102
103 pub(crate) fn was_woken(&self) -> bool {
104 self.take_woken()
105 }
106}
107
108pub(crate) unsafe fn waker_from_reactor(reactor: &Reactor) -> Waker {
113 unsafe {
114 Waker::from_raw(RawWaker::new(
115 reactor as *const Reactor as *const (),
116 &VTABLE,
117 ))
118 }
119}
120
121const VTABLE: RawWakerVTable = RawWakerVTable::new(w_clone, w_wake, w_wake_by_ref, w_drop);
122
123unsafe fn w_clone(data: *const ()) -> RawWaker {
124 RawWaker::new(data, &VTABLE)
125}
126
127unsafe fn w_wake(data: *const ()) {
128 unsafe { w_wake_by_ref(data) };
129}
130
131unsafe fn w_wake_by_ref(data: *const ()) {
132 if data.is_null() {
133 return;
134 }
135 let reactor = unsafe { &*(data as *const Reactor) };
136 reactor.set_woken();
137}
138
139unsafe fn w_drop(_data: *const ()) {}
140
141pub fn register_read(waker: &Waker, id: OsalId) {
145 let data = waker.data();
146 if data.is_null() {
147 return;
148 }
149 let reactor = unsafe { &*(data as *const Reactor) };
150 reactor.register_read(id);
151}
152
153pub fn register_write(waker: &Waker, id: OsalId) {
157 let data = waker.data();
158 if data.is_null() {
159 return;
160 }
161 let reactor = unsafe { &*(data as *const Reactor) };
162 reactor.register_write(id);
163}