Skip to main content

leodos_protocols/transport/cfdp/class2/machine/receiver/
mod.rs

1//! The synchronous state machine for the receiving-side of a CFDP transaction.
2//!
3//! This module contains the [`ReceiverMachine`], which processes [`Event`]s and
4//! produces [`Action`]s. It is responsible for handling incoming file data,
5//! tracking missing segments, and sending acknowledgments back to the sender.
6//! It is designed to be completely independent of the underlying I/O and timing
7//! mechanisms, making it portable and easily testable.
8
9/// Output actions produced by the receiver state machine.
10pub mod action;
11/// Input events that drive the receiver state machine.
12pub mod event;
13/// Transaction state and configuration for the receiver.
14pub mod transaction;
15
16use crate::transport::cfdp::CfdpError;
17use crate::transport::cfdp::class2::machine::MAX_CONCURRENT_TRANSACTIONS;
18use crate::transport::cfdp::class2::machine::TimerType;
19use crate::transport::cfdp::class2::machine::receiver::action::Action;
20use crate::transport::cfdp::class2::machine::receiver::action::Actions;
21use crate::transport::cfdp::class2::machine::receiver::event::Event;
22use crate::transport::cfdp::class2::machine::receiver::transaction::Transaction;
23use crate::transport::cfdp::class2::machine::receiver::transaction::TransactionState;
24use crate::transport::cfdp::class2::machine::tracker::SegmentTracker;
25use crate::transport::cfdp::class2::machine::transaction::TransactionConfig;
26use crate::transport::cfdp::class2::machine::transaction::TransactionId;
27use crate::transport::cfdp::filestore::FileStore;
28use crate::transport::cfdp::pdu::EntityId;
29use crate::transport::cfdp::pdu::Pdu;
30use crate::transport::cfdp::pdu::PduVariant;
31use crate::transport::cfdp::pdu::file_data::FileDataPdu;
32use crate::transport::cfdp::pdu::file_directive::ConditionCode;
33use crate::transport::cfdp::pdu::file_directive::DirectiveCode;
34use crate::transport::cfdp::pdu::file_directive::ack::TransactionStatus;
35use crate::transport::cfdp::pdu::file_directive::eof::EofPdu;
36use crate::transport::cfdp::pdu::file_directive::metadata::ChecksumType;
37use crate::transport::cfdp::pdu::file_directive::metadata::MetadataPdu;
38use crate::transport::cfdp::pdu::file_directive::prompt::PromptResponse;
39use crate::transport::cfdp::pdu::tlv::fault_handler_override::FaultHandlerSet;
40use crate::transport::cfdp::pdu::tlv::fault_handler_override::HandlerCode;
41use crate::transport::cfdp::pdu::tlv::filestore_request::FilestoreRequest;
42use crate::transport::cfdp::pdu::tlv::filestore_request::TlvFilestoreRequest;
43use crate::transport::cfdp::pdu::tlv::filestore_response::FilestoreResponse;
44use heapless::LinearMap;
45use heapless::Vec;
46
47/// Manages the state of all active receiving ('destination') transactions.
48///
49/// This struct holds a map of all transactions for which this entity is the receiver.
50/// It should be driven by a `Runner` which feeds it events and executes the
51/// resulting actions.
52#[derive(Debug, Default)]
53pub struct ReceiverMachine {
54    /// A map of active transactions, keyed by their unique `TransactionId`.
55    transactions: LinearMap<TransactionId, Transaction, MAX_CONCURRENT_TRANSACTIONS>,
56    /// The CFDP Entity ID of this local entity.
57    id: EntityId,
58}
59
60impl ReceiverMachine {
61    /// Creates a new `ReceiverMachine`.
62    pub fn new(id: EntityId) -> Self {
63        Self {
64            transactions: LinearMap::new(),
65            id,
66        }
67    }
68
69    /// The primary state machine logic for the receiver.
70    ///
71    /// It takes an `Event` as input and returns a `Vec` of `Action`s for the
72    /// `Runner` to execute. This function is pure and has no side effects.
73    pub fn handle<'a>(
74        &mut self,
75        file_store: &mut impl FileStore,
76        event: Event<'a>,
77        actions: &mut Actions<'a>,
78    ) -> Result<(), CfdpError> {
79        actions.clear();
80        match event {
81            Event::PduReceived {
82                pdu,
83                transaction_id,
84            } => self.handle_pdu_received(file_store, transaction_id, pdu, actions)?,
85            Event::FileDataWritten {
86                transaction_id,
87                offset,
88                len,
89            } => self.handle_file_data_written(transaction_id, offset, len, actions)?,
90            Event::TimerExpired {
91                timer_type,
92                transaction_id,
93            } => self.handle_timer_expired(timer_type, transaction_id, actions)?,
94            Event::ChecksumVerified {
95                transaction_id,
96                is_valid,
97            } => self.handle_checksum_verified(transaction_id, is_valid, actions)?,
98            Event::SuspendRequest { transaction_id } => {
99                self.handle_suspend_request(transaction_id, actions)?
100            }
101            Event::ResumeRequest { transaction_id } => {
102                self.handle_resume_request(transaction_id, actions)?
103            }
104            Event::FilestoreResponsesReceived {
105                transaction_id,
106                responses,
107            } => self.handle_filestore_responses(transaction_id, responses, actions)?,
108        }
109        Ok(())
110    }
111
112    /// Processes filestore responses and completes the transaction.
113    fn handle_filestore_responses<'a>(
114        &mut self,
115        transaction_id: TransactionId,
116        responses: Vec<FilestoreResponse, 4, u8>,
117        actions: &mut Actions<'a>,
118    ) -> Result<(), CfdpError> {
119        let Some(transaction) = self.transactions.get(&transaction_id) else {
120            return Ok(());
121        };
122
123        if transaction.state != TransactionState::WaitingForFilestoreResponses {
124            return Ok(()); // Ignore stale event
125        }
126
127        actions.push(Action::NotifyFileReceived {
128            transaction_id,
129            file_name: transaction.config.destination_file_id.clone(),
130            file_size: transaction.config.file_size,
131        })?;
132
133        actions.push(Action::SendAck {
134            destination_id: transaction.config.transaction_id.source_id,
135            transaction_id,
136            directive_code: DirectiveCode::Eof,
137            condition_code: ConditionCode::NoError,
138            transaction_status: TransactionStatus::Active,
139        })?;
140
141        actions.push(Action::SendFinished {
142            destination_id: transaction.config.transaction_id.source_id,
143            transaction_id,
144            condition_code: ConditionCode::NoError,
145            filestore_responses: responses,
146        })?;
147
148        self.terminate_transaction(transaction_id, ConditionCode::NoError, actions)?;
149        Ok(())
150    }
151
152    /// Suspends a transaction by stopping its timers and marking it suspended.
153    fn handle_suspend_request<'a>(
154        &mut self,
155        transaction_id: TransactionId,
156        actions: &mut Actions<'a>,
157    ) -> Result<(), CfdpError> {
158        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
159            return Ok(());
160        };
161        if transaction.suspended {
162            return Ok(());
163        }
164        transaction.suspended = true;
165        actions.push(Action::StopTimer {
166            transaction_id,
167            timer_type: None,
168        })?;
169        actions.push(Action::NotifySuspended { transaction_id })?;
170        Ok(())
171    }
172
173    /// Resumes a suspended transaction by restarting the appropriate timers.
174    fn handle_resume_request<'a>(
175        &mut self,
176        transaction_id: TransactionId,
177        actions: &mut Actions<'a>,
178    ) -> Result<(), CfdpError> {
179        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
180            return Ok(());
181        };
182        transaction.suspended = false;
183        match transaction.state {
184            TransactionState::WaitingForNakData => {
185                actions.push(Action::StartTimer {
186                    timer_type: TimerType::Nak,
187                    seconds: transaction.nak_timeout_secs,
188                    transaction_id,
189                })?;
190                actions.push(Action::StartTimer {
191                    timer_type: TimerType::Inactivity,
192                    seconds: transaction.config.inactivity_timeout_secs,
193                    transaction_id,
194                })?;
195            }
196            TransactionState::ReceivingFileData => {
197                if transaction.keep_alive_interval_secs != 0 {
198                    actions.push(Action::StartTimer {
199                        timer_type: TimerType::KeepAlive,
200                        seconds: transaction.keep_alive_interval_secs,
201                        transaction_id,
202                    })?;
203                }
204                actions.push(Action::StartTimer {
205                    timer_type: TimerType::Inactivity,
206                    seconds: transaction.config.inactivity_timeout_secs,
207                    transaction_id,
208                })?;
209            }
210            _ => {}
211        }
212        actions.push(Action::NotifyResumed {
213            transaction_id,
214            progress: transaction.progress,
215        })?;
216        Ok(())
217    }
218
219    /// Dispatches a timer expiry to the appropriate handler based on timer type.
220    fn handle_timer_expired<'a>(
221        &mut self,
222        timer_type: TimerType,
223        transaction_id: TransactionId,
224        actions: &mut Actions<'a>,
225    ) -> Result<(), CfdpError> {
226        match timer_type {
227            TimerType::Nak => self.handle_nak_timeout(transaction_id, actions)?,
228            TimerType::Inactivity => self.handle_inactivity(transaction_id, actions)?,
229            TimerType::Ack => {}
230            TimerType::KeepAlive => {
231                if let Some(transaction) = self.transactions.get(&transaction_id) {
232                    actions.push(Action::SendKeepAlive {
233                        transaction_id,
234                        destination_id: transaction.config.transaction_id.source_id,
235                        progress: transaction.progress,
236                    })?;
237
238                    if transaction.keep_alive_interval_secs != 0 {
239                        actions.push(Action::StartTimer {
240                            timer_type: TimerType::KeepAlive,
241                            seconds: transaction.keep_alive_interval_secs,
242                            transaction_id,
243                        })?;
244                    }
245                }
246            }
247        }
248        Ok(())
249    }
250
251    /// Updates the transaction's progress and segment tracker after a write.
252    fn handle_file_data_written<'a>(
253        &mut self,
254        transaction_id: TransactionId,
255        offset: u64,
256        len: usize,
257        _actions: &mut Actions<'a>,
258    ) -> Result<(), CfdpError> {
259        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
260            return Ok(());
261        };
262        transaction.progress = offset + len as u64;
263        transaction.tracker.add_segment(offset, len as u64)?;
264        Ok(())
265    }
266
267    /// Routes a received PDU to the appropriate handler based on its variant.
268    fn handle_pdu_received<'a>(
269        &mut self,
270        file_store: &mut impl FileStore,
271        transaction_id: TransactionId,
272        pdu: &'a Pdu,
273        actions: &mut Actions<'a>,
274    ) -> Result<(), CfdpError> {
275        if let Some(transaction) = self.transactions.get(&transaction_id) {
276            match pdu.variant()? {
277                PduVariant::FileData(data) => {
278                    self.handle_file_data(transaction_id, pdu.large_file_flag(), data, actions)?;
279                }
280                PduVariant::Eof(eof) => {
281                    self.handle_eof(transaction_id, pdu, eof, actions)?;
282                }
283                PduVariant::Prompt(prompt)
284                    if !transaction.suspended
285                        && prompt.prompt_response() == PromptResponse::KeepAlive =>
286                {
287                    actions.push(Action::SendKeepAlive {
288                        transaction_id,
289                        destination_id: transaction.config.transaction_id.source_id,
290                        progress: transaction.progress,
291                    })?;
292                }
293                _ => {}
294            }
295        } else {
296            match pdu.variant()? {
297                PduVariant::Metadata(meta) => self.handle_metadata(
298                    file_store,
299                    transaction_id,
300                    meta,
301                    actions,
302                    pdu.large_file_flag(),
303                )?,
304                PduVariant::Eof(eof) => {
305                    actions.push(Action::SendAck {
306                        transaction_id,
307                        destination_id: pdu.source_entity_id()?,
308                        directive_code: DirectiveCode::Eof,
309                        condition_code: eof.condition_code()?,
310                        transaction_status: TransactionStatus::Unrecognized,
311                    })?;
312                }
313                _ => {}
314            }
315        }
316        Ok(())
317    }
318
319    /// Creates a new receiving transaction and registers it in the active map.
320    fn create_transaction(
321        &mut self,
322        file_store: &mut impl FileStore,
323        transaction_id: TransactionId,
324        source_file_name: &[u8],
325        destination_file_name: &[u8],
326        file_size: u64,
327        checksum_type: ChecksumType,
328        fault_handlers: FaultHandlerSet,
329    ) -> Result<(), CfdpError> {
330        let config = TransactionConfig {
331            transaction_id,
332            destination_id: self.id,
333            source_file_id: file_store.intern(source_file_name)?,
334            destination_file_id: file_store.intern(destination_file_name)?,
335            file_size,
336            inactivity_timeout_secs: 30,
337            checksum_type,
338            fault_handlers,
339        };
340        let transaction = Transaction {
341            config,
342            state: TransactionState::ReceivingFileData,
343            suspended: false,
344            progress: 0,
345            tracker: SegmentTracker::new(file_size),
346            nak_retries: 0,
347            nak_limit: 0,
348            nak_timeout_secs: 10,
349            keep_alive_interval_secs: 60,
350            filestore_requests: Vec::new(),
351        };
352        self.transactions
353            .insert(transaction_id, transaction)
354            .map_err(|_| CfdpError::Custom("Too many concurrent transactions"))?;
355        Ok(())
356    }
357
358    /// Processes a Metadata PDU by creating a transaction and starting timers.
359    fn handle_metadata<'a>(
360        &mut self,
361        file_store: &mut impl FileStore,
362        transaction_id: TransactionId,
363        meta: &MetadataPdu,
364        actions: &mut Actions<'a>,
365        large_file_flag: bool,
366    ) -> Result<(), CfdpError> {
367        if self.transactions.contains_key(&transaction_id) {
368            return Ok(());
369        }
370        let (source_file_name, destination_file_name, _options) =
371            meta.variable_fields(large_file_flag)?;
372        let file_size = meta.file_size(large_file_flag)?;
373        let checksum_type = meta.checksum_type()?;
374        let fault_handlers = meta.fault_handler_overrides(large_file_flag)?;
375        self.create_transaction(
376            file_store,
377            transaction_id,
378            source_file_name,
379            destination_file_name,
380            file_size,
381            checksum_type,
382            fault_handlers,
383        )?;
384        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
385            return Ok(());
386        };
387        let mut fs_requests = Vec::new();
388        for tlv in meta.filestore_requests(large_file_flag)? {
389            let req = TlvFilestoreRequest::from_tlv(tlv)?;
390            let id1 = file_store.intern(req.first_file_name()?)?;
391            let id2 = req.second_file_name()?.map(|name| file_store.intern(name));
392            let fs_request = FilestoreRequest {
393                action: req.action()?,
394                first_file_name: id1,
395                second_file_name: id2.transpose()?,
396            };
397            fs_requests
398                .push(fs_request)
399                .map_err(|_| CfdpError::Custom("Too many filestore requests in Metadata PDU"))?;
400        }
401        transaction.filestore_requests = fs_requests;
402        if transaction.keep_alive_interval_secs != 0 {
403            actions.push(Action::StartTimer {
404                timer_type: TimerType::KeepAlive,
405                seconds: transaction.keep_alive_interval_secs,
406                transaction_id,
407            })?;
408        }
409        actions.push(Action::StartTimer {
410            timer_type: TimerType::Inactivity,
411            seconds: 30,
412            transaction_id,
413        })?;
414        Ok(())
415    }
416
417    /// Processes incoming file data by scheduling a write to the filestore.
418    fn handle_file_data<'a>(
419        &mut self,
420        transaction_id: TransactionId,
421        large_file_flag: bool,
422        file_data: FileDataPdu<'a>,
423        actions: &mut Actions<'a>,
424    ) -> Result<(), CfdpError> {
425        let Some(transaction) = self.transactions.get(&transaction_id) else {
426            return Ok(());
427        };
428        actions.push(Action::StopTimer {
429            timer_type: Some(TimerType::Inactivity),
430            transaction_id,
431        })?;
432        actions.push(Action::WriteFileData {
433            transaction_id,
434            data: file_data.file_data(large_file_flag)?,
435            offset: file_data.offset(large_file_flag)?,
436        })?;
437        if !transaction.suspended {
438            actions.push(Action::StartTimer {
439                timer_type: TimerType::Inactivity,
440                seconds: transaction.config.inactivity_timeout_secs,
441                transaction_id,
442            })?;
443        }
444        Ok(())
445    }
446
447    /// Processes an EOF PDU by verifying completeness or requesting missing data.
448    fn handle_eof<'a>(
449        &mut self,
450        transaction_id: TransactionId,
451        pdu: &Pdu,
452        eof: &EofPdu,
453        actions: &mut Actions<'a>,
454    ) -> Result<(), CfdpError> {
455        let condition_code = eof.condition_code()?;
456
457        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
458            actions.push(Action::SendAck {
459                transaction_id,
460                destination_id: pdu.source_entity_id()?,
461                directive_code: DirectiveCode::Eof,
462                condition_code,
463                transaction_status: TransactionStatus::Active,
464            })?;
465            return Ok(());
466        };
467
468        // Spec 4.11.2.{6,7} imply timers are suspended/stopped during state changes.
469        actions.push(Action::StopTimer {
470            timer_type: Some(TimerType::Inactivity),
471            transaction_id,
472        })?;
473
474        // Per Spec 4.7.2, upon receiving an EOF PDU (which requires an ACK per 4.6.4.3.5 and 4.6.6.1.2),
475        // we must immediately issue the Expected Response.
476        actions.push(Action::SendAck {
477            transaction_id,
478            destination_id: transaction.config.transaction_id.source_id,
479            directive_code: DirectiveCode::Eof,
480            condition_code,
481            transaction_status: TransactionStatus::Active,
482        })?;
483
484        // --- STEP 2: Decide what to do NEXT based on the condition code ---
485        match condition_code {
486            ConditionCode::NoError => {
487                if transaction.tracker.is_complete() {
488                    // We have the whole file. Proceed to checksum verification.
489                    // (Spec 4.6.1.2.8)
490                    actions.push(Action::VerifyChecksum {
491                        transaction_id,
492                        checksum_type: transaction.config.checksum_type,
493                        expected_checksum: eof.file_checksum(),
494                    })?;
495                    transaction.state = TransactionState::VerifyingChecksum;
496                } else {
497                    // We are missing data. In addition to the ACK we just sent,
498                    // we must now request the missing segments. (Spec 4.6.4.3.3)
499                    self.send_nak(transaction_id, actions)?;
500                }
501            }
502            _ => {
503                // This is the CANCELLATION case (e.g., condition is CancelReceived, etc.).
504                // Per Spec 4.6.6.1.1, receiving an EOF(cancel) causes a Notice of Completion (Canceled).
505                // Our 'handle_fault' function will trigger this termination.
506                self.handle_fault(transaction_id, condition_code, actions)?;
507            }
508        }
509
510        Ok(())
511    }
512
513    /// Handles the result of checksum verification, completing or faulting.
514    fn handle_checksum_verified<'a>(
515        &mut self,
516        transaction_id: TransactionId,
517        is_valid: bool,
518        actions: &mut Actions<'a>,
519    ) -> Result<(), CfdpError> {
520        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
521            return Ok(());
522        };
523        if transaction.state != TransactionState::VerifyingChecksum {
524            return Ok(());
525        }
526
527        if !is_valid {
528            self.handle_fault(transaction_id, ConditionCode::FileChecksumFailure, actions)?;
529            return Ok(());
530        }
531
532        if !transaction.filestore_requests.is_empty() {
533            actions.push(Action::ExecuteFilestoreRequests {
534                transaction_id,
535                requests: transaction.filestore_requests.clone(),
536            })?;
537            transaction.state = TransactionState::WaitingForFilestoreResponses;
538        } else {
539            // No filestore requests, so we can finish immediately.
540            actions.push(Action::NotifyFileReceived {
541                transaction_id,
542                file_name: transaction.config.destination_file_id.clone(),
543                file_size: transaction.config.file_size,
544            })?;
545            actions.push(Action::SendAck {
546                destination_id: transaction.config.transaction_id.source_id,
547                transaction_id,
548                directive_code: DirectiveCode::Eof,
549                condition_code: ConditionCode::NoError,
550                transaction_status: TransactionStatus::Active,
551            })?;
552            actions.push(Action::SendFinished {
553                destination_id: transaction.config.transaction_id.source_id,
554                transaction_id,
555                condition_code: ConditionCode::NoError,
556                filestore_responses: Vec::new(),
557            })?;
558            self.terminate_transaction(transaction_id, ConditionCode::NoError, actions)?;
559        }
560        Ok(())
561    }
562
563    /// Sends a NAK PDU for missing segments and starts the NAK timer.
564    fn send_nak<'a>(
565        &mut self,
566        transaction_id: TransactionId,
567        actions: &mut Actions<'a>,
568    ) -> Result<(), CfdpError> {
569        let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
570            return Ok(());
571        };
572        actions.push(Action::SendNak {
573            destination_id: transaction.config.transaction_id.source_id,
574            transaction_id,
575            start_of_scope: 0,
576            end_of_scope: transaction.config.file_size,
577        })?;
578        transaction.state = TransactionState::WaitingForNakData;
579        actions.push(Action::StartTimer {
580            timer_type: TimerType::Nak,
581            seconds: transaction.nak_timeout_secs,
582            transaction_id: transaction.config.transaction_id,
583        })?;
584        Ok(())
585    }
586
587    /// Retries the NAK or faults if the retry limit is exceeded.
588    fn handle_nak_timeout<'a>(
589        &mut self,
590        id: TransactionId,
591        actions: &mut Actions<'a>,
592    ) -> Result<(), CfdpError> {
593        let Some(transaction) = self.transactions.get_mut(&id) else {
594            return Ok(());
595        };
596        if transaction.nak_retries >= transaction.nak_limit {
597            self.handle_fault(id, ConditionCode::NakLimitReached, actions)?;
598        } else {
599            transaction.nak_retries += 1;
600            self.send_nak(id, actions)?;
601        }
602        Ok(())
603    }
604
605    /// Raises a keep-alive limit fault due to inactivity timeout.
606    fn handle_inactivity<'a>(
607        &mut self,
608        id: TransactionId,
609        actions: &mut Actions<'a>,
610    ) -> Result<(), CfdpError> {
611        self.handle_fault(id, ConditionCode::KeepAliveLimitReached, actions)
612    }
613
614    /// Removes a transaction and emits a termination action.
615    fn terminate_transaction<'a>(
616        &mut self,
617        id: TransactionId,
618        condition_code: ConditionCode,
619        actions: &mut Actions<'a>,
620    ) -> Result<(), CfdpError> {
621        let Some(transaction) = self.transactions.remove(&id) else {
622            return Ok(());
623        };
624        actions.push(Action::TerminateTransaction {
625            transaction_id: transaction.config.transaction_id,
626            condition_code,
627        })?;
628        Ok(())
629    }
630
631    /// Dispatches a fault condition to the configured fault handler.
632    fn handle_fault<'a>(
633        &mut self,
634        transaction_id: TransactionId,
635        condition_code: ConditionCode,
636        actions: &mut Actions<'a>,
637    ) -> Result<(), CfdpError> {
638        let Some(transaction) = self.transactions.get(&transaction_id) else {
639            return Ok(());
640        };
641        match transaction
642            .config
643            .fault_handlers
644            .get_handler(condition_code)
645        {
646            HandlerCode::Cancel => {
647                self.terminate_transaction(transaction_id, condition_code, actions)?;
648            }
649            HandlerCode::Ignore => {
650                actions.push(Action::NotifyFault {
651                    transaction_id,
652                    condition_code,
653                })?;
654            }
655            HandlerCode::Suspend => {
656                self.handle_suspend_request(transaction_id, actions)?;
657            }
658            HandlerCode::Abandon => {
659                // Abandon is similar to Cancel but implies no further PDUs will be sent.
660                // For a receiver, it's effectively the same as Cancel.
661                self.terminate_transaction(transaction_id, condition_code, actions)?;
662            }
663        }
664        Ok(())
665    }
666}