leodos_protocols/transport/srspp/machine/receiver/reassembly/
packed.rs1use crate::network::spp::SequenceCount;
2use crate::network::spp::SequenceFlag;
3
4use super::super::DataOutcome;
5use super::super::GapOutcome;
6use super::super::ReceiverBackend;
7use super::super::ReceiverError;
8use super::super::base::ReceiverBase;
9use super::super::utils::Bitset;
10use super::super::utils::BumpSlab;
11
12#[derive(Clone, Copy, Default)]
14struct SlotMeta {
15 offset: usize,
17 len: usize,
19 flags: SequenceFlag,
21}
22
23pub struct PackedReceiver<const WIN: usize, const BUF: usize, const REASM: usize> {
38 base: ReceiverBase,
40 occupied: Bitset<WIN>,
42 slot_meta: [SlotMeta; WIN],
44 slab: BumpSlab<BUF>,
46 reassembly: [u8; REASM],
48 reassembly_len: usize,
50 reassembly_in_progress: bool,
52 complete_message_len: Option<usize>,
54}
55
56impl<const WIN: usize, const BUF: usize, const REASM: usize> ReceiverBackend
57 for PackedReceiver<WIN, BUF, REASM>
58{
59 fn new() -> Self {
60 Self {
61 base: ReceiverBase::new(),
62 occupied: Bitset::new(),
63 slot_meta: [SlotMeta::default(); WIN],
64 slab: BumpSlab::new(),
65 reassembly: [0u8; REASM],
66 reassembly_len: 0,
67 reassembly_in_progress: false,
68 complete_message_len: None,
69 }
70 }
71
72 fn handle_data(
73 &mut self,
74 seq: SequenceCount,
75 flags: SequenceFlag,
76 payload: &[u8],
77 ) -> Result<DataOutcome, ReceiverError> {
78 let distance = self.base.distance(seq);
79 let seq_before = self.base.expected_seq_raw();
80
81 if distance == 0 {
82 self.deliver_packet(flags, payload)?;
83 self.base.advance();
84 self.deliver_buffered()?;
85 } else if distance < Self::MAX_AHEAD {
86 if !self.base.is_ooo_duplicate(distance) {
87 self.store_ooo(seq.value(), flags, payload)?;
88 self.base.record_ooo(distance);
89 }
90 }
91
92 let progressed = self.base.expected_seq_raw() != seq_before;
93 let has_gap = self.occupied.any();
94 Ok(DataOutcome { progressed, has_gap })
95 }
96
97 fn skip_gap(&mut self) -> Result<GapOutcome, ReceiverError> {
98 if self.reassembly_in_progress {
99 self.reassembly_len = 0;
100 self.reassembly_in_progress = false;
101 }
102
103 self.base.advance();
104 self.deliver_buffered()?;
105
106 let has_gap = self.occupied.any();
107 Ok(GapOutcome { has_gap })
108 }
109
110 fn take_message(&mut self) -> Option<&[u8]> {
111 self.complete_message_len
112 .take()
113 .map(|len| &self.reassembly[..len])
114 }
115
116 fn reassembly_data(&self, len: usize) -> &[u8] {
117 &self.reassembly[..len]
118 }
119
120 fn has_message(&self) -> bool {
121 self.complete_message_len.is_some()
122 }
123
124 fn message_len(&self) -> Option<usize> {
125 self.complete_message_len
126 }
127
128 fn consume_message<Ret>(&mut self, f: impl FnOnce(&[u8]) -> Ret) -> Option<Ret> {
129 let len = self.complete_message_len.take()?;
130 Some(f(&self.reassembly[..len]))
131 }
132
133 fn expected_seq(&self) -> SequenceCount {
134 self.base.expected_seq()
135 }
136
137 fn recv_bitmap(&self) -> u16 {
138 self.base.recv_bitmap()
139 }
140}
141
142impl<const WIN: usize, const BUF: usize, const REASM: usize> PackedReceiver<WIN, BUF, REASM> {
143 const MAX_AHEAD: u16 = WIN as u16;
145
146 fn slot_idx(seq: u16) -> usize {
148 seq as usize % WIN
149 }
150
151 fn store_ooo(
153 &mut self,
154 seq: u16,
155 flags: SequenceFlag,
156 payload: &[u8],
157 ) -> Result<(), ReceiverError> {
158 let idx = Self::slot_idx(seq);
159 if self.occupied.is_set(idx) {
160 return Ok(());
161 }
162 let (offset, len) = self.slab.alloc(payload).ok_or(ReceiverError::BufferFull)?;
163 self.slot_meta[idx] = SlotMeta { offset, len, flags };
164 self.occupied.set(idx);
165 Ok(())
166 }
167
168 fn deliver_buffered(&mut self) -> Result<(), ReceiverError> {
170 loop {
171 let seq = self.base.expected_seq_raw();
172 let idx = Self::slot_idx(seq);
173 if !self.occupied.is_set(idx) {
174 break;
175 }
176
177 let meta = self.slot_meta[idx];
178 self.occupied.clear(idx);
179
180 let mut temp = [0u8; REASM];
181 let len = meta.len.min(REASM);
182 temp[..len].copy_from_slice(self.slab.get(meta.offset, meta.len));
183 self.deliver_packet(meta.flags, &temp[..len])?;
184 self.base.advance();
185 }
186 self.slab.clear();
187 Ok(())
188 }
189
190 fn deliver_packet(&mut self, flags: SequenceFlag, payload: &[u8]) -> Result<(), ReceiverError> {
192 match flags {
193 SequenceFlag::Unsegmented => {
194 if payload.len() > REASM {
195 return Err(ReceiverError::MessageTooLarge);
196 }
197 self.reassembly[..payload.len()].copy_from_slice(payload);
198 self.complete_message_len = Some(payload.len());
199 self.reassembly_in_progress = false;
200 }
201 SequenceFlag::First => {
202 if payload.len() > REASM {
203 return Err(ReceiverError::MessageTooLarge);
204 }
205 self.reassembly[..payload.len()].copy_from_slice(payload);
206 self.reassembly_len = payload.len();
207 self.reassembly_in_progress = true;
208 self.complete_message_len = None;
209 }
210 SequenceFlag::Continuation | SequenceFlag::Last => {
211 if !self.reassembly_in_progress {
212 return Err(ReceiverError::ReassemblyError);
213 }
214 let new_len = self.reassembly_len + payload.len();
215 if new_len > REASM {
216 return Err(ReceiverError::MessageTooLarge);
217 }
218 self.reassembly[self.reassembly_len..new_len].copy_from_slice(payload);
219 self.reassembly_len = new_len;
220
221 if flags == SequenceFlag::Last {
222 self.complete_message_len = Some(self.reassembly_len);
223 self.reassembly_in_progress = false;
224 }
225 }
226 }
227 Ok(())
228 }
229}