Skip to main content

leodos_protocols/transport/srspp/machine/receiver/reassembly/
packed.rs

1use crate::network::spp::SequenceCount;
2use crate::network::spp::SequenceFlag;
3
4use super::super::DataOutcome;
5use super::super::GapOutcome;
6use super::super::ReceiverBackend;
7use super::super::ReceiverError;
8use super::super::base::ReceiverBase;
9use super::super::utils::Bitset;
10use super::super::utils::BumpSlab;
11
12/// Metadata for a single buffered out-of-order segment.
13#[derive(Clone, Copy, Default)]
14struct SlotMeta {
15    /// Byte offset into the bump slab.
16    offset: usize,
17    /// Length of the stored payload in bytes.
18    len: usize,
19    /// SPP sequence flag for this segment.
20    flags: SequenceFlag,
21}
22
23/// Packed backend — efficient when payloads are small.
24///
25/// Use when segments are typically much smaller than the MTU:
26/// out-of-order payloads are bump-allocated at their actual
27/// size, so `BUF` can be much smaller than `WIN × MTU`.
28///
29/// OOO insert is O(1) (bump append). Delivery copies each
30/// buffered segment into the reassembly buffer — O(MSG) total.
31///
32/// Static memory: `BUF` (reorder slab) + `REASM` (reassembly).
33///
34/// * `WIN` — receive window (number of indexed slots)
35/// * `BUF` — bump slab capacity in bytes
36/// * `REASM` — reassembly buffer size
37pub struct PackedReceiver<const WIN: usize, const BUF: usize, const REASM: usize> {
38    /// Sequence tracking state.
39    base: ReceiverBase,
40    /// Bitset tracking which window slots hold buffered segments.
41    occupied: Bitset<WIN>,
42    /// Per-slot metadata (offset, length, flags) for buffered segments.
43    slot_meta: [SlotMeta; WIN],
44    /// Append-only bump allocator storing out-of-order payloads.
45    slab: BumpSlab<BUF>,
46    /// Buffer for reassembling segmented messages.
47    reassembly: [u8; REASM],
48    /// Current write position in the reassembly buffer.
49    reassembly_len: usize,
50    /// Whether a multi-segment reassembly is in progress.
51    reassembly_in_progress: bool,
52    /// Length of a fully reassembled message, if one is ready.
53    complete_message_len: Option<usize>,
54}
55
56impl<const WIN: usize, const BUF: usize, const REASM: usize> ReceiverBackend
57    for PackedReceiver<WIN, BUF, REASM>
58{
59    fn new() -> Self {
60        Self {
61            base: ReceiverBase::new(),
62            occupied: Bitset::new(),
63            slot_meta: [SlotMeta::default(); WIN],
64            slab: BumpSlab::new(),
65            reassembly: [0u8; REASM],
66            reassembly_len: 0,
67            reassembly_in_progress: false,
68            complete_message_len: None,
69        }
70    }
71
72    fn handle_data(
73        &mut self,
74        seq: SequenceCount,
75        flags: SequenceFlag,
76        payload: &[u8],
77    ) -> Result<DataOutcome, ReceiverError> {
78        let distance = self.base.distance(seq);
79        let seq_before = self.base.expected_seq_raw();
80
81        if distance == 0 {
82            self.deliver_packet(flags, payload)?;
83            self.base.advance();
84            self.deliver_buffered()?;
85        } else if distance < Self::MAX_AHEAD {
86            if !self.base.is_ooo_duplicate(distance) {
87                self.store_ooo(seq.value(), flags, payload)?;
88                self.base.record_ooo(distance);
89            }
90        }
91
92        let progressed = self.base.expected_seq_raw() != seq_before;
93        let has_gap = self.occupied.any();
94        Ok(DataOutcome { progressed, has_gap })
95    }
96
97    fn skip_gap(&mut self) -> Result<GapOutcome, ReceiverError> {
98        if self.reassembly_in_progress {
99            self.reassembly_len = 0;
100            self.reassembly_in_progress = false;
101        }
102
103        self.base.advance();
104        self.deliver_buffered()?;
105
106        let has_gap = self.occupied.any();
107        Ok(GapOutcome { has_gap })
108    }
109
110    fn take_message(&mut self) -> Option<&[u8]> {
111        self.complete_message_len
112            .take()
113            .map(|len| &self.reassembly[..len])
114    }
115
116    fn reassembly_data(&self, len: usize) -> &[u8] {
117        &self.reassembly[..len]
118    }
119
120    fn has_message(&self) -> bool {
121        self.complete_message_len.is_some()
122    }
123
124    fn message_len(&self) -> Option<usize> {
125        self.complete_message_len
126    }
127
128    fn consume_message<Ret>(&mut self, f: impl FnOnce(&[u8]) -> Ret) -> Option<Ret> {
129        let len = self.complete_message_len.take()?;
130        Some(f(&self.reassembly[..len]))
131    }
132
133    fn expected_seq(&self) -> SequenceCount {
134        self.base.expected_seq()
135    }
136
137    fn recv_bitmap(&self) -> u16 {
138        self.base.recv_bitmap()
139    }
140}
141
142impl<const WIN: usize, const BUF: usize, const REASM: usize> PackedReceiver<WIN, BUF, REASM> {
143    /// Maximum forward distance accepted for out-of-order packets.
144    const MAX_AHEAD: u16 = WIN as u16;
145
146    /// Map a raw sequence number to a window slot index.
147    fn slot_idx(seq: u16) -> usize {
148        seq as usize % WIN
149    }
150
151    /// Store an out-of-order segment in the slab and record its metadata.
152    fn store_ooo(
153        &mut self,
154        seq: u16,
155        flags: SequenceFlag,
156        payload: &[u8],
157    ) -> Result<(), ReceiverError> {
158        let idx = Self::slot_idx(seq);
159        if self.occupied.is_set(idx) {
160            return Ok(());
161        }
162        let (offset, len) = self.slab.alloc(payload).ok_or(ReceiverError::BufferFull)?;
163        self.slot_meta[idx] = SlotMeta { offset, len, flags };
164        self.occupied.set(idx);
165        Ok(())
166    }
167
168    /// Deliver consecutive buffered segments starting from the expected sequence.
169    fn deliver_buffered(&mut self) -> Result<(), ReceiverError> {
170        loop {
171            let seq = self.base.expected_seq_raw();
172            let idx = Self::slot_idx(seq);
173            if !self.occupied.is_set(idx) {
174                break;
175            }
176
177            let meta = self.slot_meta[idx];
178            self.occupied.clear(idx);
179
180            let mut temp = [0u8; REASM];
181            let len = meta.len.min(REASM);
182            temp[..len].copy_from_slice(self.slab.get(meta.offset, meta.len));
183            self.deliver_packet(meta.flags, &temp[..len])?;
184            self.base.advance();
185        }
186        self.slab.clear();
187        Ok(())
188    }
189
190    /// Append or complete a packet into the reassembly buffer based on its flags.
191    fn deliver_packet(&mut self, flags: SequenceFlag, payload: &[u8]) -> Result<(), ReceiverError> {
192        match flags {
193            SequenceFlag::Unsegmented => {
194                if payload.len() > REASM {
195                    return Err(ReceiverError::MessageTooLarge);
196                }
197                self.reassembly[..payload.len()].copy_from_slice(payload);
198                self.complete_message_len = Some(payload.len());
199                self.reassembly_in_progress = false;
200            }
201            SequenceFlag::First => {
202                if payload.len() > REASM {
203                    return Err(ReceiverError::MessageTooLarge);
204                }
205                self.reassembly[..payload.len()].copy_from_slice(payload);
206                self.reassembly_len = payload.len();
207                self.reassembly_in_progress = true;
208                self.complete_message_len = None;
209            }
210            SequenceFlag::Continuation | SequenceFlag::Last => {
211                if !self.reassembly_in_progress {
212                    return Err(ReceiverError::ReassemblyError);
213                }
214                let new_len = self.reassembly_len + payload.len();
215                if new_len > REASM {
216                    return Err(ReceiverError::MessageTooLarge);
217                }
218                self.reassembly[self.reassembly_len..new_len].copy_from_slice(payload);
219                self.reassembly_len = new_len;
220
221                if flags == SequenceFlag::Last {
222                    self.complete_message_len = Some(self.reassembly_len);
223                    self.reassembly_in_progress = false;
224                }
225            }
226        }
227        Ok(())
228    }
229}