Skip to main content

leodos_protocols/transport/srspp/api/tokio/
receiver.rs

1use crate::network::NetworkRead;
2use crate::network::NetworkWrite;
3use crate::network::isl::address::Address;
4use crate::network::spp::Apid;
5use crate::network::spp::SequenceCount;
6use crate::transport::srspp::machine::receiver::AckState;
7use crate::transport::srspp::machine::receiver::HandleResult;
8use crate::transport::srspp::machine::receiver::ReceiverBackend;
9use crate::transport::srspp::machine::receiver::ReceiverConfig;
10use crate::transport::srspp::machine::receiver::TimerAction;
11use crate::transport::srspp::packet::SrsppAckPacket;
12use crate::transport::srspp::packet::SrsppDataPacket;
13use crate::transport::srspp::packet::SrsppPacket;
14use crate::transport::srspp::packet::SrsppType;
15use tokio::time::Instant;
16
17use super::SrsppError;
18use super::sleep_until;
19use super::ticks_to_duration;
20
21/// Async srspp receiver.
22///
23/// Receives messages reliably over the link, handling reordering and reassembly.
24/// Sends ACKs to the remote sender.
25pub struct SrsppReceiver<
26    L: NetworkWrite + NetworkRead<Error = <L as NetworkWrite>::Error>,
27    R: ReceiverBackend,
28    const MTU: usize,
29> {
30    /// Network link for receiving data and sending ACKs.
31    link: L,
32    /// Local address used as the source in outgoing ACKs.
33    local_address: Address,
34    /// APID used in outgoing ACK packets.
35    apid: Apid,
36    /// Function code used in outgoing ACK packets.
37    function_code: u8,
38    /// Receiver backend handling reordering and reassembly.
39    machine: R,
40    /// ACK and timer state.
41    ack_state: AckState,
42    /// Deadline for the delayed ACK timer.
43    ack_timer: Option<Instant>,
44    /// Deadline for the progress (inactivity) timer.
45    progress_timer: Option<Instant>,
46    /// Tick rate used to convert timer ticks to durations.
47    ticks_per_sec: u32,
48    /// Buffer for receiving data packets from the link.
49    recv_buffer: [u8; MTU],
50    /// Buffer for building outgoing ACK packets.
51    ack_buffer: [u8; 32],
52}
53
54impl<
55    L: NetworkWrite + NetworkRead<Error = <L as NetworkWrite>::Error>,
56    R: ReceiverBackend,
57    const MTU: usize,
58> SrsppReceiver<L, R, MTU>
59{
60    /// Create a new receiver for a specific remote sender.
61    pub fn new(
62        config: ReceiverConfig,
63        remote_address: Address,
64        link: L,
65        ticks_per_sec: u32,
66    ) -> Self {
67        let local_address = config.local_address;
68        let apid = config.apid;
69        let function_code = config.function_code;
70        let ack_state = AckState::new(&config, remote_address);
71        Self {
72            link,
73            local_address,
74            apid,
75            function_code,
76            machine: R::new(),
77            ack_state,
78            ack_timer: None,
79            progress_timer: None,
80            ticks_per_sec,
81            recv_buffer: [0u8; MTU],
82            ack_buffer: [0u8; 32],
83        }
84    }
85
86    /// Receive the next complete message.
87    ///
88    /// Blocks until a message is available.
89    pub async fn recv(&mut self, buf: &mut [u8]) -> Result<usize, SrsppError> {
90        loop {
91            if let Some(msg) = self.machine.take_message() {
92                let len = msg.len().min(buf.len());
93                buf[..len].copy_from_slice(&msg[..len]);
94                return Ok(len);
95            }
96
97            self.poll().await?;
98        }
99    }
100
101    /// Try to receive a message without blocking.
102    ///
103    /// Returns `None` if no complete message is available.
104    pub fn try_recv(&mut self, buf: &mut [u8]) -> Option<usize> {
105        self.machine.take_message().map(|m| {
106            let len = m.len().min(buf.len());
107            buf[..len].copy_from_slice(&m[..len]);
108            len
109        })
110    }
111
112    /// Wait for a complete message to become available.
113    ///
114    /// Returns a [`DeliveryToken`] that borrows `&mut self`,
115    /// preventing further receives while the token is held.
116    /// Call [`DeliveryToken::consume`] with a synchronous closure
117    /// to read the message data without an intermediate copy.
118    pub async fn wait_for_message(&mut self) -> Result<DeliveryToken<'_, L, R, MTU>, SrsppError> {
119        loop {
120            if let Some(len) = self.machine.message_len() {
121                return Ok(DeliveryToken {
122                    rx: self,
123                    msg_len: len,
124                });
125            }
126            self.poll().await?;
127        }
128    }
129
130    /// Wait for a message and process it in-place with a closure.
131    ///
132    /// Equivalent to `wait_for_message().await?.consume(f)` but
133    /// more concise when you don't need the [`DeliveryToken`]
134    /// metadata.
135    pub async fn recv_with<F, Ret>(&mut self, f: F) -> Result<Ret, SrsppError>
136    where
137        F: FnOnce(&[u8]) -> Ret,
138    {
139        let token = self.wait_for_message().await?;
140        Ok(token.consume(f))
141    }
142
143    /// Poll for incoming data and handle timers.
144    pub async fn poll(&mut self) -> Result<(), SrsppError> {
145        tokio::select! {
146            biased;
147
148            result = self.link.read(&mut self.recv_buffer) => {
149                let len = result.map_err(|e| SrsppError::Network(e.to_string()))?;
150                self.handle_incoming(&self.recv_buffer[..len].to_vec()).await?;
151            }
152
153            _ = sleep_until(self.ack_timer) => {
154                self.handle_ack_timeout().await?;
155            }
156
157            _ = sleep_until(self.progress_timer) => {
158                self.handle_progress_timeout().await?;
159            }
160        }
161
162        Ok(())
163    }
164
165    /// Parses an incoming packet and processes it if it is a data packet.
166    async fn handle_incoming(&mut self, packet: &[u8]) -> Result<(), SrsppError> {
167        let parsed =
168            SrsppPacket::parse(packet).map_err(|e| SrsppError::PacketError(format!("{:?}", e)))?;
169        let srspp_type = parsed
170            .srspp_type()
171            .map_err(|e| SrsppError::PacketError(format!("{:?}", e)))?;
172
173        if srspp_type == SrsppType::Data {
174            let data = SrsppDataPacket::parse(packet)
175                .map_err(|e| SrsppError::PacketError(format!("{:?}", e)))?;
176
177            let outcome = self.machine.handle_data(
178                data.primary.sequence_count(),
179                data.primary.sequence_flag(),
180                &data.payload,
181            )?;
182
183            let result = self.ack_state.on_data(
184                outcome,
185                self.machine.expected_seq(),
186                self.machine.recv_bitmap(),
187            );
188            self.apply_result(result).await?;
189        }
190
191        Ok(())
192    }
193
194    /// Fires when the delayed ACK timer expires and sends an ACK.
195    async fn handle_ack_timeout(&mut self) -> Result<(), SrsppError> {
196        self.ack_timer = None;
197        let result = self.ack_state.on_ack_timeout(
198            self.machine.expected_seq(),
199            self.machine.recv_bitmap(),
200        );
201        self.apply_result(result).await
202    }
203
204    /// Fires when the progress timer expires due to sender inactivity.
205    async fn handle_progress_timeout(&mut self) -> Result<(), SrsppError> {
206        self.progress_timer = None;
207        let outcome = self.machine.skip_gap()?;
208        let result = self.ack_state.on_gap_skip(outcome);
209        self.apply_result(result).await
210    }
211
212    /// Sends ACK and updates timers based on a state machine result.
213    async fn apply_result(&mut self, result: HandleResult) -> Result<(), SrsppError> {
214        if let Some(ack) = result.ack {
215            let ack = SrsppAckPacket::builder()
216                .buffer(&mut self.ack_buffer)
217                .source_address(self.local_address)
218                .target(ack.destination)
219                .apid(self.apid)
220                .function_code(self.function_code)
221                .cumulative_ack(ack.cumulative_ack)
222                .sequence_count(SequenceCount::new())
223                .selective_bitmap(ack.selective_bitmap)
224                .build()
225                .map_err(|e| SrsppError::PacketError(format!("{:?}", e)))?;
226
227            self.link
228                .write(zerocopy::IntoBytes::as_bytes(ack))
229                .await
230                .map_err(|e| SrsppError::Network(e.to_string()))?;
231        }
232
233        if let Some(action) = result.ack_timer {
234            self.ack_timer = match action {
235                TimerAction::Start { ticks } => {
236                    Some(Instant::now() + ticks_to_duration(ticks, self.ticks_per_sec))
237                }
238                TimerAction::Stop => None,
239            };
240        }
241
242        if let Some(action) = result.progress_timer {
243            self.progress_timer = match action {
244                TimerAction::Start { ticks } => {
245                    Some(Instant::now() + ticks_to_duration(ticks, self.ticks_per_sec))
246                }
247                TimerAction::Stop => None,
248            };
249        }
250
251        Ok(())
252    }
253}
254
255/// Zero-copy delivery token returned by
256/// [`SrsppReceiver::wait_for_message`].
257///
258/// Holds `&mut SrsppReceiver`, preventing any I/O while the
259/// token is alive.  Call [`consume`](Self::consume) with a
260/// synchronous closure to read the message and release the
261/// token in one step.
262pub struct DeliveryToken<
263    'a,
264    L: NetworkWrite + NetworkRead<Error = <L as NetworkWrite>::Error>,
265    R: ReceiverBackend,
266    const MTU: usize,
267> {
268    rx: &'a mut SrsppReceiver<L, R, MTU>,
269    msg_len: usize,
270}
271
272impl<
273    'a,
274    L: NetworkWrite + NetworkRead<Error = <L as NetworkWrite>::Error>,
275    R: ReceiverBackend,
276    const MTU: usize,
277> DeliveryToken<'a, L, R, MTU>
278{
279    /// Byte length of the pending message.
280    pub fn len(&self) -> usize {
281        self.msg_len
282    }
283
284    /// Pass the message data to `f`, consume the token, and
285    /// return whatever `f` returns.
286    pub fn consume<F, Ret>(self, f: F) -> Ret
287    where
288        F: FnOnce(&[u8]) -> Ret,
289    {
290        self.rx.machine.consume_message(f).unwrap()
291    }
292}