Skip to main content

leodos_protocols/transport/srspp/api/cfs/
mod.rs

1/// Concurrent per-connection receive.
2mod listener;
3/// Combined sender/receiver SRSPP node.
4mod node;
5/// Multi-stream SRSPP receiver.
6mod receiver;
7/// SRSPP sender with retransmission support.
8mod sender;
9
10pub use listener::SrsppStream;
11pub use node::{SrsppNode, SrsppNodeDriver};
12pub use receiver::{DeliveryToken, SrsppReceiver, SrsppReceiverDriver, SrsppRxHandle};
13pub use sender::{
14    SimpleSender, SrsppSender, SrsppSenderDriver, SrsppTxHandle,
15};
16
17use leodos_libcfs::cfe::time::SysTime;
18
19use crate::network::spp::SequenceCount;
20use crate::transport::srspp::machine::receiver::ReceiverError;
21use crate::transport::srspp::machine::sender::SenderError;
22use crate::transport::srspp::packet;
23
24/// Errors from the SRSPP CFS transport layer.
25#[derive(Debug, Clone, thiserror::Error)]
26pub enum TransportError<E> {
27    /// The sender state machine reported an error.
28    #[error(transparent)]
29    Sender(#[from] SenderError),
30    /// The receiver state machine reported an error.
31    #[error(transparent)]
32    Receiver(#[from] ReceiverError),
33    /// The underlying network failed.
34    #[error("network error: {0}")]
35    Network(E),
36    /// A packet could not be built or parsed.
37    #[error(transparent)]
38    Packet(#[from] packet::SrsppPacketError),
39}
40
41/// Fixed-capacity set of retransmission timers keyed by sequence number.
42struct TimerSet<const N: usize> {
43    /// Array of (sequence number, optional deadline) slots.
44    timers: [(SequenceCount, Option<SysTime>); N],
45}
46
47impl<const N: usize> TimerSet<N> {
48    /// Creates an empty timer set.
49    fn new() -> Self {
50        Self {
51            timers: [(SequenceCount::new(), None); N],
52        }
53    }
54
55    /// Starts a timer for the given sequence number with the specified deadline.
56    fn start(&mut self, seq: SequenceCount, deadline: SysTime) {
57        for slot in &mut self.timers {
58            if slot.1.is_none() {
59                *slot = (seq, Some(deadline));
60                return;
61            }
62        }
63    }
64
65    /// Cancels the timer for the given sequence number.
66    fn stop(&mut self, seq: SequenceCount) {
67        for slot in &mut self.timers {
68            if slot.0 == seq && slot.1.is_some() {
69                slot.1 = None;
70            }
71        }
72    }
73
74    /// Returns an iterator of sequence numbers whose timers have expired.
75    fn expired(&mut self, now: SysTime) -> impl Iterator<Item = SequenceCount> + '_ {
76        self.timers.iter_mut().filter_map(move |slot| {
77            if let Some(deadline) = slot.1 {
78                if now >= deadline {
79                    slot.1 = None;
80                    return Some(slot.0);
81                }
82            }
83            None
84        })
85    }
86
87    /// Returns the earliest active deadline, if any.
88    fn next_deadline(&self) -> Option<SysTime> {
89        self.timers
90            .iter()
91            .filter_map(|(_, deadline)| *deadline)
92            .min()
93    }
94}