Skip to main content

leodos_protocols/transport/srspp/machine/
sender.rs

1//! Sender state machine for SRSPP.
2//!
3//! Handles segmentation, buffering, and retransmission of messages.
4//! Completely synchronous - no I/O, no async.
5
6use crate::network::isl::address::{Address, RawAddress};
7use crate::network::spp::{Apid, SequenceCount, SequenceFlag};
8use heapless::Vec;
9
10/// Maximum number of actions that can be emitted per event.
11const MAX_ACTIONS: usize = 32;
12
13/// Events that drive the sender state machine.
14#[derive(Debug)]
15pub enum SenderEvent<'a> {
16    /// Application wants to send data.
17    SendRequest {
18        /// Destination address.
19        target: Address,
20        /// Data to send.
21        data: &'a [u8],
22    },
23
24    /// An ACK packet was received from the remote.
25    AckReceived {
26        /// Highest contiguously acknowledged sequence number.
27        cumulative_ack: SequenceCount,
28        /// Bitmap of selectively acknowledged packets.
29        selective_bitmap: u16,
30    },
31
32    /// A retransmission timer expired for a specific packet.
33    RetransmitTimeout {
34        /// Sequence number of the timed-out packet.
35        seq: SequenceCount,
36    },
37}
38
39/// Actions the sender machine wants the driver to perform.
40#[derive(Debug, Copy, Clone, Eq, PartialEq)]
41pub enum SenderAction {
42    /// Transmit a packet and start its retransmission timer.
43    /// Call `get_payload(seq)` to get the payload data.
44    Transmit {
45        /// Sequence number to transmit.
46        seq: SequenceCount,
47        /// Retransmission timeout in ticks.
48        rto_ticks: u32,
49    },
50
51    /// Stop a retransmission timer.
52    StopTimer {
53        /// Sequence number whose timer should be stopped.
54        seq: SequenceCount,
55    },
56
57    /// A packet was permanently lost (max retransmits exceeded).
58    PacketLost {
59        /// Sequence number of the lost packet.
60        seq: SequenceCount,
61    },
62
63    /// A segmented message was lost (a packet from it was permanently lost).
64    MessageLost,
65
66    /// Send buffer has space available (for backpressure signaling).
67    SpaceAvailable {
68        /// Number of bytes available.
69        bytes: usize,
70    },
71}
72
73/// Collection of actions emitted by the sender.
74#[derive(Debug)]
75pub struct SenderActions {
76    inner: Vec<SenderAction, MAX_ACTIONS>,
77}
78
79impl SenderActions {
80    /// Create a new empty actions collection.
81    pub fn new() -> Self {
82        Self { inner: Vec::new() }
83    }
84
85    /// Add an action to the collection.
86    pub fn push(&mut self, action: SenderAction) {
87        let _ = self.inner.push(action);
88    }
89
90    /// Clear all actions.
91    pub fn clear(&mut self) {
92        self.inner.clear();
93    }
94
95    /// Iterate over the actions.
96    pub fn iter(&self) -> impl Iterator<Item = &SenderAction> {
97        self.inner.iter()
98    }
99
100    /// Check if empty.
101    pub fn is_empty(&self) -> bool {
102        self.inner.is_empty()
103    }
104
105    /// Number of actions.
106    pub fn len(&self) -> usize {
107        self.inner.len()
108    }
109}
110
111impl Default for SenderActions {
112    fn default() -> Self {
113        Self::new()
114    }
115}
116
117impl<'a> IntoIterator for &'a SenderActions {
118    type Item = &'a SenderAction;
119    type IntoIter = core::slice::Iter<'a, SenderAction>;
120
121    fn into_iter(self) -> Self::IntoIter {
122        self.inner.iter()
123    }
124}
125
126/// Error from sender operations.
127#[derive(Debug, Copy, Clone, Eq, PartialEq, thiserror::Error)]
128pub enum SenderError {
129    /// Send buffer has no space for the payload.
130    #[error("send buffer full")]
131    BufferFull,
132    /// All send window slots are occupied.
133    #[error("send window full")]
134    WindowFull,
135}
136
137/// Information about a buffered packet's payload and metadata.
138pub struct PayloadInfo<'a> {
139    /// Sequence number of the packet.
140    pub seq: SequenceCount,
141    /// Segmentation flags for this packet.
142    pub flags: SequenceFlag,
143    /// Destination address for this packet.
144    pub target: Address,
145    /// Payload data bytes.
146    pub payload: &'a [u8],
147}
148
149/// Configuration for the sender.
150#[derive(Debug, Clone, bon::Builder)]
151pub struct SenderConfig {
152    /// Local source address for outgoing packets.
153    pub source_address: Address,
154    /// APID used for all packets from this sender.
155    pub apid: Apid,
156    /// cFE function code for outgoing packets.
157    pub function_code: u8,
158    /// Retransmission timeout in ticks.
159    pub rto_ticks: u32,
160    /// Maximum number of retransmission attempts per packet.
161    pub max_retransmits: u8,
162    /// Total header overhead per packet in bytes.
163    pub header_overhead: usize,
164}
165
166/// State of a packet slot.
167#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
168enum SlotState {
169    /// Slot is unused and available.
170    #[default]
171    Empty,
172    /// Packet is queued but not yet transmitted.
173    PendingTransmit,
174    /// Packet has been transmitted and is awaiting acknowledgement.
175    AwaitingAck,
176}
177
178/// Metadata for a packet in the send buffer.
179#[derive(Clone, Copy)]
180struct PacketMeta {
181    /// Current state of this slot.
182    state: SlotState,
183    /// Sequence number assigned to this packet.
184    seq: u16,
185    /// Segmentation flags for this packet.
186    flags: SequenceFlag,
187    /// Destination address in raw form.
188    target: RawAddress,
189    /// Number of retransmission attempts so far.
190    retransmit_count: u8,
191    /// Byte offset into the send buffer.
192    offset: usize,
193    /// Length of the payload in bytes.
194    len: usize,
195    /// Whether this packet belongs to a segmented message.
196    is_segmented: bool,
197}
198
199impl Default for PacketMeta {
200    fn default() -> Self {
201        Self {
202            state: SlotState::Empty,
203            seq: 0,
204            flags: SequenceFlag::Unsegmented,
205            target: RawAddress::from(Address::ground(0)),
206            retransmit_count: 0,
207            offset: 0,
208            len: 0,
209            is_segmented: false,
210        }
211    }
212}
213
214/// Sender state machine.
215///
216/// Handles segmentation, buffering, and retransmission.
217/// Completely synchronous - no I/O, no async.
218///
219/// # Type Parameters
220///
221/// * `WIN` - Maximum number of in-flight packets (window size)
222/// * `BUF` - Total send buffer size in bytes
223/// * `MTU` - Maximum transmission unit (packet size)
224pub struct SenderMachine<const WIN: usize, const BUF: usize, const MTU: usize> {
225    /// Sender configuration.
226    pub(crate) config: SenderConfig,
227    /// Per-slot metadata for each window entry.
228    meta: [PacketMeta; WIN],
229    /// Contiguous send buffer holding all packet payloads.
230    data: [u8; BUF],
231    /// Current write position in the send buffer.
232    write_pos: usize,
233    /// Next sequence number to assign.
234    next_seq: u16,
235    /// Lowest unacknowledged sequence number.
236    send_base: u16,
237}
238
239impl<const WIN: usize, const BUF: usize, const MTU: usize> SenderMachine<WIN, BUF, MTU> {
240    /// Create a new sender state machine with the given configuration.
241    pub fn new(config: SenderConfig) -> Self {
242        Self {
243            config,
244            meta: [PacketMeta::default(); WIN],
245            data: [0u8; BUF],
246            write_pos: 0,
247            next_seq: 0,
248            send_base: 0,
249        }
250    }
251
252    /// Returns a reference to the sender configuration.
253    pub fn config(&self) -> &SenderConfig {
254        &self.config
255    }
256
257    /// Maximum payload bytes per packet given the MTU and header overhead.
258    pub fn max_payload_per_packet(&self) -> usize {
259        MTU.saturating_sub(self.config.header_overhead)
260    }
261
262    /// Available space in the send buffer (bytes).
263    pub fn available_bytes(&self) -> usize {
264        BUF - self.write_pos
265    }
266
267    /// Available slots in send window.
268    pub fn available_window(&self) -> usize {
269        WIN.saturating_sub(self.unacked_count())
270    }
271
272    /// Number of unacked packets.
273    pub fn unacked_count(&self) -> usize {
274        self.meta
275            .iter()
276            .filter(|m| m.state != SlotState::Empty)
277            .count()
278    }
279
280    /// Check if all data has been acknowledged.
281    pub fn is_idle(&self) -> bool {
282        self.meta.iter().all(|m| m.state == SlotState::Empty)
283    }
284
285    /// Retrieve payload info for a buffered packet by sequence number.
286    pub fn get_payload(&self, seq: SequenceCount) -> Option<PayloadInfo<'_>> {
287        let seq_val = seq.value();
288        self.meta
289            .iter()
290            .find(|m| m.state != SlotState::Empty && m.seq == seq_val)
291            .map(|m| PayloadInfo {
292                seq: SequenceCount::from(m.seq),
293                flags: m.flags,
294                target: m.target.parse(),
295                payload: &self.data[m.offset..m.offset + m.len],
296            })
297    }
298
299    /// Process an event and produce actions.
300    pub fn handle(
301        &mut self,
302        event: SenderEvent<'_>,
303        actions: &mut SenderActions,
304    ) -> Result<(), SenderError> {
305        actions.clear();
306
307        match event {
308            SenderEvent::SendRequest { target, data } => {
309                self.handle_send_request(target, data, actions)?;
310            }
311            SenderEvent::AckReceived {
312                cumulative_ack,
313                selective_bitmap,
314            } => {
315                self.handle_ack(cumulative_ack, selective_bitmap, actions);
316            }
317            SenderEvent::RetransmitTimeout { seq } => {
318                self.handle_timeout(seq, actions);
319            }
320        }
321        Ok(())
322    }
323
324    /// Segment data if needed and queue packets for transmission.
325    fn handle_send_request(
326        &mut self,
327        target: Address,
328        data: &[u8],
329        actions: &mut SenderActions,
330    ) -> Result<(), SenderError> {
331        let max_payload = self.max_payload_per_packet();
332
333        if data.len() <= max_payload {
334            self.queue_packet(target, data, SequenceFlag::Unsegmented, false, actions)?;
335        } else {
336            let mut offset = 0;
337            let mut is_first = true;
338
339            while offset < data.len() {
340                let remaining = data.len() - offset;
341                let chunk_size = remaining.min(max_payload);
342                let is_last = offset + chunk_size >= data.len();
343
344                let flags = if is_first {
345                    is_first = false;
346                    SequenceFlag::First
347                } else if is_last {
348                    SequenceFlag::Last
349                } else {
350                    SequenceFlag::Continuation
351                };
352
353                self.queue_packet(
354                    target,
355                    &data[offset..offset + chunk_size],
356                    flags,
357                    true,
358                    actions,
359                )?;
360                offset += chunk_size;
361            }
362        }
363
364        Ok(())
365    }
366
367    /// Allocate a slot and buffer space, then emit a Transmit action.
368    fn queue_packet(
369        &mut self,
370        target: Address,
371        payload: &[u8],
372        flags: SequenceFlag,
373        is_segmented: bool,
374        actions: &mut SenderActions,
375    ) -> Result<(), SenderError> {
376        let slot_idx = self
377            .meta
378            .iter()
379            .position(|m| m.state == SlotState::Empty)
380            .ok_or(SenderError::WindowFull)?;
381
382        if payload.len() > self.available_bytes() {
383            self.compact();
384            if payload.len() > self.available_bytes() {
385                return Err(SenderError::BufferFull);
386            }
387        }
388
389        let offset = self.write_pos;
390        self.data[offset..offset + payload.len()].copy_from_slice(payload);
391        self.write_pos += payload.len();
392
393        let seq = SequenceCount::from(self.next_seq);
394
395        self.meta[slot_idx] = PacketMeta {
396            state: SlotState::PendingTransmit,
397            seq: self.next_seq,
398            flags,
399            target: RawAddress::from(target),
400            retransmit_count: 0,
401            offset,
402            len: payload.len(),
403            is_segmented,
404        };
405
406        self.next_seq = (self.next_seq + 1) & SequenceCount::MAX;
407
408        actions.push(SenderAction::Transmit {
409            seq,
410            rto_ticks: self.config.rto_ticks,
411        });
412
413        Ok(())
414    }
415
416    /// Process cumulative and selective ACKs, freeing acknowledged slots.
417    fn handle_ack(
418        &mut self,
419        cumulative_ack: SequenceCount,
420        selective_bitmap: u16,
421        actions: &mut SenderActions,
422    ) {
423        let ack_val = cumulative_ack.value();
424        let mut freed_bytes = 0usize;
425
426        // Clear acked packets (cumulative)
427        for meta in &mut self.meta {
428            if meta.state != SlotState::Empty {
429                let diff = ack_val.wrapping_sub(meta.seq) & SequenceCount::MAX;
430                if diff < (SequenceCount::MAX / 2) {
431                    freed_bytes += meta.len;
432                    let seq = SequenceCount::from(meta.seq);
433                    actions.push(SenderAction::StopTimer { seq });
434                    meta.state = SlotState::Empty;
435                }
436            }
437        }
438
439        // Handle selective ACKs
440        for bit_pos in 0..16u16 {
441            if selective_bitmap & (1 << bit_pos) != 0 {
442                let acked_seq = (ack_val + 1 + bit_pos) & SequenceCount::MAX;
443                for meta in &mut self.meta {
444                    if meta.state != SlotState::Empty && meta.seq == acked_seq {
445                        freed_bytes += meta.len;
446                        let seq = SequenceCount::from(meta.seq);
447                        actions.push(SenderAction::StopTimer { seq });
448                        meta.state = SlotState::Empty;
449                        break;
450                    }
451                }
452            }
453        }
454
455        self.update_send_base();
456
457        if freed_bytes > 0 {
458            actions.push(SenderAction::SpaceAvailable {
459                bytes: self.available_bytes(),
460            });
461        }
462    }
463
464    /// Retransmit or declare loss when a retransmission timer expires.
465    fn handle_timeout(&mut self, seq: SequenceCount, actions: &mut SenderActions) {
466        let seq_val = seq.value();
467        let mut lost_segmented = false;
468
469        for i in 0..self.meta.len() {
470            if self.meta[i].state == SlotState::AwaitingAck && self.meta[i].seq == seq_val {
471                if self.meta[i].retransmit_count >= self.config.max_retransmits {
472                    actions.push(SenderAction::PacketLost { seq });
473                    lost_segmented = self.meta[i].is_segmented;
474                    self.meta[i].state = SlotState::Empty;
475                } else {
476                    self.meta[i].state = SlotState::PendingTransmit;
477                    self.meta[i].retransmit_count += 1;
478                    actions.push(SenderAction::Transmit {
479                        seq,
480                        rto_ticks: self.config.rto_ticks,
481                    });
482                }
483                break;
484            }
485        }
486
487        if lost_segmented {
488            actions.push(SenderAction::MessageLost);
489        }
490
491        self.update_send_base();
492    }
493
494    /// Mark a packet as transmitted (awaiting ACK).
495    ///
496    /// Call this after sending a packet from a `Transmit` action.
497    pub fn mark_transmitted(&mut self, seq: SequenceCount) {
498        for meta in &mut self.meta {
499            if meta.state == SlotState::PendingTransmit && meta.seq == seq.value() {
500                meta.state = SlotState::AwaitingAck;
501                break;
502            }
503        }
504    }
505
506    /// Recalculate `send_base` from the lowest unacknowledged sequence.
507    fn update_send_base(&mut self) {
508        let mut min_unacked = self.next_seq;
509        let mut found = false;
510
511        for meta in &self.meta {
512            if meta.state != SlotState::Empty {
513                found = true;
514                let diff = min_unacked.wrapping_sub(meta.seq) & SequenceCount::MAX;
515                if diff < (SequenceCount::MAX / 2) && diff > 0 {
516                    min_unacked = meta.seq;
517                }
518            }
519        }
520
521        self.send_base = if found { min_unacked } else { self.next_seq };
522    }
523
524    /// Compact the send buffer by shifting live payloads to the front.
525    fn compact(&mut self) {
526        let mut indices: [Option<usize>; WIN] = [None; WIN];
527        let mut count = 0;
528
529        for (i, m) in self.meta.iter().enumerate() {
530            if m.state != SlotState::Empty {
531                let offset = m.offset;
532                let mut insert_pos = count;
533                for j in 0..count {
534                    if let Some(idx) = indices[j] {
535                        if self.meta[idx].offset > offset {
536                            insert_pos = j;
537                            break;
538                        }
539                    }
540                }
541                for j in (insert_pos..count).rev() {
542                    indices[j + 1] = indices[j];
543                }
544                indices[insert_pos] = Some(i);
545                count += 1;
546            }
547        }
548
549        if count == 0 {
550            self.write_pos = 0;
551            return;
552        }
553
554        let mut new_pos = 0usize;
555        for idx_opt in indices.iter().take(count) {
556            if let Some(idx) = *idx_opt {
557                let old_offset = self.meta[idx].offset;
558                let len = self.meta[idx].len;
559
560                if new_pos != old_offset {
561                    self.data.copy_within(old_offset..old_offset + len, new_pos);
562                }
563
564                self.meta[idx].offset = new_pos;
565                new_pos += len;
566            }
567        }
568
569        self.write_pos = new_pos;
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576    use crate::transport::srspp::packet::SrsppDataPacket;
577
578    fn make_config() -> SenderConfig {
579        SenderConfig {
580            source_address: Address::satellite(1, 5),
581            apid: Apid::new(0x42).unwrap(),
582            function_code: 0,
583            rto_ticks: 100,
584            max_retransmits: 3,
585            header_overhead: SrsppDataPacket::HEADER_SIZE,
586        }
587    }
588
589    fn target() -> Address {
590        Address::satellite(2, 3)
591    }
592
593    #[test]
594    fn test_send_emits_transmit_action() {
595        let mut sender: SenderMachine<8, 4096, 512> = SenderMachine::new(make_config());
596        let mut actions = SenderActions::new();
597
598        sender
599            .handle(
600                SenderEvent::SendRequest {
601                    target: target(),
602                    data: &[1, 2, 3],
603                },
604                &mut actions,
605            )
606            .unwrap();
607
608        // Should have one Transmit action
609        assert_eq!(actions.len(), 1);
610        let action = actions.iter().next().unwrap();
611        assert!(matches!(
612            action,
613            SenderAction::Transmit { seq, rto_ticks: 100 } if seq.value() == 0
614        ));
615
616        let info = sender.get_payload(SequenceCount::from(0)).unwrap();
617        assert!(!info.payload.is_empty());
618    }
619
620    #[test]
621    fn test_mark_transmitted() {
622        let mut sender: SenderMachine<8, 4096, 512> = SenderMachine::new(make_config());
623        let mut actions = SenderActions::new();
624
625        sender
626            .handle(
627                SenderEvent::SendRequest {
628                    target: target(),
629                    data: &[1, 2, 3],
630                },
631                &mut actions,
632            )
633            .unwrap();
634
635        // Before marking - packet is PendingTransmit
636        assert_eq!(sender.unacked_count(), 1);
637
638        // Mark as transmitted
639        sender.mark_transmitted(SequenceCount::from(0));
640
641        // Still unacked (now AwaitingAck)
642        assert_eq!(sender.unacked_count(), 1);
643    }
644
645    #[test]
646    fn test_ack_stops_timer() {
647        let mut sender: SenderMachine<8, 4096, 512> = SenderMachine::new(make_config());
648        let mut actions = SenderActions::new();
649
650        sender
651            .handle(
652                SenderEvent::SendRequest {
653                    target: target(),
654                    data: &[1, 2, 3],
655                },
656                &mut actions,
657            )
658            .unwrap();
659        sender.mark_transmitted(SequenceCount::from(0));
660
661        sender
662            .handle(
663                SenderEvent::AckReceived {
664                    cumulative_ack: SequenceCount::from(0),
665                    selective_bitmap: 0,
666                },
667                &mut actions,
668            )
669            .unwrap();
670
671        assert!(
672            actions
673                .iter()
674                .any(|a| matches!(a, SenderAction::StopTimer { seq } if seq.value() == 0))
675        );
676        assert!(
677            actions
678                .iter()
679                .any(|a| matches!(a, SenderAction::SpaceAvailable { .. }))
680        );
681        assert!(sender.is_idle());
682    }
683
684    #[test]
685    fn test_timeout_emits_retransmit() {
686        let mut sender: SenderMachine<8, 4096, 512> = SenderMachine::new(make_config());
687        let mut actions = SenderActions::new();
688
689        sender
690            .handle(
691                SenderEvent::SendRequest {
692                    target: target(),
693                    data: &[1, 2, 3],
694                },
695                &mut actions,
696            )
697            .unwrap();
698        sender.mark_transmitted(SequenceCount::from(0));
699
700        // Timeout
701        sender
702            .handle(
703                SenderEvent::RetransmitTimeout {
704                    seq: SequenceCount::from(0),
705                },
706                &mut actions,
707            )
708            .unwrap();
709
710        // Should emit another Transmit
711        assert!(
712            actions
713                .iter()
714                .any(|a| matches!(a, SenderAction::Transmit { seq, .. } if seq.value() == 0))
715        );
716    }
717
718    #[test]
719    fn test_max_retransmits_emits_packet_lost() {
720        let mut sender: SenderMachine<8, 4096, 512> = SenderMachine::new(make_config());
721        let mut actions = SenderActions::new();
722
723        sender
724            .handle(
725                SenderEvent::SendRequest {
726                    target: target(),
727                    data: &[1, 2, 3],
728                },
729                &mut actions,
730            )
731            .unwrap();
732        sender.mark_transmitted(SequenceCount::from(0));
733
734        // Timeout multiple times (max_retransmits = 3)
735        for _ in 0..3 {
736            sender
737                .handle(
738                    SenderEvent::RetransmitTimeout {
739                        seq: SequenceCount::from(0),
740                    },
741                    &mut actions,
742                )
743                .unwrap();
744            sender.mark_transmitted(SequenceCount::from(0));
745        }
746
747        // One more timeout should trigger PacketLost
748        sender
749            .handle(
750                SenderEvent::RetransmitTimeout {
751                    seq: SequenceCount::from(0),
752                },
753                &mut actions,
754            )
755            .unwrap();
756
757        assert!(
758            actions
759                .iter()
760                .any(|a| matches!(a, SenderAction::PacketLost { seq } if seq.value() == 0))
761        );
762        assert!(sender.is_idle());
763    }
764
765    #[test]
766    fn test_segmentation() {
767        let mut sender: SenderMachine<8, 4096, 64> = SenderMachine::new(make_config());
768        let mut actions = SenderActions::new();
769
770        let data = [0u8; 150];
771        sender
772            .handle(
773                SenderEvent::SendRequest {
774                    target: target(),
775                    data: &data,
776                },
777                &mut actions,
778            )
779            .unwrap();
780
781        let max_payload = 64 - SrsppDataPacket::HEADER_SIZE;
782        let expected = (150 + max_payload - 1) / max_payload;
783        let transmit_count = actions
784            .iter()
785            .filter(|a| matches!(a, SenderAction::Transmit { .. }))
786            .count();
787        assert_eq!(transmit_count, expected);
788    }
789
790    #[test]
791    fn test_selective_ack() {
792        let mut sender: SenderMachine<8, 4096, 512> = SenderMachine::new(make_config());
793        let mut actions = SenderActions::new();
794
795        // Send 3 packets
796        for i in 0..3 {
797            sender
798                .handle(
799                    SenderEvent::SendRequest {
800                        target: target(),
801                        data: &[i as u8; 10],
802                    },
803                    &mut actions,
804                )
805                .unwrap();
806            sender.mark_transmitted(SequenceCount::from(i));
807        }
808
809        // ACK packet 0 cumulatively, packet 2 selectively (bitmap bit 1 = seq 2)
810        sender
811            .handle(
812                SenderEvent::AckReceived {
813                    cumulative_ack: SequenceCount::from(0),
814                    selective_bitmap: 0b0010, // bit 1 = cumulative + 2 = seq 2
815                },
816                &mut actions,
817            )
818            .unwrap();
819
820        // Should have StopTimer for 0 and 2, but not 1
821        let stopped: heapless::Vec<u16, 8> = actions
822            .iter()
823            .filter_map(|a| {
824                if let SenderAction::StopTimer { seq } = a {
825                    Some(seq.value())
826                } else {
827                    None
828                }
829            })
830            .collect();
831
832        assert!(stopped.contains(&0));
833        assert!(stopped.contains(&2));
834        assert!(!stopped.contains(&1));
835
836        // Packet 1 still pending
837        assert!(!sender.is_idle());
838        assert!(sender.get_payload(SequenceCount::from(1)).is_some());
839    }
840
841    #[test]
842    fn test_message_lost_on_segmented_packet_loss() {
843        let mut sender: SenderMachine<8, 4096, 64> = SenderMachine::new(make_config());
844        let mut actions = SenderActions::new();
845
846        let data = [0u8; 150];
847        sender
848            .handle(
849                SenderEvent::SendRequest {
850                    target: target(),
851                    data: &data,
852                },
853                &mut actions,
854            )
855            .unwrap();
856
857        let transmit_count = actions
858            .iter()
859            .filter(|a| matches!(a, SenderAction::Transmit { .. }))
860            .count();
861        assert!(transmit_count >= 3);
862
863        for i in 0..transmit_count as u16 {
864            sender.mark_transmitted(SequenceCount::from(i));
865        }
866
867        for _ in 0..3 {
868            sender
869                .handle(
870                    SenderEvent::RetransmitTimeout {
871                        seq: SequenceCount::from(1),
872                    },
873                    &mut actions,
874                )
875                .unwrap();
876            sender.mark_transmitted(SequenceCount::from(1));
877        }
878
879        sender
880            .handle(
881                SenderEvent::RetransmitTimeout {
882                    seq: SequenceCount::from(1),
883                },
884                &mut actions,
885            )
886            .unwrap();
887
888        assert!(
889            actions
890                .iter()
891                .any(|a| matches!(a, SenderAction::PacketLost { seq } if seq.value() == 1))
892        );
893        assert!(
894            actions
895                .iter()
896                .any(|a| matches!(a, SenderAction::MessageLost))
897        );
898    }
899}