Skip to main content

leodos_protocols/datalink/link/cfs/
pipe.rs

1use leodos_libcfs::cfe::sb::msg::MsgId;
2use leodos_libcfs::cfe::sb::pipe::Pipe;
3use leodos_libcfs::cfe::sb::send_buf::SendBuffer;
4
5use leodos_libcfs::error::CfsError as CfsError;
6
7use crate::datalink::{DatalinkRead, DatalinkWrite};
8
9/// Sends frames over a CFS software bus pipe.
10pub struct PipeFrameWriter {
11    msg_id: MsgId,
12}
13
14impl PipeFrameWriter {
15    /// Creates a new sender with the given message ID.
16    pub fn new(msg_id: MsgId) -> Self {
17        Self { msg_id }
18    }
19}
20
21impl DatalinkWrite for PipeFrameWriter {
22    type Error = CfsError;
23
24    async fn write(&mut self, data: &[u8]) -> Result<(), Self::Error> {
25        let header_size = 8;
26        let total_size = header_size + data.len();
27
28        let mut buf = SendBuffer::new(total_size)?;
29
30        {
31            let mut msg = buf.view();
32            msg.init(self.msg_id, total_size)?;
33            let slice = buf.as_mut_slice();
34            slice[header_size..].copy_from_slice(data);
35        }
36
37        buf.send(true)?;
38        Ok(())
39    }
40}
41
42/// Receives frames from a CFS software bus pipe.
43pub struct PipeFrameReader<'a> {
44    pipe: &'a mut Pipe,
45    header_size: usize,
46}
47
48impl<'a> PipeFrameReader<'a> {
49    /// Creates a new receiver on the given pipe.
50    pub fn new(pipe: &'a mut Pipe) -> Self {
51        Self {
52            pipe,
53            header_size: 8,
54        }
55    }
56
57    /// Sets a custom header size to skip when receiving.
58    pub fn with_header_size(mut self, size: usize) -> Self {
59        self.header_size = size;
60        self
61    }
62}
63
64impl DatalinkRead for PipeFrameReader<'_> {
65    type Error = CfsError;
66
67    async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
68        let header_size = self.header_size;
69        let total_size = header_size + buffer.len();
70
71        let mut recv_buf = heapless::Vec::<u8, 2048>::new();
72        recv_buf.resize(total_size, 0).ok();
73
74        let len = self.pipe.recv(&mut recv_buf).await?;
75
76        if len <= header_size {
77            return Ok(0);
78        }
79
80        let payload_len = len - header_size;
81        let copy_len = payload_len.min(buffer.len());
82        buffer[..copy_len]
83            .copy_from_slice(&recv_buf[header_size..header_size + copy_len]);
84
85        Ok(copy_len)
86    }
87}