Skip to main content

leodos_protocols/datalink/link/cfs/
sb.rs

1use leodos_libcfs::cfe::sb::msg::MsgId;
2use leodos_libcfs::cfe::sb::pipe::Pipe;
3use leodos_libcfs::cfe::sb::send_buf::SendBuffer;
4use leodos_libcfs::error::CfsError as CfsError;
5
6use crate::datalink::{DatalinkRead, DatalinkWrite};
7
8const HEADER_SIZE: usize = 8;
9
10/// A bidirectional data link over the cFS Software Bus.
11///
12/// Sends packets by publishing to `send_mid` and receives
13/// packets by subscribing to `recv_mid` on a private pipe.
14pub struct SbDatalink {
15    pipe: Pipe,
16    send_mid: MsgId,
17}
18
19impl SbDatalink {
20    /// Creates a new Software Bus data link.
21    ///
22    /// Opens a pipe named `name` with queue depth `depth`,
23    /// subscribes to `recv_mid`, and sends outbound data
24    /// to `send_mid`.
25    pub fn new(
26        name: &str,
27        depth: u16,
28        recv_mid: MsgId,
29        send_mid: MsgId,
30    ) -> Result<Self, CfsError> {
31        let pipe = Pipe::new(name, depth)?;
32        pipe.subscribe(recv_mid)?;
33        Ok(Self { pipe, send_mid })
34    }
35}
36
37impl DatalinkWrite for SbDatalink {
38    type Error = CfsError;
39
40    async fn write(&mut self, data: &[u8]) -> Result<(), Self::Error> {
41        let total_size = HEADER_SIZE + data.len();
42        let mut buf = SendBuffer::new(total_size)?;
43        {
44            let mut msg = buf.view();
45            msg.init(self.send_mid, total_size)?;
46            let slice = buf.as_mut_slice();
47            slice[HEADER_SIZE..].copy_from_slice(data);
48        }
49        buf.send(true)?;
50        Ok(())
51    }
52}
53
54impl DatalinkRead for SbDatalink {
55    type Error = CfsError;
56
57    async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
58        let total_size = HEADER_SIZE + buffer.len();
59        let mut recv_buf = heapless::Vec::<u8, 2048>::new();
60        recv_buf.resize(total_size, 0).ok();
61        let len = self.pipe.recv(&mut recv_buf).await?;
62        if len <= HEADER_SIZE {
63            return Ok(0);
64        }
65        let payload_len = len - HEADER_SIZE;
66        let copy_len = payload_len.min(buffer.len());
67        buffer[..copy_len].copy_from_slice(
68            &recv_buf[HEADER_SIZE..HEADER_SIZE + copy_len],
69        );
70        Ok(copy_len)
71    }
72}