leodos_protocols/transport/srspp/machine/receiver/reassembly/
lite.rs1use crate::network::spp::SequenceCount;
2use crate::network::spp::SequenceFlag;
3use heapless::Vec;
4
5use super::super::DataOutcome;
6use super::super::GapOutcome;
7use super::super::ReceiverBackend;
8use super::super::ReceiverError;
9use super::super::base::ReceiverBase;
10use super::super::utils::GapTracker;
11
12pub struct LiteReceiver<const REASM: usize, const WIN: usize, const MTU: usize> {
27 base: ReceiverBase,
29 message_buf: [u8; REASM],
31 gaps: GapTracker<WIN>,
33 message_ends: Vec<usize, WIN>,
35 base_seq: u16,
37 reassembly_in_progress: bool,
39 complete_message_len: Option<usize>,
41 pending_shift: usize,
43 pending_segs: u16,
45}
46
47impl<const REASM: usize, const WIN: usize, const MTU: usize> ReceiverBackend
48 for LiteReceiver<REASM, WIN, MTU>
49{
50 fn new() -> Self {
51 Self {
52 base: ReceiverBase::new(),
53 message_buf: [0u8; REASM],
54 gaps: GapTracker::new(),
55 message_ends: Vec::new(),
56 base_seq: 0,
57 reassembly_in_progress: false,
58 complete_message_len: None,
59 pending_shift: 0,
60 pending_segs: 0,
61 }
62 }
63
64 fn handle_data(
65 &mut self,
66 seq: SequenceCount,
67 flags: SequenceFlag,
68 payload: &[u8],
69 ) -> Result<DataOutcome, ReceiverError> {
70 if self.complete_message_len.is_none() {
71 self.apply_pending_shift();
72 }
73
74 let distance = self.base.distance(seq);
75 let seq_before = self.base.expected_seq_raw();
76
77 let max_dist = Self::MAX_SEGS as u16;
78 if distance == 0 || distance < max_dist && !self.base.is_ooo_duplicate(distance) {
79 if distance > 0 {
80 self.base.record_ooo(distance);
81 }
82 self.place_segment(seq, flags, payload)?;
83 self.try_deliver()?;
84 }
85
86 let progressed = self.base.expected_seq_raw() != seq_before;
87 let has_gap = self.gaps.has_gaps() || !self.message_ends.is_empty();
88 Ok(DataOutcome { progressed, has_gap })
89 }
90
91 fn skip_gap(&mut self) -> Result<GapOutcome, ReceiverError> {
92 if self.complete_message_len.is_none() {
93 self.apply_pending_shift();
94 }
95
96 if self.reassembly_in_progress {
97 self.gaps.reset();
98 self.message_ends.clear();
99 self.reassembly_in_progress = false;
100 }
101
102 let shift = MTU;
103 if shift < REASM {
104 self.message_buf.copy_within(shift.., 0);
105 }
106 self.gaps.shift(shift);
107 self.message_ends.retain(|&e| e > shift);
108 for e in self.message_ends.iter_mut() {
109 *e -= shift;
110 }
111
112 self.base.advance();
113 self.base_seq = self.base_seq.wrapping_add(1) & SequenceCount::MAX as u16;
114
115 self.try_deliver()?;
116
117 let has_gap = self.gaps.has_gaps() || !self.message_ends.is_empty();
118 Ok(GapOutcome { has_gap })
119 }
120
121 fn take_message(&mut self) -> Option<&[u8]> {
122 self.complete_message_len
123 .take()
124 .map(|len| &self.message_buf[..len])
125 }
126
127 fn reassembly_data(&self, len: usize) -> &[u8] {
128 &self.message_buf[..len]
129 }
130
131 fn has_message(&self) -> bool {
132 self.complete_message_len.is_some()
133 }
134
135 fn message_len(&self) -> Option<usize> {
136 self.complete_message_len
137 }
138
139 fn consume_message<Ret>(&mut self, f: impl FnOnce(&[u8]) -> Ret) -> Option<Ret> {
140 let len = self.complete_message_len.take()?;
141 Some(f(&self.message_buf[..len]))
142 }
143
144 fn expected_seq(&self) -> SequenceCount {
145 self.base.expected_seq()
146 }
147
148 fn recv_bitmap(&self) -> u16 {
149 self.base.recv_bitmap()
150 }
151}
152
153impl<const REASM: usize, const WIN: usize, const MTU: usize> LiteReceiver<REASM, WIN, MTU> {
154 const MAX_SEGS: usize = REASM / MTU;
156
157 fn apply_pending_shift(&mut self) {
159 let shift = self.pending_shift;
160 if shift == 0 {
161 return;
162 }
163 if shift < REASM {
164 self.message_buf.copy_within(shift.., 0);
165 }
166 self.gaps.shift(shift);
167 self.message_ends.retain(|&e| e > shift);
168 for e in self.message_ends.iter_mut() {
169 *e -= shift;
170 }
171 self.base_seq = self.base_seq.wrapping_add(self.pending_segs) & SequenceCount::MAX as u16;
172 self.pending_shift = 0;
173 self.pending_segs = 0;
174 }
175
176 fn seg_distance(&self, seq: SequenceCount) -> u16 {
178 seq.value().wrapping_sub(self.base_seq) & SequenceCount::MAX
179 }
180
181 fn place_segment(
183 &mut self,
184 seq: SequenceCount,
185 flags: SequenceFlag,
186 payload: &[u8],
187 ) -> Result<(), ReceiverError> {
188 let d = self.seg_distance(seq) as usize;
189 if d >= Self::MAX_SEGS {
190 return Ok(());
191 }
192 let start = d * MTU;
193 let end = start + payload.len();
194 if end > REASM {
195 return Err(ReceiverError::MessageTooLarge);
196 }
197 self.message_buf[start..end].copy_from_slice(payload);
198
199 let slot_end = (start + MTU).min(REASM);
200 self.gaps.fill(start, slot_end);
201
202 match flags {
203 SequenceFlag::First => {
204 self.reassembly_in_progress = true;
205 }
206 SequenceFlag::Last | SequenceFlag::Unsegmented => {
207 let pos = self
208 .message_ends
209 .iter()
210 .position(|&e| e > end)
211 .unwrap_or(self.message_ends.len());
212 let _ = self.message_ends.insert(pos, end);
213 }
214 SequenceFlag::Continuation => {}
215 }
216
217 Ok(())
218 }
219
220 fn try_deliver(&mut self) -> Result<(), ReceiverError> {
222 loop {
223 let msg_end;
224
225 if self.pending_shift > 0 {
226 let Some(&pre) = self.message_ends.first() else {
227 break;
228 };
229 if !self.gaps.is_complete_to(pre) {
230 break;
231 }
232 self.apply_pending_shift();
233 msg_end = *self.message_ends.first().unwrap();
234 } else {
235 let Some(&end) = self.message_ends.first() else {
236 break;
237 };
238 if !self.gaps.is_complete_to(end) {
239 break;
240 }
241 msg_end = end;
242 }
243
244 self.message_ends.remove(0);
245 self.complete_message_len = Some(msg_end);
246 self.reassembly_in_progress = false;
247
248 let segs = (msg_end + MTU - 1) / MTU;
249 for _ in 0..segs {
250 self.base.advance();
251 }
252 self.pending_shift = segs * MTU;
253 self.pending_segs = segs as u16;
254 }
255 Ok(())
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262
263 #[allow(unused)]
264 fn cfg_immediate() -> super::super::super::ReceiverConfig {
265 super::super::super::ReceiverConfig {
266 local_address: crate::network::isl::address::Address::satellite(1, 1),
267 apid: crate::network::spp::Apid::new(0x42).unwrap(),
268 function_code: 0,
269 immediate_ack: true,
270 ack_delay_ticks: 20,
271 progress_timeout_ticks: None,
272 }
273 }
274
275 const MTU: usize = 64;
276
277 type Rx = LiteReceiver<{ 8 * MTU }, 8, MTU>;
278
279 fn make() -> Rx {
280 Rx::new()
281 }
282
283 #[test]
284 fn segmented_mtu_aligned() {
285 let mut rx = make();
286
287 let first = [1u8; MTU];
288 let cont = [2u8; MTU];
289 let last = [3u8; 10];
290
291 rx.handle_data(SequenceCount::from(0), SequenceFlag::First, &first)
292 .unwrap();
293 assert!(!rx.has_message());
294
295 rx.handle_data(SequenceCount::from(1), SequenceFlag::Continuation, &cont)
296 .unwrap();
297 assert!(!rx.has_message());
298
299 rx.handle_data(SequenceCount::from(2), SequenceFlag::Last, &last)
300 .unwrap();
301 assert!(rx.has_message());
302 let msg = rx.take_message().unwrap();
303 assert_eq!(&msg[..MTU], &first);
304 assert_eq!(&msg[MTU..2 * MTU], &cont);
305 assert_eq!(&msg[2 * MTU..2 * MTU + 10], &last);
306 }
307
308 #[test]
309 fn gap_merge_split() {
310 let mut rx = make();
311 let p = [0u8; 10];
312
313 rx.handle_data(SequenceCount::from(0), SequenceFlag::Unsegmented, &p)
314 .unwrap();
315 rx.take_message();
316
317 rx.handle_data(SequenceCount::from(3), SequenceFlag::Unsegmented, &p)
318 .unwrap();
319
320 rx.handle_data(SequenceCount::from(1), SequenceFlag::Unsegmented, &p)
321 .unwrap();
322 assert!(rx.has_message());
323 rx.take_message();
324
325 rx.handle_data(SequenceCount::from(2), SequenceFlag::Unsegmented, &p)
326 .unwrap();
327 assert!(rx.has_message());
330 }
331}