1mod base;
4pub mod utils;
6
7pub mod reassembly;
9
10pub use reassembly::fast::FastReceiver;
11pub use reassembly::lite::LiteReceiver;
12pub use reassembly::packed::PackedReceiver;
13
14use crate::network::isl::address::Address;
15use crate::network::spp::Apid;
16use crate::network::spp::SequenceCount;
17use crate::network::spp::SequenceFlag;
18
19#[derive(Debug, Copy, Clone, Eq, PartialEq)]
21pub struct AckInfo {
22 pub destination: Address,
24 pub cumulative_ack: SequenceCount,
26 pub selective_bitmap: u16,
28}
29
30#[derive(Debug, Copy, Clone, Eq, PartialEq)]
32pub enum TimerAction {
33 Start {
35 ticks: u32,
37 },
38 Stop,
40}
41
42#[derive(Debug, Copy, Clone, Default)]
44pub struct HandleResult {
45 pub ack: Option<AckInfo>,
47 pub ack_timer: Option<TimerAction>,
49 pub progress_timer: Option<TimerAction>,
51}
52
53#[derive(Debug, Copy, Clone)]
55pub struct DataOutcome {
56 pub progressed: bool,
58 pub has_gap: bool,
60}
61
62#[derive(Debug, Copy, Clone)]
64pub struct GapOutcome {
65 pub has_gap: bool,
67}
68
69#[derive(Debug, Copy, Clone, Eq, PartialEq, thiserror::Error)]
71pub enum ReceiverError {
72 #[error("Reorder buffer full")]
74 BufferFull,
75 #[error("Message too large for reassembly buffer")]
77 MessageTooLarge,
78 #[error("Reassembly error")]
80 ReassemblyError,
81}
82
83#[derive(Debug, Clone, bon::Builder)]
85pub struct ReceiverConfig {
86 pub local_address: Address,
88 pub apid: Apid,
90 pub function_code: u8,
92 pub immediate_ack: bool,
94 pub ack_delay_ticks: u32,
96 pub progress_timeout_ticks: Option<u32>,
98}
99
100pub type ReceiverMachine<const WIN: usize, const BUF: usize, const REASM: usize> =
106 PackedReceiver<WIN, BUF, REASM>;
107
108pub struct AckState {
110 remote_address: Address,
112 immediate_ack: bool,
114 ack_delay_ticks: u32,
116 progress_timeout_ticks: Option<u32>,
118 ack_pending: bool,
120 ack_timer_running: bool,
122}
123
124impl AckState {
125 pub fn new(config: &ReceiverConfig, remote_address: Address) -> Self {
127 Self {
128 remote_address,
129 immediate_ack: config.immediate_ack,
130 ack_delay_ticks: config.ack_delay_ticks,
131 progress_timeout_ticks: config.progress_timeout_ticks,
132 ack_pending: false,
133 ack_timer_running: false,
134 }
135 }
136
137 fn emit_ack(&mut self, seq: SequenceCount, bitmap: u16) -> HandleResult {
138 let cumulative = seq.value().wrapping_sub(1) & SequenceCount::MAX;
139 let ack_timer = self.ack_timer_running.then(|| TimerAction::Stop);
140 self.ack_timer_running = false;
141 self.ack_pending = false;
142 HandleResult {
143 ack: Some(AckInfo {
144 destination: self.remote_address,
145 cumulative_ack: SequenceCount::from(cumulative),
146 selective_bitmap: bitmap,
147 }),
148 ack_timer,
149 progress_timer: None,
150 }
151 }
152
153 pub fn on_ack_timeout(&mut self, seq: SequenceCount, bitmap: u16) -> HandleResult {
155 self.ack_timer_running = false;
156 if self.ack_pending {
157 self.emit_ack(seq, bitmap)
158 } else {
159 HandleResult::default()
160 }
161 }
162
163 pub fn on_data(
165 &mut self,
166 outcome: DataOutcome,
167 seq: SequenceCount,
168 bitmap: u16,
169 ) -> HandleResult {
170 let progress_timer = self.progress_timeout_ticks.and_then(|ticks| {
171 outcome
172 .has_gap
173 .then(|| TimerAction::Start { ticks })
174 .or_else(|| outcome.progressed.then(|| TimerAction::Stop))
175 });
176
177 self.ack_pending = true;
178
179 let (ack, ack_timer) = if self.immediate_ack {
180 let r = self.emit_ack(seq, bitmap);
181 (r.ack, r.ack_timer)
182 } else if !self.ack_timer_running {
183 self.ack_timer_running = true;
184 let timer = TimerAction::Start {
185 ticks: self.ack_delay_ticks,
186 };
187 (None, Some(timer))
188 } else {
189 (None, None)
190 };
191
192 HandleResult {
193 ack,
194 ack_timer,
195 progress_timer,
196 }
197 }
198
199 pub fn on_gap_skip(&mut self, outcome: GapOutcome) -> HandleResult {
201 let progress_timer = (!outcome.has_gap).then(|| TimerAction::Stop).or_else(|| {
202 self.progress_timeout_ticks
203 .map(|ticks| TimerAction::Start { ticks })
204 });
205 HandleResult {
206 ack: None,
207 ack_timer: None,
208 progress_timer,
209 }
210 }
211}
212
213pub trait ReceiverBackend: Sized {
215 fn new() -> Self;
217 fn handle_data(
219 &mut self,
220 seq: SequenceCount,
221 flags: SequenceFlag,
222 payload: &[u8],
223 ) -> Result<DataOutcome, ReceiverError>;
224 fn skip_gap(&mut self) -> Result<GapOutcome, ReceiverError>;
226 fn take_message(&mut self) -> Option<&[u8]>;
228 fn reassembly_data(&self, len: usize) -> &[u8];
230 fn has_message(&self) -> bool;
232 fn message_len(&self) -> Option<usize>;
234 fn consume_message<Ret>(&mut self, f: impl FnOnce(&[u8]) -> Ret) -> Option<Ret>;
236 fn expected_seq(&self) -> SequenceCount;
238 fn recv_bitmap(&self) -> u16;
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 fn test_remote_address() -> Address {
247 Address::satellite(1, 2)
248 }
249
250 fn make_config() -> ReceiverConfig {
251 ReceiverConfig {
252 local_address: Address::satellite(1, 1),
253 apid: Apid::new(0x42).unwrap(),
254 function_code: 0,
255 immediate_ack: true,
256 ack_delay_ticks: 20,
257 progress_timeout_ticks: None,
258 }
259 }
260
261 fn make_delayed_config() -> ReceiverConfig {
262 ReceiverConfig {
263 local_address: Address::satellite(1, 1),
264 apid: Apid::new(0x42).unwrap(),
265 function_code: 0,
266 immediate_ack: false,
267 ack_delay_ticks: 20,
268 progress_timeout_ticks: None,
269 }
270 }
271
272 fn make_progress_config(ticks: u32) -> ReceiverConfig {
273 ReceiverConfig {
274 local_address: Address::satellite(1, 1),
275 apid: Apid::new(0x42).unwrap(),
276 function_code: 0,
277 immediate_ack: true,
278 ack_delay_ticks: 20,
279 progress_timeout_ticks: Some(ticks),
280 }
281 }
282
283 fn test_immediate_ack<R: ReceiverBackend>() {
286 let mut rx = R::new();
287 let mut ack = AckState::new(&make_config(), test_remote_address());
288 let outcome = rx
289 .handle_data(
290 SequenceCount::from(0),
291 SequenceFlag::Unsegmented,
292 &[1, 2, 3],
293 )
294 .unwrap();
295 let r = ack.on_data(outcome, rx.expected_seq(), rx.recv_bitmap());
296 assert!(r.ack.is_some());
297 }
298
299 fn test_delayed_ack_starts_timer<R: ReceiverBackend>() {
300 let mut rx = R::new();
301 let mut ack = AckState::new(&make_delayed_config(), test_remote_address());
302 let outcome = rx
303 .handle_data(
304 SequenceCount::from(0),
305 SequenceFlag::Unsegmented,
306 &[1, 2, 3],
307 )
308 .unwrap();
309 let r = ack.on_data(outcome, rx.expected_seq(), rx.recv_bitmap());
310 assert!(matches!(
311 r.ack_timer,
312 Some(TimerAction::Start { ticks: 20 })
313 ));
314 assert!(r.ack.is_none());
315 }
316
317 fn test_ack_timeout_sends_ack<R: ReceiverBackend>() {
318 let mut rx = R::new();
319 let mut ack = AckState::new(&make_delayed_config(), test_remote_address());
320 let outcome = rx
321 .handle_data(
322 SequenceCount::from(0),
323 SequenceFlag::Unsegmented,
324 &[1, 2, 3],
325 )
326 .unwrap();
327 ack.on_data(outcome, rx.expected_seq(), rx.recv_bitmap());
328 let r = ack.on_ack_timeout(rx.expected_seq(), rx.recv_bitmap());
329 assert!(r.ack.is_some());
330 }
331
332 fn test_receive_single_packet<R: ReceiverBackend>() {
333 let mut rx = R::new();
334 rx.handle_data(
335 SequenceCount::from(0),
336 SequenceFlag::Unsegmented,
337 &[1, 2, 3, 4, 5],
338 )
339 .unwrap();
340 assert!(rx.has_message());
341 assert_eq!(rx.take_message().unwrap(), &[1, 2, 3, 4, 5]);
342 }
343
344 fn test_out_of_order_delivery<R: ReceiverBackend>() {
345 let mut rx = R::new();
346 let mut ack = AckState::new(&make_config(), test_remote_address());
347 let outcome = rx
348 .handle_data(SequenceCount::from(1), SequenceFlag::Unsegmented, &[2])
349 .unwrap();
350 let r1 = ack.on_data(outcome, rx.expected_seq(), rx.recv_bitmap());
351 assert!(!rx.has_message());
352 assert_eq!(r1.ack.map(|a| a.selective_bitmap), Some(0b0001));
353
354 rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[1])
355 .unwrap();
356 assert!(rx.has_message());
357 }
358
359 fn test_duplicate_ignored<R: ReceiverBackend>() {
360 let mut rx = R::new();
361 rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[1])
362 .unwrap();
363 rx.take_message();
364 rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[99])
365 .unwrap();
366 assert!(!rx.has_message());
367 }
368
369 fn test_progress_timeout_skips_gap<R: ReceiverBackend>() {
370 let mut rx = R::new();
371 rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[1])
372 .unwrap();
373 rx.take_message();
374 rx.handle_data(SequenceCount::from(2), SequenceFlag::Unsegmented, &[3])
375 .unwrap();
376 assert!(!rx.has_message());
377 rx.skip_gap().unwrap();
378 assert_eq!(rx.expected_seq().value(), 3);
379 assert!(rx.has_message());
380 assert_eq!(rx.take_message().unwrap(), &[3]);
381 }
382
383 fn test_no_progress_timeout_in_reliable_mode<R: ReceiverBackend>() {
384 let mut rx = R::new();
385 let mut ack = AckState::new(&make_config(), test_remote_address());
386 rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[1])
387 .unwrap();
388 rx.take_message();
389 let outcome = rx
390 .handle_data(SequenceCount::from(2), SequenceFlag::Unsegmented, &[3])
391 .unwrap();
392 let r = ack.on_data(outcome, rx.expected_seq(), rx.recv_bitmap());
393 assert!(!matches!(r.progress_timer, Some(TimerAction::Start { .. })));
394 }
395
396 fn test_progress_timer_resets_on_progress<R: ReceiverBackend>() {
397 let mut rx = R::new();
398 let mut ack = AckState::new(&make_progress_config(50), test_remote_address());
399 let o1 = rx
400 .handle_data(SequenceCount::from(1), SequenceFlag::Unsegmented, &[2])
401 .unwrap();
402 let r1 = ack.on_data(o1, rx.expected_seq(), rx.recv_bitmap());
403 assert!(matches!(
404 r1.progress_timer,
405 Some(TimerAction::Start { ticks: 50 })
406 ));
407 let o2 = rx
408 .handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[1])
409 .unwrap();
410 let r2 = ack.on_data(o2, rx.expected_seq(), rx.recv_bitmap());
411 assert!(matches!(r2.progress_timer, Some(TimerAction::Stop)));
412 }
413
414 fn test_segmented_message<R: ReceiverBackend>() {
417 let mut rx = R::new();
418 rx.handle_data(SequenceCount::from(0), SequenceFlag::First, &[1, 2, 3])
419 .unwrap();
420 assert!(!rx.has_message());
421 rx.handle_data(
422 SequenceCount::from(1),
423 SequenceFlag::Continuation,
424 &[4, 5, 6],
425 )
426 .unwrap();
427 assert!(!rx.has_message());
428 rx.handle_data(SequenceCount::from(2), SequenceFlag::Last, &[7, 8])
429 .unwrap();
430 assert!(rx.has_message());
431 assert_eq!(rx.take_message().unwrap(), &[1, 2, 3, 4, 5, 6, 7, 8]);
432 }
433
434 fn test_progress_timeout_discards_partial<R: ReceiverBackend>() {
435 let mut rx = R::new();
436 rx.handle_data(SequenceCount::from(0), SequenceFlag::First, &[1, 2, 3])
437 .unwrap();
438 rx.handle_data(SequenceCount::from(3), SequenceFlag::Unsegmented, &[10, 11])
439 .unwrap();
440 assert!(!rx.has_message());
441 rx.skip_gap().unwrap();
442 assert!(!rx.has_message());
443 rx.skip_gap().unwrap();
444 assert!(rx.has_message());
445 assert_eq!(rx.take_message().unwrap(), &[10, 11]);
446 rx.handle_data(SequenceCount::from(4), SequenceFlag::Unsegmented, &[20, 21])
447 .unwrap();
448 assert!(rx.has_message());
449 assert_eq!(rx.take_message().unwrap(), &[20, 21]);
450 }
451
452 macro_rules! backend_tests {
455 ($mod_name:ident, $ty:ty) => {
456 mod $mod_name {
457 use super::*;
458 #[test]
459 fn immediate_ack() {
460 test_immediate_ack::<$ty>()
461 }
462 #[test]
463 fn delayed_ack_starts_timer() {
464 test_delayed_ack_starts_timer::<$ty>()
465 }
466 #[test]
467 fn ack_timeout_sends_ack() {
468 test_ack_timeout_sends_ack::<$ty>()
469 }
470 #[test]
471 fn receive_single_packet() {
472 test_receive_single_packet::<$ty>()
473 }
474 #[test]
475 fn out_of_order_delivery() {
476 test_out_of_order_delivery::<$ty>()
477 }
478 #[test]
479 fn duplicate_ignored() {
480 test_duplicate_ignored::<$ty>()
481 }
482 #[test]
483 fn progress_timeout_skips_gap() {
484 test_progress_timeout_skips_gap::<$ty>()
485 }
486 #[test]
487 fn no_progress_timeout_in_reliable_mode() {
488 test_no_progress_timeout_in_reliable_mode::<$ty>()
489 }
490 #[test]
491 fn progress_timer_resets_on_progress() {
492 test_progress_timer_resets_on_progress::<$ty>()
493 }
494 }
495 };
496 }
497
498 backend_tests!(packed, PackedReceiver<8, 4096, 8192>);
499 backend_tests!(fast, FastReceiver<8, 512, 8192, 4096>);
500 backend_tests!(lite, LiteReceiver<4096, 8, 512>);
501
502 mod packed_segmented {
504 use super::*;
505 #[test]
506 fn segmented_message() {
507 test_segmented_message::<PackedReceiver<8, 4096, 8192>>()
508 }
509 #[test]
510 fn progress_timeout_discards_partial() {
511 test_progress_timeout_discards_partial::<PackedReceiver<8, 4096, 8192>>()
512 }
513 }
514 mod fast_segmented {
515 use super::*;
516 #[test]
517 fn segmented_message() {
518 test_segmented_message::<FastReceiver<8, 512, 8192, 4096>>()
519 }
520 #[test]
521 fn progress_timeout_discards_partial() {
522 test_progress_timeout_discards_partial::<FastReceiver<8, 512, 8192, 4096>>()
523 }
524 }
525
526 #[test]
529 fn test_receiver_machine_alias() {
530 let mut rx: ReceiverMachine<8, 4096, 8192> = ReceiverMachine::new();
531 rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[42])
532 .unwrap();
533 assert_eq!(rx.take_message().unwrap(), &[42]);
534 }
535
536 #[test]
537 fn fast_receiver_seq_wraparound() {
538 let mut rx: FastReceiver<8, 512, 8192, 4096> = FastReceiver::new();
539
540 for i in 0..SequenceCount::MAX {
541 rx.handle_data(
542 SequenceCount::from(i),
543 SequenceFlag::Unsegmented,
544 &[i as u8],
545 )
546 .unwrap();
547 rx.take_message();
548 }
549
550 rx.handle_data(
551 SequenceCount::from(SequenceCount::MAX),
552 SequenceFlag::Unsegmented,
553 &[0xFF],
554 )
555 .unwrap();
556 assert_eq!(rx.take_message().unwrap(), &[0xFF]);
557
558 rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[0x00])
559 .unwrap();
560 assert_eq!(rx.take_message().unwrap(), &[0x00]);
561 assert_eq!(rx.expected_seq().value(), 1);
562 }
563
564 #[test]
565 fn packed_receiver_slab_reset() {
566 let mut rx: PackedReceiver<8, 128, 8192> = PackedReceiver::new();
567
568 let big = [0xAA; 60];
569 rx.handle_data(SequenceCount::from(1), SequenceFlag::Unsegmented, &big)
570 .unwrap();
571 rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &big)
572 .unwrap();
573 rx.take_message();
574 rx.take_message();
575
576 rx.handle_data(SequenceCount::from(3), SequenceFlag::Unsegmented, &big)
577 .unwrap();
578 rx.handle_data(SequenceCount::from(2), SequenceFlag::Unsegmented, &big)
579 .unwrap();
580 assert!(rx.has_message());
581 }
582}