Skip to main content

leodos_protocols/network/isl/gossip/
mod.rs

1//! ISL Gossip protocol — epidemic flood with epoch dedup.
2//!
3//! [`Gossip`] owns four directional datalinks and implements
4//! [`NetworkRead`] + [`NetworkWrite`]. Every packet is flooded
5//! to all neighbors (minus the predecessor) and delivered
6//! locally. Duplicate epochs are silently dropped.
7
8/// Sliding-window duplicate filter for epoch-based deduplication.
9pub mod bitmap;
10/// Gossip packet structure and builder.
11pub mod packet;
12
13use futures::FutureExt as _;
14use futures::future::Either;
15use zerocopy::FromBytes as _;
16use zerocopy::IntoBytes as _;
17use zerocopy::network_endian::U16;
18
19use bitmap::DuplicateFilter;
20
21use crate::datalink::Datalink;
22use crate::datalink::DatalinkRead;
23use crate::datalink::DatalinkWrite;
24use crate::network::NetworkRead;
25use crate::network::NetworkWrite;
26use crate::network::isl::address::Address;
27use crate::network::isl::gossip::packet::Epoch;
28use crate::network::isl::gossip::packet::IslGossipTelecommand;
29use crate::network::isl::torus::Direction;
30use crate::network::isl::torus::Point;
31use crate::network::isl::torus::Torus;
32use crate::network::spp::Apid;
33use crate::utils::ringbuf::RingBuffer;
34
35/// Per-direction link state: the link itself plus its input
36/// buffer, output queue, and staging buffer.
37struct Port<L, const MTU: usize, const OUT: usize> {
38    link: L,
39    buf: [u8; MTU],
40    out: RingBuffer<OUT>,
41    stage: [u8; MTU],
42}
43
44impl<L, const MTU: usize, const OUT: usize> Port<L, MTU, OUT> {
45    fn new(link: L) -> Self {
46        Self {
47            link,
48            buf: [0u8; MTU],
49            out: RingBuffer::new(),
50            stage: [0u8; MTU],
51        }
52    }
53}
54
55/// Error from a directional link or from gossip parsing.
56#[derive(Debug, Clone, thiserror::Error)]
57pub enum GossipError<E> {
58    /// Error on the north link.
59    #[error("North link error: {0}")]
60    North(E),
61    /// Error on the south link.
62    #[error("South link error: {0}")]
63    South(E),
64    /// Error on the east link.
65    #[error("East link error: {0}")]
66    East(E),
67    /// Error on the west link.
68    #[error("West link error: {0}")]
69    West(E),
70    /// The caller's buffer is too small for the payload.
71    #[error("buffer too small: need {needed} bytes, got {provided}")]
72    BufferTooSmall {
73        /// Payload size in bytes.
74        needed: usize,
75        /// Caller buffer size in bytes.
76        provided: usize,
77    },
78}
79
80/// Epidemic gossip flood over a 4-connected torus mesh.
81///
82/// Implements [`NetworkWrite`] (wrap + flood) and
83/// [`NetworkRead`] (receive, dedup, forward, deliver).
84pub struct Gossip<N, const MTU: usize = 256, const OUT: usize = 2048> {
85    north: Port<N, MTU, OUT>,
86    south: Port<N, MTU, OUT>,
87    east: Port<N, MTU, OUT>,
88    west: Port<N, MTU, OUT>,
89    address: Address,
90    torus: Torus,
91    apid: Apid,
92    function_code: u8,
93    dedup: DuplicateFilter,
94    next_epoch: u16,
95    buf: [u8; MTU],
96}
97
98#[bon::bon]
99impl<N, const MTU: usize, const OUT: usize> Gossip<N, MTU, OUT>
100where
101    N: Datalink,
102{
103    #[builder]
104    /// Creates a new gossip node with directional links.
105    pub fn new(
106        north: N,
107        south: N,
108        east: N,
109        west: N,
110        address: Address,
111        torus: Torus,
112        apid: Apid,
113        function_code: u8,
114    ) -> Self {
115        Self {
116            north: Port::new(north),
117            south: Port::new(south),
118            east: Port::new(east),
119            west: Port::new(west),
120            address,
121            torus,
122            apid,
123            function_code,
124            dedup: DuplicateFilter::new(),
125            next_epoch: 0,
126            buf: [0u8; MTU],
127        }
128    }
129
130    /// Returns this node's own address.
131    pub fn address(&self) -> Address {
132        self.address
133    }
134}
135
136impl<N, const MTU: usize, const OUT: usize> Gossip<N, MTU, OUT>
137where
138    N: Datalink,
139{
140    /// Compute which directions to flood a locally-originated
141    /// packet (all neighbors in service area).
142    fn flood_directions(
143        &self,
144        service_area_min: u8,
145        service_area_max: u8,
146    ) -> [bool; 4] {
147        let my_point = Point::from(self.address);
148        let all = [
149            Direction::North,
150            Direction::South,
151            Direction::East,
152            Direction::West,
153        ];
154        let mut dirs = [false; 4];
155        for (i, direction) in all.iter().enumerate() {
156            let neighbor = self.torus.neighbor(my_point, *direction);
157            let addr = Address::from(neighbor);
158            dirs[i] = addr.is_in_service_area(
159                service_area_min,
160                service_area_max,
161            );
162        }
163        dirs
164    }
165
166}
167
168impl<N, const MTU: usize, const OUT: usize> NetworkWrite
169    for Gossip<N, MTU, OUT>
170where
171    N: Datalink,
172{
173    type Error = GossipError<N::WriteError>;
174
175    async fn write(
176        &mut self,
177        data: &[u8],
178    ) -> Result<(), Self::Error> {
179        let epoch_val = self.next_epoch;
180        self.next_epoch = self.next_epoch.wrapping_add(1);
181        let epoch = Epoch(U16::new(epoch_val));
182        self.dedup.is_duplicate(epoch.0.get());
183
184        let pkt = IslGossipTelecommand::builder()
185            .buffer(&mut self.buf)
186            .apid(self.apid)
187            .function_code(self.function_code)
188            .origin(self.address)
189            .predecessor(self.address)
190            .service_area_min(0)
191            .service_area_max(255)
192            .epoch(epoch)
193            .payload_len(data.len())
194            .build()
195            .ok();
196
197        let Some(pkt) = pkt else {
198            return Ok(());
199        };
200        pkt.payload.copy_from_slice(data);
201        pkt.set_cfe_checksum();
202        let len = pkt.as_bytes().len();
203
204        let dirs = self.flood_directions(0, 255);
205
206        if dirs[0] {
207            let (_, mut w) = self.north.link.split();
208            w.write(&self.buf[..len])
209                .await
210                .map_err(GossipError::North)?;
211        }
212        if dirs[1] {
213            let (_, mut w) = self.south.link.split();
214            w.write(&self.buf[..len])
215                .await
216                .map_err(GossipError::South)?;
217        }
218        if dirs[2] {
219            let (_, mut w) = self.east.link.split();
220            w.write(&self.buf[..len])
221                .await
222                .map_err(GossipError::East)?;
223        }
224        if dirs[3] {
225            let (_, mut w) = self.west.link.split();
226            w.write(&self.buf[..len])
227                .await
228                .map_err(GossipError::West)?;
229        }
230
231        Ok(())
232    }
233}
234
235impl<N, const MTU: usize, const OUT: usize> NetworkRead
236    for Gossip<N, MTU, OUT>
237where
238    N: Datalink,
239{
240    type Error = GossipError<N::ReadError>;
241
242    async fn read(
243        &mut self,
244        buffer: &mut [u8],
245    ) -> Result<usize, Self::Error> {
246        loop {
247            let Self {
248                north,
249                south,
250                east,
251                west,
252                address,
253                torus,
254                dedup,
255                ..
256            } = self;
257
258            let (mut nr, mut nw) = north.link.split();
259            let (mut sr, mut sw) = south.link.split();
260            let (mut er, mut ew) = east.link.split();
261            let (mut wr, mut ww) = west.link.split();
262
263            macro_rules! stage {
264                ($w:expr, $port:expr) => {
265                    match $port.out.front() {
266                        Some(data) => {
267                            let len = data.len();
268                            $port.stage[..len]
269                                .copy_from_slice(data);
270                            Either::Left(
271                                $w.write(&$port.stage[..len])
272                                    .fuse(),
273                            )
274                        }
275                        None => {
276                            Either::Right(
277                                futures::future::pending(),
278                            )
279                        }
280                    }
281                };
282            }
283
284            enum Event<RE> {
285                Read(Result<usize, RE>, Direction),
286                Write(Direction),
287            }
288
289            let event = {
290                let nw_f = stage!(nw, north);
291                let sw_f = stage!(sw, south);
292                let ew_f = stage!(ew, east);
293                let ww_f = stage!(ww, west);
294
295                let nr_f = nr.read(&mut north.buf).fuse();
296                let sr_f = sr.read(&mut south.buf).fuse();
297                let er_f = er.read(&mut east.buf).fuse();
298                let wr_f = wr.read(&mut west.buf).fuse();
299
300                pin_utils::pin_mut!(
301                    nr_f, sr_f, er_f, wr_f, nw_f, sw_f,
302                    ew_f, ww_f
303                );
304
305                futures::select_biased! {
306                    _ = nw_f => Event::Write(Direction::North),
307                    _ = sw_f => Event::Write(Direction::South),
308                    _ = ew_f => Event::Write(Direction::East),
309                    _ = ww_f => Event::Write(Direction::West),
310                    r = nr_f => Event::Read(r, Direction::North),
311                    r = sr_f => Event::Read(r, Direction::South),
312                    r = er_f => Event::Read(r, Direction::East),
313                    r = wr_f => Event::Read(r, Direction::West),
314                }
315            };
316
317            match event {
318                Event::Write(dir) => match dir {
319                    Direction::North => north.out.pop(),
320                    Direction::South => south.out.pop(),
321                    Direction::East => east.out.pop(),
322                    Direction::West => west.out.pop(),
323                },
324                Event::Read(result, dir) => {
325                    let (buf, len) = match dir {
326                        Direction::North => (
327                            &north.buf[..],
328                            result.map_err(GossipError::North)?,
329                        ),
330                        Direction::South => (
331                            &south.buf[..],
332                            result.map_err(GossipError::South)?,
333                        ),
334                        Direction::East => (
335                            &east.buf[..],
336                            result.map_err(GossipError::East)?,
337                        ),
338                        Direction::West => (
339                            &west.buf[..],
340                            result.map_err(GossipError::West)?,
341                        ),
342                    };
343
344                    let raw = &buf[..len];
345                    let Ok(pkt) =
346                        IslGossipTelecommand::ref_from_bytes(raw)
347                    else {
348                        continue;
349                    };
350
351                    let header = &pkt.gossip_header;
352                    let epoch = header.epoch();
353
354                    if dedup.is_duplicate(epoch.0.get()) {
355                        continue;
356                    }
357
358                    let predecessor = header.predecessor();
359                    let my_point = Point::from(*address);
360                    let all = [
361                        Direction::North,
362                        Direction::South,
363                        Direction::East,
364                        Direction::West,
365                    ];
366                    let fwd = {
367                        let mut dirs = [false; 4];
368                        for (i, d) in
369                            all.iter().enumerate()
370                        {
371                            let n =
372                                torus.neighbor(my_point, *d);
373                            let a = Address::from(n);
374                            dirs[i] = a != predecessor
375                                && a.is_in_service_area(
376                                    header.service_area_min,
377                                    header.service_area_max,
378                                );
379                        }
380                        dirs
381                    };
382
383                    if fwd[0] {
384                        north.out.push(raw);
385                    }
386                    if fwd[1] {
387                        south.out.push(raw);
388                    }
389                    if fwd[2] {
390                        east.out.push(raw);
391                    }
392                    if fwd[3] {
393                        west.out.push(raw);
394                    }
395
396                    let payload = &pkt.payload;
397                    let payload_len = payload.len();
398                    if buffer.len() < payload_len {
399                        return Err(GossipError::BufferTooSmall {
400                            needed: payload_len,
401                            provided: buffer.len(),
402                        });
403                    }
404                    buffer[..payload_len]
405                        .copy_from_slice(payload);
406                    return Ok(payload_len);
407                }
408            }
409        }
410    }
411}