leodos_libcfs/cfe/sb/
pipe.rs1use crate::cfe::sb::msg::MsgId;
3use crate::error::{CfsError, OsalError, SbError, Result};
4use crate::ffi::{self, CFE_SB_DEFAULT_QOS};
5use crate::cstring;
6use crate::status::check;
7use bitflags::bitflags;
8use core::future::Future;
9use core::mem::MaybeUninit;
10use core::slice;
11use core::task::Poll;
12use heapless::String;
13
14#[derive(Debug, Clone, Copy)]
16#[repr(transparent)]
17pub struct PipeId(pub ffi::CFE_SB_PipeId_t);
18
19impl PartialEq for PipeId {
20 fn eq(&self, other: &Self) -> bool {
21 self.0 == other.0
22 }
23}
24impl Eq for PipeId {}
25
26impl PipeId {
27 pub fn to_index(&self) -> Result<u32> {
29 let mut index = MaybeUninit::uninit();
30 check(unsafe { ffi::CFE_SB_PipeId_ToIndex(self.0, index.as_mut_ptr()) })?;
31 Ok(unsafe { index.assume_init() })
32 }
33}
34
35bitflags! {
36 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
38 pub struct PipeOptions: u8 {
39 const IGNORE_MINE = ffi::CFE_SB_PIPEOPTS_IGNOREMINE as u8;
42 }
43}
44
45#[derive(Debug, Clone, Copy)]
47#[repr(transparent)]
48pub struct Qos(pub(crate) ffi::CFE_SB_Qos_t);
49
50impl Default for Qos {
51 fn default() -> Self {
52 Self(CFE_SB_DEFAULT_QOS)
53 }
54}
55
56impl Qos {
57 pub fn new(priority: u8, reliability: u8) -> Self {
59 Self(ffi::CFE_SB_Qos_t {
60 Priority: priority,
61 Reliability: reliability,
62 })
63 }
64
65 pub fn priority(&self) -> u8 {
67 self.0.Priority
68 }
69
70 pub fn reliability(&self) -> u8 {
72 self.0.Reliability
73 }
74}
75
76#[derive(Debug)]
80pub struct Pipe {
81 id: PipeId,
82}
83
84pub enum Timeout {
86 PendForever,
88 Poll,
90 Milliseconds(u32),
92}
93
94impl Pipe {
95 pub fn new(name: &str, depth: u16) -> Result<Self> {
101 let c_name = cstring::<{ ffi::OS_MAX_API_NAME as usize }>(name)?;
102
103 let mut pipe_id_uninit = MaybeUninit::<ffi::CFE_SB_PipeId_t>::uninit();
104 let status =
105 unsafe { ffi::CFE_SB_CreatePipe(pipe_id_uninit.as_mut_ptr(), depth, c_name.as_ptr()) };
106
107 check(status)?;
108
109 let id = PipeId(unsafe { pipe_id_uninit.assume_init() });
110 Ok(Self { id })
111 }
112
113 pub fn subscribe_ex(&self, msg_id: MsgId, qos: Qos, msg_lim: u16) -> Result<()> {
121 check(unsafe { ffi::CFE_SB_SubscribeEx(msg_id.0, self.id.0, qos.0, msg_lim) })?;
122 Ok(())
123 }
124
125 pub fn subscribe(&self, msg_id: MsgId) -> Result<()> {
134 check(unsafe { ffi::CFE_SB_Subscribe(msg_id.0, self.id.0) })?;
135 Ok(())
136 }
137
138 pub fn unsubscribe(&self, msg_id: MsgId) -> Result<()> {
140 check(unsafe { ffi::CFE_SB_Unsubscribe(msg_id.0, self.id.0) })?;
141 Ok(())
142 }
143
144 pub fn unsubscribe_local(&self, msg_id: MsgId) -> Result<()> {
148 check(unsafe { ffi::CFE_SB_UnsubscribeLocal(msg_id.0, self.id.0) })?;
149 Ok(())
150 }
151
152 pub fn timed_recv(&mut self, buf: &mut [u8], timeout: Timeout) -> Result<usize> {
169 let mut buf_ptr = MaybeUninit::uninit();
170
171 let timeout = match timeout {
172 Timeout::PendForever => ffi::CFE_SB_PEND_FOREVER,
173 Timeout::Poll => ffi::CFE_SB_POLL as i32,
174 Timeout::Milliseconds(ms) => {
175 if ms > i32::MAX as u32 {
177 return Err(CfsError::Sb(SbError::BadArgument));
178 } else {
179 ms as i32
180 }
181 }
182 };
183 check(unsafe { ffi::CFE_SB_ReceiveBuffer(buf_ptr.as_mut_ptr(), self.id.0, timeout) })?;
184
185 let buf_ptr = unsafe { buf_ptr.assume_init() };
186
187 let mut size = 0;
188 check(unsafe {
189 ffi::CFE_MSG_GetSize(buf_ptr as *const ffi::CFE_MSG_Message_t, &mut size)
190 })?;
191
192 if size > buf.len() {
193 unsafe {
195 check(ffi::CFE_SB_ReleaseMessageBuffer(buf_ptr))?;
196 }
197 return Err(CfsError::Osal(OsalError::InvalidSize));
198 }
199
200 let src_slice = unsafe { slice::from_raw_parts(buf_ptr as *const u8, size) };
201 buf[..size].copy_from_slice(src_slice);
202
203 Ok(size)
204 }
205
206 pub fn set_opts(&self, opts: PipeOptions) -> Result<()> {
208 check(unsafe { ffi::CFE_SB_SetPipeOpts(self.id.0, opts.bits()) })?;
209 Ok(())
210 }
211
212 pub fn get_opts(&self) -> Result<PipeOptions> {
214 let mut opts = MaybeUninit::uninit();
215 check(unsafe { ffi::CFE_SB_GetPipeOpts(self.id.0, opts.as_mut_ptr()) })?;
216 Ok(PipeOptions::from_bits_truncate(unsafe {
217 opts.assume_init()
218 }))
219 }
220
221 pub fn id(&self) -> PipeId {
223 self.id
224 }
225
226 pub fn name(&self) -> Result<String<{ ffi::OS_MAX_API_NAME as usize }>> {
228 let mut buffer = [0u8; ffi::OS_MAX_API_NAME as usize];
229 check(unsafe {
230 ffi::CFE_SB_GetPipeName(
231 buffer.as_mut_ptr() as *mut libc::c_char,
232 buffer.len(),
233 self.id.0,
234 )
235 })?;
236 let len = buffer.iter().position(|&b| b == 0).unwrap_or(buffer.len());
237 let vec = heapless::Vec::from_slice(&buffer[..len]).map_err(|_| CfsError::Osal(OsalError::NameTooLong))?;
238 String::from_utf8(vec).map_err(|_| CfsError::InvalidString)
239 }
240
241 pub fn get_id_by_name(name: &str) -> Result<PipeId> {
243 let c_name = cstring::<{ ffi::OS_MAX_API_NAME as usize }>(name)?;
244
245 let mut pipe_id = MaybeUninit::uninit();
246 check(unsafe { ffi::CFE_SB_GetPipeIdByName(pipe_id.as_mut_ptr(), c_name.as_ptr()) })?;
247 Ok(PipeId(unsafe { pipe_id.assume_init() }))
248 }
249
250 pub fn subscribe_local(&self, msg_id: MsgId, msg_lim: u16) -> Result<()> {
259 check(unsafe { ffi::CFE_SB_SubscribeLocal(msg_id.0, self.id.0, msg_lim) })?;
260 Ok(())
261 }
262}
263
264impl Drop for Pipe {
265 fn drop(&mut self) {
268 let _ = unsafe { ffi::CFE_SB_DeletePipe(self.id.0) };
269 }
270}
271
272impl Pipe {
273 pub fn recv<'a>(
275 &'a mut self,
276 buf: &'a mut [u8],
277 ) -> impl Future<Output = Result<usize>> + use<'a> {
278 core::future::poll_fn(|_| {
279 let recv_future = self.timed_recv(buf, Timeout::Poll);
280 match recv_future {
281 Err(CfsError::Sb(SbError::NoMessage | SbError::TimeOut))
282 | Err(CfsError::Osal(OsalError::Timeout | OsalError::QueueEmpty)) => Poll::Pending,
283 Ok(result) => Poll::Ready(Ok(result)),
284 Err(e) => Poll::Ready(Err(e)),
285 }
286 })
287 }
288}