leodos_libcfs/runtime/sync/
spsc.rs1use core::task::{Context, Poll};
6use heapless::spsc::{Consumer, Producer, Queue};
7
8pub struct Sender<'a, T> {
10 producer: Producer<'a, T>,
11}
12
13pub struct Receiver<'a, T, const N: usize> {
15 consumer: Consumer<'a, T>,
16}
17
18pub struct Channel<T, const N: usize> {
20 queue: Queue<T, N>,
21}
22
23pub fn channel<'a, T, const N: usize>() -> Channel<T, N> {
25 Channel {
26 queue: Queue::new(),
27 }
28}
29
30impl<T, const N: usize> Channel<T, N> {
31 pub fn split(&mut self) -> (Sender<'_, T>, Receiver<'_, T, N>) {
33 let (producer, consumer) = self.queue.split();
34 (Sender { producer }, Receiver { consumer })
35 }
36}
37
38impl<'a, T> Sender<'a, T> {
39 pub async fn send(&mut self, value: T) {
41 let mut value = Some(value);
42 core::future::poll_fn(|_cx: &mut Context<'_>| {
43 let val = value.take().unwrap();
44 match self.producer.enqueue(val) {
45 Ok(()) => Poll::Ready(()),
46 Err(val) => {
47 value = Some(val);
48 Poll::Pending
49 }
50 }
51 })
52 .await
53 }
54}
55
56impl<'a, T, const N: usize> Receiver<'a, T, N> {
57 pub async fn recv(&mut self) -> T {
59 core::future::poll_fn(|_cx: &mut Context<'_>| {
60 if let Some(value) = self.consumer.dequeue() {
61 Poll::Ready(value)
62 } else {
63 Poll::Pending
64 }
65 })
66 .await
67 }
68}