Skip to main content

leodos_protocols/transport/srspp/machine/receiver/reassembly/
lite.rs

1use crate::network::spp::SequenceCount;
2use crate::network::spp::SequenceFlag;
3use heapless::Vec;
4
5use super::super::DataOutcome;
6use super::super::GapOutcome;
7use super::super::ReceiverBackend;
8use super::super::ReceiverError;
9use super::super::base::ReceiverBase;
10use super::super::utils::GapTracker;
11
12/// Half-memory backend — reorder and reassembly share one buffer.
13///
14/// Use when memory is scarce: segments are placed directly at
15/// their final byte offset in a single flat buffer, so there is
16/// no separate reorder stage. Static memory is just `REASM`.
17///
18/// Trade-off: each delivery requires an O(REASM) byte shift to
19/// reclaim the consumed prefix, and OOO insert is O(WIN) for
20/// gap bookkeeping. Segments tile at MTU boundaries, so each
21/// slot reserves a full MTU regardless of payload size.
22///
23/// * `REASM` — reassembly buffer size (the only buffer)
24/// * `WIN` — maximum gap tracker intervals
25/// * `MTU` — maximum segment payload size
26pub struct LiteReceiver<const REASM: usize, const WIN: usize, const MTU: usize> {
27    /// Sequence tracking state.
28    base: ReceiverBase,
29    /// Contiguous buffer where segments are placed at computed offsets.
30    message_buf: [u8; REASM],
31    /// Tracks unfilled byte ranges in the reassembly buffer.
32    gaps: GapTracker<WIN>,
33    /// Sorted list of byte offsets where complete messages end.
34    message_ends: Vec<usize, WIN>,
35    /// Sequence number corresponding to byte offset 0 in the buffer.
36    base_seq: u16,
37    /// Whether a multi-segment reassembly is in progress.
38    reassembly_in_progress: bool,
39    /// Length of a fully reassembled message, if one is ready.
40    complete_message_len: Option<usize>,
41    /// Deferred byte shift to apply before the next operation.
42    pending_shift: usize,
43    /// Number of segments consumed by the pending shift.
44    pending_segs: u16,
45}
46
47impl<const REASM: usize, const WIN: usize, const MTU: usize> ReceiverBackend
48    for LiteReceiver<REASM, WIN, MTU>
49{
50    fn new() -> Self {
51        Self {
52            base: ReceiverBase::new(),
53            message_buf: [0u8; REASM],
54            gaps: GapTracker::new(),
55            message_ends: Vec::new(),
56            base_seq: 0,
57            reassembly_in_progress: false,
58            complete_message_len: None,
59            pending_shift: 0,
60            pending_segs: 0,
61        }
62    }
63
64    fn handle_data(
65        &mut self,
66        seq: SequenceCount,
67        flags: SequenceFlag,
68        payload: &[u8],
69    ) -> Result<DataOutcome, ReceiverError> {
70        if self.complete_message_len.is_none() {
71            self.apply_pending_shift();
72        }
73
74        let distance = self.base.distance(seq);
75        let seq_before = self.base.expected_seq_raw();
76
77        let max_dist = Self::MAX_SEGS as u16;
78        if distance == 0 || distance < max_dist && !self.base.is_ooo_duplicate(distance) {
79            if distance > 0 {
80                self.base.record_ooo(distance);
81            }
82            self.place_segment(seq, flags, payload)?;
83            self.try_deliver()?;
84        }
85
86        let progressed = self.base.expected_seq_raw() != seq_before;
87        let has_gap = self.gaps.has_gaps() || !self.message_ends.is_empty();
88        Ok(DataOutcome { progressed, has_gap })
89    }
90
91    fn skip_gap(&mut self) -> Result<GapOutcome, ReceiverError> {
92        if self.complete_message_len.is_none() {
93            self.apply_pending_shift();
94        }
95
96        if self.reassembly_in_progress {
97            self.gaps.reset();
98            self.message_ends.clear();
99            self.reassembly_in_progress = false;
100        }
101
102        let shift = MTU;
103        if shift < REASM {
104            self.message_buf.copy_within(shift.., 0);
105        }
106        self.gaps.shift(shift);
107        self.message_ends.retain(|&e| e > shift);
108        for e in self.message_ends.iter_mut() {
109            *e -= shift;
110        }
111
112        self.base.advance();
113        self.base_seq = self.base_seq.wrapping_add(1) & SequenceCount::MAX as u16;
114
115        self.try_deliver()?;
116
117        let has_gap = self.gaps.has_gaps() || !self.message_ends.is_empty();
118        Ok(GapOutcome { has_gap })
119    }
120
121    fn take_message(&mut self) -> Option<&[u8]> {
122        self.complete_message_len
123            .take()
124            .map(|len| &self.message_buf[..len])
125    }
126
127    fn reassembly_data(&self, len: usize) -> &[u8] {
128        &self.message_buf[..len]
129    }
130
131    fn has_message(&self) -> bool {
132        self.complete_message_len.is_some()
133    }
134
135    fn message_len(&self) -> Option<usize> {
136        self.complete_message_len
137    }
138
139    fn consume_message<Ret>(&mut self, f: impl FnOnce(&[u8]) -> Ret) -> Option<Ret> {
140        let len = self.complete_message_len.take()?;
141        Some(f(&self.message_buf[..len]))
142    }
143
144    fn expected_seq(&self) -> SequenceCount {
145        self.base.expected_seq()
146    }
147
148    fn recv_bitmap(&self) -> u16 {
149        self.base.recv_bitmap()
150    }
151}
152
153impl<const REASM: usize, const WIN: usize, const MTU: usize> LiteReceiver<REASM, WIN, MTU> {
154    /// Maximum number of segments that fit in the reassembly buffer.
155    const MAX_SEGS: usize = REASM / MTU;
156
157    /// Shift the buffer and metadata to consume delivered messages.
158    fn apply_pending_shift(&mut self) {
159        let shift = self.pending_shift;
160        if shift == 0 {
161            return;
162        }
163        if shift < REASM {
164            self.message_buf.copy_within(shift.., 0);
165        }
166        self.gaps.shift(shift);
167        self.message_ends.retain(|&e| e > shift);
168        for e in self.message_ends.iter_mut() {
169            *e -= shift;
170        }
171        self.base_seq = self.base_seq.wrapping_add(self.pending_segs) & SequenceCount::MAX as u16;
172        self.pending_shift = 0;
173        self.pending_segs = 0;
174    }
175
176    /// Compute the segment distance from `base_seq` to `seq` with wrapping.
177    fn seg_distance(&self, seq: SequenceCount) -> u16 {
178        seq.value().wrapping_sub(self.base_seq) & SequenceCount::MAX
179    }
180
181    /// Copy a segment's payload into the reassembly buffer at its computed offset.
182    fn place_segment(
183        &mut self,
184        seq: SequenceCount,
185        flags: SequenceFlag,
186        payload: &[u8],
187    ) -> Result<(), ReceiverError> {
188        let d = self.seg_distance(seq) as usize;
189        if d >= Self::MAX_SEGS {
190            return Ok(());
191        }
192        let start = d * MTU;
193        let end = start + payload.len();
194        if end > REASM {
195            return Err(ReceiverError::MessageTooLarge);
196        }
197        self.message_buf[start..end].copy_from_slice(payload);
198
199        let slot_end = (start + MTU).min(REASM);
200        self.gaps.fill(start, slot_end);
201
202        match flags {
203            SequenceFlag::First => {
204                self.reassembly_in_progress = true;
205            }
206            SequenceFlag::Last | SequenceFlag::Unsegmented => {
207                let pos = self
208                    .message_ends
209                    .iter()
210                    .position(|&e| e > end)
211                    .unwrap_or(self.message_ends.len());
212                let _ = self.message_ends.insert(pos, end);
213            }
214            SequenceFlag::Continuation => {}
215        }
216
217        Ok(())
218    }
219
220    /// Deliver complete messages whose gaps have all been filled.
221    fn try_deliver(&mut self) -> Result<(), ReceiverError> {
222        loop {
223            let msg_end;
224
225            if self.pending_shift > 0 {
226                let Some(&pre) = self.message_ends.first() else {
227                    break;
228                };
229                if !self.gaps.is_complete_to(pre) {
230                    break;
231                }
232                self.apply_pending_shift();
233                msg_end = *self.message_ends.first().unwrap();
234            } else {
235                let Some(&end) = self.message_ends.first() else {
236                    break;
237                };
238                if !self.gaps.is_complete_to(end) {
239                    break;
240                }
241                msg_end = end;
242            }
243
244            self.message_ends.remove(0);
245            self.complete_message_len = Some(msg_end);
246            self.reassembly_in_progress = false;
247
248            let segs = (msg_end + MTU - 1) / MTU;
249            for _ in 0..segs {
250                self.base.advance();
251            }
252            self.pending_shift = segs * MTU;
253            self.pending_segs = segs as u16;
254        }
255        Ok(())
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262
263    #[allow(unused)]
264    fn cfg_immediate() -> super::super::super::ReceiverConfig {
265        super::super::super::ReceiverConfig {
266            local_address: crate::network::isl::address::Address::satellite(1, 1),
267            apid: crate::network::spp::Apid::new(0x42).unwrap(),
268            function_code: 0,
269            immediate_ack: true,
270            ack_delay_ticks: 20,
271            progress_timeout_ticks: None,
272        }
273    }
274
275    const MTU: usize = 64;
276
277    type Rx = LiteReceiver<{ 8 * MTU }, 8, MTU>;
278
279    fn make() -> Rx {
280        Rx::new()
281    }
282
283    #[test]
284    fn segmented_mtu_aligned() {
285        let mut rx = make();
286
287        let first = [1u8; MTU];
288        let cont = [2u8; MTU];
289        let last = [3u8; 10];
290
291        rx.handle_data(SequenceCount::from(0), SequenceFlag::First, &first)
292            .unwrap();
293        assert!(!rx.has_message());
294
295        rx.handle_data(SequenceCount::from(1), SequenceFlag::Continuation, &cont)
296            .unwrap();
297        assert!(!rx.has_message());
298
299        rx.handle_data(SequenceCount::from(2), SequenceFlag::Last, &last)
300            .unwrap();
301        assert!(rx.has_message());
302        let msg = rx.take_message().unwrap();
303        assert_eq!(&msg[..MTU], &first);
304        assert_eq!(&msg[MTU..2 * MTU], &cont);
305        assert_eq!(&msg[2 * MTU..2 * MTU + 10], &last);
306    }
307
308    #[test]
309    fn gap_merge_split() {
310        let mut rx = make();
311        let p = [0u8; 10];
312
313        rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &p)
314            .unwrap();
315        rx.take_message();
316
317        rx.handle_data(SequenceCount::from(3), SequenceFlag::Unsegmented, &p)
318            .unwrap();
319
320        rx.handle_data(SequenceCount::from(1), SequenceFlag::Unsegmented, &p)
321            .unwrap();
322        assert!(rx.has_message());
323        rx.take_message();
324
325        rx.handle_data(SequenceCount::from(2), SequenceFlag::Unsegmented, &p)
326            .unwrap();
327        // seq=2 fills the gap; both seq=2 and seq=3 pass through try_deliver,
328        // with seq=3 landing in complete_message_len last.
329        assert!(rx.has_message());
330    }
331}