1pub mod bitmap;
10pub mod packet;
12
13use futures::FutureExt as _;
14use futures::future::Either;
15use zerocopy::FromBytes as _;
16use zerocopy::IntoBytes as _;
17use zerocopy::network_endian::U16;
18
19use bitmap::DuplicateFilter;
20
21use crate::datalink::Datalink;
22use crate::datalink::DatalinkRead;
23use crate::datalink::DatalinkWrite;
24use crate::network::NetworkRead;
25use crate::network::NetworkWrite;
26use crate::network::isl::address::Address;
27use crate::network::isl::gossip::packet::Epoch;
28use crate::network::isl::gossip::packet::IslGossipTelecommand;
29use crate::network::isl::torus::Direction;
30use crate::network::isl::torus::Point;
31use crate::network::isl::torus::Torus;
32use crate::network::spp::Apid;
33use crate::utils::ringbuf::RingBuffer;
34
35struct Port<L, const MTU: usize, const OUT: usize> {
38 link: L,
39 buf: [u8; MTU],
40 out: RingBuffer<OUT>,
41 stage: [u8; MTU],
42}
43
44impl<L, const MTU: usize, const OUT: usize> Port<L, MTU, OUT> {
45 fn new(link: L) -> Self {
46 Self {
47 link,
48 buf: [0u8; MTU],
49 out: RingBuffer::new(),
50 stage: [0u8; MTU],
51 }
52 }
53}
54
55#[derive(Debug, Clone, thiserror::Error)]
57pub enum GossipError<E> {
58 #[error("North link error: {0}")]
60 North(E),
61 #[error("South link error: {0}")]
63 South(E),
64 #[error("East link error: {0}")]
66 East(E),
67 #[error("West link error: {0}")]
69 West(E),
70 #[error("buffer too small: need {needed} bytes, got {provided}")]
72 BufferTooSmall {
73 needed: usize,
75 provided: usize,
77 },
78}
79
80pub struct Gossip<N, const MTU: usize = 256, const OUT: usize = 2048> {
85 north: Port<N, MTU, OUT>,
86 south: Port<N, MTU, OUT>,
87 east: Port<N, MTU, OUT>,
88 west: Port<N, MTU, OUT>,
89 address: Address,
90 torus: Torus,
91 apid: Apid,
92 function_code: u8,
93 dedup: DuplicateFilter,
94 next_epoch: u16,
95 buf: [u8; MTU],
96}
97
98#[bon::bon]
99impl<N, const MTU: usize, const OUT: usize> Gossip<N, MTU, OUT>
100where
101 N: Datalink,
102{
103 #[builder]
104 pub fn new(
106 north: N,
107 south: N,
108 east: N,
109 west: N,
110 address: Address,
111 torus: Torus,
112 apid: Apid,
113 function_code: u8,
114 ) -> Self {
115 Self {
116 north: Port::new(north),
117 south: Port::new(south),
118 east: Port::new(east),
119 west: Port::new(west),
120 address,
121 torus,
122 apid,
123 function_code,
124 dedup: DuplicateFilter::new(),
125 next_epoch: 0,
126 buf: [0u8; MTU],
127 }
128 }
129
130 pub fn address(&self) -> Address {
132 self.address
133 }
134}
135
136impl<N, const MTU: usize, const OUT: usize> Gossip<N, MTU, OUT>
137where
138 N: Datalink,
139{
140 fn flood_directions(
143 &self,
144 service_area_min: u8,
145 service_area_max: u8,
146 ) -> [bool; 4] {
147 let my_point = Point::from(self.address);
148 let all = [
149 Direction::North,
150 Direction::South,
151 Direction::East,
152 Direction::West,
153 ];
154 let mut dirs = [false; 4];
155 for (i, direction) in all.iter().enumerate() {
156 let neighbor = self.torus.neighbor(my_point, *direction);
157 let addr = Address::from(neighbor);
158 dirs[i] = addr.is_in_service_area(
159 service_area_min,
160 service_area_max,
161 );
162 }
163 dirs
164 }
165
166}
167
168impl<N, const MTU: usize, const OUT: usize> NetworkWrite
169 for Gossip<N, MTU, OUT>
170where
171 N: Datalink,
172{
173 type Error = GossipError<N::WriteError>;
174
175 async fn write(
176 &mut self,
177 data: &[u8],
178 ) -> Result<(), Self::Error> {
179 let epoch_val = self.next_epoch;
180 self.next_epoch = self.next_epoch.wrapping_add(1);
181 let epoch = Epoch(U16::new(epoch_val));
182 self.dedup.is_duplicate(epoch.0.get());
183
184 let pkt = IslGossipTelecommand::builder()
185 .buffer(&mut self.buf)
186 .apid(self.apid)
187 .function_code(self.function_code)
188 .origin(self.address)
189 .predecessor(self.address)
190 .service_area_min(0)
191 .service_area_max(255)
192 .epoch(epoch)
193 .payload_len(data.len())
194 .build()
195 .ok();
196
197 let Some(pkt) = pkt else {
198 return Ok(());
199 };
200 pkt.payload.copy_from_slice(data);
201 pkt.set_cfe_checksum();
202 let len = pkt.as_bytes().len();
203
204 let dirs = self.flood_directions(0, 255);
205
206 if dirs[0] {
207 let (_, mut w) = self.north.link.split();
208 w.write(&self.buf[..len])
209 .await
210 .map_err(GossipError::North)?;
211 }
212 if dirs[1] {
213 let (_, mut w) = self.south.link.split();
214 w.write(&self.buf[..len])
215 .await
216 .map_err(GossipError::South)?;
217 }
218 if dirs[2] {
219 let (_, mut w) = self.east.link.split();
220 w.write(&self.buf[..len])
221 .await
222 .map_err(GossipError::East)?;
223 }
224 if dirs[3] {
225 let (_, mut w) = self.west.link.split();
226 w.write(&self.buf[..len])
227 .await
228 .map_err(GossipError::West)?;
229 }
230
231 Ok(())
232 }
233}
234
235impl<N, const MTU: usize, const OUT: usize> NetworkRead
236 for Gossip<N, MTU, OUT>
237where
238 N: Datalink,
239{
240 type Error = GossipError<N::ReadError>;
241
242 async fn read(
243 &mut self,
244 buffer: &mut [u8],
245 ) -> Result<usize, Self::Error> {
246 loop {
247 let Self {
248 north,
249 south,
250 east,
251 west,
252 address,
253 torus,
254 dedup,
255 ..
256 } = self;
257
258 let (mut nr, mut nw) = north.link.split();
259 let (mut sr, mut sw) = south.link.split();
260 let (mut er, mut ew) = east.link.split();
261 let (mut wr, mut ww) = west.link.split();
262
263 macro_rules! stage {
264 ($w:expr, $port:expr) => {
265 match $port.out.front() {
266 Some(data) => {
267 let len = data.len();
268 $port.stage[..len]
269 .copy_from_slice(data);
270 Either::Left(
271 $w.write(&$port.stage[..len])
272 .fuse(),
273 )
274 }
275 None => {
276 Either::Right(
277 futures::future::pending(),
278 )
279 }
280 }
281 };
282 }
283
284 enum Event<RE> {
285 Read(Result<usize, RE>, Direction),
286 Write(Direction),
287 }
288
289 let event = {
290 let nw_f = stage!(nw, north);
291 let sw_f = stage!(sw, south);
292 let ew_f = stage!(ew, east);
293 let ww_f = stage!(ww, west);
294
295 let nr_f = nr.read(&mut north.buf).fuse();
296 let sr_f = sr.read(&mut south.buf).fuse();
297 let er_f = er.read(&mut east.buf).fuse();
298 let wr_f = wr.read(&mut west.buf).fuse();
299
300 pin_utils::pin_mut!(
301 nr_f, sr_f, er_f, wr_f, nw_f, sw_f,
302 ew_f, ww_f
303 );
304
305 futures::select_biased! {
306 _ = nw_f => Event::Write(Direction::North),
307 _ = sw_f => Event::Write(Direction::South),
308 _ = ew_f => Event::Write(Direction::East),
309 _ = ww_f => Event::Write(Direction::West),
310 r = nr_f => Event::Read(r, Direction::North),
311 r = sr_f => Event::Read(r, Direction::South),
312 r = er_f => Event::Read(r, Direction::East),
313 r = wr_f => Event::Read(r, Direction::West),
314 }
315 };
316
317 match event {
318 Event::Write(dir) => match dir {
319 Direction::North => north.out.pop(),
320 Direction::South => south.out.pop(),
321 Direction::East => east.out.pop(),
322 Direction::West => west.out.pop(),
323 },
324 Event::Read(result, dir) => {
325 let (buf, len) = match dir {
326 Direction::North => (
327 &north.buf[..],
328 result.map_err(GossipError::North)?,
329 ),
330 Direction::South => (
331 &south.buf[..],
332 result.map_err(GossipError::South)?,
333 ),
334 Direction::East => (
335 &east.buf[..],
336 result.map_err(GossipError::East)?,
337 ),
338 Direction::West => (
339 &west.buf[..],
340 result.map_err(GossipError::West)?,
341 ),
342 };
343
344 let raw = &buf[..len];
345 let Ok(pkt) =
346 IslGossipTelecommand::ref_from_bytes(raw)
347 else {
348 continue;
349 };
350
351 let header = &pkt.gossip_header;
352 let epoch = header.epoch();
353
354 if dedup.is_duplicate(epoch.0.get()) {
355 continue;
356 }
357
358 let predecessor = header.predecessor();
359 let my_point = Point::from(*address);
360 let all = [
361 Direction::North,
362 Direction::South,
363 Direction::East,
364 Direction::West,
365 ];
366 let fwd = {
367 let mut dirs = [false; 4];
368 for (i, d) in
369 all.iter().enumerate()
370 {
371 let n =
372 torus.neighbor(my_point, *d);
373 let a = Address::from(n);
374 dirs[i] = a != predecessor
375 && a.is_in_service_area(
376 header.service_area_min,
377 header.service_area_max,
378 );
379 }
380 dirs
381 };
382
383 if fwd[0] {
384 north.out.push(raw);
385 }
386 if fwd[1] {
387 south.out.push(raw);
388 }
389 if fwd[2] {
390 east.out.push(raw);
391 }
392 if fwd[3] {
393 west.out.push(raw);
394 }
395
396 let payload = &pkt.payload;
397 let payload_len = payload.len();
398 if buffer.len() < payload_len {
399 return Err(GossipError::BufferTooSmall {
400 needed: payload_len,
401 provided: buffer.len(),
402 });
403 }
404 buffer[..payload_len]
405 .copy_from_slice(payload);
406 return Ok(payload_len);
407 }
408 }
409 }
410 }
411}