Skip to main content

leodos_protocols/datalink/link/
local.rs

1use core::future::poll_fn;
2use core::task::Poll;
3
4use heapless::Deque;
5
6use crate::datalink::{DatalinkRead, DatalinkWrite};
7use crate::network::{NetworkRead, NetworkWrite};
8use crate::utils::cell::SyncRefCell;
9
10/// Error from a local in-process channel.
11#[derive(Debug, Clone)]
12pub enum LocalLinkError {
13    /// The internal queue has no remaining capacity.
14    QueueFull,
15    /// The channel has been closed.
16    Closed,
17}
18
19impl core::fmt::Display for LocalLinkError {
20    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
21        match self {
22            Self::QueueFull => write!(f, "queue full"),
23            Self::Closed => write!(f, "channel closed"),
24        }
25    }
26}
27
28impl core::error::Error for LocalLinkError {}
29
30struct Packet<const MTU: usize> {
31    data: [u8; MTU],
32    len: usize,
33}
34
35struct LocalChannelState<const QUEUE: usize, const MTU: usize> {
36    to_router: Deque<Packet<MTU>, QUEUE>,
37    from_router: Deque<Packet<MTU>, QUEUE>,
38    closed: bool,
39}
40
41/// A single-threaded bidirectional channel between an app and a router.
42pub struct LocalChannel<const QUEUE: usize, const MTU: usize> {
43    state: SyncRefCell<LocalChannelState<QUEUE, MTU>>,
44}
45
46impl<const QUEUE: usize, const MTU: usize> LocalChannel<QUEUE, MTU> {
47    /// Creates a new local channel with empty queues.
48    pub fn new() -> Self {
49        Self {
50            state: SyncRefCell::new(LocalChannelState {
51                to_router: Deque::new(),
52                from_router: Deque::new(),
53                closed: false,
54            }),
55        }
56    }
57
58    /// Splits the channel into application-side and router-side handles.
59    pub fn split(&self) -> (LocalAppHandle<'_, QUEUE, MTU>, LocalRouterHandle<'_, QUEUE, MTU>) {
60        (
61            LocalAppHandle { channel: self },
62            LocalRouterHandle { channel: self },
63        )
64    }
65
66    /// Closes the channel, causing future operations to return `Closed`.
67    pub fn close(&self) {
68        self.state.with_mut(|s| s.closed = true);
69    }
70}
71
72impl<const QUEUE: usize, const MTU: usize> Default for LocalChannel<QUEUE, MTU> {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78/// Application-side handle for sending to and receiving from the router.
79pub struct LocalAppHandle<'a, const QUEUE: usize, const MTU: usize> {
80    channel: &'a LocalChannel<QUEUE, MTU>,
81}
82
83impl<'a, const QUEUE: usize, const MTU: usize> NetworkWrite for LocalAppHandle<'a, QUEUE, MTU> {
84    type Error = LocalLinkError;
85
86    async fn write(&mut self, data: &[u8]) -> Result<(), Self::Error> {
87        poll_fn(|_cx| {
88            self.channel.state.with_mut(|state| {
89                if state.closed {
90                    return Poll::Ready(Err(LocalLinkError::Closed));
91                }
92
93                if state.to_router.is_full() {
94                    return Poll::Pending;
95                }
96
97                let mut packet = Packet {
98                    data: [0u8; MTU],
99                    len: data.len().min(MTU),
100                };
101                packet.data[..packet.len].copy_from_slice(&data[..packet.len]);
102                state.to_router.push_back(packet).ok();
103
104                Poll::Ready(Ok(()))
105            })
106        })
107        .await
108    }
109}
110
111impl<'a, const QUEUE: usize, const MTU: usize> NetworkRead for LocalAppHandle<'a, QUEUE, MTU> {
112    type Error = LocalLinkError;
113
114    async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
115        poll_fn(|_cx| {
116            self.channel.state.with_mut(|state| {
117                if let Some(packet) = state.from_router.pop_front() {
118                    let len = packet.len.min(buffer.len());
119                    buffer[..len].copy_from_slice(&packet.data[..len]);
120                    return Poll::Ready(Ok(len));
121                }
122
123                if state.closed {
124                    return Poll::Ready(Err(LocalLinkError::Closed));
125                }
126
127                Poll::Pending
128            })
129        })
130        .await
131    }
132}
133
134/// Router-side handle for sending to and receiving from the application.
135pub struct LocalRouterHandle<'a, const QUEUE: usize, const MTU: usize> {
136    channel: &'a LocalChannel<QUEUE, MTU>,
137}
138
139impl<'a, const QUEUE: usize, const MTU: usize> DatalinkWrite for LocalRouterHandle<'a, QUEUE, MTU> {
140    type Error = LocalLinkError;
141
142    async fn write(&mut self, data: &[u8]) -> Result<(), Self::Error> {
143        poll_fn(|_cx| {
144            self.channel.state.with_mut(|state| {
145                if state.closed {
146                    return Poll::Ready(Err(LocalLinkError::Closed));
147                }
148
149                if state.from_router.is_full() {
150                    return Poll::Pending;
151                }
152
153                let mut packet = Packet {
154                    data: [0u8; MTU],
155                    len: data.len().min(MTU),
156                };
157                packet.data[..packet.len].copy_from_slice(&data[..packet.len]);
158                state.from_router.push_back(packet).ok();
159
160                Poll::Ready(Ok(()))
161            })
162        })
163        .await
164    }
165}
166
167impl<'a, const QUEUE: usize, const MTU: usize> DatalinkRead for LocalRouterHandle<'a, QUEUE, MTU> {
168    type Error = LocalLinkError;
169
170    async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
171        poll_fn(|_cx| {
172            self.channel.state.with_mut(|state| {
173                if let Some(packet) = state.to_router.pop_front() {
174                    let len = packet.len.min(buffer.len());
175                    buffer[..len].copy_from_slice(&packet.data[..len]);
176                    return Poll::Ready(Ok(len));
177                }
178
179                if state.closed {
180                    return Poll::Ready(Err(LocalLinkError::Closed));
181                }
182
183                Poll::Pending
184            })
185        })
186        .await
187    }
188}