Skip to main content

leodos_protocols/transport/srspp/api/cfs/
sender.rs

1use core::future::poll_fn;
2use core::task::Poll;
3
4use zerocopy::Immutable;
5use zerocopy::IntoBytes;
6
7use futures::FutureExt;
8
9use leodos_libcfs::cfe::duration::Duration;
10use leodos_libcfs::cfe::time::SysTime;
11use leodos_libcfs::runtime::time::sleep;
12
13use crate::network::NetworkRead;
14use crate::network::NetworkWrite;
15use crate::network::isl::address::Address;
16use crate::network::spp::Apid;
17use crate::network::spp::SequenceCount;
18use crate::transport::srspp::api::cfs::TransportError;
19use crate::transport::srspp::dtn::AlwaysReachable;
20use crate::transport::srspp::dtn::MessageStore;
21use crate::transport::srspp::dtn::NoStore;
22use crate::transport::srspp::dtn::Reachable;
23use crate::transport::srspp::machine::sender::SenderAction;
24use crate::transport::srspp::machine::sender::SenderActions;
25use crate::transport::srspp::machine::sender::SenderConfig;
26use crate::transport::srspp::machine::sender::SenderEvent;
27use crate::transport::srspp::machine::sender::SenderMachine;
28use crate::transport::srspp::packet::SrsppAckPacket;
29use crate::transport::srspp::packet::SrsppDataPacket;
30use crate::transport::srspp::packet::SrsppPacket;
31use crate::transport::srspp::packet::SrsppType;
32use crate::transport::srspp::rto::RtoPolicy;
33use crate::utils::cell::SyncRefCell;
34
35use super::TimerSet;
36
37/// Shared mutable state for the sender channel.
38pub(super) struct SenderState<E, const WIN: usize, const BUF: usize, const MTU: usize> {
39    /// Sender state machine.
40    pub(crate) machine: SenderMachine<WIN, BUF, MTU>,
41    /// Pending actions produced by the state machine.
42    pub(crate) actions: SenderActions,
43    /// Retransmission timers for in-flight packets.
44    pub(crate) timers: TimerSet<WIN>,
45    /// Whether the handle has signaled no more data.
46    pub(crate) closed: bool,
47    /// First error encountered, propagated to the handle.
48    pub(crate) error: Option<TransportError<E>>,
49}
50
51// ── Channel and driver ──
52
53/// Channel that owns the sender state. Split into handle + driver.
54pub struct SrsppSender<
55    E,
56    S: MessageStore,
57    R: Reachable,
58    const WIN: usize,
59    const BUF: usize,
60    const MTU: usize,
61> {
62    /// Interior-mutable sender state shared between handle and driver.
63    pub(super) state: SyncRefCell<SenderState<E, WIN, BUF, MTU>>,
64    /// DTN store and reachability oracle.
65    dtn: SyncRefCell<DtnContext<S, R>>,
66    /// This sender's own address (for reachability checks).
67    origin: Address,
68}
69
70pub(super) struct DtnContext<S, R> {
71    pub(super) store: S,
72    pub(super) reachable: R,
73}
74
75/// Alias for a sender without DTN support.
76pub type SimpleSender<E, const WIN: usize = 8, const BUF: usize = 4096, const MTU: usize = 512> =
77    SrsppSender<E, NoStore, AlwaysReachable, WIN, BUF, MTU>;
78
79#[bon::bon]
80impl<E: Clone, S: MessageStore, R: Reachable, const WIN: usize, const BUF: usize, const MTU: usize>
81    SrsppSender<E, S, R, WIN, BUF, MTU>
82{
83    /// Creates a new sender.
84    #[builder]
85    pub fn new(
86        source_address: Address,
87        apid: Apid,
88        #[builder(default)] function_code: u8,
89        rto_ticks: u32,
90        #[builder(default = 3)] max_retransmits: u8,
91        #[builder(default = SrsppDataPacket::HEADER_SIZE)] header_overhead: usize,
92        store: S,
93        reachable: R,
94    ) -> Self {
95        let config = SenderConfig::builder()
96            .source_address(source_address)
97            .apid(apid)
98            .function_code(function_code)
99            .rto_ticks(rto_ticks)
100            .max_retransmits(max_retransmits)
101            .header_overhead(header_overhead)
102            .build();
103        Self {
104            state: SyncRefCell::new(SenderState {
105                machine: SenderMachine::new(config),
106                actions: SenderActions::new(),
107                timers: TimerSet::new(),
108                closed: false,
109                error: None,
110            }),
111            dtn: SyncRefCell::new(DtnContext { store, reachable }),
112            origin: source_address,
113        }
114    }
115
116    /// Splits into a handle for sending and a driver for I/O.
117    pub fn split<P: RtoPolicy>(
118        &self,
119        rto_policy: P,
120    ) -> (
121        SrsppTxHandle<'_, E, S, R, WIN, BUF, MTU>,
122        SrsppSenderDriver<'_, P, E, S, R, WIN, BUF, MTU>,
123    ) {
124        (
125            SrsppTxHandle {
126                sender: &self.state,
127                dtn: &self.dtn,
128                origin: self.origin,
129            },
130            SrsppSenderDriver::new(rto_policy, &self.state, &self.dtn, self.origin),
131        )
132    }
133}
134
135/// Driver that handles I/O and DTN drain. Runs as a concurrent task.
136pub struct SrsppSenderDriver<
137    'a,
138    P: RtoPolicy,
139    E,
140    S: MessageStore,
141    R: Reachable,
142    const WIN: usize,
143    const BUF: usize,
144    const MTU: usize,
145> {
146    rto_policy: P,
147    pub(super) sender: &'a SyncRefCell<SenderState<E, WIN, BUF, MTU>>,
148    dtn: &'a SyncRefCell<DtnContext<S, R>>,
149    origin: Address,
150    tx_buffer: [u8; MTU],
151}
152
153impl<
154    'a,
155    P: RtoPolicy,
156    E,
157    S: MessageStore,
158    R: Reachable,
159    const WIN: usize,
160    const BUF: usize,
161    const MTU: usize,
162> SrsppSenderDriver<'a, P, E, S, R, WIN, BUF, MTU>
163{
164    pub(super) fn new(
165        rto_policy: P,
166        sender: &'a SyncRefCell<SenderState<E, WIN, BUF, MTU>>,
167        dtn: &'a SyncRefCell<DtnContext<S, R>>,
168        origin: Address,
169    ) -> Self {
170        Self {
171            rto_policy,
172            sender,
173            dtn,
174            origin,
175            tx_buffer: [0u8; MTU],
176        }
177    }
178}
179
180impl<
181    'a,
182    P: RtoPolicy,
183    E: Clone,
184    S: MessageStore,
185    R: Reachable,
186    const WIN: usize,
187    const BUF: usize,
188    const MTU: usize,
189> SrsppSenderDriver<'a, P, E, S, R, WIN, BUF, MTU>
190{
191    /// Sends all pending transmit actions over the link.
192    pub(super) async fn transmit(
193        &mut self,
194        link: &mut impl NetworkWrite<Error = E>,
195    ) -> Result<(), TransportError<E>> {
196        let now = SysTime::now();
197
198        let (transmits, cfg_clone) = self.sender.with(|s| {
199            let t = s
200                .actions
201                .iter()
202                .filter_map(|a| match a {
203                    SenderAction::Transmit { seq, .. } => Some(*seq),
204                    _ => None,
205                })
206                .collect::<heapless::Vec<_, WIN>>();
207            (t, s.machine.config().clone())
208        });
209
210        for seq in transmits {
211            let packet_len = self.sender.with(|s| {
212                if let Some(info) = s.machine.get_payload(seq) {
213                    let pkt = SrsppDataPacket::builder()
214                        .buffer(&mut self.tx_buffer)
215                        .source_address(cfg_clone.source_address)
216                        .target(info.target)
217                        .apid(cfg_clone.apid)
218                        .function_code(cfg_clone.function_code)
219                        .sequence_count(seq)
220                        .sequence_flag(info.flags)
221                        .payload_len(info.payload.len())
222                        .build()
223                        .map_err(TransportError::Packet)?;
224                    pkt.payload.copy_from_slice(info.payload);
225                    Ok::<_, TransportError<E>>(Some(
226                        SrsppDataPacket::HEADER_SIZE + info.payload.len(),
227                    ))
228                } else {
229                    Ok::<_, TransportError<E>>(None)
230                }
231            })?;
232
233            if let Some(packet_len) = packet_len {
234                link.write(&self.tx_buffer[..packet_len])
235                    .await
236                    .map_err(TransportError::Network)?;
237
238                let rto_dur = Duration::from_millis(self.rto_policy.rto_ticks(now.seconds()));
239
240                self.sender.with_mut(|s| {
241                    s.machine.mark_transmitted(seq);
242                    s.timers.start(seq, now + SysTime::from(rto_dur));
243                });
244            }
245        }
246
247        self.sender.with_mut(|s| {
248            for action in s.actions.iter() {
249                let &SenderAction::StopTimer { seq } = action else {
250                    continue;
251                };
252                s.timers.stop(seq);
253            }
254        });
255
256        Ok(())
257    }
258
259    /// Processes a received ACK packet and updates sender state.
260    pub(super) fn process_ack(&mut self, packet: &[u8]) -> Result<(), TransportError<E>> {
261        if let Ok(SrsppType::Ack) = SrsppPacket::parse(packet).and_then(|p| p.srspp_type()) {
262            if let Ok(ack) = SrsppAckPacket::parse(packet) {
263                self.sender.with_mut(|s| {
264                    s.machine.handle(
265                        SenderEvent::AckReceived {
266                            cumulative_ack: ack.ack_payload.cumulative_ack(),
267                            selective_bitmap: ack.ack_payload.selective_ack_bitmap(),
268                        },
269                        &mut s.actions,
270                    )?;
271
272                    for action in s.actions.iter() {
273                        let &SenderAction::StopTimer { seq } = action else {
274                            continue;
275                        };
276                        s.timers.stop(seq);
277                    }
278                    Ok::<(), TransportError<E>>(())
279                })?;
280            }
281        }
282        Ok(())
283    }
284
285    /// Processes expired retransmission timers and retransmits.
286    pub(super) async fn handle_timeouts(
287        &mut self,
288        link: &mut impl NetworkWrite<Error = E>,
289    ) -> Result<(), TransportError<E>> {
290        let now = SysTime::now();
291
292        for seq in self
293            .sender
294            .with_mut(|s| s.timers.expired(now).collect::<heapless::Vec<_, WIN>>())
295        {
296            self.sender.with_mut(|s| {
297                s.machine.handle(
298                    SenderEvent::RetransmitTimeout {
299                        seq: SequenceCount::from(seq),
300                    },
301                    &mut s.actions,
302                )
303            })?;
304
305            self.transmit(link).await?;
306        }
307
308        Ok(())
309    }
310
311    /// Returns the earliest sender retransmission deadline.
312    pub(super) fn next_deadline(&self) -> Option<SysTime> {
313        self.sender.with(|s| s.timers.next_deadline())
314    }
315
316    /// Drain stored messages into the SRSPP state machine.
317    pub(super) async fn drain_stored(
318        &mut self,
319        link: &mut impl NetworkWrite<Error = E>,
320    ) -> Result<(), TransportError<E>> {
321        self.dtn
322            .with_mut(|d| d.store.expire(SysTime::now().seconds()));
323
324        let pending = self.dtn.with(|d| d.store.pending_targets());
325        if pending == 0 {
326            return Ok(());
327        }
328
329        for station in 0..16u8 {
330            if pending & (1 << station) == 0 {
331                continue;
332            }
333            let target = Address::Ground { station };
334            if !self
335                .dtn
336                .with(|d| d.reachable.is_reachable(self.origin, target))
337            {
338                continue;
339            }
340
341            loop {
342                let (bytes, window) = self
343                    .sender
344                    .with(|s| (s.machine.available_bytes(), s.machine.available_window()));
345                if window == 0 {
346                    break;
347                }
348                let Some(size) = self.dtn.with(|d| d.store.peek_size(target)) else {
349                    break;
350                };
351                if size > bytes {
352                    break;
353                }
354                let Some(len) = self
355                    .dtn
356                    .with_mut(|d| d.store.read(target, &mut self.tx_buffer))
357                else {
358                    break;
359                };
360                self.sender.with_mut(|s| {
361                    s.machine.handle(
362                        SenderEvent::SendRequest {
363                            target,
364                            data: &self.tx_buffer[..len],
365                        },
366                        &mut s.actions,
367                    )
368                })?;
369            }
370
371            self.transmit(link).await?;
372        }
373
374        Ok(())
375    }
376
377    /// Run the driver loop.
378    pub async fn run(
379        &mut self,
380        link: &mut (impl NetworkWrite<Error = E> + NetworkRead<Error = E>),
381    ) -> Result<(), TransportError<E>> {
382        let mut recv_buffer = [0u8; MTU];
383        loop {
384            let pending = self.dtn.with(|d| d.store.pending_targets() != 0);
385            if self.sender.with(|s| s.closed && s.machine.is_idle()) && !pending {
386                return Ok(());
387            }
388
389            self.drain_stored(link).await?;
390
391            if let Err(e) = self.transmit(link).await {
392                self.sender.with_mut(|s| s.error = Some(e.clone()));
393                return Err(e);
394            }
395
396            let timeout = duration_until(self.next_deadline());
397
398            let event = {
399                let read_fut = link.read(&mut recv_buffer).fuse();
400                let sleep_fut = sleep(timeout).fuse();
401                pin_utils::pin_mut!(read_fut, sleep_fut);
402                futures::select_biased! {
403                    r = read_fut => Some(r),
404                    _ = sleep_fut => None,
405                }
406            };
407
408            match event {
409                Some(Ok(len)) => {
410                    if let Err(e) = self.process_ack(&recv_buffer[..len]) {
411                        self.sender.with_mut(|s| s.error = Some(e.clone()));
412                        return Err(e);
413                    }
414                }
415                Some(Err(e)) => {
416                    let err = TransportError::Network(e);
417                    self.sender.with_mut(|s| s.error = Some(err.clone()));
418                    return Err(err);
419                }
420                None => {
421                    if let Err(e) = self.handle_timeouts(link).await {
422                        self.sender.with_mut(|s| s.error = Some(e.clone()));
423                        return Err(e);
424                    }
425                }
426            }
427        }
428    }
429}
430
431/// Converts an optional deadline into a duration from now.
432pub(super) fn duration_until(deadline: Option<SysTime>) -> Duration {
433    let now = SysTime::now();
434    deadline
435        .map(|d| {
436            if d > now {
437                Duration::from(d - now)
438            } else {
439                Duration::zero()
440            }
441        })
442        .unwrap_or(Duration::from_secs(60))
443}
444
445/// Handle for sending data over an SRSPP node.
446pub struct SrsppTxHandle<
447    'a,
448    E,
449    S: MessageStore,
450    R: Reachable,
451    const WIN: usize,
452    const BUF: usize,
453    const MTU: usize,
454> {
455    pub(super) sender: &'a SyncRefCell<SenderState<E, WIN, BUF, MTU>>,
456    pub(super) dtn: &'a SyncRefCell<DtnContext<S, R>>,
457    pub(super) origin: Address,
458}
459
460impl<'a, E, S: MessageStore, R: Reachable, const WIN: usize, const BUF: usize, const MTU: usize>
461    Clone for SrsppTxHandle<'a, E, S, R, WIN, BUF, MTU>
462{
463    fn clone(&self) -> Self {
464        *self
465    }
466}
467
468impl<'a, E, S: MessageStore, R: Reachable, const WIN: usize, const BUF: usize, const MTU: usize>
469    Copy for SrsppTxHandle<'a, E, S, R, WIN, BUF, MTU>
470{
471}
472
473impl<
474    'a,
475    E: Clone,
476    S: MessageStore,
477    R: Reachable,
478    const WIN: usize,
479    const BUF: usize,
480    const MTU: usize,
481> SrsppTxHandle<'a, E, S, R, WIN, BUF, MTU>
482{
483    /// Sends data to the given target.
484    ///
485    /// If the destination is unreachable, the message is
486    /// stored for later delivery by the driver. If reachable,
487    /// it enters SRSPP normally.
488    pub async fn send(
489        &mut self,
490        target: impl Into<Address>,
491        data: &(impl IntoBytes + Immutable + ?Sized),
492    ) -> Result<(), TransportError<E>> {
493        let target = target.into();
494        let data = data.as_bytes();
495
496        if !self
497            .dtn
498            .with(|d| d.reachable.is_reachable(self.origin, target))
499        {
500            self.dtn
501                .with_mut(|d| d.store.write(target, data, 0, SysTime::now().seconds()));
502            return Ok(());
503        }
504
505        // Normal SRSPP path
506        poll_fn(|_cx| {
507            self.sender.with(|s| {
508                if let Some(ref e) = s.error {
509                    return Poll::Ready(Err(e.clone()));
510                }
511                if s.machine.available_bytes() >= data.len() && s.machine.available_window() > 0 {
512                    Poll::Ready(Ok(()))
513                } else {
514                    Poll::Pending
515                }
516            })
517        })
518        .await?;
519
520        self.sender.with_mut(|s| {
521            s.machine
522                .handle(SenderEvent::SendRequest { target, data }, &mut s.actions)
523        })?;
524        Ok(())
525    }
526
527    /// Signal that no more data will be sent.
528    /// Driver will exit once all pending data is acknowledged
529    /// and the store is drained.
530    pub fn close(&mut self) {
531        self.sender.with_mut(|s| s.closed = true);
532    }
533
534    /// Check available buffer space in bytes.
535    pub fn available_bytes(&self) -> usize {
536        self.sender.with(|s| s.machine.available_bytes())
537    }
538
539    /// Check available window slots.
540    pub fn available_window(&self) -> usize {
541        self.sender.with(|s| s.machine.available_window())
542    }
543
544    /// Check if all data has been acknowledged.
545    pub fn is_idle(&self) -> bool {
546        self.sender.with(|s| s.machine.is_idle())
547    }
548}