Skip to main content

leodos_protocols/transport/srspp/machine/receiver/reassembly/
fast.rs

1use crate::network::spp::SequenceCount;
2use crate::network::spp::SequenceFlag;
3
4use super::super::DataOutcome;
5use super::super::GapOutcome;
6use super::super::ReceiverBackend;
7use super::super::ReceiverError;
8use super::super::base::ReceiverBase;
9use super::super::utils::Bitset;
10use super::super::utils::SlotMap;
11
12/// Fastest backend — O(1) insert and O(1) delivery.
13///
14/// Use when CPU budget is tight and you can afford the extra
15/// memory: both buffering an out-of-order segment and delivering
16/// a run of consecutive segments are constant-time per packet.
17///
18/// Stores out-of-order segments in `WIN` fixed MTU-sized slots
19/// indexed by `seq % WIN`. Each slot reserves a full MTU even
20/// for shorter payloads.
21///
22/// Static memory: `WIN × MTU` (reorder) + `REASM` (reassembly).
23///
24/// * `WIN` — receive window (number of slots)
25/// * `MTU` — maximum segment payload size
26/// * `REASM` — reassembly buffer size
27/// * `TOTAL` — total slot storage (`WIN * MTU`)
28pub struct FastReceiver<const WIN: usize, const MTU: usize, const REASM: usize, const TOTAL: usize>
29{
30    /// Sequence tracking state.
31    base: ReceiverBase,
32    /// Bitset tracking which window slots hold buffered segments.
33    occupied: Bitset<WIN>,
34    /// Fixed-size slot storage for out-of-order payloads.
35    slots: SlotMap<TOTAL, WIN, MTU>,
36    /// Per-slot sequence flags for buffered segments.
37    flags: [SequenceFlag; WIN],
38    /// Buffer for reassembling segmented messages.
39    reassembly: [u8; REASM],
40    /// Current write position in the reassembly buffer.
41    reassembly_len: usize,
42    /// Whether a multi-segment reassembly is in progress.
43    reassembly_in_progress: bool,
44    /// Length of a fully reassembled message, if one is ready.
45    complete_message_len: Option<usize>,
46}
47
48impl<const WIN: usize, const MTU: usize, const REASM: usize, const TOTAL: usize> ReceiverBackend
49    for FastReceiver<WIN, MTU, REASM, TOTAL>
50{
51    fn new() -> Self {
52        Self {
53            base: ReceiverBase::new(),
54            occupied: Bitset::new(),
55            slots: SlotMap::new(),
56            flags: [SequenceFlag::default(); WIN],
57            reassembly: [0u8; REASM],
58            reassembly_len: 0,
59            reassembly_in_progress: false,
60            complete_message_len: None,
61        }
62    }
63
64    fn handle_data(
65        &mut self,
66        seq: SequenceCount,
67        flags: SequenceFlag,
68        payload: &[u8],
69    ) -> Result<DataOutcome, ReceiverError> {
70        let distance = self.base.distance(seq);
71        let seq_before = self.base.expected_seq_raw();
72
73        if distance == 0 {
74            self.deliver_packet(flags, payload)?;
75            self.base.advance();
76            self.deliver_buffered()?;
77        } else if distance < Self::MAX_AHEAD {
78            if !self.base.is_ooo_duplicate(distance) {
79                self.store_ooo(seq.value(), flags, payload);
80                self.base.record_ooo(distance);
81            }
82        }
83
84        let progressed = self.base.expected_seq_raw() != seq_before;
85        let has_gap = self.occupied.any();
86        Ok(DataOutcome { progressed, has_gap })
87    }
88
89    fn skip_gap(&mut self) -> Result<GapOutcome, ReceiverError> {
90        if self.reassembly_in_progress {
91            self.reassembly_len = 0;
92            self.reassembly_in_progress = false;
93        }
94
95        self.base.advance();
96        self.deliver_buffered()?;
97
98        let has_gap = self.occupied.any();
99        Ok(GapOutcome { has_gap })
100    }
101
102    fn take_message(&mut self) -> Option<&[u8]> {
103        self.complete_message_len
104            .take()
105            .map(|len| &self.reassembly[..len])
106    }
107
108    fn reassembly_data(&self, len: usize) -> &[u8] {
109        &self.reassembly[..len]
110    }
111
112    fn has_message(&self) -> bool {
113        self.complete_message_len.is_some()
114    }
115
116    fn message_len(&self) -> Option<usize> {
117        self.complete_message_len
118    }
119
120    fn consume_message<Ret>(&mut self, f: impl FnOnce(&[u8]) -> Ret) -> Option<Ret> {
121        let len = self.complete_message_len.take()?;
122        Some(f(&self.reassembly[..len]))
123    }
124
125    fn expected_seq(&self) -> SequenceCount {
126        self.base.expected_seq()
127    }
128
129    fn recv_bitmap(&self) -> u16 {
130        self.base.recv_bitmap()
131    }
132}
133
134impl<const WIN: usize, const MTU: usize, const REASM: usize, const TOTAL: usize>
135    FastReceiver<WIN, MTU, REASM, TOTAL>
136{
137    /// Maximum forward distance accepted for out-of-order packets.
138    const MAX_AHEAD: u16 = WIN as u16;
139
140    /// Map a raw sequence number to a window slot index.
141    fn slot_idx(seq: u16) -> usize {
142        seq as usize % WIN
143    }
144
145    /// Store an out-of-order segment into a fixed slot.
146    fn store_ooo(&mut self, seq: u16, flags: SequenceFlag, payload: &[u8]) {
147        let idx = Self::slot_idx(seq);
148        if self.occupied.is_set(idx) {
149            return;
150        }
151        self.slots.write(idx, payload);
152        self.flags[idx] = flags;
153        self.occupied.set(idx);
154    }
155
156    /// Deliver consecutive buffered segments starting from the expected sequence.
157    fn deliver_buffered(&mut self) -> Result<(), ReceiverError> {
158        let mut temp = [0u8; MTU];
159        loop {
160            let seq = self.base.expected_seq_raw();
161            let idx = Self::slot_idx(seq);
162            if !self.occupied.is_set(idx) {
163                break;
164            }
165
166            let flags = self.flags[idx];
167            self.occupied.clear(idx);
168
169            let len = self.slots.read(idx, &mut temp);
170            self.deliver_packet(flags, &temp[..len])?;
171            self.base.advance();
172        }
173        Ok(())
174    }
175
176    /// Append or complete a packet into the reassembly buffer based on its flags.
177    fn deliver_packet(&mut self, flags: SequenceFlag, payload: &[u8]) -> Result<(), ReceiverError> {
178        match flags {
179            SequenceFlag::Unsegmented => {
180                if payload.len() > REASM {
181                    return Err(ReceiverError::MessageTooLarge);
182                }
183                self.reassembly[..payload.len()].copy_from_slice(payload);
184                self.complete_message_len = Some(payload.len());
185                self.reassembly_in_progress = false;
186            }
187            SequenceFlag::First => {
188                if payload.len() > REASM {
189                    return Err(ReceiverError::MessageTooLarge);
190                }
191                self.reassembly[..payload.len()].copy_from_slice(payload);
192                self.reassembly_len = payload.len();
193                self.reassembly_in_progress = true;
194                self.complete_message_len = None;
195            }
196            SequenceFlag::Continuation | SequenceFlag::Last => {
197                if !self.reassembly_in_progress {
198                    return Err(ReceiverError::ReassemblyError);
199                }
200                let new_len = self.reassembly_len + payload.len();
201                if new_len > REASM {
202                    return Err(ReceiverError::MessageTooLarge);
203                }
204                self.reassembly[self.reassembly_len..new_len].copy_from_slice(payload);
205                self.reassembly_len = new_len;
206
207                if flags == SequenceFlag::Last {
208                    self.complete_message_len = Some(self.reassembly_len);
209                    self.reassembly_in_progress = false;
210                }
211            }
212        }
213        Ok(())
214    }
215}