Skip to main content

leodos_protocols/transport/cfdp/class2/machine/sender/
mod.rs

1//! Synchronous state machine for the sending-side of a CFDP transaction.
2//!
3//! This module contains the [`SenderMachine`], which processes [`Event`]s and
4//! produces [`Action`]s. It is responsible for initiating file transfers,
5//! sending file data, and handling acknowledgments from the receiver. It is
6//! designed to be completely independent of the underlying I/O and timing mechanisms,
7//! making it portable and easily testable.
8
9/// Output actions produced by the sender state machine.
10pub mod action;
11/// Input events that drive the sender state machine.
12pub mod event;
13/// Transaction state and configuration for the sender.
14pub mod transaction;
15
16use crate::transport::cfdp::CfdpError;
17use crate::transport::cfdp::class2::machine::FILE_DATA_CHUNK_SIZE;
18use crate::transport::cfdp::class2::machine::MAX_CONCURRENT_TRANSACTIONS;
19use crate::transport::cfdp::class2::machine::PromptType;
20use crate::transport::cfdp::class2::machine::TimerType;
21use crate::transport::cfdp::class2::machine::sender::action::Action;
22use crate::transport::cfdp::class2::machine::sender::action::Actions;
23use crate::transport::cfdp::class2::machine::sender::event::Event;
24use crate::transport::cfdp::class2::machine::sender::transaction::Transaction;
25use crate::transport::cfdp::class2::machine::sender::transaction::TransactionState;
26use crate::transport::cfdp::class2::machine::transaction::TransactionConfig;
27use crate::transport::cfdp::class2::machine::transaction::TransactionId;
28use crate::transport::cfdp::filestore::FileId;
29use crate::transport::cfdp::pdu::EntityId;
30use crate::transport::cfdp::pdu::Pdu;
31use crate::transport::cfdp::pdu::PduVariant;
32use crate::transport::cfdp::pdu::TransactionSeqNum;
33use crate::transport::cfdp::pdu::file_directive::ConditionCode;
34use crate::transport::cfdp::pdu::file_directive::ack::AckPdu;
35use crate::transport::cfdp::pdu::file_directive::ack::AckedDirectiveCode;
36use crate::transport::cfdp::pdu::file_directive::ack::TransactionStatus;
37use crate::transport::cfdp::pdu::file_directive::finished::FinishedPdu;
38use crate::transport::cfdp::pdu::file_directive::keepalive::KeepAlivePdu;
39use crate::transport::cfdp::pdu::file_directive::metadata::ChecksumType;
40use crate::transport::cfdp::pdu::file_directive::nak::NakPdu;
41use crate::transport::cfdp::pdu::tlv::fault_handler_override::HandlerCode;
42use heapless::LinearMap;
43
44/// Manages the state of all active sending ('source') transactions.
45///
46/// This struct holds a map of all transactions for which this entity is the sender.
47/// It should be driven by a `Runner` which feeds it events and executes the
48/// resulting actions.
49#[derive(Debug)]
50pub struct SenderMachine {
51    /// A map of active transactions, keyed by their unique `TransactionId`.
52    transactions: LinearMap<TransactionId, Transaction, MAX_CONCURRENT_TRANSACTIONS>,
53    /// The next transaction sequence number to be used for a new `PutRequest`.
54    next_seq_num: TransactionSeqNum,
55    /// The CFDP Entity ID of this local entity.
56    id: EntityId,
57    /// The keep-alive limit for transactions.
58    keep_alive_limit: u64,
59    /// The timeout in seconds to wait for an expected ACK PDU.
60    ack_timeout_secs: u16,
61}
62
63impl SenderMachine {
64    /// Creates a new `SenderMachine` with the given local entity ID.
65    pub fn new(id: EntityId, ack_timeout_secs: u16, keep_alive_limit: u64) -> Self {
66        Self {
67            transactions: LinearMap::new(),
68            next_seq_num: TransactionSeqNum::default(),
69            id,
70            ack_timeout_secs,
71            keep_alive_limit,
72        }
73    }
74
75    /// Creates a new transaction and registers it in the active transactions map.
76    fn create_transation<'a>(
77        &mut self,
78        destination_id: EntityId,
79        source_file_id: FileId,
80        destination_file_id: FileId,
81        file_size: u64,
82        checksum_type: ChecksumType,
83    ) -> Result<TransactionId, CfdpError> {
84        let transaction_id = TransactionId {
85            source_id: self.id,
86            seq_num: self.next_seq_num,
87        };
88        self.next_seq_num.increment();
89        let config = TransactionConfig {
90            transaction_id,
91            destination_id,
92            source_file_id,
93            destination_file_id,
94            file_size,
95            inactivity_timeout_secs: 30,
96            checksum_type,
97            fault_handlers: Default::default(),
98        };
99        let transaction = Transaction {
100            config,
101            state: TransactionState::SendingFileData,
102            suspended: false,
103            progress: 0,
104            ack_retries: 0,
105            file_checksum: None,
106            last_receiver_progress: 0,
107            ack_limit: 5,
108            keep_alive_limit: self.keep_alive_limit,
109        };
110        self.transactions
111            .insert(transaction_id, transaction)
112            .map_err(|_| CfdpError::TooManyConcurrentTransactions)?;
113        Ok(transaction_id)
114    }
115
116    /// The primary state machine logic for the sender.
117    ///
118    /// It takes an `Event` as input and returns a `Vec` of `Action`s for the
119    /// `Runner` to execute. This function is pure and has no side effects.
120    pub fn handle<'a>(
121        &mut self,
122        actions: &mut Actions<'a>,
123        event: Event<'a>,
124    ) -> Result<(), CfdpError> {
125        actions.clear();
126        match event {
127            Event::PutRequest {
128                source_file_name,
129                destination_file_name,
130                destination_id,
131                file_size,
132                checksum_type,
133            } => self.handle_put_request(
134                destination_id,
135                source_file_name,
136                destination_file_name,
137                file_size,
138                checksum_type,
139                actions,
140            )?,
141            Event::DataSegmentReady {
142                transaction_id,
143                data,
144                offset,
145            } => self.handle_data_segment_ready(transaction_id, offset, data, actions)?,
146            Event::ChecksumReady {
147                transaction_id,
148                checksum,
149            } => self.handle_checksum(transaction_id, checksum, actions)?,
150            Event::PduReceived {
151                pdu,
152                transaction_id,
153            } => self.handle_pdu_received(pdu, transaction_id, actions)?,
154            Event::TimerExpired {
155                timer_type,
156                transaction_id,
157            } => self.handle_timer_expired(transaction_id, timer_type, actions)?,
158            Event::PromptRequest {
159                transaction_id,
160                prompt_type,
161            } => self.handle_prompt(transaction_id, prompt_type, actions)?,
162            Event::SuspendRequest { transaction_id } => {
163                self.handle_suspend(transaction_id, actions)?
164            }
165            Event::ResumeRequest { transaction_id } => {
166                self.handle_resume(transaction_id, actions)?
167            }
168        }
169        Ok(())
170    }
171
172    /// Dispatches a prompt request to the appropriate handler.
173    fn handle_prompt<'a>(
174        &mut self,
175        transaction_id: TransactionId,
176        prompt_type: PromptType,
177        actions: &mut Actions<'a>,
178    ) -> Result<(), CfdpError> {
179        match prompt_type {
180            PromptType::Nak => self.handle_prompt_nak(transaction_id, actions)?,
181            PromptType::KeepAlive => self.handle_prompt_keep_alive(transaction_id, actions)?,
182        }
183        Ok(())
184    }
185
186    /// Sends a Prompt(KeepAlive) if the transaction is actively sending data.
187    fn handle_prompt_keep_alive<'a>(
188        &mut self,
189        transaction_id: TransactionId,
190        actions: &mut Actions<'a>,
191    ) -> Result<(), CfdpError> {
192        let Some(transaction) = self.transactions.get(&transaction_id) else {
193            return Ok(());
194        };
195
196        // It only makes sense to send a Prompt(KeepAlive) while we are actively sending
197        // data and before the EOF has been sent.
198        match transaction.state {
199            TransactionState::SendingFileData if !transaction.suspended => {
200                actions.push(Action::SendPrompt {
201                    transaction_id,
202                    destination_id: transaction.config.destination_id,
203                    prompt_type: PromptType::KeepAlive,
204                })?;
205            }
206            _ => {}
207        }
208
209        Ok(())
210    }
211
212    /// Routes a received PDU to the appropriate handler based on its variant.
213    fn handle_pdu_received<'a>(
214        &mut self,
215        pdu: &'a Pdu,
216        transaction_id: TransactionId,
217        actions: &mut Actions<'a>,
218    ) -> Result<(), CfdpError> {
219        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
220            return Ok(());
221        };
222        match pdu.variant()? {
223            PduVariant::Finished(v) => self.handle_finished(transaction_id, v, actions)?,
224            PduVariant::Ack(v) => self.handle_ack(transaction_id, v, actions)?,
225            // Process active data transfers only if not suspended
226            PduVariant::Nak(v) if !transaction.suspended => {
227                self.handle_nak(transaction_id, v, actions)?
228            }
229            PduVariant::KeepAlive(v) if !transaction.suspended => {
230                self.handle_keep_alive(transaction_id, v, actions)?
231            }
232            _ => {}
233        }
234        Ok(())
235    }
236
237    /// Processes a ready data segment by sending it and requesting the next chunk.
238    fn handle_data_segment_ready<'a>(
239        &mut self,
240        transaction_id: TransactionId,
241        offset: u64,
242        data: &'a [u8],
243        actions: &mut Actions<'a>,
244    ) -> Result<(), CfdpError> {
245        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
246            return Ok(());
247        };
248
249        if transaction.suspended {
250            return Ok(());
251        }
252
253        actions.push(Action::SendFileData {
254            destination_id: transaction.config.destination_id,
255            transaction_id,
256            offset,
257            data,
258        })?;
259
260        transaction.progress = offset + data.len() as u64;
261
262        if transaction.progress >= transaction.config.file_size {
263            actions.push(Action::CalculateChecksum {
264                transaction_id,
265                checksum_type: transaction.config.checksum_type,
266            })?;
267        } else {
268            actions.push(Action::ReadDataSegment {
269                transaction_id,
270                start_offset: transaction.progress,
271                end_offset: transaction.progress + FILE_DATA_CHUNK_SIZE as u64,
272            })?;
273        }
274        Ok(())
275    }
276
277    /// Initiates a new file transfer by creating a transaction and sending metadata.
278    fn handle_put_request<'a>(
279        &mut self,
280        destination_id: EntityId,
281        source_file_name: FileId,
282        destination_file_name: FileId,
283        file_size: u64,
284        checksum_type: ChecksumType,
285        actions: &mut Actions<'a>,
286    ) -> Result<(), CfdpError> {
287        let transaction_id = self.create_transation(
288            destination_id,
289            source_file_name,
290            destination_file_name,
291            file_size,
292            checksum_type,
293        )?;
294        actions.push(Action::SendMetadata {
295            destination_id,
296            transaction_id,
297            file_size: 0,
298            source_file_name,
299            destination_file_name,
300            checksum_type,
301        })?;
302        actions.push(Action::ReadDataSegment {
303            transaction_id,
304            start_offset: 0,
305            end_offset: FILE_DATA_CHUNK_SIZE as u64,
306        })?;
307        Ok(())
308    }
309
310    /// Stores the computed checksum and sends the EOF PDU.
311    fn handle_checksum<'a>(
312        &mut self,
313        transaction_id: TransactionId,
314        checksum: u32,
315        actions: &mut Actions<'a>,
316    ) -> Result<(), CfdpError> {
317        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
318            return Ok(());
319        };
320        transaction.file_checksum = Some(checksum);
321        self.send_eof(transaction_id, checksum, actions)
322    }
323
324    /// Handles timer expiry by retrying or faulting based on timer type.
325    fn handle_timer_expired<'a>(
326        &mut self,
327        transaction_id: TransactionId,
328        timer_type: TimerType,
329        actions: &mut Actions<'a>,
330    ) -> Result<(), CfdpError> {
331        match timer_type {
332            TimerType::Ack => {
333                let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
334                    return Ok(());
335                };
336                if transaction.ack_retries >= transaction.ack_limit {
337                    // Exceeded ACK retries, fault the transaction
338                    self.handle_terminate(transaction_id, ConditionCode::AckLimitReached, actions)?;
339                    return Ok(());
340                }
341                // Retry sending the EOF PDU
342                transaction.ack_retries += 1;
343                if transaction.state != TransactionState::WaitingForEofAck {
344                    return Ok(());
345                }
346                let checksum = transaction
347                    .file_checksum
348                    .ok_or_else(|| CfdpError::Custom("Checksum not calculated"))?;
349                self.send_eof(transaction_id, checksum, actions)?;
350            }
351            TimerType::Inactivity => {
352                let Some(transaction) = self.transactions.get(&transaction_id) else {
353                    return Ok(());
354                };
355                if transaction.state != TransactionState::WaitingForFinishedPdu {
356                    return Ok(());
357                }
358                self.handle_fault(transaction_id, ConditionCode::InactivityDetected, actions)?;
359            }
360            TimerType::Nak => {}
361            TimerType::KeepAlive => {}
362        }
363        Ok(())
364    }
365
366    /// Sends a Prompt(NAK) if the transaction is in the data-sending phase.
367    fn handle_prompt_nak<'a>(
368        &mut self,
369        transaction_id: TransactionId,
370        actions: &mut Actions<'a>,
371    ) -> Result<(), CfdpError> {
372        let Some(transaction) = self.transactions.get(&transaction_id) else {
373            return Ok(());
374        };
375        if transaction.state != TransactionState::SendingFileData {
376            return Ok(());
377        }
378        actions.push(Action::SendPrompt {
379            destination_id: transaction.config.destination_id,
380            transaction_id,
381            prompt_type: PromptType::Nak,
382        })?;
383        Ok(())
384    }
385
386    /// Suspends a transaction by stopping its timers and marking it suspended.
387    fn handle_suspend<'a>(
388        &mut self,
389        transaction_id: TransactionId,
390        actions: &mut Actions<'a>,
391    ) -> Result<(), CfdpError> {
392        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
393            return Ok(());
394        };
395        if transaction.suspended {
396            return Ok(());
397        }
398        transaction.suspended = true;
399        actions.push(Action::StopTimer {
400            transaction_id,
401            timer_type: None,
402        })?;
403        actions.push(Action::NotifySuspended { transaction_id })?;
404        Ok(())
405    }
406
407    /// Resumes a suspended transaction from its current state.
408    fn handle_resume<'a>(
409        &mut self,
410        transaction_id: TransactionId,
411        actions: &mut Actions<'a>,
412    ) -> Result<(), CfdpError> {
413        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
414            return Ok(());
415        };
416        if !transaction.suspended {
417            return Ok(());
418        }
419        let progress = transaction.progress;
420        transaction.suspended = false;
421        match transaction.state {
422            TransactionState::SendingFileData => {
423                actions.push(Action::ReadDataSegment {
424                    transaction_id,
425                    start_offset: transaction.progress,
426                    end_offset: transaction.progress + FILE_DATA_CHUNK_SIZE as u64,
427                })?;
428            }
429            TransactionState::WaitingForEofAck => {
430                let checksum = transaction
431                    .file_checksum
432                    .ok_or(CfdpError::Custom("Checksum missing on resume"))?;
433                self.send_eof(transaction_id, checksum, actions)?;
434            }
435            TransactionState::WaitingForFinishedPdu => {
436                actions.push(Action::StartTimer {
437                    timer_type: TimerType::Inactivity,
438                    seconds: transaction.config.inactivity_timeout_secs,
439                    transaction_id,
440                })?;
441            }
442            TransactionState::WaitingForChecksum => {
443                actions.push(Action::CalculateChecksum {
444                    transaction_id,
445                    checksum_type: transaction.config.checksum_type,
446                })?;
447            }
448        }
449        actions.push(Action::NotifyResumed {
450            transaction_id,
451            progress,
452        })?;
453        Ok(())
454    }
455
456    /// Processes a Finished PDU by sending an ACK and terminating the transaction.
457    fn handle_finished<'a>(
458        &mut self,
459        transaction_id: TransactionId,
460        fin: &FinishedPdu,
461        actions: &mut Actions<'a>,
462    ) -> Result<(), CfdpError> {
463        let Some(transaction) = self.transactions.get(&transaction_id) else {
464            return Ok(());
465        };
466        if transaction.state != TransactionState::WaitingForFinishedPdu {
467            return Err(CfdpError::Custom(
468                "Received Finished PDU in unexpected transaction state",
469            ));
470        }
471
472        // Per Spec 4.11.1.1.2.b, a sender's timers should be terminated upon completion.
473        actions.push(Action::StopTimer {
474            transaction_id,
475            timer_type: None,
476        })?;
477
478        // Per Spec 4.7.2, upon receiving a PDU requiring an ACK (which Finished does, per 4.6.4.3.5),
479        // we must immediately issue the Expected Response.
480        actions.push(Action::SendAck {
481            transaction_id,
482            destination_id: transaction.config.destination_id,
483            acked_directive_code: AckedDirectiveCode::Finished,
484            condition_code: fin.condition_code()?,
485            transaction_status: TransactionStatus::Terminated,
486        })?;
487
488        self.handle_terminate(transaction_id, fin.condition_code()?, actions)?;
489        Ok(())
490    }
491
492    /// Sends an EOF PDU and starts the ACK timer.
493    fn send_eof<'a>(
494        &mut self,
495        transaction_id: TransactionId,
496        checksum: u32,
497        actions: &mut Actions<'a>,
498    ) -> Result<(), CfdpError> {
499        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
500            return Ok(());
501        };
502        actions.push(Action::SendEof {
503            destination_id: transaction.config.destination_id,
504            transaction_id,
505            condition_code: ConditionCode::NoError,
506            file_size: transaction.config.file_size,
507            checksum,
508        })?;
509        transaction.state = TransactionState::WaitingForEofAck;
510        actions.push(Action::StartTimer {
511            timer_type: TimerType::Ack,
512            seconds: self.ack_timeout_secs,
513            transaction_id,
514        })?;
515        Ok(())
516    }
517
518    /// Processes an ACK PDU, transitioning state on EOF acknowledgment.
519    fn handle_ack<'a>(
520        &mut self,
521        transaction_id: TransactionId,
522        ack: &AckPdu,
523        actions: &mut Actions<'a>,
524    ) -> Result<(), CfdpError> {
525        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
526            return Ok(());
527        };
528        match (transaction.state, ack.acked_directive_code()?) {
529            (TransactionState::WaitingForEofAck, AckedDirectiveCode::Eof) => {
530                actions.push(Action::StopTimer {
531                    timer_type: Some(TimerType::Ack),
532                    transaction_id,
533                })?;
534                transaction.state = TransactionState::WaitingForFinishedPdu;
535                actions.push(Action::StartTimer {
536                    timer_type: TimerType::Inactivity,
537                    seconds: transaction.config.inactivity_timeout_secs,
538                    transaction_id,
539                })?;
540            }
541            _ => {}
542        }
543        Ok(())
544    }
545
546    /// Handles a NAK by scheduling retransmission of the requested segments.
547    fn handle_nak<'a>(
548        &mut self,
549        transaction_id: TransactionId,
550        nak: NakPdu<'a>,
551        actions: &mut Actions<'a>,
552    ) -> Result<(), CfdpError> {
553        if self.transactions.get(&transaction_id).is_none() {
554            return Ok(());
555        }
556        actions.push(Action::StopTimer {
557            timer_type: Some(TimerType::Ack),
558            transaction_id,
559        })?;
560        actions.push(Action::ReadDataSegmentBatch {
561            transaction_id,
562            segments: nak.segment_requests()?,
563        })?;
564        Ok(())
565    }
566
567    /// Processes a KeepAlive PDU and checks for keep-alive limit violations.
568    fn handle_keep_alive<'a>(
569        &mut self,
570        transaction_id: TransactionId,
571        keepalive: KeepAlivePdu<'_>,
572        actions: &mut Actions<'a>,
573    ) -> Result<(), CfdpError> {
574        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
575            return Ok(());
576        };
577
578        let receiver_progress = keepalive.progress();
579
580        if receiver_progress > transaction.last_receiver_progress {
581            transaction.last_receiver_progress = receiver_progress;
582        }
583
584        let discrepancy = transaction
585            .progress
586            .saturating_sub(transaction.last_receiver_progress);
587
588        if discrepancy > transaction.keep_alive_limit {
589            self.handle_fault(
590                transaction_id,
591                ConditionCode::KeepAliveLimitReached,
592                actions,
593            )?;
594        }
595
596        // If the limit is not exceeded, we do nothing further. The progress is updated and the transaction continues.
597        Ok(())
598    }
599
600    /// Removes a transaction and emits a termination action.
601    fn handle_terminate<'a>(
602        &mut self,
603        id: TransactionId,
604        condition: ConditionCode,
605        actions: &mut Actions<'a>,
606    ) -> Result<(), CfdpError> {
607        let Some(transaction) = self.transactions.remove(&id) else {
608            return Ok(());
609        };
610        actions.push(Action::TerminateTransaction {
611            transaction_id: transaction.config.transaction_id,
612            condition_code: condition,
613        })?;
614        Ok(())
615    }
616
617    /// Dispatches a fault condition to the configured fault handler.
618    fn handle_fault<'a>(
619        &mut self,
620        transaction_id: TransactionId,
621        condition_code: ConditionCode,
622        actions: &mut Actions<'a>,
623    ) -> Result<(), CfdpError> {
624        let Some(transaction) = self.transactions.get(&transaction_id) else {
625            return Ok(());
626        };
627        match transaction
628            .config
629            .fault_handlers
630            .get_handler(condition_code)
631        {
632            HandlerCode::Cancel => {
633                self.handle_terminate(transaction_id, condition_code, actions)?;
634            }
635            HandlerCode::Ignore => {
636                actions.push(Action::NotifyFault {
637                    transaction_id,
638                    condition_code,
639                })?;
640            }
641            HandlerCode::Suspend => {
642                self.handle_suspend(transaction_id, actions)?;
643            }
644            HandlerCode::Abandon => {
645                // Abandon is similar to Cancel but implies no further PDUs will be sent.
646                // For a receiver, it's effectively the same as Cancel.
647                self.handle_terminate(transaction_id, condition_code, actions)?;
648            }
649        }
650        Ok(())
651    }
652}