leodos_protocols/datalink/link/
local.rs1use core::future::poll_fn;
2use core::task::Poll;
3
4use heapless::Deque;
5
6use crate::datalink::{DatalinkRead, DatalinkWrite};
7use crate::network::{NetworkRead, NetworkWrite};
8use crate::utils::cell::SyncRefCell;
9
10#[derive(Debug, Clone)]
12pub enum LocalLinkError {
13 QueueFull,
15 Closed,
17}
18
19impl core::fmt::Display for LocalLinkError {
20 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
21 match self {
22 Self::QueueFull => write!(f, "queue full"),
23 Self::Closed => write!(f, "channel closed"),
24 }
25 }
26}
27
28impl core::error::Error for LocalLinkError {}
29
30struct Packet<const MTU: usize> {
31 data: [u8; MTU],
32 len: usize,
33}
34
35struct LocalChannelState<const QUEUE: usize, const MTU: usize> {
36 to_router: Deque<Packet<MTU>, QUEUE>,
37 from_router: Deque<Packet<MTU>, QUEUE>,
38 closed: bool,
39}
40
41pub struct LocalChannel<const QUEUE: usize, const MTU: usize> {
43 state: SyncRefCell<LocalChannelState<QUEUE, MTU>>,
44}
45
46impl<const QUEUE: usize, const MTU: usize> LocalChannel<QUEUE, MTU> {
47 pub fn new() -> Self {
49 Self {
50 state: SyncRefCell::new(LocalChannelState {
51 to_router: Deque::new(),
52 from_router: Deque::new(),
53 closed: false,
54 }),
55 }
56 }
57
58 pub fn split(&self) -> (LocalAppHandle<'_, QUEUE, MTU>, LocalRouterHandle<'_, QUEUE, MTU>) {
60 (
61 LocalAppHandle { channel: self },
62 LocalRouterHandle { channel: self },
63 )
64 }
65
66 pub fn close(&self) {
68 self.state.with_mut(|s| s.closed = true);
69 }
70}
71
72impl<const QUEUE: usize, const MTU: usize> Default for LocalChannel<QUEUE, MTU> {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78pub struct LocalAppHandle<'a, const QUEUE: usize, const MTU: usize> {
80 channel: &'a LocalChannel<QUEUE, MTU>,
81}
82
83impl<'a, const QUEUE: usize, const MTU: usize> NetworkWrite for LocalAppHandle<'a, QUEUE, MTU> {
84 type Error = LocalLinkError;
85
86 async fn write(&mut self, data: &[u8]) -> Result<(), Self::Error> {
87 poll_fn(|_cx| {
88 self.channel.state.with_mut(|state| {
89 if state.closed {
90 return Poll::Ready(Err(LocalLinkError::Closed));
91 }
92
93 if state.to_router.is_full() {
94 return Poll::Pending;
95 }
96
97 let mut packet = Packet {
98 data: [0u8; MTU],
99 len: data.len().min(MTU),
100 };
101 packet.data[..packet.len].copy_from_slice(&data[..packet.len]);
102 state.to_router.push_back(packet).ok();
103
104 Poll::Ready(Ok(()))
105 })
106 })
107 .await
108 }
109}
110
111impl<'a, const QUEUE: usize, const MTU: usize> NetworkRead for LocalAppHandle<'a, QUEUE, MTU> {
112 type Error = LocalLinkError;
113
114 async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
115 poll_fn(|_cx| {
116 self.channel.state.with_mut(|state| {
117 if let Some(packet) = state.from_router.pop_front() {
118 let len = packet.len.min(buffer.len());
119 buffer[..len].copy_from_slice(&packet.data[..len]);
120 return Poll::Ready(Ok(len));
121 }
122
123 if state.closed {
124 return Poll::Ready(Err(LocalLinkError::Closed));
125 }
126
127 Poll::Pending
128 })
129 })
130 .await
131 }
132}
133
134pub struct LocalRouterHandle<'a, const QUEUE: usize, const MTU: usize> {
136 channel: &'a LocalChannel<QUEUE, MTU>,
137}
138
139impl<'a, const QUEUE: usize, const MTU: usize> DatalinkWrite for LocalRouterHandle<'a, QUEUE, MTU> {
140 type Error = LocalLinkError;
141
142 async fn write(&mut self, data: &[u8]) -> Result<(), Self::Error> {
143 poll_fn(|_cx| {
144 self.channel.state.with_mut(|state| {
145 if state.closed {
146 return Poll::Ready(Err(LocalLinkError::Closed));
147 }
148
149 if state.from_router.is_full() {
150 return Poll::Pending;
151 }
152
153 let mut packet = Packet {
154 data: [0u8; MTU],
155 len: data.len().min(MTU),
156 };
157 packet.data[..packet.len].copy_from_slice(&data[..packet.len]);
158 state.from_router.push_back(packet).ok();
159
160 Poll::Ready(Ok(()))
161 })
162 })
163 .await
164 }
165}
166
167impl<'a, const QUEUE: usize, const MTU: usize> DatalinkRead for LocalRouterHandle<'a, QUEUE, MTU> {
168 type Error = LocalLinkError;
169
170 async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
171 poll_fn(|_cx| {
172 self.channel.state.with_mut(|state| {
173 if let Some(packet) = state.to_router.pop_front() {
174 let len = packet.len.min(buffer.len());
175 buffer[..len].copy_from_slice(&packet.data[..len]);
176 return Poll::Ready(Ok(len));
177 }
178
179 if state.closed {
180 return Poll::Ready(Err(LocalLinkError::Closed));
181 }
182
183 Poll::Pending
184 })
185 })
186 .await
187 }
188}