leodos_protocols/transport/srspp/api/cfs/
node.rs1use futures::FutureExt;
2
3use leodos_libcfs::cfe::duration::Duration;
4use leodos_libcfs::runtime::time::sleep;
5
6use crate::network::NetworkRead;
7use crate::network::NetworkWrite;
8use crate::network::isl::address::Address;
9use crate::transport::srspp::api::cfs::TransportError;
10use crate::transport::srspp::dtn::AlwaysReachable;
11use crate::transport::srspp::dtn::MessageStore;
12use crate::transport::srspp::dtn::NoStore;
13use crate::transport::srspp::dtn::Reachable;
14use crate::transport::srspp::machine::receiver::ReceiverBackend;
15use crate::transport::srspp::machine::receiver::ReceiverConfig;
16use crate::transport::srspp::machine::receiver::ReceiverMachine;
17use crate::transport::srspp::machine::sender::SenderActions;
18use crate::transport::srspp::machine::sender::SenderConfig;
19use crate::transport::srspp::machine::sender::SenderMachine;
20use crate::transport::srspp::packet::SrsppPacket;
21use crate::transport::srspp::packet::SrsppType;
22use crate::transport::srspp::rto::RtoPolicy;
23use crate::utils::cell::SyncRefCell;
24
25use super::TimerSet;
26use super::receiver::MultiReceiverState;
27use super::receiver::SrsppReceiverDriver;
28use super::receiver::SrsppRxHandle;
29use super::sender::DtnContext;
30use super::sender::SenderState;
31use super::sender::SrsppSenderDriver;
32use super::sender::SrsppTxHandle;
33use super::sender::duration_until;
34
35pub struct SrsppNode<
37 E,
38 S: MessageStore = NoStore,
39 Re: Reachable = AlwaysReachable,
40 R: ReceiverBackend = ReceiverMachine<8, 4096, 8192>,
41 const WIN: usize = 8,
42 const BUF: usize = 4096,
43 const MTU: usize = 512,
44 const MAX_STREAMS: usize = 1,
45> {
46 pub(super) sender: SyncRefCell<SenderState<E, WIN, BUF, MTU>>,
47 pub(super) receiver: SyncRefCell<MultiReceiverState<E, R, MAX_STREAMS>>,
48 dtn: SyncRefCell<DtnContext<S, Re>>,
49 origin: Address,
50}
51
52impl<
53 E: Clone,
54 S: MessageStore,
55 Re: Reachable,
56 R: ReceiverBackend,
57 const WIN: usize,
58 const BUF: usize,
59 const MTU: usize,
60 const MAX_STREAMS: usize,
61> SrsppNode<E, S, Re, R, WIN, BUF, MTU, MAX_STREAMS>
62{
63 pub fn new(sender_config: SenderConfig, receiver_config: ReceiverConfig, store: S, reachable: Re) -> Self {
65 let origin = sender_config.source_address;
66 let ack_delay = Duration::from_millis(receiver_config.ack_delay_ticks);
67 Self {
68 sender: SyncRefCell::new(SenderState {
69 machine: SenderMachine::new(sender_config),
70 actions: SenderActions::new(),
71 timers: TimerSet::new(),
72 closed: false,
73 error: None,
74 }),
75 receiver: SyncRefCell::new(MultiReceiverState {
76 config: receiver_config,
77 streams: heapless::LinearMap::new(),
78 ack_delay,
79 closed: false,
80 error: None,
81 }),
82 dtn: SyncRefCell::new(DtnContext { store, reachable }),
83 origin,
84 }
85 }
86
87 pub fn split<L: NetworkWrite<Error = E> + NetworkRead<Error = E>, P: RtoPolicy>(
89 &self,
90 link: L,
91 rto_policy: P,
92 ) -> (
93 SrsppRxHandle<'_, E, R, MAX_STREAMS>,
94 SrsppTxHandle<'_, E, S, Re, WIN, BUF, MTU>,
95 SrsppNodeDriver<'_, L, P, E, S, Re, R, WIN, BUF, MTU, MAX_STREAMS>,
96 ) {
97 (
98 SrsppRxHandle {
99 receiver: &self.receiver,
100 },
101 SrsppTxHandle {
102 sender: &self.sender,
103 dtn: &self.dtn,
104 origin: self.origin,
105 },
106 SrsppNodeDriver {
107 link,
108 sender: SrsppSenderDriver::new(
109 rto_policy,
110 &self.sender,
111 &self.dtn,
112 self.origin,
113 ),
114 receiver: SrsppReceiverDriver::new(&self.receiver),
115 recv_buffer: [0u8; MTU],
116 },
117 )
118 }
119}
120
121pub struct SrsppNodeDriver<
123 'a,
124 L,
125 P: RtoPolicy,
126 E,
127 S: MessageStore,
128 Re: Reachable,
129 R: ReceiverBackend,
130 const WIN: usize,
131 const BUF: usize,
132 const MTU: usize,
133 const MAX_STREAMS: usize,
134> {
135 link: L,
136 sender: SrsppSenderDriver<'a, P, E, S, Re, WIN, BUF, MTU>,
137 receiver: SrsppReceiverDriver<'a, E, R, MAX_STREAMS>,
138 recv_buffer: [u8; MTU],
139}
140
141impl<
142 'a,
143 L: NetworkWrite<Error = E> + NetworkRead<Error = E>,
144 P: RtoPolicy,
145 E: Clone,
146 S: MessageStore,
147 Re: Reachable,
148 R: ReceiverBackend,
149 const WIN: usize,
150 const BUF: usize,
151 const MTU: usize,
152 const MAX_STREAMS: usize,
153> SrsppNodeDriver<'a, L, P, E, S, Re, R, WIN, BUF, MTU, MAX_STREAMS>
154{
155 pub async fn run(&mut self) -> Result<(), TransportError<E>> {
157 loop {
158 self.sender.drain_stored(&mut self.link).await?;
159
160 if let Err(e) = self.sender.transmit(&mut self.link).await {
161 self.set_both_errors(e.clone());
162 return Err(e);
163 }
164
165 let timeout = self.next_timeout();
166
167 let event = {
168 let read_fut = self.link.read(&mut self.recv_buffer).fuse();
169 let sleep_fut = sleep(timeout).fuse();
170 pin_utils::pin_mut!(read_fut, sleep_fut);
171 futures::select_biased! {
172 r = read_fut => Some(r),
173 _ = sleep_fut => None,
174 }
175 };
176
177 match event {
178 Some(Ok(len)) => {
179 if let Err(e) = self.handle_incoming(len).await {
180 self.set_both_errors(e.clone());
181 return Err(e);
182 }
183 }
184 Some(Err(e)) => {
185 let err = TransportError::Network(e);
186 self.set_both_errors(err.clone());
187 return Err(err);
188 }
189 None => {
190 if let Err(e) = self.sender.handle_timeouts(&mut self.link).await {
191 self.set_both_errors(e.clone());
192 return Err(e);
193 }
194 if let Err(e) = self.receiver.handle_timeouts(&mut self.link).await {
195 self.set_both_errors(e.clone());
196 return Err(e);
197 }
198 }
199 }
200 }
201 }
202
203 async fn handle_incoming(&mut self, len: usize) -> Result<(), TransportError<E>> {
204 let packet = &self.recv_buffer[..len];
205 let Ok(parsed) = SrsppPacket::parse(packet) else {
206 return Ok(());
207 };
208 match parsed.srspp_type() {
209 Ok(SrsppType::Data) => self.receiver.process_data(packet, &mut self.link).await,
210 Ok(SrsppType::Eos) => self.receiver.process_data(packet, &mut self.link).await,
211 Ok(SrsppType::Ack) => self.sender.process_ack(packet),
212 Err(_) => Ok(()),
213 }
214 }
215
216 fn next_timeout(&self) -> Duration {
217 let s = self.sender.next_deadline();
218 let r = self.receiver.next_deadline();
219 let deadline = match (s, r) {
220 (Some(a), Some(b)) => Some(if a < b { a } else { b }),
221 (a, b) => a.or(b),
222 };
223 duration_until(deadline)
224 }
225
226 fn set_both_errors(&self, err: TransportError<E>) {
227 self.sender.sender.with_mut(|s| s.error = Some(err.clone()));
228 self.receiver.state.with_mut(|s| s.error = Some(err));
229 }
230}