leodos_protocols/transport/srspp/machine/receiver/reassembly/
fast.rs1use crate::network::spp::SequenceCount;
2use crate::network::spp::SequenceFlag;
3
4use super::super::DataOutcome;
5use super::super::GapOutcome;
6use super::super::ReceiverBackend;
7use super::super::ReceiverError;
8use super::super::base::ReceiverBase;
9use super::super::utils::Bitset;
10use super::super::utils::SlotMap;
11
12pub struct FastReceiver<const WIN: usize, const MTU: usize, const REASM: usize, const TOTAL: usize>
29{
30 base: ReceiverBase,
32 occupied: Bitset<WIN>,
34 slots: SlotMap<TOTAL, WIN, MTU>,
36 flags: [SequenceFlag; WIN],
38 reassembly: [u8; REASM],
40 reassembly_len: usize,
42 reassembly_in_progress: bool,
44 complete_message_len: Option<usize>,
46}
47
48impl<const WIN: usize, const MTU: usize, const REASM: usize, const TOTAL: usize> ReceiverBackend
49 for FastReceiver<WIN, MTU, REASM, TOTAL>
50{
51 fn new() -> Self {
52 Self {
53 base: ReceiverBase::new(),
54 occupied: Bitset::new(),
55 slots: SlotMap::new(),
56 flags: [SequenceFlag::default(); WIN],
57 reassembly: [0u8; REASM],
58 reassembly_len: 0,
59 reassembly_in_progress: false,
60 complete_message_len: None,
61 }
62 }
63
64 fn handle_data(
65 &mut self,
66 seq: SequenceCount,
67 flags: SequenceFlag,
68 payload: &[u8],
69 ) -> Result<DataOutcome, ReceiverError> {
70 let distance = self.base.distance(seq);
71 let seq_before = self.base.expected_seq_raw();
72
73 if distance == 0 {
74 self.deliver_packet(flags, payload)?;
75 self.base.advance();
76 self.deliver_buffered()?;
77 } else if distance < Self::MAX_AHEAD {
78 if !self.base.is_ooo_duplicate(distance) {
79 self.store_ooo(seq.value(), flags, payload);
80 self.base.record_ooo(distance);
81 }
82 }
83
84 let progressed = self.base.expected_seq_raw() != seq_before;
85 let has_gap = self.occupied.any();
86 Ok(DataOutcome { progressed, has_gap })
87 }
88
89 fn skip_gap(&mut self) -> Result<GapOutcome, ReceiverError> {
90 if self.reassembly_in_progress {
91 self.reassembly_len = 0;
92 self.reassembly_in_progress = false;
93 }
94
95 self.base.advance();
96 self.deliver_buffered()?;
97
98 let has_gap = self.occupied.any();
99 Ok(GapOutcome { has_gap })
100 }
101
102 fn take_message(&mut self) -> Option<&[u8]> {
103 self.complete_message_len
104 .take()
105 .map(|len| &self.reassembly[..len])
106 }
107
108 fn reassembly_data(&self, len: usize) -> &[u8] {
109 &self.reassembly[..len]
110 }
111
112 fn has_message(&self) -> bool {
113 self.complete_message_len.is_some()
114 }
115
116 fn message_len(&self) -> Option<usize> {
117 self.complete_message_len
118 }
119
120 fn consume_message<Ret>(&mut self, f: impl FnOnce(&[u8]) -> Ret) -> Option<Ret> {
121 let len = self.complete_message_len.take()?;
122 Some(f(&self.reassembly[..len]))
123 }
124
125 fn expected_seq(&self) -> SequenceCount {
126 self.base.expected_seq()
127 }
128
129 fn recv_bitmap(&self) -> u16 {
130 self.base.recv_bitmap()
131 }
132}
133
134impl<const WIN: usize, const MTU: usize, const REASM: usize, const TOTAL: usize>
135 FastReceiver<WIN, MTU, REASM, TOTAL>
136{
137 const MAX_AHEAD: u16 = WIN as u16;
139
140 fn slot_idx(seq: u16) -> usize {
142 seq as usize % WIN
143 }
144
145 fn store_ooo(&mut self, seq: u16, flags: SequenceFlag, payload: &[u8]) {
147 let idx = Self::slot_idx(seq);
148 if self.occupied.is_set(idx) {
149 return;
150 }
151 self.slots.write(idx, payload);
152 self.flags[idx] = flags;
153 self.occupied.set(idx);
154 }
155
156 fn deliver_buffered(&mut self) -> Result<(), ReceiverError> {
158 let mut temp = [0u8; MTU];
159 loop {
160 let seq = self.base.expected_seq_raw();
161 let idx = Self::slot_idx(seq);
162 if !self.occupied.is_set(idx) {
163 break;
164 }
165
166 let flags = self.flags[idx];
167 self.occupied.clear(idx);
168
169 let len = self.slots.read(idx, &mut temp);
170 self.deliver_packet(flags, &temp[..len])?;
171 self.base.advance();
172 }
173 Ok(())
174 }
175
176 fn deliver_packet(&mut self, flags: SequenceFlag, payload: &[u8]) -> Result<(), ReceiverError> {
178 match flags {
179 SequenceFlag::Unsegmented => {
180 if payload.len() > REASM {
181 return Err(ReceiverError::MessageTooLarge);
182 }
183 self.reassembly[..payload.len()].copy_from_slice(payload);
184 self.complete_message_len = Some(payload.len());
185 self.reassembly_in_progress = false;
186 }
187 SequenceFlag::First => {
188 if payload.len() > REASM {
189 return Err(ReceiverError::MessageTooLarge);
190 }
191 self.reassembly[..payload.len()].copy_from_slice(payload);
192 self.reassembly_len = payload.len();
193 self.reassembly_in_progress = true;
194 self.complete_message_len = None;
195 }
196 SequenceFlag::Continuation | SequenceFlag::Last => {
197 if !self.reassembly_in_progress {
198 return Err(ReceiverError::ReassemblyError);
199 }
200 let new_len = self.reassembly_len + payload.len();
201 if new_len > REASM {
202 return Err(ReceiverError::MessageTooLarge);
203 }
204 self.reassembly[self.reassembly_len..new_len].copy_from_slice(payload);
205 self.reassembly_len = new_len;
206
207 if flags == SequenceFlag::Last {
208 self.complete_message_len = Some(self.reassembly_len);
209 self.reassembly_in_progress = false;
210 }
211 }
212 }
213 Ok(())
214 }
215}