Skip to main content

leodos_protocols/network/isl/routing/
mod.rs

1/// Routing algorithm trait and implementations.
2pub mod algorithm;
3/// ISL routable packet definitions and builders.
4pub mod packet;
5
6use futures::FutureExt as _;
7use futures::future::Either;
8use zerocopy::IntoBytes as _;
9
10use crate::datalink::Datalink;
11use crate::datalink::DatalinkRead;
12use crate::datalink::DatalinkWrite;
13use crate::network::NetworkRead;
14use crate::network::NetworkWrite;
15use crate::network::isl;
16use crate::network::isl::address::Address;
17use crate::network::isl::routing::algorithm::RoutingAlgorithm;
18use crate::network::isl::routing::packet::IslRoutingTelecommand;
19use crate::network::isl::torus::Point;
20use crate::network::isl::torus::{Direction, Hop};
21use crate::utils::clock::Clock;
22use crate::utils::ringbuf::RingBuffer;
23
24/// Per-direction link state: the link itself plus its
25/// input buffer, output queue, and staging buffer.
26struct Port<L, const MTU: usize, const OUT: usize> {
27    /// Bidirectional data link, split into read/write
28    /// halves on each poll cycle.
29    link: L,
30    /// Input buffer for packets read from this link.
31    buf: [u8; MTU],
32    /// Output queue for packets waiting to be forwarded
33    /// through this link.
34    out: RingBuffer<OUT>,
35    /// Staging buffer: the front of `out` is copied here
36    /// before starting a write future, so the ring stays
37    /// free for new enqueues while the write is in flight.
38    stage: [u8; MTU],
39}
40
41impl<L, const MTU: usize, const OUT: usize> Port<L, MTU, OUT> {
42    fn new(link: L) -> Self {
43        Self {
44            link,
45            buf: [0u8; MTU],
46            out: RingBuffer::new(),
47            stage: [0u8; MTU],
48        }
49    }
50}
51
52/// A SpacePacket router with per-direction output queues.
53///
54/// ISL directions (N/S/E/W) share link type `N`. The ground
55/// link has an independent type `G` since it may use a
56/// different physical layer. Both must implement [`Datalink`]
57/// so the router can split them into read/write halves for
58/// concurrent I/O.
59///
60/// The `read()` loop uses `select_biased!` to poll all 5
61/// readers and all 5 writers concurrently, eliminating
62/// head-of-line blocking and deadlock.
63pub struct Router<N, G, A, C, const MTU: usize = 1024, const OUT: usize = 2048> {
64    north: Port<N, MTU, OUT>,
65    south: Port<N, MTU, OUT>,
66    east: Port<N, MTU, OUT>,
67    west: Port<N, MTU, OUT>,
68    ground: Port<G, MTU, OUT>,
69    address: Address,
70    algorithm: A,
71    clock: C,
72}
73
74/// Error from a directional link or from ISL parsing.
75#[derive(Debug, Clone, thiserror::Error)]
76pub enum RouterError<E, GE = E> {
77    /// Error on the north link.
78    #[error("North link error: {0}")]
79    North(E),
80    /// Error on the south link.
81    #[error("South link error: {0}")]
82    South(E),
83    /// Error on the east link.
84    #[error("East link error: {0}")]
85    East(E),
86    /// Error on the west link.
87    #[error("West link error: {0}")]
88    West(E),
89    /// Error on the ground link.
90    #[error("Ground link error: {0}")]
91    Ground(GE),
92    /// The caller's buffer is too small for the received packet.
93    #[error("buffer too small: need {needed} bytes, got {provided}")]
94    BufferTooSmall {
95        /// Packet size in bytes.
96        needed: usize,
97        /// Caller buffer size in bytes.
98        provided: usize,
99    },
100    /// Error parsing the ISL message.
101    #[error("ISL message error: {0}")]
102    IslMessageError(#[from] isl::routing::packet::IslMessageError),
103}
104
105#[bon::bon]
106impl<N, G, A, C, const MTU: usize, const OUT: usize> Router<N, G, A, C, MTU, OUT>
107where
108    N: Datalink,
109    G: Datalink,
110    A: RoutingAlgorithm,
111    C: Clock,
112{
113    #[builder]
114    /// Creates a new router with directional links.
115    pub fn new(
116        north: N,
117        south: N,
118        east: N,
119        west: N,
120        ground: G,
121        address: Address,
122        algorithm: A,
123        clock: C,
124    ) -> Self {
125        Self {
126            north: Port::new(north),
127            south: Port::new(south),
128            east: Port::new(east),
129            west: Port::new(west),
130            ground: Port::new(ground),
131            address,
132            algorithm,
133            clock,
134        }
135    }
136
137    /// Returns this router's own address.
138    pub fn address(&self) -> Address {
139        self.address
140    }
141}
142
143impl<N, G, A, C, const MTU: usize, const OUT: usize> NetworkWrite for Router<N, G, A, C, MTU, OUT>
144where
145    N: Datalink,
146    G: Datalink,
147    A: RoutingAlgorithm,
148    C: Clock,
149{
150    type Error = RouterError<N::WriteError, G::WriteError>;
151
152    async fn write(&mut self, data: &[u8]) -> Result<(), Self::Error> {
153        let packet = IslRoutingTelecommand::parse(data).map_err(RouterError::IslMessageError)?;
154        let target = packet.isl_header.target();
155        let bytes = packet.as_bytes();
156        let next = self
157            .algorithm
158            .route(Point::from(self.address), target, self.clock.now());
159        match next {
160            Hop::Isl(Direction::North) => {
161                let (_, mut w) = self.north.link.split();
162                w.write(bytes).await.map_err(RouterError::North)
163            }
164            Hop::Isl(Direction::South) => {
165                let (_, mut w) = self.south.link.split();
166                w.write(bytes).await.map_err(RouterError::South)
167            }
168            Hop::Isl(Direction::East) => {
169                let (_, mut w) = self.east.link.split();
170                w.write(bytes).await.map_err(RouterError::East)
171            }
172            Hop::Isl(Direction::West) => {
173                let (_, mut w) = self.west.link.split();
174                w.write(bytes).await.map_err(RouterError::West)
175            }
176            Hop::Ground => {
177                let (_, mut w) = self.ground.link.split();
178                w.write(bytes).await.map_err(RouterError::Ground)
179            }
180            Hop::Local => Ok(()),
181        }
182    }
183}
184
185impl<N, G, A, C, const MTU: usize, const OUT: usize> NetworkRead for Router<N, G, A, C, MTU, OUT>
186where
187    N: Datalink,
188    G: Datalink,
189    A: RoutingAlgorithm,
190    C: Clock,
191{
192    type Error = RouterError<N::ReadError, G::ReadError>;
193
194    async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
195        loop {
196            let Self {
197                north,
198                south,
199                east,
200                west,
201                ground,
202                address,
203                algorithm,
204                clock,
205            } = self;
206
207            // Split each link into read/write halves.
208            let (mut nr, mut nw) = north.link.split();
209            let (mut sr, mut sw) = south.link.split();
210            let (mut er, mut ew) = east.link.split();
211            let (mut wr, mut ww) = west.link.split();
212            let (mut gr, mut gw) = ground.link.split();
213
214            // Stage output: copy queue front into staging
215            // buffer, then start a write future. If the
216            // queue is empty, return pending().
217            macro_rules! stage {
218                ($w:expr, $port:expr) => {
219                    match $port.out.front() {
220                        Some(data) => {
221                            let len = data.len();
222                            $port.stage[..len].copy_from_slice(data);
223                            Either::Left($w.write(&$port.stage[..len]).fuse())
224                        }
225                        None => Either::Right(futures::future::pending()),
226                    }
227                };
228            }
229
230            enum Event<RE, GRE> {
231                IslRead(Result<usize, RE>, Direction),
232                GroundRead(Result<usize, GRE>),
233                IslWrite(Direction),
234                GroundWrite,
235            }
236
237            // Route a received packet: return if local,
238            // enqueue for forwarding otherwise.
239            macro_rules! route_packet {
240                ($buf:expr, $len:expr, $err_variant:ident) => {{
241                    let buf = &$buf[..$len];
242                    let Ok(packet) = IslRoutingTelecommand::parse(buf) else {
243                        continue;
244                    };
245                    let next = algorithm.route(
246                        Point::from(*address),
247                        packet.isl_header.target(),
248                        clock.now(),
249                    );
250
251                    if next == Hop::Local {
252                        if buffer.len() < $len {
253                            return Err(RouterError::BufferTooSmall {
254                                needed: $len,
255                                provided: buffer.len(),
256                            });
257                        }
258                        buffer[..$len].copy_from_slice(buf);
259                        return Ok($len);
260                    }
261
262                    match next {
263                        Hop::Isl(Direction::North) => {
264                            north.out.push(buf);
265                        }
266                        Hop::Isl(Direction::South) => {
267                            south.out.push(buf);
268                        }
269                        Hop::Isl(Direction::East) => {
270                            east.out.push(buf);
271                        }
272                        Hop::Isl(Direction::West) => {
273                            west.out.push(buf);
274                        }
275                        Hop::Ground => {
276                            ground.out.push(buf);
277                        }
278                        Hop::Local => {}
279                    }
280                }};
281            }
282
283            let event = {
284                let nw = stage!(nw, north);
285                let sw = stage!(sw, south);
286                let ew = stage!(ew, east);
287                let ww = stage!(ww, west);
288                let gw = stage!(gw, ground);
289
290                let nr = nr.read(&mut north.buf).fuse();
291                let sr = sr.read(&mut south.buf).fuse();
292                let er = er.read(&mut east.buf).fuse();
293                let wr = wr.read(&mut west.buf).fuse();
294                let gr = gr.read(&mut ground.buf).fuse();
295
296                pin_utils::pin_mut!(nr, sr, er, wr, gr, nw, sw, ew, ww, gw);
297
298                // Writes before reads: drain output queues
299                // before accepting new packets. Reads can
300                // wait in the link's own buffer; this prevents
301                // write starvation under heavy load.
302                futures::select_biased! {
303                    _ = nw => Event::IslWrite(Direction::North),
304                    _ = sw => Event::IslWrite(Direction::South),
305                    _ = ew => Event::IslWrite(Direction::East),
306                    _ = ww => Event::IslWrite(Direction::West),
307                    _ = gw => Event::GroundWrite,
308                    r = nr => Event::IslRead(r, Direction::North),
309                    r = sr => Event::IslRead(r, Direction::South),
310                    r = er => Event::IslRead(r, Direction::East),
311                    r = wr => Event::IslRead(r, Direction::West),
312                    r = gr => Event::GroundRead(r),
313                }
314            };
315
316            match event {
317                Event::IslWrite(dir) => match dir {
318                    Direction::North => north.out.pop(),
319                    Direction::South => south.out.pop(),
320                    Direction::East => east.out.pop(),
321                    Direction::West => west.out.pop(),
322                },
323                Event::GroundWrite => ground.out.pop(),
324                Event::GroundRead(result) => {
325                    let len = result.map_err(RouterError::Ground)?;
326                    route_packet!(ground.buf, len, Ground);
327                }
328                Event::IslRead(result, dir) => {
329                    let (buf, len) = match dir {
330                        Direction::North => (&north.buf, result.map_err(RouterError::North)?),
331                        Direction::South => (&south.buf, result.map_err(RouterError::South)?),
332                        Direction::East => (&east.buf, result.map_err(RouterError::East)?),
333                        Direction::West => (&west.buf, result.map_err(RouterError::West)?),
334                    };
335                    route_packet!(buf, len, Isl);
336                }
337            }
338        }
339    }
340}