1use crate::network::isl::address::{Address, RawAddress};
7use crate::network::spp::{Apid, SequenceCount, SequenceFlag};
8use heapless::Vec;
9
10const MAX_ACTIONS: usize = 32;
12
13#[derive(Debug)]
15pub enum SenderEvent<'a> {
16 SendRequest {
18 target: Address,
20 data: &'a [u8],
22 },
23
24 AckReceived {
26 cumulative_ack: SequenceCount,
28 selective_bitmap: u16,
30 },
31
32 RetransmitTimeout {
34 seq: SequenceCount,
36 },
37}
38
39#[derive(Debug, Copy, Clone, Eq, PartialEq)]
41pub enum SenderAction {
42 Transmit {
45 seq: SequenceCount,
47 rto_ticks: u32,
49 },
50
51 StopTimer {
53 seq: SequenceCount,
55 },
56
57 PacketLost {
59 seq: SequenceCount,
61 },
62
63 MessageLost,
65
66 SpaceAvailable {
68 bytes: usize,
70 },
71}
72
73#[derive(Debug)]
75pub struct SenderActions {
76 inner: Vec<SenderAction, MAX_ACTIONS>,
77}
78
79impl SenderActions {
80 pub fn new() -> Self {
82 Self { inner: Vec::new() }
83 }
84
85 pub fn push(&mut self, action: SenderAction) {
87 let _ = self.inner.push(action);
88 }
89
90 pub fn clear(&mut self) {
92 self.inner.clear();
93 }
94
95 pub fn iter(&self) -> impl Iterator<Item = &SenderAction> {
97 self.inner.iter()
98 }
99
100 pub fn is_empty(&self) -> bool {
102 self.inner.is_empty()
103 }
104
105 pub fn len(&self) -> usize {
107 self.inner.len()
108 }
109}
110
111impl Default for SenderActions {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117impl<'a> IntoIterator for &'a SenderActions {
118 type Item = &'a SenderAction;
119 type IntoIter = core::slice::Iter<'a, SenderAction>;
120
121 fn into_iter(self) -> Self::IntoIter {
122 self.inner.iter()
123 }
124}
125
126#[derive(Debug, Copy, Clone, Eq, PartialEq, thiserror::Error)]
128pub enum SenderError {
129 #[error("send buffer full")]
131 BufferFull,
132 #[error("send window full")]
134 WindowFull,
135}
136
137pub struct PayloadInfo<'a> {
139 pub seq: SequenceCount,
141 pub flags: SequenceFlag,
143 pub target: Address,
145 pub payload: &'a [u8],
147}
148
149#[derive(Debug, Clone, bon::Builder)]
151pub struct SenderConfig {
152 pub source_address: Address,
154 pub apid: Apid,
156 pub function_code: u8,
158 pub rto_ticks: u32,
160 pub max_retransmits: u8,
162 pub header_overhead: usize,
164}
165
166#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
168enum SlotState {
169 #[default]
171 Empty,
172 PendingTransmit,
174 AwaitingAck,
176}
177
178#[derive(Clone, Copy)]
180struct PacketMeta {
181 state: SlotState,
183 seq: u16,
185 flags: SequenceFlag,
187 target: RawAddress,
189 retransmit_count: u8,
191 offset: usize,
193 len: usize,
195 is_segmented: bool,
197}
198
199impl Default for PacketMeta {
200 fn default() -> Self {
201 Self {
202 state: SlotState::Empty,
203 seq: 0,
204 flags: SequenceFlag::Unsegmented,
205 target: RawAddress::from(Address::ground(0)),
206 retransmit_count: 0,
207 offset: 0,
208 len: 0,
209 is_segmented: false,
210 }
211 }
212}
213
214pub struct SenderMachine<const WIN: usize, const BUF: usize, const MTU: usize> {
225 pub(crate) config: SenderConfig,
227 meta: [PacketMeta; WIN],
229 data: [u8; BUF],
231 write_pos: usize,
233 next_seq: u16,
235 send_base: u16,
237}
238
239impl<const WIN: usize, const BUF: usize, const MTU: usize> SenderMachine<WIN, BUF, MTU> {
240 pub fn new(config: SenderConfig) -> Self {
242 Self {
243 config,
244 meta: [PacketMeta::default(); WIN],
245 data: [0u8; BUF],
246 write_pos: 0,
247 next_seq: 0,
248 send_base: 0,
249 }
250 }
251
252 pub fn config(&self) -> &SenderConfig {
254 &self.config
255 }
256
257 pub fn max_payload_per_packet(&self) -> usize {
259 MTU.saturating_sub(self.config.header_overhead)
260 }
261
262 pub fn available_bytes(&self) -> usize {
264 BUF - self.write_pos
265 }
266
267 pub fn available_window(&self) -> usize {
269 WIN.saturating_sub(self.unacked_count())
270 }
271
272 pub fn unacked_count(&self) -> usize {
274 self.meta
275 .iter()
276 .filter(|m| m.state != SlotState::Empty)
277 .count()
278 }
279
280 pub fn is_idle(&self) -> bool {
282 self.meta.iter().all(|m| m.state == SlotState::Empty)
283 }
284
285 pub fn get_payload(&self, seq: SequenceCount) -> Option<PayloadInfo<'_>> {
287 let seq_val = seq.value();
288 self.meta
289 .iter()
290 .find(|m| m.state != SlotState::Empty && m.seq == seq_val)
291 .map(|m| PayloadInfo {
292 seq: SequenceCount::from(m.seq),
293 flags: m.flags,
294 target: m.target.parse(),
295 payload: &self.data[m.offset..m.offset + m.len],
296 })
297 }
298
299 pub fn handle(
301 &mut self,
302 event: SenderEvent<'_>,
303 actions: &mut SenderActions,
304 ) -> Result<(), SenderError> {
305 actions.clear();
306
307 match event {
308 SenderEvent::SendRequest { target, data } => {
309 self.handle_send_request(target, data, actions)?;
310 }
311 SenderEvent::AckReceived {
312 cumulative_ack,
313 selective_bitmap,
314 } => {
315 self.handle_ack(cumulative_ack, selective_bitmap, actions);
316 }
317 SenderEvent::RetransmitTimeout { seq } => {
318 self.handle_timeout(seq, actions);
319 }
320 }
321 Ok(())
322 }
323
324 fn handle_send_request(
326 &mut self,
327 target: Address,
328 data: &[u8],
329 actions: &mut SenderActions,
330 ) -> Result<(), SenderError> {
331 let max_payload = self.max_payload_per_packet();
332
333 if data.len() <= max_payload {
334 self.queue_packet(target, data, SequenceFlag::Unsegmented, false, actions)?;
335 } else {
336 let mut offset = 0;
337 let mut is_first = true;
338
339 while offset < data.len() {
340 let remaining = data.len() - offset;
341 let chunk_size = remaining.min(max_payload);
342 let is_last = offset + chunk_size >= data.len();
343
344 let flags = if is_first {
345 is_first = false;
346 SequenceFlag::First
347 } else if is_last {
348 SequenceFlag::Last
349 } else {
350 SequenceFlag::Continuation
351 };
352
353 self.queue_packet(
354 target,
355 &data[offset..offset + chunk_size],
356 flags,
357 true,
358 actions,
359 )?;
360 offset += chunk_size;
361 }
362 }
363
364 Ok(())
365 }
366
367 fn queue_packet(
369 &mut self,
370 target: Address,
371 payload: &[u8],
372 flags: SequenceFlag,
373 is_segmented: bool,
374 actions: &mut SenderActions,
375 ) -> Result<(), SenderError> {
376 let slot_idx = self
377 .meta
378 .iter()
379 .position(|m| m.state == SlotState::Empty)
380 .ok_or(SenderError::WindowFull)?;
381
382 if payload.len() > self.available_bytes() {
383 self.compact();
384 if payload.len() > self.available_bytes() {
385 return Err(SenderError::BufferFull);
386 }
387 }
388
389 let offset = self.write_pos;
390 self.data[offset..offset + payload.len()].copy_from_slice(payload);
391 self.write_pos += payload.len();
392
393 let seq = SequenceCount::from(self.next_seq);
394
395 self.meta[slot_idx] = PacketMeta {
396 state: SlotState::PendingTransmit,
397 seq: self.next_seq,
398 flags,
399 target: RawAddress::from(target),
400 retransmit_count: 0,
401 offset,
402 len: payload.len(),
403 is_segmented,
404 };
405
406 self.next_seq = (self.next_seq + 1) & SequenceCount::MAX;
407
408 actions.push(SenderAction::Transmit {
409 seq,
410 rto_ticks: self.config.rto_ticks,
411 });
412
413 Ok(())
414 }
415
416 fn handle_ack(
418 &mut self,
419 cumulative_ack: SequenceCount,
420 selective_bitmap: u16,
421 actions: &mut SenderActions,
422 ) {
423 let ack_val = cumulative_ack.value();
424 let mut freed_bytes = 0usize;
425
426 for meta in &mut self.meta {
428 if meta.state != SlotState::Empty {
429 let diff = ack_val.wrapping_sub(meta.seq) & SequenceCount::MAX;
430 if diff < (SequenceCount::MAX / 2) {
431 freed_bytes += meta.len;
432 let seq = SequenceCount::from(meta.seq);
433 actions.push(SenderAction::StopTimer { seq });
434 meta.state = SlotState::Empty;
435 }
436 }
437 }
438
439 for bit_pos in 0..16u16 {
441 if selective_bitmap & (1 << bit_pos) != 0 {
442 let acked_seq = (ack_val + 1 + bit_pos) & SequenceCount::MAX;
443 for meta in &mut self.meta {
444 if meta.state != SlotState::Empty && meta.seq == acked_seq {
445 freed_bytes += meta.len;
446 let seq = SequenceCount::from(meta.seq);
447 actions.push(SenderAction::StopTimer { seq });
448 meta.state = SlotState::Empty;
449 break;
450 }
451 }
452 }
453 }
454
455 self.update_send_base();
456
457 if freed_bytes > 0 {
458 actions.push(SenderAction::SpaceAvailable {
459 bytes: self.available_bytes(),
460 });
461 }
462 }
463
464 fn handle_timeout(&mut self, seq: SequenceCount, actions: &mut SenderActions) {
466 let seq_val = seq.value();
467 let mut lost_segmented = false;
468
469 for i in 0..self.meta.len() {
470 if self.meta[i].state == SlotState::AwaitingAck && self.meta[i].seq == seq_val {
471 if self.meta[i].retransmit_count >= self.config.max_retransmits {
472 actions.push(SenderAction::PacketLost { seq });
473 lost_segmented = self.meta[i].is_segmented;
474 self.meta[i].state = SlotState::Empty;
475 } else {
476 self.meta[i].state = SlotState::PendingTransmit;
477 self.meta[i].retransmit_count += 1;
478 actions.push(SenderAction::Transmit {
479 seq,
480 rto_ticks: self.config.rto_ticks,
481 });
482 }
483 break;
484 }
485 }
486
487 if lost_segmented {
488 actions.push(SenderAction::MessageLost);
489 }
490
491 self.update_send_base();
492 }
493
494 pub fn mark_transmitted(&mut self, seq: SequenceCount) {
498 for meta in &mut self.meta {
499 if meta.state == SlotState::PendingTransmit && meta.seq == seq.value() {
500 meta.state = SlotState::AwaitingAck;
501 break;
502 }
503 }
504 }
505
506 fn update_send_base(&mut self) {
508 let mut min_unacked = self.next_seq;
509 let mut found = false;
510
511 for meta in &self.meta {
512 if meta.state != SlotState::Empty {
513 found = true;
514 let diff = min_unacked.wrapping_sub(meta.seq) & SequenceCount::MAX;
515 if diff < (SequenceCount::MAX / 2) && diff > 0 {
516 min_unacked = meta.seq;
517 }
518 }
519 }
520
521 self.send_base = if found { min_unacked } else { self.next_seq };
522 }
523
524 fn compact(&mut self) {
526 let mut indices: [Option<usize>; WIN] = [None; WIN];
527 let mut count = 0;
528
529 for (i, m) in self.meta.iter().enumerate() {
530 if m.state != SlotState::Empty {
531 let offset = m.offset;
532 let mut insert_pos = count;
533 for j in 0..count {
534 if let Some(idx) = indices[j] {
535 if self.meta[idx].offset > offset {
536 insert_pos = j;
537 break;
538 }
539 }
540 }
541 for j in (insert_pos..count).rev() {
542 indices[j + 1] = indices[j];
543 }
544 indices[insert_pos] = Some(i);
545 count += 1;
546 }
547 }
548
549 if count == 0 {
550 self.write_pos = 0;
551 return;
552 }
553
554 let mut new_pos = 0usize;
555 for idx_opt in indices.iter().take(count) {
556 if let Some(idx) = *idx_opt {
557 let old_offset = self.meta[idx].offset;
558 let len = self.meta[idx].len;
559
560 if new_pos != old_offset {
561 self.data.copy_within(old_offset..old_offset + len, new_pos);
562 }
563
564 self.meta[idx].offset = new_pos;
565 new_pos += len;
566 }
567 }
568
569 self.write_pos = new_pos;
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576 use crate::transport::srspp::packet::SrsppDataPacket;
577
578 fn make_config() -> SenderConfig {
579 SenderConfig {
580 source_address: Address::satellite(1, 5),
581 apid: Apid::new(0x42).unwrap(),
582 function_code: 0,
583 rto_ticks: 100,
584 max_retransmits: 3,
585 header_overhead: SrsppDataPacket::HEADER_SIZE,
586 }
587 }
588
589 fn target() -> Address {
590 Address::satellite(2, 3)
591 }
592
593 #[test]
594 fn test_send_emits_transmit_action() {
595 let mut sender: SenderMachine<8, 4096, 512> = SenderMachine::new(make_config());
596 let mut actions = SenderActions::new();
597
598 sender
599 .handle(
600 SenderEvent::SendRequest {
601 target: target(),
602 data: &[1, 2, 3],
603 },
604 &mut actions,
605 )
606 .unwrap();
607
608 assert_eq!(actions.len(), 1);
610 let action = actions.iter().next().unwrap();
611 assert!(matches!(
612 action,
613 SenderAction::Transmit { seq, rto_ticks: 100 } if seq.value() == 0
614 ));
615
616 let info = sender.get_payload(SequenceCount::from(0)).unwrap();
617 assert!(!info.payload.is_empty());
618 }
619
620 #[test]
621 fn test_mark_transmitted() {
622 let mut sender: SenderMachine<8, 4096, 512> = SenderMachine::new(make_config());
623 let mut actions = SenderActions::new();
624
625 sender
626 .handle(
627 SenderEvent::SendRequest {
628 target: target(),
629 data: &[1, 2, 3],
630 },
631 &mut actions,
632 )
633 .unwrap();
634
635 assert_eq!(sender.unacked_count(), 1);
637
638 sender.mark_transmitted(SequenceCount::from(0));
640
641 assert_eq!(sender.unacked_count(), 1);
643 }
644
645 #[test]
646 fn test_ack_stops_timer() {
647 let mut sender: SenderMachine<8, 4096, 512> = SenderMachine::new(make_config());
648 let mut actions = SenderActions::new();
649
650 sender
651 .handle(
652 SenderEvent::SendRequest {
653 target: target(),
654 data: &[1, 2, 3],
655 },
656 &mut actions,
657 )
658 .unwrap();
659 sender.mark_transmitted(SequenceCount::from(0));
660
661 sender
662 .handle(
663 SenderEvent::AckReceived {
664 cumulative_ack: SequenceCount::from(0),
665 selective_bitmap: 0,
666 },
667 &mut actions,
668 )
669 .unwrap();
670
671 assert!(
672 actions
673 .iter()
674 .any(|a| matches!(a, SenderAction::StopTimer { seq } if seq.value() == 0))
675 );
676 assert!(
677 actions
678 .iter()
679 .any(|a| matches!(a, SenderAction::SpaceAvailable { .. }))
680 );
681 assert!(sender.is_idle());
682 }
683
684 #[test]
685 fn test_timeout_emits_retransmit() {
686 let mut sender: SenderMachine<8, 4096, 512> = SenderMachine::new(make_config());
687 let mut actions = SenderActions::new();
688
689 sender
690 .handle(
691 SenderEvent::SendRequest {
692 target: target(),
693 data: &[1, 2, 3],
694 },
695 &mut actions,
696 )
697 .unwrap();
698 sender.mark_transmitted(SequenceCount::from(0));
699
700 sender
702 .handle(
703 SenderEvent::RetransmitTimeout {
704 seq: SequenceCount::from(0),
705 },
706 &mut actions,
707 )
708 .unwrap();
709
710 assert!(
712 actions
713 .iter()
714 .any(|a| matches!(a, SenderAction::Transmit { seq, .. } if seq.value() == 0))
715 );
716 }
717
718 #[test]
719 fn test_max_retransmits_emits_packet_lost() {
720 let mut sender: SenderMachine<8, 4096, 512> = SenderMachine::new(make_config());
721 let mut actions = SenderActions::new();
722
723 sender
724 .handle(
725 SenderEvent::SendRequest {
726 target: target(),
727 data: &[1, 2, 3],
728 },
729 &mut actions,
730 )
731 .unwrap();
732 sender.mark_transmitted(SequenceCount::from(0));
733
734 for _ in 0..3 {
736 sender
737 .handle(
738 SenderEvent::RetransmitTimeout {
739 seq: SequenceCount::from(0),
740 },
741 &mut actions,
742 )
743 .unwrap();
744 sender.mark_transmitted(SequenceCount::from(0));
745 }
746
747 sender
749 .handle(
750 SenderEvent::RetransmitTimeout {
751 seq: SequenceCount::from(0),
752 },
753 &mut actions,
754 )
755 .unwrap();
756
757 assert!(
758 actions
759 .iter()
760 .any(|a| matches!(a, SenderAction::PacketLost { seq } if seq.value() == 0))
761 );
762 assert!(sender.is_idle());
763 }
764
765 #[test]
766 fn test_segmentation() {
767 let mut sender: SenderMachine<8, 4096, 64> = SenderMachine::new(make_config());
768 let mut actions = SenderActions::new();
769
770 let data = [0u8; 150];
771 sender
772 .handle(
773 SenderEvent::SendRequest {
774 target: target(),
775 data: &data,
776 },
777 &mut actions,
778 )
779 .unwrap();
780
781 let max_payload = 64 - SrsppDataPacket::HEADER_SIZE;
782 let expected = (150 + max_payload - 1) / max_payload;
783 let transmit_count = actions
784 .iter()
785 .filter(|a| matches!(a, SenderAction::Transmit { .. }))
786 .count();
787 assert_eq!(transmit_count, expected);
788 }
789
790 #[test]
791 fn test_selective_ack() {
792 let mut sender: SenderMachine<8, 4096, 512> = SenderMachine::new(make_config());
793 let mut actions = SenderActions::new();
794
795 for i in 0..3 {
797 sender
798 .handle(
799 SenderEvent::SendRequest {
800 target: target(),
801 data: &[i as u8; 10],
802 },
803 &mut actions,
804 )
805 .unwrap();
806 sender.mark_transmitted(SequenceCount::from(i));
807 }
808
809 sender
811 .handle(
812 SenderEvent::AckReceived {
813 cumulative_ack: SequenceCount::from(0),
814 selective_bitmap: 0b0010, },
816 &mut actions,
817 )
818 .unwrap();
819
820 let stopped: heapless::Vec<u16, 8> = actions
822 .iter()
823 .filter_map(|a| {
824 if let SenderAction::StopTimer { seq } = a {
825 Some(seq.value())
826 } else {
827 None
828 }
829 })
830 .collect();
831
832 assert!(stopped.contains(&0));
833 assert!(stopped.contains(&2));
834 assert!(!stopped.contains(&1));
835
836 assert!(!sender.is_idle());
838 assert!(sender.get_payload(SequenceCount::from(1)).is_some());
839 }
840
841 #[test]
842 fn test_message_lost_on_segmented_packet_loss() {
843 let mut sender: SenderMachine<8, 4096, 64> = SenderMachine::new(make_config());
844 let mut actions = SenderActions::new();
845
846 let data = [0u8; 150];
847 sender
848 .handle(
849 SenderEvent::SendRequest {
850 target: target(),
851 data: &data,
852 },
853 &mut actions,
854 )
855 .unwrap();
856
857 let transmit_count = actions
858 .iter()
859 .filter(|a| matches!(a, SenderAction::Transmit { .. }))
860 .count();
861 assert!(transmit_count >= 3);
862
863 for i in 0..transmit_count as u16 {
864 sender.mark_transmitted(SequenceCount::from(i));
865 }
866
867 for _ in 0..3 {
868 sender
869 .handle(
870 SenderEvent::RetransmitTimeout {
871 seq: SequenceCount::from(1),
872 },
873 &mut actions,
874 )
875 .unwrap();
876 sender.mark_transmitted(SequenceCount::from(1));
877 }
878
879 sender
880 .handle(
881 SenderEvent::RetransmitTimeout {
882 seq: SequenceCount::from(1),
883 },
884 &mut actions,
885 )
886 .unwrap();
887
888 assert!(
889 actions
890 .iter()
891 .any(|a| matches!(a, SenderAction::PacketLost { seq } if seq.value() == 1))
892 );
893 assert!(
894 actions
895 .iter()
896 .any(|a| matches!(a, SenderAction::MessageLost))
897 );
898 }
899}