leodos_protocols/datalink/link/cfs/
pipe.rs1use leodos_libcfs::cfe::sb::msg::MsgId;
2use leodos_libcfs::cfe::sb::pipe::Pipe;
3use leodos_libcfs::cfe::sb::send_buf::SendBuffer;
4
5use leodos_libcfs::error::CfsError as CfsError;
6
7use crate::datalink::{DatalinkRead, DatalinkWrite};
8
9pub struct PipeFrameWriter {
11 msg_id: MsgId,
12}
13
14impl PipeFrameWriter {
15 pub fn new(msg_id: MsgId) -> Self {
17 Self { msg_id }
18 }
19}
20
21impl DatalinkWrite for PipeFrameWriter {
22 type Error = CfsError;
23
24 async fn write(&mut self, data: &[u8]) -> Result<(), Self::Error> {
25 let header_size = 8;
26 let total_size = header_size + data.len();
27
28 let mut buf = SendBuffer::new(total_size)?;
29
30 {
31 let mut msg = buf.view();
32 msg.init(self.msg_id, total_size)?;
33 let slice = buf.as_mut_slice();
34 slice[header_size..].copy_from_slice(data);
35 }
36
37 buf.send(true)?;
38 Ok(())
39 }
40}
41
42pub struct PipeFrameReader<'a> {
44 pipe: &'a mut Pipe,
45 header_size: usize,
46}
47
48impl<'a> PipeFrameReader<'a> {
49 pub fn new(pipe: &'a mut Pipe) -> Self {
51 Self {
52 pipe,
53 header_size: 8,
54 }
55 }
56
57 pub fn with_header_size(mut self, size: usize) -> Self {
59 self.header_size = size;
60 self
61 }
62}
63
64impl DatalinkRead for PipeFrameReader<'_> {
65 type Error = CfsError;
66
67 async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
68 let header_size = self.header_size;
69 let total_size = header_size + buffer.len();
70
71 let mut recv_buf = heapless::Vec::<u8, 2048>::new();
72 recv_buf.resize(total_size, 0).ok();
73
74 let len = self.pipe.recv(&mut recv_buf).await?;
75
76 if len <= header_size {
77 return Ok(0);
78 }
79
80 let payload_len = len - header_size;
81 let copy_len = payload_len.min(buffer.len());
82 buffer[..copy_len]
83 .copy_from_slice(&recv_buf[header_size..header_size + copy_len]);
84
85 Ok(copy_len)
86 }
87}