1use core::future::poll_fn;
2use core::task::Poll;
3
4use zerocopy::Immutable;
5use zerocopy::IntoBytes;
6
7use futures::FutureExt;
8
9use leodos_libcfs::cfe::duration::Duration;
10use leodos_libcfs::cfe::time::SysTime;
11use leodos_libcfs::runtime::time::sleep;
12
13use crate::network::NetworkRead;
14use crate::network::NetworkWrite;
15use crate::network::isl::address::Address;
16use crate::network::spp::Apid;
17use crate::network::spp::SequenceCount;
18use crate::transport::srspp::api::cfs::TransportError;
19use crate::transport::srspp::dtn::AlwaysReachable;
20use crate::transport::srspp::dtn::MessageStore;
21use crate::transport::srspp::dtn::NoStore;
22use crate::transport::srspp::dtn::Reachable;
23use crate::transport::srspp::machine::sender::SenderAction;
24use crate::transport::srspp::machine::sender::SenderActions;
25use crate::transport::srspp::machine::sender::SenderConfig;
26use crate::transport::srspp::machine::sender::SenderEvent;
27use crate::transport::srspp::machine::sender::SenderMachine;
28use crate::transport::srspp::packet::SrsppAckPacket;
29use crate::transport::srspp::packet::SrsppDataPacket;
30use crate::transport::srspp::packet::SrsppPacket;
31use crate::transport::srspp::packet::SrsppType;
32use crate::transport::srspp::rto::RtoPolicy;
33use crate::utils::cell::SyncRefCell;
34
35use super::TimerSet;
36
37pub(super) struct SenderState<E, const WIN: usize, const BUF: usize, const MTU: usize> {
39 pub(crate) machine: SenderMachine<WIN, BUF, MTU>,
41 pub(crate) actions: SenderActions,
43 pub(crate) timers: TimerSet<WIN>,
45 pub(crate) closed: bool,
47 pub(crate) error: Option<TransportError<E>>,
49}
50
51pub struct SrsppSender<
55 E,
56 S: MessageStore,
57 R: Reachable,
58 const WIN: usize,
59 const BUF: usize,
60 const MTU: usize,
61> {
62 pub(super) state: SyncRefCell<SenderState<E, WIN, BUF, MTU>>,
64 dtn: SyncRefCell<DtnContext<S, R>>,
66 origin: Address,
68}
69
70pub(super) struct DtnContext<S, R> {
71 pub(super) store: S,
72 pub(super) reachable: R,
73}
74
75pub type SimpleSender<E, const WIN: usize = 8, const BUF: usize = 4096, const MTU: usize = 512> =
77 SrsppSender<E, NoStore, AlwaysReachable, WIN, BUF, MTU>;
78
79#[bon::bon]
80impl<E: Clone, S: MessageStore, R: Reachable, const WIN: usize, const BUF: usize, const MTU: usize>
81 SrsppSender<E, S, R, WIN, BUF, MTU>
82{
83 #[builder]
85 pub fn new(
86 source_address: Address,
87 apid: Apid,
88 #[builder(default)] function_code: u8,
89 rto_ticks: u32,
90 #[builder(default = 3)] max_retransmits: u8,
91 #[builder(default = SrsppDataPacket::HEADER_SIZE)] header_overhead: usize,
92 store: S,
93 reachable: R,
94 ) -> Self {
95 let config = SenderConfig::builder()
96 .source_address(source_address)
97 .apid(apid)
98 .function_code(function_code)
99 .rto_ticks(rto_ticks)
100 .max_retransmits(max_retransmits)
101 .header_overhead(header_overhead)
102 .build();
103 Self {
104 state: SyncRefCell::new(SenderState {
105 machine: SenderMachine::new(config),
106 actions: SenderActions::new(),
107 timers: TimerSet::new(),
108 closed: false,
109 error: None,
110 }),
111 dtn: SyncRefCell::new(DtnContext { store, reachable }),
112 origin: source_address,
113 }
114 }
115
116 pub fn split<P: RtoPolicy>(
118 &self,
119 rto_policy: P,
120 ) -> (
121 SrsppTxHandle<'_, E, S, R, WIN, BUF, MTU>,
122 SrsppSenderDriver<'_, P, E, S, R, WIN, BUF, MTU>,
123 ) {
124 (
125 SrsppTxHandle {
126 sender: &self.state,
127 dtn: &self.dtn,
128 origin: self.origin,
129 },
130 SrsppSenderDriver::new(rto_policy, &self.state, &self.dtn, self.origin),
131 )
132 }
133}
134
135pub struct SrsppSenderDriver<
137 'a,
138 P: RtoPolicy,
139 E,
140 S: MessageStore,
141 R: Reachable,
142 const WIN: usize,
143 const BUF: usize,
144 const MTU: usize,
145> {
146 rto_policy: P,
147 pub(super) sender: &'a SyncRefCell<SenderState<E, WIN, BUF, MTU>>,
148 dtn: &'a SyncRefCell<DtnContext<S, R>>,
149 origin: Address,
150 tx_buffer: [u8; MTU],
151}
152
153impl<
154 'a,
155 P: RtoPolicy,
156 E,
157 S: MessageStore,
158 R: Reachable,
159 const WIN: usize,
160 const BUF: usize,
161 const MTU: usize,
162> SrsppSenderDriver<'a, P, E, S, R, WIN, BUF, MTU>
163{
164 pub(super) fn new(
165 rto_policy: P,
166 sender: &'a SyncRefCell<SenderState<E, WIN, BUF, MTU>>,
167 dtn: &'a SyncRefCell<DtnContext<S, R>>,
168 origin: Address,
169 ) -> Self {
170 Self {
171 rto_policy,
172 sender,
173 dtn,
174 origin,
175 tx_buffer: [0u8; MTU],
176 }
177 }
178}
179
180impl<
181 'a,
182 P: RtoPolicy,
183 E: Clone,
184 S: MessageStore,
185 R: Reachable,
186 const WIN: usize,
187 const BUF: usize,
188 const MTU: usize,
189> SrsppSenderDriver<'a, P, E, S, R, WIN, BUF, MTU>
190{
191 pub(super) async fn transmit(
193 &mut self,
194 link: &mut impl NetworkWrite<Error = E>,
195 ) -> Result<(), TransportError<E>> {
196 let now = SysTime::now();
197
198 let (transmits, cfg_clone) = self.sender.with(|s| {
199 let t = s
200 .actions
201 .iter()
202 .filter_map(|a| match a {
203 SenderAction::Transmit { seq, .. } => Some(*seq),
204 _ => None,
205 })
206 .collect::<heapless::Vec<_, WIN>>();
207 (t, s.machine.config().clone())
208 });
209
210 for seq in transmits {
211 let packet_len = self.sender.with(|s| {
212 if let Some(info) = s.machine.get_payload(seq) {
213 let pkt = SrsppDataPacket::builder()
214 .buffer(&mut self.tx_buffer)
215 .source_address(cfg_clone.source_address)
216 .target(info.target)
217 .apid(cfg_clone.apid)
218 .function_code(cfg_clone.function_code)
219 .sequence_count(seq)
220 .sequence_flag(info.flags)
221 .payload_len(info.payload.len())
222 .build()
223 .map_err(TransportError::Packet)?;
224 pkt.payload.copy_from_slice(info.payload);
225 Ok::<_, TransportError<E>>(Some(
226 SrsppDataPacket::HEADER_SIZE + info.payload.len(),
227 ))
228 } else {
229 Ok::<_, TransportError<E>>(None)
230 }
231 })?;
232
233 if let Some(packet_len) = packet_len {
234 link.write(&self.tx_buffer[..packet_len])
235 .await
236 .map_err(TransportError::Network)?;
237
238 let rto_dur = Duration::from_millis(self.rto_policy.rto_ticks(now.seconds()));
239
240 self.sender.with_mut(|s| {
241 s.machine.mark_transmitted(seq);
242 s.timers.start(seq, now + SysTime::from(rto_dur));
243 });
244 }
245 }
246
247 self.sender.with_mut(|s| {
248 for action in s.actions.iter() {
249 let &SenderAction::StopTimer { seq } = action else {
250 continue;
251 };
252 s.timers.stop(seq);
253 }
254 });
255
256 Ok(())
257 }
258
259 pub(super) fn process_ack(&mut self, packet: &[u8]) -> Result<(), TransportError<E>> {
261 if let Ok(SrsppType::Ack) = SrsppPacket::parse(packet).and_then(|p| p.srspp_type()) {
262 if let Ok(ack) = SrsppAckPacket::parse(packet) {
263 self.sender.with_mut(|s| {
264 s.machine.handle(
265 SenderEvent::AckReceived {
266 cumulative_ack: ack.ack_payload.cumulative_ack(),
267 selective_bitmap: ack.ack_payload.selective_ack_bitmap(),
268 },
269 &mut s.actions,
270 )?;
271
272 for action in s.actions.iter() {
273 let &SenderAction::StopTimer { seq } = action else {
274 continue;
275 };
276 s.timers.stop(seq);
277 }
278 Ok::<(), TransportError<E>>(())
279 })?;
280 }
281 }
282 Ok(())
283 }
284
285 pub(super) async fn handle_timeouts(
287 &mut self,
288 link: &mut impl NetworkWrite<Error = E>,
289 ) -> Result<(), TransportError<E>> {
290 let now = SysTime::now();
291
292 for seq in self
293 .sender
294 .with_mut(|s| s.timers.expired(now).collect::<heapless::Vec<_, WIN>>())
295 {
296 self.sender.with_mut(|s| {
297 s.machine.handle(
298 SenderEvent::RetransmitTimeout {
299 seq: SequenceCount::from(seq),
300 },
301 &mut s.actions,
302 )
303 })?;
304
305 self.transmit(link).await?;
306 }
307
308 Ok(())
309 }
310
311 pub(super) fn next_deadline(&self) -> Option<SysTime> {
313 self.sender.with(|s| s.timers.next_deadline())
314 }
315
316 pub(super) async fn drain_stored(
318 &mut self,
319 link: &mut impl NetworkWrite<Error = E>,
320 ) -> Result<(), TransportError<E>> {
321 self.dtn
322 .with_mut(|d| d.store.expire(SysTime::now().seconds()));
323
324 let pending = self.dtn.with(|d| d.store.pending_targets());
325 if pending == 0 {
326 return Ok(());
327 }
328
329 for station in 0..16u8 {
330 if pending & (1 << station) == 0 {
331 continue;
332 }
333 let target = Address::Ground { station };
334 if !self
335 .dtn
336 .with(|d| d.reachable.is_reachable(self.origin, target))
337 {
338 continue;
339 }
340
341 loop {
342 let (bytes, window) = self
343 .sender
344 .with(|s| (s.machine.available_bytes(), s.machine.available_window()));
345 if window == 0 {
346 break;
347 }
348 let Some(size) = self.dtn.with(|d| d.store.peek_size(target)) else {
349 break;
350 };
351 if size > bytes {
352 break;
353 }
354 let Some(len) = self
355 .dtn
356 .with_mut(|d| d.store.read(target, &mut self.tx_buffer))
357 else {
358 break;
359 };
360 self.sender.with_mut(|s| {
361 s.machine.handle(
362 SenderEvent::SendRequest {
363 target,
364 data: &self.tx_buffer[..len],
365 },
366 &mut s.actions,
367 )
368 })?;
369 }
370
371 self.transmit(link).await?;
372 }
373
374 Ok(())
375 }
376
377 pub async fn run(
379 &mut self,
380 link: &mut (impl NetworkWrite<Error = E> + NetworkRead<Error = E>),
381 ) -> Result<(), TransportError<E>> {
382 let mut recv_buffer = [0u8; MTU];
383 loop {
384 let pending = self.dtn.with(|d| d.store.pending_targets() != 0);
385 if self.sender.with(|s| s.closed && s.machine.is_idle()) && !pending {
386 return Ok(());
387 }
388
389 self.drain_stored(link).await?;
390
391 if let Err(e) = self.transmit(link).await {
392 self.sender.with_mut(|s| s.error = Some(e.clone()));
393 return Err(e);
394 }
395
396 let timeout = duration_until(self.next_deadline());
397
398 let event = {
399 let read_fut = link.read(&mut recv_buffer).fuse();
400 let sleep_fut = sleep(timeout).fuse();
401 pin_utils::pin_mut!(read_fut, sleep_fut);
402 futures::select_biased! {
403 r = read_fut => Some(r),
404 _ = sleep_fut => None,
405 }
406 };
407
408 match event {
409 Some(Ok(len)) => {
410 if let Err(e) = self.process_ack(&recv_buffer[..len]) {
411 self.sender.with_mut(|s| s.error = Some(e.clone()));
412 return Err(e);
413 }
414 }
415 Some(Err(e)) => {
416 let err = TransportError::Network(e);
417 self.sender.with_mut(|s| s.error = Some(err.clone()));
418 return Err(err);
419 }
420 None => {
421 if let Err(e) = self.handle_timeouts(link).await {
422 self.sender.with_mut(|s| s.error = Some(e.clone()));
423 return Err(e);
424 }
425 }
426 }
427 }
428 }
429}
430
431pub(super) fn duration_until(deadline: Option<SysTime>) -> Duration {
433 let now = SysTime::now();
434 deadline
435 .map(|d| {
436 if d > now {
437 Duration::from(d - now)
438 } else {
439 Duration::zero()
440 }
441 })
442 .unwrap_or(Duration::from_secs(60))
443}
444
445pub struct SrsppTxHandle<
447 'a,
448 E,
449 S: MessageStore,
450 R: Reachable,
451 const WIN: usize,
452 const BUF: usize,
453 const MTU: usize,
454> {
455 pub(super) sender: &'a SyncRefCell<SenderState<E, WIN, BUF, MTU>>,
456 pub(super) dtn: &'a SyncRefCell<DtnContext<S, R>>,
457 pub(super) origin: Address,
458}
459
460impl<'a, E, S: MessageStore, R: Reachable, const WIN: usize, const BUF: usize, const MTU: usize>
461 Clone for SrsppTxHandle<'a, E, S, R, WIN, BUF, MTU>
462{
463 fn clone(&self) -> Self {
464 *self
465 }
466}
467
468impl<'a, E, S: MessageStore, R: Reachable, const WIN: usize, const BUF: usize, const MTU: usize>
469 Copy for SrsppTxHandle<'a, E, S, R, WIN, BUF, MTU>
470{
471}
472
473impl<
474 'a,
475 E: Clone,
476 S: MessageStore,
477 R: Reachable,
478 const WIN: usize,
479 const BUF: usize,
480 const MTU: usize,
481> SrsppTxHandle<'a, E, S, R, WIN, BUF, MTU>
482{
483 pub async fn send(
489 &mut self,
490 target: impl Into<Address>,
491 data: &(impl IntoBytes + Immutable + ?Sized),
492 ) -> Result<(), TransportError<E>> {
493 let target = target.into();
494 let data = data.as_bytes();
495
496 if !self
497 .dtn
498 .with(|d| d.reachable.is_reachable(self.origin, target))
499 {
500 self.dtn
501 .with_mut(|d| d.store.write(target, data, 0, SysTime::now().seconds()));
502 return Ok(());
503 }
504
505 poll_fn(|_cx| {
507 self.sender.with(|s| {
508 if let Some(ref e) = s.error {
509 return Poll::Ready(Err(e.clone()));
510 }
511 if s.machine.available_bytes() >= data.len() && s.machine.available_window() > 0 {
512 Poll::Ready(Ok(()))
513 } else {
514 Poll::Pending
515 }
516 })
517 })
518 .await?;
519
520 self.sender.with_mut(|s| {
521 s.machine
522 .handle(SenderEvent::SendRequest { target, data }, &mut s.actions)
523 })?;
524 Ok(())
525 }
526
527 pub fn close(&mut self) {
531 self.sender.with_mut(|s| s.closed = true);
532 }
533
534 pub fn available_bytes(&self) -> usize {
536 self.sender.with(|s| s.machine.available_bytes())
537 }
538
539 pub fn available_window(&self) -> usize {
541 self.sender.with(|s| s.machine.available_window())
542 }
543
544 pub fn is_idle(&self) -> bool {
546 self.sender.with(|s| s.machine.is_idle())
547 }
548}