Skip to main content

leodos_libcfs/runtime/sync/
spsc.rs

1//! An asynchronous, single-producer, single-producer channel for `no_std` environments.
2//!
3//! This channel is a thin async wrapper around `heapless::spsc::Queue`.
4
5use core::task::{Context, Poll};
6use heapless::spsc::{Consumer, Producer, Queue};
7
8/// The sending half of a channel.
9pub struct Sender<'a, T> {
10    producer: Producer<'a, T>,
11}
12
13/// The receiving half of a channel.
14pub struct Receiver<'a, T, const N: usize> {
15    consumer: Consumer<'a, T>,
16}
17
18/// An asynchronous single-producer, single-consumer channel.
19pub struct Channel<T, const N: usize> {
20    queue: Queue<T, N>,
21}
22
23/// Creates a new asynchronous channel.
24pub fn channel<'a, T, const N: usize>() -> Channel<T, N> {
25    Channel {
26        queue: Queue::new(),
27    }
28}
29
30impl<T, const N: usize> Channel<T, N> {
31    /// Splits the channel into its sending and receiving halves.
32    pub fn split(&mut self) -> (Sender<'_, T>, Receiver<'_, T, N>) {
33        let (producer, consumer) = self.queue.split();
34        (Sender { producer }, Receiver { consumer })
35    }
36}
37
38impl<'a, T> Sender<'a, T> {
39    /// Sends a value, waiting until there is capacity.
40    pub async fn send(&mut self, value: T) {
41        let mut value = Some(value);
42        core::future::poll_fn(|_cx: &mut Context<'_>| {
43            let val = value.take().unwrap();
44            match self.producer.enqueue(val) {
45                Ok(()) => Poll::Ready(()),
46                Err(val) => {
47                    value = Some(val);
48                    Poll::Pending
49                }
50            }
51        })
52        .await
53    }
54}
55
56impl<'a, T, const N: usize> Receiver<'a, T, N> {
57    /// Receives a value, waiting until one is available.
58    pub async fn recv(&mut self) -> T {
59        core::future::poll_fn(|_cx: &mut Context<'_>| {
60            if let Some(value) = self.consumer.dequeue() {
61                Poll::Ready(value)
62            } else {
63                Poll::Pending
64            }
65        })
66        .await
67    }
68}