leodos_protocols/transport/srspp/api/tokio/
receiver.rs1use crate::network::NetworkRead;
2use crate::network::NetworkWrite;
3use crate::network::isl::address::Address;
4use crate::network::spp::Apid;
5use crate::network::spp::SequenceCount;
6use crate::transport::srspp::machine::receiver::AckState;
7use crate::transport::srspp::machine::receiver::HandleResult;
8use crate::transport::srspp::machine::receiver::ReceiverBackend;
9use crate::transport::srspp::machine::receiver::ReceiverConfig;
10use crate::transport::srspp::machine::receiver::TimerAction;
11use crate::transport::srspp::packet::SrsppAckPacket;
12use crate::transport::srspp::packet::SrsppDataPacket;
13use crate::transport::srspp::packet::SrsppPacket;
14use crate::transport::srspp::packet::SrsppType;
15use tokio::time::Instant;
16
17use super::SrsppError;
18use super::sleep_until;
19use super::ticks_to_duration;
20
21pub struct SrsppReceiver<
26 L: NetworkWrite + NetworkRead<Error = <L as NetworkWrite>::Error>,
27 R: ReceiverBackend,
28 const MTU: usize,
29> {
30 link: L,
32 local_address: Address,
34 apid: Apid,
36 function_code: u8,
38 machine: R,
40 ack_state: AckState,
42 ack_timer: Option<Instant>,
44 progress_timer: Option<Instant>,
46 ticks_per_sec: u32,
48 recv_buffer: [u8; MTU],
50 ack_buffer: [u8; 32],
52}
53
54impl<
55 L: NetworkWrite + NetworkRead<Error = <L as NetworkWrite>::Error>,
56 R: ReceiverBackend,
57 const MTU: usize,
58> SrsppReceiver<L, R, MTU>
59{
60 pub fn new(
62 config: ReceiverConfig,
63 remote_address: Address,
64 link: L,
65 ticks_per_sec: u32,
66 ) -> Self {
67 let local_address = config.local_address;
68 let apid = config.apid;
69 let function_code = config.function_code;
70 let ack_state = AckState::new(&config, remote_address);
71 Self {
72 link,
73 local_address,
74 apid,
75 function_code,
76 machine: R::new(),
77 ack_state,
78 ack_timer: None,
79 progress_timer: None,
80 ticks_per_sec,
81 recv_buffer: [0u8; MTU],
82 ack_buffer: [0u8; 32],
83 }
84 }
85
86 pub async fn recv(&mut self, buf: &mut [u8]) -> Result<usize, SrsppError> {
90 loop {
91 if let Some(msg) = self.machine.take_message() {
92 let len = msg.len().min(buf.len());
93 buf[..len].copy_from_slice(&msg[..len]);
94 return Ok(len);
95 }
96
97 self.poll().await?;
98 }
99 }
100
101 pub fn try_recv(&mut self, buf: &mut [u8]) -> Option<usize> {
105 self.machine.take_message().map(|m| {
106 let len = m.len().min(buf.len());
107 buf[..len].copy_from_slice(&m[..len]);
108 len
109 })
110 }
111
112 pub async fn wait_for_message(&mut self) -> Result<DeliveryToken<'_, L, R, MTU>, SrsppError> {
119 loop {
120 if let Some(len) = self.machine.message_len() {
121 return Ok(DeliveryToken {
122 rx: self,
123 msg_len: len,
124 });
125 }
126 self.poll().await?;
127 }
128 }
129
130 pub async fn recv_with<F, Ret>(&mut self, f: F) -> Result<Ret, SrsppError>
136 where
137 F: FnOnce(&[u8]) -> Ret,
138 {
139 let token = self.wait_for_message().await?;
140 Ok(token.consume(f))
141 }
142
143 pub async fn poll(&mut self) -> Result<(), SrsppError> {
145 tokio::select! {
146 biased;
147
148 result = self.link.read(&mut self.recv_buffer) => {
149 let len = result.map_err(|e| SrsppError::Network(e.to_string()))?;
150 self.handle_incoming(&self.recv_buffer[..len].to_vec()).await?;
151 }
152
153 _ = sleep_until(self.ack_timer) => {
154 self.handle_ack_timeout().await?;
155 }
156
157 _ = sleep_until(self.progress_timer) => {
158 self.handle_progress_timeout().await?;
159 }
160 }
161
162 Ok(())
163 }
164
165 async fn handle_incoming(&mut self, packet: &[u8]) -> Result<(), SrsppError> {
167 let parsed =
168 SrsppPacket::parse(packet).map_err(|e| SrsppError::PacketError(format!("{:?}", e)))?;
169 let srspp_type = parsed
170 .srspp_type()
171 .map_err(|e| SrsppError::PacketError(format!("{:?}", e)))?;
172
173 if srspp_type == SrsppType::Data {
174 let data = SrsppDataPacket::parse(packet)
175 .map_err(|e| SrsppError::PacketError(format!("{:?}", e)))?;
176
177 let outcome = self.machine.handle_data(
178 data.primary.sequence_count(),
179 data.primary.sequence_flag(),
180 &data.payload,
181 )?;
182
183 let result = self.ack_state.on_data(
184 outcome,
185 self.machine.expected_seq(),
186 self.machine.recv_bitmap(),
187 );
188 self.apply_result(result).await?;
189 }
190
191 Ok(())
192 }
193
194 async fn handle_ack_timeout(&mut self) -> Result<(), SrsppError> {
196 self.ack_timer = None;
197 let result = self.ack_state.on_ack_timeout(
198 self.machine.expected_seq(),
199 self.machine.recv_bitmap(),
200 );
201 self.apply_result(result).await
202 }
203
204 async fn handle_progress_timeout(&mut self) -> Result<(), SrsppError> {
206 self.progress_timer = None;
207 let outcome = self.machine.skip_gap()?;
208 let result = self.ack_state.on_gap_skip(outcome);
209 self.apply_result(result).await
210 }
211
212 async fn apply_result(&mut self, result: HandleResult) -> Result<(), SrsppError> {
214 if let Some(ack) = result.ack {
215 let ack = SrsppAckPacket::builder()
216 .buffer(&mut self.ack_buffer)
217 .source_address(self.local_address)
218 .target(ack.destination)
219 .apid(self.apid)
220 .function_code(self.function_code)
221 .cumulative_ack(ack.cumulative_ack)
222 .sequence_count(SequenceCount::new())
223 .selective_bitmap(ack.selective_bitmap)
224 .build()
225 .map_err(|e| SrsppError::PacketError(format!("{:?}", e)))?;
226
227 self.link
228 .write(zerocopy::IntoBytes::as_bytes(ack))
229 .await
230 .map_err(|e| SrsppError::Network(e.to_string()))?;
231 }
232
233 if let Some(action) = result.ack_timer {
234 self.ack_timer = match action {
235 TimerAction::Start { ticks } => {
236 Some(Instant::now() + ticks_to_duration(ticks, self.ticks_per_sec))
237 }
238 TimerAction::Stop => None,
239 };
240 }
241
242 if let Some(action) = result.progress_timer {
243 self.progress_timer = match action {
244 TimerAction::Start { ticks } => {
245 Some(Instant::now() + ticks_to_duration(ticks, self.ticks_per_sec))
246 }
247 TimerAction::Stop => None,
248 };
249 }
250
251 Ok(())
252 }
253}
254
255pub struct DeliveryToken<
263 'a,
264 L: NetworkWrite + NetworkRead<Error = <L as NetworkWrite>::Error>,
265 R: ReceiverBackend,
266 const MTU: usize,
267> {
268 rx: &'a mut SrsppReceiver<L, R, MTU>,
269 msg_len: usize,
270}
271
272impl<
273 'a,
274 L: NetworkWrite + NetworkRead<Error = <L as NetworkWrite>::Error>,
275 R: ReceiverBackend,
276 const MTU: usize,
277> DeliveryToken<'a, L, R, MTU>
278{
279 pub fn len(&self) -> usize {
281 self.msg_len
282 }
283
284 pub fn consume<F, Ret>(self, f: F) -> Ret
287 where
288 F: FnOnce(&[u8]) -> Ret,
289 {
290 self.rx.machine.consume_message(f).unwrap()
291 }
292}