Skip to main content

leodos_libcfs/cfe/sb/
pipe.rs

1//! Software Bus pipe management for receiving messages.
2use crate::cfe::sb::msg::MsgId;
3use crate::error::{CfsError, OsalError, SbError, Result};
4use crate::ffi::{self, CFE_SB_DEFAULT_QOS};
5use crate::cstring;
6use crate::status::check;
7use bitflags::bitflags;
8use core::future::Future;
9use core::mem::MaybeUninit;
10use core::slice;
11use core::task::Poll;
12use heapless::String;
13
14/// A type-safe, zero-cost wrapper for a cFE Software Bus Pipe ID.
15#[derive(Debug, Clone, Copy)]
16#[repr(transparent)]
17pub struct PipeId(pub ffi::CFE_SB_PipeId_t);
18
19impl PartialEq for PipeId {
20    fn eq(&self, other: &Self) -> bool {
21        self.0 == other.0
22    }
23}
24impl Eq for PipeId {}
25
26impl PipeId {
27    /// Converts the Pipe ID into a zero-based integer suitable for array indexing.
28    pub fn to_index(&self) -> Result<u32> {
29        let mut index = MaybeUninit::uninit();
30        check(unsafe { ffi::CFE_SB_PipeId_ToIndex(self.0, index.as_mut_ptr()) })?;
31        Ok(unsafe { index.assume_init() })
32    }
33}
34
35bitflags! {
36    /// Options to alter a pipe's behavior.
37    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
38    pub struct PipeOptions: u8 {
39        /// When set, prevents messages sent from the same application from being
40        /// received on this pipe.
41        const IGNORE_MINE = ffi::CFE_SB_PIPEOPTS_IGNOREMINE as u8;
42    }
43}
44
45/// Quality of Service options for a software bus subscription.
46#[derive(Debug, Clone, Copy)]
47#[repr(transparent)]
48pub struct Qos(pub(crate) ffi::CFE_SB_Qos_t);
49
50impl Default for Qos {
51    fn default() -> Self {
52        Self(CFE_SB_DEFAULT_QOS)
53    }
54}
55
56impl Qos {
57    /// Creates new QoS settings.
58    pub fn new(priority: u8, reliability: u8) -> Self {
59        Self(ffi::CFE_SB_Qos_t {
60            Priority: priority,
61            Reliability: reliability,
62        })
63    }
64
65    /// Gets the priority level.
66    pub fn priority(&self) -> u8 {
67        self.0.Priority
68    }
69
70    /// Gets the reliability level.
71    pub fn reliability(&self) -> u8 {
72        self.0.Reliability
73    }
74}
75
76/// A CFE Software Bus pipe.
77///
78/// When dropped, it automatically cleans up the underlying CFE resource.
79#[derive(Debug)]
80pub struct Pipe {
81    id: PipeId,
82}
83
84/// Timeout options for receiving messages from a pipe.
85pub enum Timeout {
86    /// Block indefinitely until a message is received.
87    PendForever,
88    /// Perform a non-blocking poll for a message.
89    Poll,
90    /// Wait for the specified number of milliseconds.
91    Milliseconds(u32),
92}
93
94impl Pipe {
95    /// Creates a new software bus pipe.
96    ///
97    /// # Arguments
98    /// * `name` - A unique string to identify the pipe.
99    /// * `depth` - The maximum number of messages the pipe can hold.
100    pub fn new(name: &str, depth: u16) -> Result<Self> {
101        let c_name = cstring::<{ ffi::OS_MAX_API_NAME as usize }>(name)?;
102
103        let mut pipe_id_uninit = MaybeUninit::<ffi::CFE_SB_PipeId_t>::uninit();
104        let status =
105            unsafe { ffi::CFE_SB_CreatePipe(pipe_id_uninit.as_mut_ptr(), depth, c_name.as_ptr()) };
106
107        check(status)?;
108
109        let id = PipeId(unsafe { pipe_id_uninit.assume_init() });
110        Ok(Self { id })
111    }
112
113    /// Subscribes this pipe to messages with the specified `MsgId` and extended options.
114    ///
115    /// # Arguments
116    /// * `msg_id`: The message ID of the message to be subscribed to.
117    /// * `qos`: The requested Quality of Service.
118    /// * `msg_lim`: The maximum number of messages with this Message ID to
119    ///   allow in this pipe at the same time.
120    pub fn subscribe_ex(&self, msg_id: MsgId, qos: Qos, msg_lim: u16) -> Result<()> {
121        check(unsafe { ffi::CFE_SB_SubscribeEx(msg_id.0, self.id.0, qos.0, msg_lim) })?;
122        Ok(())
123    }
124
125    /// Subscribes this pipe to messages with the specified `MsgId`.
126    ///
127    /// Subscriptions are added to the head of an internal linked
128    /// list, so messages are delivered in LIFO order (last
129    /// subscriber receives first).
130    ///
131    /// # Arguments
132    /// * `msg_id`: The message ID of the message to be subscribed to.
133    pub fn subscribe(&self, msg_id: MsgId) -> Result<()> {
134        check(unsafe { ffi::CFE_SB_Subscribe(msg_id.0, self.id.0) })?;
135        Ok(())
136    }
137
138    /// Unsubscribes this pipe from messages with the specified `MsgId`.
139    pub fn unsubscribe(&self, msg_id: MsgId) -> Result<()> {
140        check(unsafe { ffi::CFE_SB_Unsubscribe(msg_id.0, self.id.0) })?;
141        Ok(())
142    }
143
144    /// Unsubscribes this pipe from messages, keeping the request local to this CPU.
145    ///
146    /// This is typically only used by a Software Bus Network (SBN) application.
147    pub fn unsubscribe_local(&self, msg_id: MsgId) -> Result<()> {
148        check(unsafe { ffi::CFE_SB_UnsubscribeLocal(msg_id.0, self.id.0) })?;
149        Ok(())
150    }
151
152    /// Receives a message from this pipe, copying it into a user-provided buffer.
153    ///
154    /// This method receives a message from the CFE-managed internal buffer and safely copies it into the provided `buf`.
155    ///
156    /// # Arguments
157    /// * `timeout`: Timeout in milliseconds. Use `sb::pipe::PEND_FOREVER` to block
158    ///   indefinitely or `sb::pipe::POLL` for a non-blocking check.
159    /// * `buffer` - A mutable byte slice to copy the message into.
160    ///
161    /// # Returns
162    /// A `MessageRef` containing the message data, tied to the lifetime of `buffer`.
163    ///
164    /// # Errors
165    /// Returns `Error::SbTimeOut` or `Error::SbNoMessage` if no message is received within the timeout.
166    /// Returns `Error::SbBadArgument` if the timeout value is invalid.
167    /// Returns `Error::OsErrInvalidSize` if the received message is larger than `buf`.
168    pub fn timed_recv(&mut self, buf: &mut [u8], timeout: Timeout) -> Result<usize> {
169        let mut buf_ptr = MaybeUninit::uninit();
170
171        let timeout = match timeout {
172            Timeout::PendForever => ffi::CFE_SB_PEND_FOREVER,
173            Timeout::Poll => ffi::CFE_SB_POLL as i32,
174            Timeout::Milliseconds(ms) => {
175                // Convert to i32, ensuring it fits.
176                if ms > i32::MAX as u32 {
177                    return Err(CfsError::Sb(SbError::BadArgument));
178                } else {
179                    ms as i32
180                }
181            }
182        };
183        check(unsafe { ffi::CFE_SB_ReceiveBuffer(buf_ptr.as_mut_ptr(), self.id.0, timeout) })?;
184
185        let buf_ptr = unsafe { buf_ptr.assume_init() };
186
187        let mut size = 0;
188        check(unsafe {
189            ffi::CFE_MSG_GetSize(buf_ptr as *const ffi::CFE_MSG_Message_t, &mut size)
190        })?;
191
192        if size > buf.len() {
193            // We must release the buffer back to SB if we can't copy it, to prevent a leak.
194            unsafe {
195                check(ffi::CFE_SB_ReleaseMessageBuffer(buf_ptr))?;
196            }
197            return Err(CfsError::Osal(OsalError::InvalidSize));
198        }
199
200        let src_slice = unsafe { slice::from_raw_parts(buf_ptr as *const u8, size) };
201        buf[..size].copy_from_slice(src_slice);
202
203        Ok(size)
204    }
205
206    /// Sets options for the pipe, see `PipeOptions` for the available options.
207    pub fn set_opts(&self, opts: PipeOptions) -> Result<()> {
208        check(unsafe { ffi::CFE_SB_SetPipeOpts(self.id.0, opts.bits()) })?;
209        Ok(())
210    }
211
212    /// Gets the current options for the pipe, see `PipeOptions` for the available options.
213    pub fn get_opts(&self) -> Result<PipeOptions> {
214        let mut opts = MaybeUninit::uninit();
215        check(unsafe { ffi::CFE_SB_GetPipeOpts(self.id.0, opts.as_mut_ptr()) })?;
216        Ok(PipeOptions::from_bits_truncate(unsafe {
217            opts.assume_init()
218        }))
219    }
220
221    /// Returns the underlying `PipeId` for this pipe.
222    pub fn id(&self) -> PipeId {
223        self.id
224    }
225
226    /// Gets the registered name of this pipe.
227    pub fn name(&self) -> Result<String<{ ffi::OS_MAX_API_NAME as usize }>> {
228        let mut buffer = [0u8; ffi::OS_MAX_API_NAME as usize];
229        check(unsafe {
230            ffi::CFE_SB_GetPipeName(
231                buffer.as_mut_ptr() as *mut libc::c_char,
232                buffer.len(),
233                self.id.0,
234            )
235        })?;
236        let len = buffer.iter().position(|&b| b == 0).unwrap_or(buffer.len());
237        let vec = heapless::Vec::from_slice(&buffer[..len]).map_err(|_| CfsError::Osal(OsalError::NameTooLong))?;
238        String::from_utf8(vec).map_err(|_| CfsError::InvalidString)
239    }
240
241    /// Finds the `PipeId` for a pipe with the given name.
242    pub fn get_id_by_name(name: &str) -> Result<PipeId> {
243        let c_name = cstring::<{ ffi::OS_MAX_API_NAME as usize }>(name)?;
244
245        let mut pipe_id = MaybeUninit::uninit();
246        check(unsafe { ffi::CFE_SB_GetPipeIdByName(pipe_id.as_mut_ptr(), c_name.as_ptr()) })?;
247        Ok(PipeId(unsafe { pipe_id.assume_init() }))
248    }
249
250    /// Subscribes this pipe to messages, keeping the request local to this CPU.
251    ///
252    /// This is typically only used by a Software Bus Network (SBN) application.
253    ///
254    /// # Arguments
255    /// * `msg_id`: The message ID of the message to be subscribed to.
256    /// * `msg_lim`: The maximum number of messages with this Message ID to
257    ///   allow in this pipe at the same time.
258    pub fn subscribe_local(&self, msg_id: MsgId, msg_lim: u16) -> Result<()> {
259        check(unsafe { ffi::CFE_SB_SubscribeLocal(msg_id.0, self.id.0, msg_lim) })?;
260        Ok(())
261    }
262}
263
264impl Drop for Pipe {
265    /// Automatically deletes the CFE software bus pipe when the `Pipe` object
266    /// goes out of scope.
267    fn drop(&mut self) {
268        let _ = unsafe { ffi::CFE_SB_DeletePipe(self.id.0) };
269    }
270}
271
272impl Pipe {
273    /// Asynchronously receives a single datagram message on the socket.
274    pub fn recv<'a>(
275        &'a mut self,
276        buf: &'a mut [u8],
277    ) -> impl Future<Output = Result<usize>> + use<'a> {
278        core::future::poll_fn(|_| {
279            let recv_future = self.timed_recv(buf, Timeout::Poll);
280            match recv_future {
281                Err(CfsError::Sb(SbError::NoMessage | SbError::TimeOut))
282                | Err(CfsError::Osal(OsalError::Timeout | OsalError::QueueEmpty)) => Poll::Pending,
283                Ok(result) => Poll::Ready(Ok(result)),
284                Err(e) => Poll::Ready(Err(e)),
285            }
286        })
287    }
288}