1pub mod action;
11pub mod event;
13pub mod transaction;
15
16use crate::transport::cfdp::CfdpError;
17use crate::transport::cfdp::class2::machine::MAX_CONCURRENT_TRANSACTIONS;
18use crate::transport::cfdp::class2::machine::TimerType;
19use crate::transport::cfdp::class2::machine::receiver::action::Action;
20use crate::transport::cfdp::class2::machine::receiver::action::Actions;
21use crate::transport::cfdp::class2::machine::receiver::event::Event;
22use crate::transport::cfdp::class2::machine::receiver::transaction::Transaction;
23use crate::transport::cfdp::class2::machine::receiver::transaction::TransactionState;
24use crate::transport::cfdp::class2::machine::tracker::SegmentTracker;
25use crate::transport::cfdp::class2::machine::transaction::TransactionConfig;
26use crate::transport::cfdp::class2::machine::transaction::TransactionId;
27use crate::transport::cfdp::filestore::FileStore;
28use crate::transport::cfdp::pdu::EntityId;
29use crate::transport::cfdp::pdu::Pdu;
30use crate::transport::cfdp::pdu::PduVariant;
31use crate::transport::cfdp::pdu::file_data::FileDataPdu;
32use crate::transport::cfdp::pdu::file_directive::ConditionCode;
33use crate::transport::cfdp::pdu::file_directive::DirectiveCode;
34use crate::transport::cfdp::pdu::file_directive::ack::TransactionStatus;
35use crate::transport::cfdp::pdu::file_directive::eof::EofPdu;
36use crate::transport::cfdp::pdu::file_directive::metadata::ChecksumType;
37use crate::transport::cfdp::pdu::file_directive::metadata::MetadataPdu;
38use crate::transport::cfdp::pdu::file_directive::prompt::PromptResponse;
39use crate::transport::cfdp::pdu::tlv::fault_handler_override::FaultHandlerSet;
40use crate::transport::cfdp::pdu::tlv::fault_handler_override::HandlerCode;
41use crate::transport::cfdp::pdu::tlv::filestore_request::FilestoreRequest;
42use crate::transport::cfdp::pdu::tlv::filestore_request::TlvFilestoreRequest;
43use crate::transport::cfdp::pdu::tlv::filestore_response::FilestoreResponse;
44use heapless::LinearMap;
45use heapless::Vec;
46
47#[derive(Debug, Default)]
53pub struct ReceiverMachine {
54 transactions: LinearMap<TransactionId, Transaction, MAX_CONCURRENT_TRANSACTIONS>,
56 id: EntityId,
58}
59
60impl ReceiverMachine {
61 pub fn new(id: EntityId) -> Self {
63 Self {
64 transactions: LinearMap::new(),
65 id,
66 }
67 }
68
69 pub fn handle<'a>(
74 &mut self,
75 file_store: &mut impl FileStore,
76 event: Event<'a>,
77 actions: &mut Actions<'a>,
78 ) -> Result<(), CfdpError> {
79 actions.clear();
80 match event {
81 Event::PduReceived {
82 pdu,
83 transaction_id,
84 } => self.handle_pdu_received(file_store, transaction_id, pdu, actions)?,
85 Event::FileDataWritten {
86 transaction_id,
87 offset,
88 len,
89 } => self.handle_file_data_written(transaction_id, offset, len, actions)?,
90 Event::TimerExpired {
91 timer_type,
92 transaction_id,
93 } => self.handle_timer_expired(timer_type, transaction_id, actions)?,
94 Event::ChecksumVerified {
95 transaction_id,
96 is_valid,
97 } => self.handle_checksum_verified(transaction_id, is_valid, actions)?,
98 Event::SuspendRequest { transaction_id } => {
99 self.handle_suspend_request(transaction_id, actions)?
100 }
101 Event::ResumeRequest { transaction_id } => {
102 self.handle_resume_request(transaction_id, actions)?
103 }
104 Event::FilestoreResponsesReceived {
105 transaction_id,
106 responses,
107 } => self.handle_filestore_responses(transaction_id, responses, actions)?,
108 }
109 Ok(())
110 }
111
112 fn handle_filestore_responses<'a>(
114 &mut self,
115 transaction_id: TransactionId,
116 responses: Vec<FilestoreResponse, 4, u8>,
117 actions: &mut Actions<'a>,
118 ) -> Result<(), CfdpError> {
119 let Some(transaction) = self.transactions.get(&transaction_id) else {
120 return Ok(());
121 };
122
123 if transaction.state != TransactionState::WaitingForFilestoreResponses {
124 return Ok(()); }
126
127 actions.push(Action::NotifyFileReceived {
128 transaction_id,
129 file_name: transaction.config.destination_file_id.clone(),
130 file_size: transaction.config.file_size,
131 })?;
132
133 actions.push(Action::SendAck {
134 destination_id: transaction.config.transaction_id.source_id,
135 transaction_id,
136 directive_code: DirectiveCode::Eof,
137 condition_code: ConditionCode::NoError,
138 transaction_status: TransactionStatus::Active,
139 })?;
140
141 actions.push(Action::SendFinished {
142 destination_id: transaction.config.transaction_id.source_id,
143 transaction_id,
144 condition_code: ConditionCode::NoError,
145 filestore_responses: responses,
146 })?;
147
148 self.terminate_transaction(transaction_id, ConditionCode::NoError, actions)?;
149 Ok(())
150 }
151
152 fn handle_suspend_request<'a>(
154 &mut self,
155 transaction_id: TransactionId,
156 actions: &mut Actions<'a>,
157 ) -> Result<(), CfdpError> {
158 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
159 return Ok(());
160 };
161 if transaction.suspended {
162 return Ok(());
163 }
164 transaction.suspended = true;
165 actions.push(Action::StopTimer {
166 transaction_id,
167 timer_type: None,
168 })?;
169 actions.push(Action::NotifySuspended { transaction_id })?;
170 Ok(())
171 }
172
173 fn handle_resume_request<'a>(
175 &mut self,
176 transaction_id: TransactionId,
177 actions: &mut Actions<'a>,
178 ) -> Result<(), CfdpError> {
179 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
180 return Ok(());
181 };
182 transaction.suspended = false;
183 match transaction.state {
184 TransactionState::WaitingForNakData => {
185 actions.push(Action::StartTimer {
186 timer_type: TimerType::Nak,
187 seconds: transaction.nak_timeout_secs,
188 transaction_id,
189 })?;
190 actions.push(Action::StartTimer {
191 timer_type: TimerType::Inactivity,
192 seconds: transaction.config.inactivity_timeout_secs,
193 transaction_id,
194 })?;
195 }
196 TransactionState::ReceivingFileData => {
197 if transaction.keep_alive_interval_secs != 0 {
198 actions.push(Action::StartTimer {
199 timer_type: TimerType::KeepAlive,
200 seconds: transaction.keep_alive_interval_secs,
201 transaction_id,
202 })?;
203 }
204 actions.push(Action::StartTimer {
205 timer_type: TimerType::Inactivity,
206 seconds: transaction.config.inactivity_timeout_secs,
207 transaction_id,
208 })?;
209 }
210 _ => {}
211 }
212 actions.push(Action::NotifyResumed {
213 transaction_id,
214 progress: transaction.progress,
215 })?;
216 Ok(())
217 }
218
219 fn handle_timer_expired<'a>(
221 &mut self,
222 timer_type: TimerType,
223 transaction_id: TransactionId,
224 actions: &mut Actions<'a>,
225 ) -> Result<(), CfdpError> {
226 match timer_type {
227 TimerType::Nak => self.handle_nak_timeout(transaction_id, actions)?,
228 TimerType::Inactivity => self.handle_inactivity(transaction_id, actions)?,
229 TimerType::Ack => {}
230 TimerType::KeepAlive => {
231 if let Some(transaction) = self.transactions.get(&transaction_id) {
232 actions.push(Action::SendKeepAlive {
233 transaction_id,
234 destination_id: transaction.config.transaction_id.source_id,
235 progress: transaction.progress,
236 })?;
237
238 if transaction.keep_alive_interval_secs != 0 {
239 actions.push(Action::StartTimer {
240 timer_type: TimerType::KeepAlive,
241 seconds: transaction.keep_alive_interval_secs,
242 transaction_id,
243 })?;
244 }
245 }
246 }
247 }
248 Ok(())
249 }
250
251 fn handle_file_data_written<'a>(
253 &mut self,
254 transaction_id: TransactionId,
255 offset: u64,
256 len: usize,
257 _actions: &mut Actions<'a>,
258 ) -> Result<(), CfdpError> {
259 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
260 return Ok(());
261 };
262 transaction.progress = offset + len as u64;
263 transaction.tracker.add_segment(offset, len as u64)?;
264 Ok(())
265 }
266
267 fn handle_pdu_received<'a>(
269 &mut self,
270 file_store: &mut impl FileStore,
271 transaction_id: TransactionId,
272 pdu: &'a Pdu,
273 actions: &mut Actions<'a>,
274 ) -> Result<(), CfdpError> {
275 if let Some(transaction) = self.transactions.get(&transaction_id) {
276 match pdu.variant()? {
277 PduVariant::FileData(data) => {
278 self.handle_file_data(transaction_id, pdu.large_file_flag(), data, actions)?;
279 }
280 PduVariant::Eof(eof) => {
281 self.handle_eof(transaction_id, pdu, eof, actions)?;
282 }
283 PduVariant::Prompt(prompt)
284 if !transaction.suspended
285 && prompt.prompt_response() == PromptResponse::KeepAlive =>
286 {
287 actions.push(Action::SendKeepAlive {
288 transaction_id,
289 destination_id: transaction.config.transaction_id.source_id,
290 progress: transaction.progress,
291 })?;
292 }
293 _ => {}
294 }
295 } else {
296 match pdu.variant()? {
297 PduVariant::Metadata(meta) => self.handle_metadata(
298 file_store,
299 transaction_id,
300 meta,
301 actions,
302 pdu.large_file_flag(),
303 )?,
304 PduVariant::Eof(eof) => {
305 actions.push(Action::SendAck {
306 transaction_id,
307 destination_id: pdu.source_entity_id()?,
308 directive_code: DirectiveCode::Eof,
309 condition_code: eof.condition_code()?,
310 transaction_status: TransactionStatus::Unrecognized,
311 })?;
312 }
313 _ => {}
314 }
315 }
316 Ok(())
317 }
318
319 fn create_transaction(
321 &mut self,
322 file_store: &mut impl FileStore,
323 transaction_id: TransactionId,
324 source_file_name: &[u8],
325 destination_file_name: &[u8],
326 file_size: u64,
327 checksum_type: ChecksumType,
328 fault_handlers: FaultHandlerSet,
329 ) -> Result<(), CfdpError> {
330 let config = TransactionConfig {
331 transaction_id,
332 destination_id: self.id,
333 source_file_id: file_store.intern(source_file_name)?,
334 destination_file_id: file_store.intern(destination_file_name)?,
335 file_size,
336 inactivity_timeout_secs: 30,
337 checksum_type,
338 fault_handlers,
339 };
340 let transaction = Transaction {
341 config,
342 state: TransactionState::ReceivingFileData,
343 suspended: false,
344 progress: 0,
345 tracker: SegmentTracker::new(file_size),
346 nak_retries: 0,
347 nak_limit: 0,
348 nak_timeout_secs: 10,
349 keep_alive_interval_secs: 60,
350 filestore_requests: Vec::new(),
351 };
352 self.transactions
353 .insert(transaction_id, transaction)
354 .map_err(|_| CfdpError::Custom("Too many concurrent transactions"))?;
355 Ok(())
356 }
357
358 fn handle_metadata<'a>(
360 &mut self,
361 file_store: &mut impl FileStore,
362 transaction_id: TransactionId,
363 meta: &MetadataPdu,
364 actions: &mut Actions<'a>,
365 large_file_flag: bool,
366 ) -> Result<(), CfdpError> {
367 if self.transactions.contains_key(&transaction_id) {
368 return Ok(());
369 }
370 let (source_file_name, destination_file_name, _options) =
371 meta.variable_fields(large_file_flag)?;
372 let file_size = meta.file_size(large_file_flag)?;
373 let checksum_type = meta.checksum_type()?;
374 let fault_handlers = meta.fault_handler_overrides(large_file_flag)?;
375 self.create_transaction(
376 file_store,
377 transaction_id,
378 source_file_name,
379 destination_file_name,
380 file_size,
381 checksum_type,
382 fault_handlers,
383 )?;
384 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
385 return Ok(());
386 };
387 let mut fs_requests = Vec::new();
388 for tlv in meta.filestore_requests(large_file_flag)? {
389 let req = TlvFilestoreRequest::from_tlv(tlv)?;
390 let id1 = file_store.intern(req.first_file_name()?)?;
391 let id2 = req.second_file_name()?.map(|name| file_store.intern(name));
392 let fs_request = FilestoreRequest {
393 action: req.action()?,
394 first_file_name: id1,
395 second_file_name: id2.transpose()?,
396 };
397 fs_requests
398 .push(fs_request)
399 .map_err(|_| CfdpError::Custom("Too many filestore requests in Metadata PDU"))?;
400 }
401 transaction.filestore_requests = fs_requests;
402 if transaction.keep_alive_interval_secs != 0 {
403 actions.push(Action::StartTimer {
404 timer_type: TimerType::KeepAlive,
405 seconds: transaction.keep_alive_interval_secs,
406 transaction_id,
407 })?;
408 }
409 actions.push(Action::StartTimer {
410 timer_type: TimerType::Inactivity,
411 seconds: 30,
412 transaction_id,
413 })?;
414 Ok(())
415 }
416
417 fn handle_file_data<'a>(
419 &mut self,
420 transaction_id: TransactionId,
421 large_file_flag: bool,
422 file_data: FileDataPdu<'a>,
423 actions: &mut Actions<'a>,
424 ) -> Result<(), CfdpError> {
425 let Some(transaction) = self.transactions.get(&transaction_id) else {
426 return Ok(());
427 };
428 actions.push(Action::StopTimer {
429 timer_type: Some(TimerType::Inactivity),
430 transaction_id,
431 })?;
432 actions.push(Action::WriteFileData {
433 transaction_id,
434 data: file_data.file_data(large_file_flag)?,
435 offset: file_data.offset(large_file_flag)?,
436 })?;
437 if !transaction.suspended {
438 actions.push(Action::StartTimer {
439 timer_type: TimerType::Inactivity,
440 seconds: transaction.config.inactivity_timeout_secs,
441 transaction_id,
442 })?;
443 }
444 Ok(())
445 }
446
447 fn handle_eof<'a>(
449 &mut self,
450 transaction_id: TransactionId,
451 pdu: &Pdu,
452 eof: &EofPdu,
453 actions: &mut Actions<'a>,
454 ) -> Result<(), CfdpError> {
455 let condition_code = eof.condition_code()?;
456
457 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
458 actions.push(Action::SendAck {
459 transaction_id,
460 destination_id: pdu.source_entity_id()?,
461 directive_code: DirectiveCode::Eof,
462 condition_code,
463 transaction_status: TransactionStatus::Active,
464 })?;
465 return Ok(());
466 };
467
468 actions.push(Action::StopTimer {
470 timer_type: Some(TimerType::Inactivity),
471 transaction_id,
472 })?;
473
474 actions.push(Action::SendAck {
477 transaction_id,
478 destination_id: transaction.config.transaction_id.source_id,
479 directive_code: DirectiveCode::Eof,
480 condition_code,
481 transaction_status: TransactionStatus::Active,
482 })?;
483
484 match condition_code {
486 ConditionCode::NoError => {
487 if transaction.tracker.is_complete() {
488 actions.push(Action::VerifyChecksum {
491 transaction_id,
492 checksum_type: transaction.config.checksum_type,
493 expected_checksum: eof.file_checksum(),
494 })?;
495 transaction.state = TransactionState::VerifyingChecksum;
496 } else {
497 self.send_nak(transaction_id, actions)?;
500 }
501 }
502 _ => {
503 self.handle_fault(transaction_id, condition_code, actions)?;
507 }
508 }
509
510 Ok(())
511 }
512
513 fn handle_checksum_verified<'a>(
515 &mut self,
516 transaction_id: TransactionId,
517 is_valid: bool,
518 actions: &mut Actions<'a>,
519 ) -> Result<(), CfdpError> {
520 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
521 return Ok(());
522 };
523 if transaction.state != TransactionState::VerifyingChecksum {
524 return Ok(());
525 }
526
527 if !is_valid {
528 self.handle_fault(transaction_id, ConditionCode::FileChecksumFailure, actions)?;
529 return Ok(());
530 }
531
532 if !transaction.filestore_requests.is_empty() {
533 actions.push(Action::ExecuteFilestoreRequests {
534 transaction_id,
535 requests: transaction.filestore_requests.clone(),
536 })?;
537 transaction.state = TransactionState::WaitingForFilestoreResponses;
538 } else {
539 actions.push(Action::NotifyFileReceived {
541 transaction_id,
542 file_name: transaction.config.destination_file_id.clone(),
543 file_size: transaction.config.file_size,
544 })?;
545 actions.push(Action::SendAck {
546 destination_id: transaction.config.transaction_id.source_id,
547 transaction_id,
548 directive_code: DirectiveCode::Eof,
549 condition_code: ConditionCode::NoError,
550 transaction_status: TransactionStatus::Active,
551 })?;
552 actions.push(Action::SendFinished {
553 destination_id: transaction.config.transaction_id.source_id,
554 transaction_id,
555 condition_code: ConditionCode::NoError,
556 filestore_responses: Vec::new(),
557 })?;
558 self.terminate_transaction(transaction_id, ConditionCode::NoError, actions)?;
559 }
560 Ok(())
561 }
562
563 fn send_nak<'a>(
565 &mut self,
566 transaction_id: TransactionId,
567 actions: &mut Actions<'a>,
568 ) -> Result<(), CfdpError> {
569 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
570 return Ok(());
571 };
572 actions.push(Action::SendNak {
573 destination_id: transaction.config.transaction_id.source_id,
574 transaction_id,
575 start_of_scope: 0,
576 end_of_scope: transaction.config.file_size,
577 })?;
578 transaction.state = TransactionState::WaitingForNakData;
579 actions.push(Action::StartTimer {
580 timer_type: TimerType::Nak,
581 seconds: transaction.nak_timeout_secs,
582 transaction_id: transaction.config.transaction_id,
583 })?;
584 Ok(())
585 }
586
587 fn handle_nak_timeout<'a>(
589 &mut self,
590 id: TransactionId,
591 actions: &mut Actions<'a>,
592 ) -> Result<(), CfdpError> {
593 let Some(transaction) = self.transactions.get_mut(&id) else {
594 return Ok(());
595 };
596 if transaction.nak_retries >= transaction.nak_limit {
597 self.handle_fault(id, ConditionCode::NakLimitReached, actions)?;
598 } else {
599 transaction.nak_retries += 1;
600 self.send_nak(id, actions)?;
601 }
602 Ok(())
603 }
604
605 fn handle_inactivity<'a>(
607 &mut self,
608 id: TransactionId,
609 actions: &mut Actions<'a>,
610 ) -> Result<(), CfdpError> {
611 self.handle_fault(id, ConditionCode::KeepAliveLimitReached, actions)
612 }
613
614 fn terminate_transaction<'a>(
616 &mut self,
617 id: TransactionId,
618 condition_code: ConditionCode,
619 actions: &mut Actions<'a>,
620 ) -> Result<(), CfdpError> {
621 let Some(transaction) = self.transactions.remove(&id) else {
622 return Ok(());
623 };
624 actions.push(Action::TerminateTransaction {
625 transaction_id: transaction.config.transaction_id,
626 condition_code,
627 })?;
628 Ok(())
629 }
630
631 fn handle_fault<'a>(
633 &mut self,
634 transaction_id: TransactionId,
635 condition_code: ConditionCode,
636 actions: &mut Actions<'a>,
637 ) -> Result<(), CfdpError> {
638 let Some(transaction) = self.transactions.get(&transaction_id) else {
639 return Ok(());
640 };
641 match transaction
642 .config
643 .fault_handlers
644 .get_handler(condition_code)
645 {
646 HandlerCode::Cancel => {
647 self.terminate_transaction(transaction_id, condition_code, actions)?;
648 }
649 HandlerCode::Ignore => {
650 actions.push(Action::NotifyFault {
651 transaction_id,
652 condition_code,
653 })?;
654 }
655 HandlerCode::Suspend => {
656 self.handle_suspend_request(transaction_id, actions)?;
657 }
658 HandlerCode::Abandon => {
659 self.terminate_transaction(transaction_id, condition_code, actions)?;
662 }
663 }
664 Ok(())
665 }
666}