leodos_protocols/datalink/link/
framed.rs1use bon::bon;
9
10use crate::coding::CodingRead;
11use crate::coding::CodingWrite;
12use crate::datalink::DatalinkRead;
13use crate::datalink::DatalinkWrite;
14use crate::datalink::framing::FrameRead;
15use crate::datalink::framing::FrameWrite;
16use crate::datalink::framing::PushError;
17use crate::datalink::security::SecurityProcessor;
18use crate::datalink::spp::SpacePacket;
19
20#[derive(Debug, Clone, thiserror::Error)]
22pub enum DatalinkError<E> {
23 #[error("link error: {0}")]
25 Link(E),
26 #[error("frame too large")]
28 FrameTooLarge,
29 #[error("invalid frame")]
31 InvalidFrame,
32 #[error("frame build error")]
34 BuildError,
35 #[error("security error")]
37 Security,
38}
39
40pub struct DatalinkWriter<F, W, S> {
46 frame_writer: F,
47 security: S,
48 coding_writer: W,
49 scratch: [u8; 2048],
50}
51
52#[bon]
53impl<F, W, S> DatalinkWriter<F, W, S>
54where
55 F: FrameWrite,
56 S: SecurityProcessor,
57 W: CodingWrite,
58{
59 #[builder]
61 pub fn new(frame_writer: F, coding_writer: W, security: S) -> Self {
62 Self {
63 frame_writer,
64 security,
65 coding_writer,
66 scratch: [0u8; 2048],
67 }
68 }
69
70 pub async fn flush(&mut self) -> Result<(), DatalinkError<W::Error>> {
73 if self.frame_writer.is_empty() {
74 return Ok(());
75 }
76
77 let frame = self
78 .frame_writer
79 .finish()
80 .map_err(|_| DatalinkError::BuildError)?;
81
82 let len = frame.len().min(2048);
83 self.scratch[..len].copy_from_slice(&frame[..len]);
84
85 let secured_len = self
86 .security
87 .apply(&mut self.scratch[..len])
88 .map_err(|_| DatalinkError::Security)?;
89
90 self.coding_writer
91 .write(&self.scratch[..secured_len])
92 .await
93 .map_err(DatalinkError::Link)
94 }
95}
96
97impl<F, W, S> DatalinkWrite for DatalinkWriter<F, W, S>
98where
99 F: FrameWrite,
100 S: SecurityProcessor,
101 W: CodingWrite,
102{
103 type Error = DatalinkError<W::Error>;
104
105 async fn write(&mut self, data: &[u8]) -> Result<(), Self::Error> {
106 match self.frame_writer.push(data) {
107 Ok(()) => Ok(()),
108 Err(PushError::TooLarge) => Err(DatalinkError::FrameTooLarge),
109 Err(PushError::Full) => {
110 self.flush().await?;
111 self.frame_writer
112 .push(data)
113 .map_err(|_| DatalinkError::FrameTooLarge)
114 }
115 }
116 }
117}
118
119pub struct DatalinkReader<F, R, S> {
125 frame_reader: F,
126 security: S,
127 coding_reader: R,
128 pos: usize,
129}
130
131#[bon]
132impl<F, R, S> DatalinkReader<F, R, S>
133where
134 F: FrameRead,
135 S: SecurityProcessor,
136 R: CodingRead,
137{
138 #[builder]
140 pub fn new(frame_reader: F, coding_reader: R, security: S) -> Self {
141 Self {
142 frame_reader,
143 security,
144 coding_reader,
145 pos: 0,
146 }
147 }
148
149 fn extract_packet(&mut self, buffer: &mut [u8]) -> Option<usize> {
150 let data = self.frame_reader.data_field();
151 if self.pos >= data.len() {
152 return None;
153 }
154 let remaining = &data[self.pos..];
155 let pkt = SpacePacket::parse(remaining).ok()?;
156 let len = pkt.primary_header.packet_len();
157 let n = len.min(buffer.len());
158 buffer[..n].copy_from_slice(&remaining[..n]);
159 self.pos += len;
160 Some(n)
161 }
162}
163
164impl<F, R, S> DatalinkRead for DatalinkReader<F, R, S>
165where
166 F: FrameRead,
167 S: SecurityProcessor,
168 R: CodingRead,
169{
170 type Error = DatalinkError<R::Error>;
171
172 async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
173 if let Some(n) = self.extract_packet(buffer) {
174 return Ok(n);
175 }
176
177 let frame_buf = self.frame_reader.buffer_mut();
178 let len = self
179 .coding_reader
180 .read(frame_buf)
181 .await
182 .map_err(DatalinkError::Link)?;
183
184 if len == 0 {
185 return Ok(0);
186 }
187
188 let _secured_len = self
189 .security
190 .process(&mut frame_buf[..len])
191 .map_err(|_| DatalinkError::Security)?;
192
193 self.frame_reader
194 .feed(len)
195 .map_err(|_| DatalinkError::InvalidFrame)?;
196 self.pos = 0;
197
198 match self.extract_packet(buffer) {
199 Some(n) => Ok(n),
200 None => Ok(0),
201 }
202 }
203}