1pub mod action;
11pub mod event;
13pub mod transaction;
15
16use crate::transport::cfdp::CfdpError;
17use crate::transport::cfdp::class2::machine::FILE_DATA_CHUNK_SIZE;
18use crate::transport::cfdp::class2::machine::MAX_CONCURRENT_TRANSACTIONS;
19use crate::transport::cfdp::class2::machine::PromptType;
20use crate::transport::cfdp::class2::machine::TimerType;
21use crate::transport::cfdp::class2::machine::sender::action::Action;
22use crate::transport::cfdp::class2::machine::sender::action::Actions;
23use crate::transport::cfdp::class2::machine::sender::event::Event;
24use crate::transport::cfdp::class2::machine::sender::transaction::Transaction;
25use crate::transport::cfdp::class2::machine::sender::transaction::TransactionState;
26use crate::transport::cfdp::class2::machine::transaction::TransactionConfig;
27use crate::transport::cfdp::class2::machine::transaction::TransactionId;
28use crate::transport::cfdp::filestore::FileId;
29use crate::transport::cfdp::pdu::EntityId;
30use crate::transport::cfdp::pdu::Pdu;
31use crate::transport::cfdp::pdu::PduVariant;
32use crate::transport::cfdp::pdu::TransactionSeqNum;
33use crate::transport::cfdp::pdu::file_directive::ConditionCode;
34use crate::transport::cfdp::pdu::file_directive::ack::AckPdu;
35use crate::transport::cfdp::pdu::file_directive::ack::AckedDirectiveCode;
36use crate::transport::cfdp::pdu::file_directive::ack::TransactionStatus;
37use crate::transport::cfdp::pdu::file_directive::finished::FinishedPdu;
38use crate::transport::cfdp::pdu::file_directive::keepalive::KeepAlivePdu;
39use crate::transport::cfdp::pdu::file_directive::metadata::ChecksumType;
40use crate::transport::cfdp::pdu::file_directive::nak::NakPdu;
41use crate::transport::cfdp::pdu::tlv::fault_handler_override::HandlerCode;
42use heapless::LinearMap;
43
44#[derive(Debug)]
50pub struct SenderMachine {
51 transactions: LinearMap<TransactionId, Transaction, MAX_CONCURRENT_TRANSACTIONS>,
53 next_seq_num: TransactionSeqNum,
55 id: EntityId,
57 keep_alive_limit: u64,
59 ack_timeout_secs: u16,
61}
62
63impl SenderMachine {
64 pub fn new(id: EntityId, ack_timeout_secs: u16, keep_alive_limit: u64) -> Self {
66 Self {
67 transactions: LinearMap::new(),
68 next_seq_num: TransactionSeqNum::default(),
69 id,
70 ack_timeout_secs,
71 keep_alive_limit,
72 }
73 }
74
75 fn create_transation<'a>(
77 &mut self,
78 destination_id: EntityId,
79 source_file_id: FileId,
80 destination_file_id: FileId,
81 file_size: u64,
82 checksum_type: ChecksumType,
83 ) -> Result<TransactionId, CfdpError> {
84 let transaction_id = TransactionId {
85 source_id: self.id,
86 seq_num: self.next_seq_num,
87 };
88 self.next_seq_num.increment();
89 let config = TransactionConfig {
90 transaction_id,
91 destination_id,
92 source_file_id,
93 destination_file_id,
94 file_size,
95 inactivity_timeout_secs: 30,
96 checksum_type,
97 fault_handlers: Default::default(),
98 };
99 let transaction = Transaction {
100 config,
101 state: TransactionState::SendingFileData,
102 suspended: false,
103 progress: 0,
104 ack_retries: 0,
105 file_checksum: None,
106 last_receiver_progress: 0,
107 ack_limit: 5,
108 keep_alive_limit: self.keep_alive_limit,
109 };
110 self.transactions
111 .insert(transaction_id, transaction)
112 .map_err(|_| CfdpError::TooManyConcurrentTransactions)?;
113 Ok(transaction_id)
114 }
115
116 pub fn handle<'a>(
121 &mut self,
122 actions: &mut Actions<'a>,
123 event: Event<'a>,
124 ) -> Result<(), CfdpError> {
125 actions.clear();
126 match event {
127 Event::PutRequest {
128 source_file_name,
129 destination_file_name,
130 destination_id,
131 file_size,
132 checksum_type,
133 } => self.handle_put_request(
134 destination_id,
135 source_file_name,
136 destination_file_name,
137 file_size,
138 checksum_type,
139 actions,
140 )?,
141 Event::DataSegmentReady {
142 transaction_id,
143 data,
144 offset,
145 } => self.handle_data_segment_ready(transaction_id, offset, data, actions)?,
146 Event::ChecksumReady {
147 transaction_id,
148 checksum,
149 } => self.handle_checksum(transaction_id, checksum, actions)?,
150 Event::PduReceived {
151 pdu,
152 transaction_id,
153 } => self.handle_pdu_received(pdu, transaction_id, actions)?,
154 Event::TimerExpired {
155 timer_type,
156 transaction_id,
157 } => self.handle_timer_expired(transaction_id, timer_type, actions)?,
158 Event::PromptRequest {
159 transaction_id,
160 prompt_type,
161 } => self.handle_prompt(transaction_id, prompt_type, actions)?,
162 Event::SuspendRequest { transaction_id } => {
163 self.handle_suspend(transaction_id, actions)?
164 }
165 Event::ResumeRequest { transaction_id } => {
166 self.handle_resume(transaction_id, actions)?
167 }
168 }
169 Ok(())
170 }
171
172 fn handle_prompt<'a>(
174 &mut self,
175 transaction_id: TransactionId,
176 prompt_type: PromptType,
177 actions: &mut Actions<'a>,
178 ) -> Result<(), CfdpError> {
179 match prompt_type {
180 PromptType::Nak => self.handle_prompt_nak(transaction_id, actions)?,
181 PromptType::KeepAlive => self.handle_prompt_keep_alive(transaction_id, actions)?,
182 }
183 Ok(())
184 }
185
186 fn handle_prompt_keep_alive<'a>(
188 &mut self,
189 transaction_id: TransactionId,
190 actions: &mut Actions<'a>,
191 ) -> Result<(), CfdpError> {
192 let Some(transaction) = self.transactions.get(&transaction_id) else {
193 return Ok(());
194 };
195
196 match transaction.state {
199 TransactionState::SendingFileData if !transaction.suspended => {
200 actions.push(Action::SendPrompt {
201 transaction_id,
202 destination_id: transaction.config.destination_id,
203 prompt_type: PromptType::KeepAlive,
204 })?;
205 }
206 _ => {}
207 }
208
209 Ok(())
210 }
211
212 fn handle_pdu_received<'a>(
214 &mut self,
215 pdu: &'a Pdu,
216 transaction_id: TransactionId,
217 actions: &mut Actions<'a>,
218 ) -> Result<(), CfdpError> {
219 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
220 return Ok(());
221 };
222 match pdu.variant()? {
223 PduVariant::Finished(v) => self.handle_finished(transaction_id, v, actions)?,
224 PduVariant::Ack(v) => self.handle_ack(transaction_id, v, actions)?,
225 PduVariant::Nak(v) if !transaction.suspended => {
227 self.handle_nak(transaction_id, v, actions)?
228 }
229 PduVariant::KeepAlive(v) if !transaction.suspended => {
230 self.handle_keep_alive(transaction_id, v, actions)?
231 }
232 _ => {}
233 }
234 Ok(())
235 }
236
237 fn handle_data_segment_ready<'a>(
239 &mut self,
240 transaction_id: TransactionId,
241 offset: u64,
242 data: &'a [u8],
243 actions: &mut Actions<'a>,
244 ) -> Result<(), CfdpError> {
245 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
246 return Ok(());
247 };
248
249 if transaction.suspended {
250 return Ok(());
251 }
252
253 actions.push(Action::SendFileData {
254 destination_id: transaction.config.destination_id,
255 transaction_id,
256 offset,
257 data,
258 })?;
259
260 transaction.progress = offset + data.len() as u64;
261
262 if transaction.progress >= transaction.config.file_size {
263 actions.push(Action::CalculateChecksum {
264 transaction_id,
265 checksum_type: transaction.config.checksum_type,
266 })?;
267 } else {
268 actions.push(Action::ReadDataSegment {
269 transaction_id,
270 start_offset: transaction.progress,
271 end_offset: transaction.progress + FILE_DATA_CHUNK_SIZE as u64,
272 })?;
273 }
274 Ok(())
275 }
276
277 fn handle_put_request<'a>(
279 &mut self,
280 destination_id: EntityId,
281 source_file_name: FileId,
282 destination_file_name: FileId,
283 file_size: u64,
284 checksum_type: ChecksumType,
285 actions: &mut Actions<'a>,
286 ) -> Result<(), CfdpError> {
287 let transaction_id = self.create_transation(
288 destination_id,
289 source_file_name,
290 destination_file_name,
291 file_size,
292 checksum_type,
293 )?;
294 actions.push(Action::SendMetadata {
295 destination_id,
296 transaction_id,
297 file_size: 0,
298 source_file_name,
299 destination_file_name,
300 checksum_type,
301 })?;
302 actions.push(Action::ReadDataSegment {
303 transaction_id,
304 start_offset: 0,
305 end_offset: FILE_DATA_CHUNK_SIZE as u64,
306 })?;
307 Ok(())
308 }
309
310 fn handle_checksum<'a>(
312 &mut self,
313 transaction_id: TransactionId,
314 checksum: u32,
315 actions: &mut Actions<'a>,
316 ) -> Result<(), CfdpError> {
317 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
318 return Ok(());
319 };
320 transaction.file_checksum = Some(checksum);
321 self.send_eof(transaction_id, checksum, actions)
322 }
323
324 fn handle_timer_expired<'a>(
326 &mut self,
327 transaction_id: TransactionId,
328 timer_type: TimerType,
329 actions: &mut Actions<'a>,
330 ) -> Result<(), CfdpError> {
331 match timer_type {
332 TimerType::Ack => {
333 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
334 return Ok(());
335 };
336 if transaction.ack_retries >= transaction.ack_limit {
337 self.handle_terminate(transaction_id, ConditionCode::AckLimitReached, actions)?;
339 return Ok(());
340 }
341 transaction.ack_retries += 1;
343 if transaction.state != TransactionState::WaitingForEofAck {
344 return Ok(());
345 }
346 let checksum = transaction
347 .file_checksum
348 .ok_or_else(|| CfdpError::Custom("Checksum not calculated"))?;
349 self.send_eof(transaction_id, checksum, actions)?;
350 }
351 TimerType::Inactivity => {
352 let Some(transaction) = self.transactions.get(&transaction_id) else {
353 return Ok(());
354 };
355 if transaction.state != TransactionState::WaitingForFinishedPdu {
356 return Ok(());
357 }
358 self.handle_fault(transaction_id, ConditionCode::InactivityDetected, actions)?;
359 }
360 TimerType::Nak => {}
361 TimerType::KeepAlive => {}
362 }
363 Ok(())
364 }
365
366 fn handle_prompt_nak<'a>(
368 &mut self,
369 transaction_id: TransactionId,
370 actions: &mut Actions<'a>,
371 ) -> Result<(), CfdpError> {
372 let Some(transaction) = self.transactions.get(&transaction_id) else {
373 return Ok(());
374 };
375 if transaction.state != TransactionState::SendingFileData {
376 return Ok(());
377 }
378 actions.push(Action::SendPrompt {
379 destination_id: transaction.config.destination_id,
380 transaction_id,
381 prompt_type: PromptType::Nak,
382 })?;
383 Ok(())
384 }
385
386 fn handle_suspend<'a>(
388 &mut self,
389 transaction_id: TransactionId,
390 actions: &mut Actions<'a>,
391 ) -> Result<(), CfdpError> {
392 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
393 return Ok(());
394 };
395 if transaction.suspended {
396 return Ok(());
397 }
398 transaction.suspended = true;
399 actions.push(Action::StopTimer {
400 transaction_id,
401 timer_type: None,
402 })?;
403 actions.push(Action::NotifySuspended { transaction_id })?;
404 Ok(())
405 }
406
407 fn handle_resume<'a>(
409 &mut self,
410 transaction_id: TransactionId,
411 actions: &mut Actions<'a>,
412 ) -> Result<(), CfdpError> {
413 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
414 return Ok(());
415 };
416 if !transaction.suspended {
417 return Ok(());
418 }
419 let progress = transaction.progress;
420 transaction.suspended = false;
421 match transaction.state {
422 TransactionState::SendingFileData => {
423 actions.push(Action::ReadDataSegment {
424 transaction_id,
425 start_offset: transaction.progress,
426 end_offset: transaction.progress + FILE_DATA_CHUNK_SIZE as u64,
427 })?;
428 }
429 TransactionState::WaitingForEofAck => {
430 let checksum = transaction
431 .file_checksum
432 .ok_or(CfdpError::Custom("Checksum missing on resume"))?;
433 self.send_eof(transaction_id, checksum, actions)?;
434 }
435 TransactionState::WaitingForFinishedPdu => {
436 actions.push(Action::StartTimer {
437 timer_type: TimerType::Inactivity,
438 seconds: transaction.config.inactivity_timeout_secs,
439 transaction_id,
440 })?;
441 }
442 TransactionState::WaitingForChecksum => {
443 actions.push(Action::CalculateChecksum {
444 transaction_id,
445 checksum_type: transaction.config.checksum_type,
446 })?;
447 }
448 }
449 actions.push(Action::NotifyResumed {
450 transaction_id,
451 progress,
452 })?;
453 Ok(())
454 }
455
456 fn handle_finished<'a>(
458 &mut self,
459 transaction_id: TransactionId,
460 fin: &FinishedPdu,
461 actions: &mut Actions<'a>,
462 ) -> Result<(), CfdpError> {
463 let Some(transaction) = self.transactions.get(&transaction_id) else {
464 return Ok(());
465 };
466 if transaction.state != TransactionState::WaitingForFinishedPdu {
467 return Err(CfdpError::Custom(
468 "Received Finished PDU in unexpected transaction state",
469 ));
470 }
471
472 actions.push(Action::StopTimer {
474 transaction_id,
475 timer_type: None,
476 })?;
477
478 actions.push(Action::SendAck {
481 transaction_id,
482 destination_id: transaction.config.destination_id,
483 acked_directive_code: AckedDirectiveCode::Finished,
484 condition_code: fin.condition_code()?,
485 transaction_status: TransactionStatus::Terminated,
486 })?;
487
488 self.handle_terminate(transaction_id, fin.condition_code()?, actions)?;
489 Ok(())
490 }
491
492 fn send_eof<'a>(
494 &mut self,
495 transaction_id: TransactionId,
496 checksum: u32,
497 actions: &mut Actions<'a>,
498 ) -> Result<(), CfdpError> {
499 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
500 return Ok(());
501 };
502 actions.push(Action::SendEof {
503 destination_id: transaction.config.destination_id,
504 transaction_id,
505 condition_code: ConditionCode::NoError,
506 file_size: transaction.config.file_size,
507 checksum,
508 })?;
509 transaction.state = TransactionState::WaitingForEofAck;
510 actions.push(Action::StartTimer {
511 timer_type: TimerType::Ack,
512 seconds: self.ack_timeout_secs,
513 transaction_id,
514 })?;
515 Ok(())
516 }
517
518 fn handle_ack<'a>(
520 &mut self,
521 transaction_id: TransactionId,
522 ack: &AckPdu,
523 actions: &mut Actions<'a>,
524 ) -> Result<(), CfdpError> {
525 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
526 return Ok(());
527 };
528 match (transaction.state, ack.acked_directive_code()?) {
529 (TransactionState::WaitingForEofAck, AckedDirectiveCode::Eof) => {
530 actions.push(Action::StopTimer {
531 timer_type: Some(TimerType::Ack),
532 transaction_id,
533 })?;
534 transaction.state = TransactionState::WaitingForFinishedPdu;
535 actions.push(Action::StartTimer {
536 timer_type: TimerType::Inactivity,
537 seconds: transaction.config.inactivity_timeout_secs,
538 transaction_id,
539 })?;
540 }
541 _ => {}
542 }
543 Ok(())
544 }
545
546 fn handle_nak<'a>(
548 &mut self,
549 transaction_id: TransactionId,
550 nak: NakPdu<'a>,
551 actions: &mut Actions<'a>,
552 ) -> Result<(), CfdpError> {
553 if self.transactions.get(&transaction_id).is_none() {
554 return Ok(());
555 }
556 actions.push(Action::StopTimer {
557 timer_type: Some(TimerType::Ack),
558 transaction_id,
559 })?;
560 actions.push(Action::ReadDataSegmentBatch {
561 transaction_id,
562 segments: nak.segment_requests()?,
563 })?;
564 Ok(())
565 }
566
567 fn handle_keep_alive<'a>(
569 &mut self,
570 transaction_id: TransactionId,
571 keepalive: KeepAlivePdu<'_>,
572 actions: &mut Actions<'a>,
573 ) -> Result<(), CfdpError> {
574 let Some(transaction) = self.transactions.get_mut(&transaction_id) else {
575 return Ok(());
576 };
577
578 let receiver_progress = keepalive.progress();
579
580 if receiver_progress > transaction.last_receiver_progress {
581 transaction.last_receiver_progress = receiver_progress;
582 }
583
584 let discrepancy = transaction
585 .progress
586 .saturating_sub(transaction.last_receiver_progress);
587
588 if discrepancy > transaction.keep_alive_limit {
589 self.handle_fault(
590 transaction_id,
591 ConditionCode::KeepAliveLimitReached,
592 actions,
593 )?;
594 }
595
596 Ok(())
598 }
599
600 fn handle_terminate<'a>(
602 &mut self,
603 id: TransactionId,
604 condition: ConditionCode,
605 actions: &mut Actions<'a>,
606 ) -> Result<(), CfdpError> {
607 let Some(transaction) = self.transactions.remove(&id) else {
608 return Ok(());
609 };
610 actions.push(Action::TerminateTransaction {
611 transaction_id: transaction.config.transaction_id,
612 condition_code: condition,
613 })?;
614 Ok(())
615 }
616
617 fn handle_fault<'a>(
619 &mut self,
620 transaction_id: TransactionId,
621 condition_code: ConditionCode,
622 actions: &mut Actions<'a>,
623 ) -> Result<(), CfdpError> {
624 let Some(transaction) = self.transactions.get(&transaction_id) else {
625 return Ok(());
626 };
627 match transaction
628 .config
629 .fault_handlers
630 .get_handler(condition_code)
631 {
632 HandlerCode::Cancel => {
633 self.handle_terminate(transaction_id, condition_code, actions)?;
634 }
635 HandlerCode::Ignore => {
636 actions.push(Action::NotifyFault {
637 transaction_id,
638 condition_code,
639 })?;
640 }
641 HandlerCode::Suspend => {
642 self.handle_suspend(transaction_id, actions)?;
643 }
644 HandlerCode::Abandon => {
645 self.handle_terminate(transaction_id, condition_code, actions)?;
648 }
649 }
650 Ok(())
651 }
652}