Skip to main content

leodos_protocols/transport/srspp/api/cfs/
node.rs

1use futures::FutureExt;
2
3use leodos_libcfs::cfe::duration::Duration;
4use leodos_libcfs::runtime::time::sleep;
5
6use crate::network::NetworkRead;
7use crate::network::NetworkWrite;
8use crate::network::isl::address::Address;
9use crate::transport::srspp::api::cfs::TransportError;
10use crate::transport::srspp::dtn::AlwaysReachable;
11use crate::transport::srspp::dtn::MessageStore;
12use crate::transport::srspp::dtn::NoStore;
13use crate::transport::srspp::dtn::Reachable;
14use crate::transport::srspp::machine::receiver::ReceiverBackend;
15use crate::transport::srspp::machine::receiver::ReceiverConfig;
16use crate::transport::srspp::machine::receiver::ReceiverMachine;
17use crate::transport::srspp::machine::sender::SenderActions;
18use crate::transport::srspp::machine::sender::SenderConfig;
19use crate::transport::srspp::machine::sender::SenderMachine;
20use crate::transport::srspp::packet::SrsppPacket;
21use crate::transport::srspp::packet::SrsppType;
22use crate::transport::srspp::rto::RtoPolicy;
23use crate::utils::cell::SyncRefCell;
24
25use super::TimerSet;
26use super::receiver::MultiReceiverState;
27use super::receiver::SrsppReceiverDriver;
28use super::receiver::SrsppRxHandle;
29use super::sender::DtnContext;
30use super::sender::SenderState;
31use super::sender::SrsppSenderDriver;
32use super::sender::SrsppTxHandle;
33use super::sender::duration_until;
34
35/// Combined SRSPP sender and receiver over a single link.
36pub struct SrsppNode<
37    E,
38    S: MessageStore = NoStore,
39    Re: Reachable = AlwaysReachable,
40    R: ReceiverBackend = ReceiverMachine<8, 4096, 8192>,
41    const WIN: usize = 8,
42    const BUF: usize = 4096,
43    const MTU: usize = 512,
44    const MAX_STREAMS: usize = 1,
45> {
46    pub(super) sender: SyncRefCell<SenderState<E, WIN, BUF, MTU>>,
47    pub(super) receiver: SyncRefCell<MultiReceiverState<E, R, MAX_STREAMS>>,
48    dtn: SyncRefCell<DtnContext<S, Re>>,
49    origin: Address,
50}
51
52impl<
53    E: Clone,
54    S: MessageStore,
55    Re: Reachable,
56    R: ReceiverBackend,
57    const WIN: usize,
58    const BUF: usize,
59    const MTU: usize,
60    const MAX_STREAMS: usize,
61> SrsppNode<E, S, Re, R, WIN, BUF, MTU, MAX_STREAMS>
62{
63    /// Creates a new node with sender and receiver configurations.
64    pub fn new(sender_config: SenderConfig, receiver_config: ReceiverConfig, store: S, reachable: Re) -> Self {
65        let origin = sender_config.source_address;
66        let ack_delay = Duration::from_millis(receiver_config.ack_delay_ticks);
67        Self {
68            sender: SyncRefCell::new(SenderState {
69                machine: SenderMachine::new(sender_config),
70                actions: SenderActions::new(),
71                timers: TimerSet::new(),
72                closed: false,
73                error: None,
74            }),
75            receiver: SyncRefCell::new(MultiReceiverState {
76                config: receiver_config,
77                streams: heapless::LinearMap::new(),
78                ack_delay,
79                closed: false,
80                error: None,
81            }),
82            dtn: SyncRefCell::new(DtnContext { store, reachable }),
83            origin,
84        }
85    }
86
87    /// Splits into separate tx/rx handles and a driver for I/O.
88    pub fn split<L: NetworkWrite<Error = E> + NetworkRead<Error = E>, P: RtoPolicy>(
89        &self,
90        link: L,
91        rto_policy: P,
92    ) -> (
93        SrsppRxHandle<'_, E, R, MAX_STREAMS>,
94        SrsppTxHandle<'_, E, S, Re, WIN, BUF, MTU>,
95        SrsppNodeDriver<'_, L, P, E, S, Re, R, WIN, BUF, MTU, MAX_STREAMS>,
96    ) {
97        (
98            SrsppRxHandle {
99                receiver: &self.receiver,
100            },
101            SrsppTxHandle {
102                sender: &self.sender,
103                dtn: &self.dtn,
104                origin: self.origin,
105            },
106            SrsppNodeDriver {
107                link,
108                sender: SrsppSenderDriver::new(
109                    rto_policy,
110                    &self.sender,
111                    &self.dtn,
112                    self.origin,
113                ),
114                receiver: SrsppReceiverDriver::new(&self.receiver),
115                recv_buffer: [0u8; MTU],
116            },
117        )
118    }
119}
120
121/// I/O driver for a combined SRSPP sender/receiver node.
122pub struct SrsppNodeDriver<
123    'a,
124    L,
125    P: RtoPolicy,
126    E,
127    S: MessageStore,
128    Re: Reachable,
129    R: ReceiverBackend,
130    const WIN: usize,
131    const BUF: usize,
132    const MTU: usize,
133    const MAX_STREAMS: usize,
134> {
135    link: L,
136    sender: SrsppSenderDriver<'a, P, E, S, Re, WIN, BUF, MTU>,
137    receiver: SrsppReceiverDriver<'a, E, R, MAX_STREAMS>,
138    recv_buffer: [u8; MTU],
139}
140
141impl<
142    'a,
143    L: NetworkWrite<Error = E> + NetworkRead<Error = E>,
144    P: RtoPolicy,
145    E: Clone,
146    S: MessageStore,
147    Re: Reachable,
148    R: ReceiverBackend,
149    const WIN: usize,
150    const BUF: usize,
151    const MTU: usize,
152    const MAX_STREAMS: usize,
153> SrsppNodeDriver<'a, L, P, E, S, Re, R, WIN, BUF, MTU, MAX_STREAMS>
154{
155    /// Runs the combined send/receive I/O loop.
156    pub async fn run(&mut self) -> Result<(), TransportError<E>> {
157        loop {
158            self.sender.drain_stored(&mut self.link).await?;
159
160            if let Err(e) = self.sender.transmit(&mut self.link).await {
161                self.set_both_errors(e.clone());
162                return Err(e);
163            }
164
165            let timeout = self.next_timeout();
166
167            let event = {
168                let read_fut = self.link.read(&mut self.recv_buffer).fuse();
169                let sleep_fut = sleep(timeout).fuse();
170                pin_utils::pin_mut!(read_fut, sleep_fut);
171                futures::select_biased! {
172                    r = read_fut => Some(r),
173                    _ = sleep_fut => None,
174                }
175            };
176
177            match event {
178                Some(Ok(len)) => {
179                    if let Err(e) = self.handle_incoming(len).await {
180                        self.set_both_errors(e.clone());
181                        return Err(e);
182                    }
183                }
184                Some(Err(e)) => {
185                    let err = TransportError::Network(e);
186                    self.set_both_errors(err.clone());
187                    return Err(err);
188                }
189                None => {
190                    if let Err(e) = self.sender.handle_timeouts(&mut self.link).await {
191                        self.set_both_errors(e.clone());
192                        return Err(e);
193                    }
194                    if let Err(e) = self.receiver.handle_timeouts(&mut self.link).await {
195                        self.set_both_errors(e.clone());
196                        return Err(e);
197                    }
198                }
199            }
200        }
201    }
202
203    async fn handle_incoming(&mut self, len: usize) -> Result<(), TransportError<E>> {
204        let packet = &self.recv_buffer[..len];
205        let Ok(parsed) = SrsppPacket::parse(packet) else {
206            return Ok(());
207        };
208        match parsed.srspp_type() {
209            Ok(SrsppType::Data) => self.receiver.process_data(packet, &mut self.link).await,
210            Ok(SrsppType::Eos) => self.receiver.process_data(packet, &mut self.link).await,
211            Ok(SrsppType::Ack) => self.sender.process_ack(packet),
212            Err(_) => Ok(()),
213        }
214    }
215
216    fn next_timeout(&self) -> Duration {
217        let s = self.sender.next_deadline();
218        let r = self.receiver.next_deadline();
219        let deadline = match (s, r) {
220            (Some(a), Some(b)) => Some(if a < b { a } else { b }),
221            (a, b) => a.or(b),
222        };
223        duration_until(deadline)
224    }
225
226    fn set_both_errors(&self, err: TransportError<E>) {
227        self.sender.sender.with_mut(|s| s.error = Some(err.clone()));
228        self.receiver.state.with_mut(|s| s.error = Some(err));
229    }
230}