Skip to main content

leodos_libcfs/runtime/sync/
oneshot.rs

1//! An asynchronous, single-message channel for `no_std` environments.
2
3use core::cell::RefCell;
4use core::future::Future;
5use core::mem;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9/// The internal state of the oneshot channel.
10enum State<T> {
11    /// The channel is empty, waiting for a value.
12    Empty,
13    /// The channel is full, holding a value.
14    Full(T),
15    /// The channel is closed because the Sender or Receiver was dropped.
16    Closed,
17}
18
19/// The shared core of the oneshot channel.
20struct Core<T> {
21    state: RefCell<State<T>>,
22}
23
24/// A handle to a oneshot channel that owns the shared state.
25pub struct Channel<T> {
26    core: Core<T>,
27}
28
29/// The sending half of a oneshot channel.
30pub struct Sender<'a, T> {
31    core: &'a Core<T>,
32}
33
34/// The receiving half of a oneshot channel.
35pub struct Receiver<'a, T> {
36    core: &'a Core<T>,
37}
38
39/// An error returned by `Sender::send` if the `Receiver` has been dropped.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub struct SendError<T>(pub T);
42
43/// An error returned by `Receiver::recv` if the `Sender` has been dropped.
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub struct RecvError;
46
47/// Creates a new oneshot channel, returning the owning `Channel` handle.
48pub fn channel<T>() -> Channel<T> {
49    Channel {
50        core: Core {
51            state: RefCell::new(State::Empty),
52        },
53    }
54}
55
56impl<T> Channel<T> {
57    /// Splits the channel into its sending and receiving halves.
58    pub fn split(&mut self) -> (Sender<'_, T>, Receiver<'_, T>) {
59        (Sender { core: &self.core }, Receiver { core: &self.core })
60    }
61}
62
63impl<'a, T> Sender<'a, T> {
64    /// Attempts to send a value to the receiver.
65    ///
66    /// This is a synchronous operation; it succeeds or fails immediately.
67    ///
68    /// # Errors
69    ///
70    /// Returns `Err(SendError(value))` if the receiver has already been dropped.
71    pub fn send(self, value: T) -> Result<(), SendError<T>> {
72        // We take ownership of `self` to ensure `send` can only be called once.
73        let mut state = self.core.state.borrow_mut();
74
75        match *state {
76            State::Empty => {
77                *state = State::Full(value);
78                Ok(())
79            }
80            // If it's already full or closed, the receiver must have been dropped.
81            _ => Err(SendError(value)),
82        }
83    }
84}
85
86impl<T> Drop for Core<T> {
87    fn drop(&mut self) {
88        // Ensure state is marked as closed when the channel is finally dropped.
89        *self.state.borrow_mut() = State::Closed;
90    }
91}
92
93impl<'a, T> Drop for Sender<'a, T> {
94    fn drop(&mut self) {
95        // If the sender is dropped before sending, mark the channel as closed
96        // to notify the receiver.
97        let mut state = self.core.state.borrow_mut();
98        if matches!(*state, State::Empty) {
99            *state = State::Closed;
100        }
101    }
102}
103
104impl<'a, T> Receiver<'a, T> {
105    /// Waits for a value to be sent on the channel.
106    ///
107    /// This returns a future that resolves to the value, or an error if the
108    /// sender is dropped before sending a value.
109    pub async fn recv(self) -> Result<T, RecvError> {
110        // A simple future that polls the channel state.
111        struct RecvFuture<'a, T> {
112            core: &'a Core<T>,
113        }
114
115        impl<'a, T> Future for RecvFuture<'a, T> {
116            type Output = Result<T, RecvError>;
117
118            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
119                let mut state = self.core.state.borrow_mut();
120                match mem::replace(&mut *state, State::Closed) {
121                    State::Full(value) => Poll::Ready(Ok(value)),
122                    State::Closed => Poll::Ready(Err(RecvError)),
123                    State::Empty => {
124                        // Not ready yet, put the state back and pend.
125                        *state = State::Empty;
126                        Poll::Pending
127                    }
128                }
129            }
130        }
131
132        RecvFuture { core: self.core }.await
133    }
134}