Skip to main content

leodos_protocols/transport/srspp/machine/receiver/
mod.rs

1//! Receiver state machine for SRSPP.
2
3mod base;
4/// Shared data structures used by receiver backends.
5pub mod utils;
6
7/// Receiver backends with different performance and memory tradeoffs.
8pub mod reassembly;
9
10pub use reassembly::fast::FastReceiver;
11pub use reassembly::lite::LiteReceiver;
12pub use reassembly::packed::PackedReceiver;
13
14use crate::network::isl::address::Address;
15use crate::network::spp::Apid;
16use crate::network::spp::SequenceCount;
17use crate::network::spp::SequenceFlag;
18
19/// ACK information emitted by the receiver state machine.
20#[derive(Debug, Copy, Clone, Eq, PartialEq)]
21pub struct AckInfo {
22    /// Address to send the ACK to.
23    pub destination: Address,
24    /// Highest contiguously received sequence number.
25    pub cumulative_ack: SequenceCount,
26    /// Bitmap of selectively acknowledged packets.
27    pub selective_bitmap: u16,
28}
29
30/// Timer action from the receiver state machine.
31#[derive(Debug, Copy, Clone, Eq, PartialEq)]
32pub enum TimerAction {
33    /// Start (or restart) the timer with the given duration.
34    Start {
35        /// Timer duration in ticks.
36        ticks: u32,
37    },
38    /// Stop the timer.
39    Stop,
40}
41
42/// Result of a receiver state machine event.
43#[derive(Debug, Copy, Clone, Default)]
44pub struct HandleResult {
45    /// ACK to send, if any.
46    pub ack: Option<AckInfo>,
47    /// Action for the delayed ACK timer; `None` means no change.
48    pub ack_timer: Option<TimerAction>,
49    /// Action for the progress timer; `None` means no change.
50    pub progress_timer: Option<TimerAction>,
51}
52
53/// Outcome of processing a data packet in the backend.
54#[derive(Debug, Copy, Clone)]
55pub struct DataOutcome {
56    /// Whether the expected sequence advanced (gap filled or in-order delivery).
57    pub progressed: bool,
58    /// Whether out-of-order packets remain buffered.
59    pub has_gap: bool,
60}
61
62/// Outcome of skipping a gap in the backend.
63#[derive(Debug, Copy, Clone)]
64pub struct GapOutcome {
65    /// Whether out-of-order packets remain buffered after the skip.
66    pub has_gap: bool,
67}
68
69/// Error from receiver operations.
70#[derive(Debug, Copy, Clone, Eq, PartialEq, thiserror::Error)]
71pub enum ReceiverError {
72    /// Reorder buffer full.
73    #[error("Reorder buffer full")]
74    BufferFull,
75    /// Message too large for reassembly buffer.
76    #[error("Message too large for reassembly buffer")]
77    MessageTooLarge,
78    /// Reassembly error (e.g., continuation without first).
79    #[error("Reassembly error")]
80    ReassemblyError,
81}
82
83/// Configuration for the receiver.
84#[derive(Debug, Clone, bon::Builder)]
85pub struct ReceiverConfig {
86    /// Local address of this receiver.
87    pub local_address: Address,
88    /// APID filter for incoming packets.
89    pub apid: Apid,
90    /// cFE function code for outgoing ACK packets.
91    pub function_code: u8,
92    /// If true, send ACKs immediately; otherwise use delayed ACKs.
93    pub immediate_ack: bool,
94    /// Delayed ACK timer duration in ticks.
95    pub ack_delay_ticks: u32,
96    /// Progress timeout in ticks; `None` disables gap-skipping.
97    pub progress_timeout_ticks: Option<u32>,
98}
99
100/// Default receiver backend (alias for [`PackedReceiver`]).
101///
102/// * `WIN` — maximum number of out-of-order packets to buffer
103/// * `BUF` — reorder slab capacity in bytes
104/// * `REASM` — maximum reassembled message size
105pub type ReceiverMachine<const WIN: usize, const BUF: usize, const REASM: usize> =
106    PackedReceiver<WIN, BUF, REASM>;
107
108/// ACK and timer state, driven by `DataOutcome`/`GapOutcome` from a backend.
109pub struct AckState {
110    /// Address of the remote sender (ACK destination).
111    remote_address: Address,
112    /// If true, send ACKs immediately; otherwise use delayed ACKs.
113    immediate_ack: bool,
114    /// Delayed ACK timer duration in ticks.
115    ack_delay_ticks: u32,
116    /// Progress timeout in ticks; `None` disables gap-skipping.
117    progress_timeout_ticks: Option<u32>,
118    /// Whether an ACK needs to be sent.
119    ack_pending: bool,
120    /// Whether the delayed ACK timer is currently running.
121    ack_timer_running: bool,
122}
123
124impl AckState {
125    /// Create ACK state for a stream from receiver config and remote address.
126    pub fn new(config: &ReceiverConfig, remote_address: Address) -> Self {
127        Self {
128            remote_address,
129            immediate_ack: config.immediate_ack,
130            ack_delay_ticks: config.ack_delay_ticks,
131            progress_timeout_ticks: config.progress_timeout_ticks,
132            ack_pending: false,
133            ack_timer_running: false,
134        }
135    }
136
137    fn emit_ack(&mut self, seq: SequenceCount, bitmap: u16) -> HandleResult {
138        let cumulative = seq.value().wrapping_sub(1) & SequenceCount::MAX;
139        let ack_timer = self.ack_timer_running.then(|| TimerAction::Stop);
140        self.ack_timer_running = false;
141        self.ack_pending = false;
142        HandleResult {
143            ack: Some(AckInfo {
144                destination: self.remote_address,
145                cumulative_ack: SequenceCount::from(cumulative),
146                selective_bitmap: bitmap,
147            }),
148            ack_timer,
149            progress_timer: None,
150        }
151    }
152
153    /// Handle delayed ACK timer expiry.
154    pub fn on_ack_timeout(&mut self, seq: SequenceCount, bitmap: u16) -> HandleResult {
155        self.ack_timer_running = false;
156        if self.ack_pending {
157            self.emit_ack(seq, bitmap)
158        } else {
159            HandleResult::default()
160        }
161    }
162
163    /// Produce ACK/timer actions after the backend processed a data packet.
164    pub fn on_data(
165        &mut self,
166        outcome: DataOutcome,
167        seq: SequenceCount,
168        bitmap: u16,
169    ) -> HandleResult {
170        let progress_timer = self.progress_timeout_ticks.and_then(|ticks| {
171            outcome
172                .has_gap
173                .then(|| TimerAction::Start { ticks })
174                .or_else(|| outcome.progressed.then(|| TimerAction::Stop))
175        });
176
177        self.ack_pending = true;
178
179        let (ack, ack_timer) = if self.immediate_ack {
180            let r = self.emit_ack(seq, bitmap);
181            (r.ack, r.ack_timer)
182        } else if !self.ack_timer_running {
183            self.ack_timer_running = true;
184            let timer = TimerAction::Start {
185                ticks: self.ack_delay_ticks,
186            };
187            (None, Some(timer))
188        } else {
189            (None, None)
190        };
191
192        HandleResult {
193            ack,
194            ack_timer,
195            progress_timer,
196        }
197    }
198
199    /// Produce timer actions after the backend skipped a gap.
200    pub fn on_gap_skip(&mut self, outcome: GapOutcome) -> HandleResult {
201        let progress_timer = (!outcome.has_gap).then(|| TimerAction::Stop).or_else(|| {
202            self.progress_timeout_ticks
203                .map(|ticks| TimerAction::Start { ticks })
204        });
205        HandleResult {
206            ack: None,
207            ack_timer: None,
208            progress_timer,
209        }
210    }
211}
212
213/// Trait abstracting over receiver backends (buffering and delivery only).
214pub trait ReceiverBackend: Sized {
215    /// Create a new backend with empty buffers.
216    fn new() -> Self;
217    /// Process a received data packet.
218    fn handle_data(
219        &mut self,
220        seq: SequenceCount,
221        flags: SequenceFlag,
222        payload: &[u8],
223    ) -> Result<DataOutcome, ReceiverError>;
224    /// Skip the current gap (advance past missing packets).
225    fn skip_gap(&mut self) -> Result<GapOutcome, ReceiverError>;
226    /// Take the complete message.
227    fn take_message(&mut self) -> Option<&[u8]>;
228    /// Returns a slice of the reassembly buffer.
229    fn reassembly_data(&self, len: usize) -> &[u8];
230    /// Check if there's a complete message ready.
231    fn has_message(&self) -> bool;
232    /// Returns the length of the pending message, if any.
233    fn message_len(&self) -> Option<usize>;
234    /// Pass the pending message to `f` and mark it consumed.
235    fn consume_message<Ret>(&mut self, f: impl FnOnce(&[u8]) -> Ret) -> Option<Ret>;
236    /// Get the current expected sequence number.
237    fn expected_seq(&self) -> SequenceCount;
238    /// Get the selective ACK bitmap.
239    fn recv_bitmap(&self) -> u16;
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    fn test_remote_address() -> Address {
247        Address::satellite(1, 2)
248    }
249
250    fn make_config() -> ReceiverConfig {
251        ReceiverConfig {
252            local_address: Address::satellite(1, 1),
253            apid: Apid::new(0x42).unwrap(),
254            function_code: 0,
255            immediate_ack: true,
256            ack_delay_ticks: 20,
257            progress_timeout_ticks: None,
258        }
259    }
260
261    fn make_delayed_config() -> ReceiverConfig {
262        ReceiverConfig {
263            local_address: Address::satellite(1, 1),
264            apid: Apid::new(0x42).unwrap(),
265            function_code: 0,
266            immediate_ack: false,
267            ack_delay_ticks: 20,
268            progress_timeout_ticks: None,
269        }
270    }
271
272    fn make_progress_config(ticks: u32) -> ReceiverConfig {
273        ReceiverConfig {
274            local_address: Address::satellite(1, 1),
275            apid: Apid::new(0x42).unwrap(),
276            function_code: 0,
277            immediate_ack: true,
278            ack_delay_ticks: 20,
279            progress_timeout_ticks: Some(ticks),
280        }
281    }
282
283    // ── Generic test functions (all backends) ──
284
285    fn test_immediate_ack<R: ReceiverBackend>() {
286        let mut rx = R::new();
287        let mut ack = AckState::new(&make_config(), test_remote_address());
288        let outcome = rx
289            .handle_data(
290                SequenceCount::from(0),
291                SequenceFlag::Unsegmented,
292                &[1, 2, 3],
293            )
294            .unwrap();
295        let r = ack.on_data(outcome, rx.expected_seq(), rx.recv_bitmap());
296        assert!(r.ack.is_some());
297    }
298
299    fn test_delayed_ack_starts_timer<R: ReceiverBackend>() {
300        let mut rx = R::new();
301        let mut ack = AckState::new(&make_delayed_config(), test_remote_address());
302        let outcome = rx
303            .handle_data(
304                SequenceCount::from(0),
305                SequenceFlag::Unsegmented,
306                &[1, 2, 3],
307            )
308            .unwrap();
309        let r = ack.on_data(outcome, rx.expected_seq(), rx.recv_bitmap());
310        assert!(matches!(
311            r.ack_timer,
312            Some(TimerAction::Start { ticks: 20 })
313        ));
314        assert!(r.ack.is_none());
315    }
316
317    fn test_ack_timeout_sends_ack<R: ReceiverBackend>() {
318        let mut rx = R::new();
319        let mut ack = AckState::new(&make_delayed_config(), test_remote_address());
320        let outcome = rx
321            .handle_data(
322                SequenceCount::from(0),
323                SequenceFlag::Unsegmented,
324                &[1, 2, 3],
325            )
326            .unwrap();
327        ack.on_data(outcome, rx.expected_seq(), rx.recv_bitmap());
328        let r = ack.on_ack_timeout(rx.expected_seq(), rx.recv_bitmap());
329        assert!(r.ack.is_some());
330    }
331
332    fn test_receive_single_packet<R: ReceiverBackend>() {
333        let mut rx = R::new();
334        rx.handle_data(
335            SequenceCount::from(0),
336            SequenceFlag::Unsegmented,
337            &[1, 2, 3, 4, 5],
338        )
339        .unwrap();
340        assert!(rx.has_message());
341        assert_eq!(rx.take_message().unwrap(), &[1, 2, 3, 4, 5]);
342    }
343
344    fn test_out_of_order_delivery<R: ReceiverBackend>() {
345        let mut rx = R::new();
346        let mut ack = AckState::new(&make_config(), test_remote_address());
347        let outcome = rx
348            .handle_data(SequenceCount::from(1), SequenceFlag::Unsegmented, &[2])
349            .unwrap();
350        let r1 = ack.on_data(outcome, rx.expected_seq(), rx.recv_bitmap());
351        assert!(!rx.has_message());
352        assert_eq!(r1.ack.map(|a| a.selective_bitmap), Some(0b0001));
353
354        rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[1])
355            .unwrap();
356        assert!(rx.has_message());
357    }
358
359    fn test_duplicate_ignored<R: ReceiverBackend>() {
360        let mut rx = R::new();
361        rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[1])
362            .unwrap();
363        rx.take_message();
364        rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[99])
365            .unwrap();
366        assert!(!rx.has_message());
367    }
368
369    fn test_progress_timeout_skips_gap<R: ReceiverBackend>() {
370        let mut rx = R::new();
371        rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[1])
372            .unwrap();
373        rx.take_message();
374        rx.handle_data(SequenceCount::from(2), SequenceFlag::Unsegmented, &[3])
375            .unwrap();
376        assert!(!rx.has_message());
377        rx.skip_gap().unwrap();
378        assert_eq!(rx.expected_seq().value(), 3);
379        assert!(rx.has_message());
380        assert_eq!(rx.take_message().unwrap(), &[3]);
381    }
382
383    fn test_no_progress_timeout_in_reliable_mode<R: ReceiverBackend>() {
384        let mut rx = R::new();
385        let mut ack = AckState::new(&make_config(), test_remote_address());
386        rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[1])
387            .unwrap();
388        rx.take_message();
389        let outcome = rx
390            .handle_data(SequenceCount::from(2), SequenceFlag::Unsegmented, &[3])
391            .unwrap();
392        let r = ack.on_data(outcome, rx.expected_seq(), rx.recv_bitmap());
393        assert!(!matches!(r.progress_timer, Some(TimerAction::Start { .. })));
394    }
395
396    fn test_progress_timer_resets_on_progress<R: ReceiverBackend>() {
397        let mut rx = R::new();
398        let mut ack = AckState::new(&make_progress_config(50), test_remote_address());
399        let o1 = rx
400            .handle_data(SequenceCount::from(1), SequenceFlag::Unsegmented, &[2])
401            .unwrap();
402        let r1 = ack.on_data(o1, rx.expected_seq(), rx.recv_bitmap());
403        assert!(matches!(
404            r1.progress_timer,
405            Some(TimerAction::Start { ticks: 50 })
406        ));
407        let o2 = rx
408            .handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[1])
409            .unwrap();
410        let r2 = ack.on_data(o2, rx.expected_seq(), rx.recv_bitmap());
411        assert!(matches!(r2.progress_timer, Some(TimerAction::Stop)));
412    }
413
414    // ── Segmented tests (Packed + Fast only — Lite tiles at MTU boundaries) ──
415
416    fn test_segmented_message<R: ReceiverBackend>() {
417        let mut rx = R::new();
418        rx.handle_data(SequenceCount::from(0), SequenceFlag::First, &[1, 2, 3])
419            .unwrap();
420        assert!(!rx.has_message());
421        rx.handle_data(
422            SequenceCount::from(1),
423            SequenceFlag::Continuation,
424            &[4, 5, 6],
425        )
426        .unwrap();
427        assert!(!rx.has_message());
428        rx.handle_data(SequenceCount::from(2), SequenceFlag::Last, &[7, 8])
429            .unwrap();
430        assert!(rx.has_message());
431        assert_eq!(rx.take_message().unwrap(), &[1, 2, 3, 4, 5, 6, 7, 8]);
432    }
433
434    fn test_progress_timeout_discards_partial<R: ReceiverBackend>() {
435        let mut rx = R::new();
436        rx.handle_data(SequenceCount::from(0), SequenceFlag::First, &[1, 2, 3])
437            .unwrap();
438        rx.handle_data(SequenceCount::from(3), SequenceFlag::Unsegmented, &[10, 11])
439            .unwrap();
440        assert!(!rx.has_message());
441        rx.skip_gap().unwrap();
442        assert!(!rx.has_message());
443        rx.skip_gap().unwrap();
444        assert!(rx.has_message());
445        assert_eq!(rx.take_message().unwrap(), &[10, 11]);
446        rx.handle_data(SequenceCount::from(4), SequenceFlag::Unsegmented, &[20, 21])
447            .unwrap();
448        assert!(rx.has_message());
449        assert_eq!(rx.take_message().unwrap(), &[20, 21]);
450    }
451
452    // ── Test instantiation ──
453
454    macro_rules! backend_tests {
455        ($mod_name:ident, $ty:ty) => {
456            mod $mod_name {
457                use super::*;
458                #[test]
459                fn immediate_ack() {
460                    test_immediate_ack::<$ty>()
461                }
462                #[test]
463                fn delayed_ack_starts_timer() {
464                    test_delayed_ack_starts_timer::<$ty>()
465                }
466                #[test]
467                fn ack_timeout_sends_ack() {
468                    test_ack_timeout_sends_ack::<$ty>()
469                }
470                #[test]
471                fn receive_single_packet() {
472                    test_receive_single_packet::<$ty>()
473                }
474                #[test]
475                fn out_of_order_delivery() {
476                    test_out_of_order_delivery::<$ty>()
477                }
478                #[test]
479                fn duplicate_ignored() {
480                    test_duplicate_ignored::<$ty>()
481                }
482                #[test]
483                fn progress_timeout_skips_gap() {
484                    test_progress_timeout_skips_gap::<$ty>()
485                }
486                #[test]
487                fn no_progress_timeout_in_reliable_mode() {
488                    test_no_progress_timeout_in_reliable_mode::<$ty>()
489                }
490                #[test]
491                fn progress_timer_resets_on_progress() {
492                    test_progress_timer_resets_on_progress::<$ty>()
493                }
494            }
495        };
496    }
497
498    backend_tests!(packed, PackedReceiver<8, 4096, 8192>);
499    backend_tests!(fast, FastReceiver<8, 512, 8192, 4096>);
500    backend_tests!(lite, LiteReceiver<4096, 8, 512>);
501
502    // Segmented reassembly tests — not applicable to LiteReceiver
503    mod packed_segmented {
504        use super::*;
505        #[test]
506        fn segmented_message() {
507            test_segmented_message::<PackedReceiver<8, 4096, 8192>>()
508        }
509        #[test]
510        fn progress_timeout_discards_partial() {
511            test_progress_timeout_discards_partial::<PackedReceiver<8, 4096, 8192>>()
512        }
513    }
514    mod fast_segmented {
515        use super::*;
516        #[test]
517        fn segmented_message() {
518            test_segmented_message::<FastReceiver<8, 512, 8192, 4096>>()
519        }
520        #[test]
521        fn progress_timeout_discards_partial() {
522            test_progress_timeout_discards_partial::<FastReceiver<8, 512, 8192, 4096>>()
523        }
524    }
525
526    // ── Backend-specific standalone tests ──
527
528    #[test]
529    fn test_receiver_machine_alias() {
530        let mut rx: ReceiverMachine<8, 4096, 8192> = ReceiverMachine::new();
531        rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[42])
532            .unwrap();
533        assert_eq!(rx.take_message().unwrap(), &[42]);
534    }
535
536    #[test]
537    fn fast_receiver_seq_wraparound() {
538        let mut rx: FastReceiver<8, 512, 8192, 4096> = FastReceiver::new();
539
540        for i in 0..SequenceCount::MAX {
541            rx.handle_data(
542                SequenceCount::from(i),
543                SequenceFlag::Unsegmented,
544                &[i as u8],
545            )
546            .unwrap();
547            rx.take_message();
548        }
549
550        rx.handle_data(
551            SequenceCount::from(SequenceCount::MAX),
552            SequenceFlag::Unsegmented,
553            &[0xFF],
554        )
555        .unwrap();
556        assert_eq!(rx.take_message().unwrap(), &[0xFF]);
557
558        rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &[0x00])
559            .unwrap();
560        assert_eq!(rx.take_message().unwrap(), &[0x00]);
561        assert_eq!(rx.expected_seq().value(), 1);
562    }
563
564    #[test]
565    fn packed_receiver_slab_reset() {
566        let mut rx: PackedReceiver<8, 128, 8192> = PackedReceiver::new();
567
568        let big = [0xAA; 60];
569        rx.handle_data(SequenceCount::from(1), SequenceFlag::Unsegmented, &big)
570            .unwrap();
571        rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &big)
572            .unwrap();
573        rx.take_message();
574        rx.take_message();
575
576        rx.handle_data(SequenceCount::from(3), SequenceFlag::Unsegmented, &big)
577            .unwrap();
578        rx.handle_data(SequenceCount::from(2), SequenceFlag::Unsegmented, &big)
579            .unwrap();
580        assert!(rx.has_message());
581    }
582}