scuffle_rtmp/chunk/
writer.rs

1//! Types and functions for writing RTMP chunks.
2
3use std::io;
4
5use byteorder::{BigEndian, LittleEndian, WriteBytesExt};
6
7use super::{Chunk, ChunkMessageHeader, ChunkType, INIT_CHUNK_SIZE};
8
9/// A chunk writer.
10///
11/// This is used to write chunks into a stream.
12pub struct ChunkWriter {
13    chunk_size: usize,
14}
15
16impl Default for ChunkWriter {
17    fn default() -> Self {
18        Self {
19            chunk_size: INIT_CHUNK_SIZE,
20        }
21    }
22}
23
24impl ChunkWriter {
25    /// Set the chunk size.
26    pub fn set_chunk_size(&mut self, chunk_size: usize) {
27        self.chunk_size = chunk_size;
28    }
29
30    /// Internal function to write the basic header.
31    #[inline]
32    fn write_basic_header(io: &mut impl io::Write, fmt: ChunkType, csid: u32) -> io::Result<()> {
33        let fmt = fmt as u8;
34
35        if csid >= 64 + 255 {
36            io.write_u8((fmt << 6) | 1)?;
37            let csid = csid - 64;
38
39            let div = csid / 256;
40            let rem = csid % 256;
41
42            io.write_u8(rem as u8)?;
43            io.write_u8(div as u8)?;
44        } else if csid >= 64 {
45            io.write_u8(fmt << 6)?;
46            io.write_u8((csid - 64) as u8)?;
47        } else {
48            io.write_u8((fmt << 6) | csid as u8)?;
49        }
50
51        Ok(())
52    }
53
54    /// Internal function to write the message header.
55    #[inline]
56    fn write_message_header(io: &mut impl io::Write, message_header: &ChunkMessageHeader) -> io::Result<()> {
57        let timestamp = if message_header.timestamp >= 0xFFFFFF {
58            0xFFFFFF
59        } else {
60            message_header.timestamp
61        };
62
63        io.write_u24::<BigEndian>(timestamp)?;
64        io.write_u24::<BigEndian>(message_header.msg_length)?;
65        io.write_u8(message_header.msg_type_id.0)?;
66        io.write_u32::<LittleEndian>(message_header.msg_stream_id)?;
67
68        if message_header.is_extended_timestamp() {
69            Self::write_extened_timestamp(io, message_header.timestamp)?;
70        }
71
72        Ok(())
73    }
74
75    /// Internal function to write the extended timestamp.
76    #[inline]
77    fn write_extened_timestamp(io: &mut impl io::Write, timestamp: u32) -> io::Result<()> {
78        io.write_u32::<BigEndian>(timestamp)?;
79
80        Ok(())
81    }
82
83    /// Write a chunk into some writer.
84    pub fn write_chunk(&self, io: &mut impl io::Write, mut chunk_info: Chunk) -> io::Result<()> {
85        Self::write_basic_header(io, ChunkType::Type0, chunk_info.basic_header.chunk_stream_id)?;
86
87        Self::write_message_header(io, &chunk_info.message_header)?;
88
89        while !chunk_info.payload.is_empty() {
90            let cur_payload_size = if chunk_info.payload.len() > self.chunk_size {
91                self.chunk_size
92            } else {
93                chunk_info.payload.len()
94            };
95
96            let payload_bytes = chunk_info.payload.split_to(cur_payload_size);
97            io.write_all(&payload_bytes[..])?;
98
99            if !chunk_info.payload.is_empty() {
100                Self::write_basic_header(io, ChunkType::Type3, chunk_info.basic_header.chunk_stream_id)?;
101
102                if chunk_info.message_header.is_extended_timestamp() {
103                    Self::write_extened_timestamp(io, chunk_info.message_header.timestamp)?;
104                }
105            }
106        }
107
108        Ok(())
109    }
110}
111
112#[cfg(test)]
113#[cfg_attr(all(test, coverage_nightly), coverage(off))]
114mod tests {
115    use bytes::Bytes;
116
117    use super::*;
118    use crate::messages::MessageType;
119
120    #[test]
121    fn test_writer_write_small_chunk() {
122        let writer = ChunkWriter::default();
123        let mut buf = Vec::new();
124
125        let chunk = Chunk::new(
126            0,
127            0,
128            MessageType::Abort,
129            0,
130            Bytes::from(vec![0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]),
131        );
132
133        writer.write_chunk(&mut buf, chunk).unwrap();
134
135        #[rustfmt::skip]
136        assert_eq!(
137            buf,
138            vec![
139                (0x00 << 6), // chunk basic header - fmt: 0, csid: 0
140                0x00, 0x00, 0x00, // timestamp (0)
141                0x00, 0x00, 0x08, // message length (8 bytes)
142                0x02, // message type id (abort)
143                0x00, 0x00, 0x00, 0x00, // message stream id (0)
144                0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // message payload
145            ]
146        );
147    }
148
149    #[test]
150    fn test_writer_write_large_chunk() {
151        let writer = ChunkWriter::default();
152        let mut buf = Vec::new();
153
154        let mut payload = Vec::new();
155        for i in 0..129 {
156            payload.push(i);
157        }
158
159        let chunk = Chunk::new(10, 100, MessageType::Audio, 13, Bytes::from(payload));
160
161        writer.write_chunk(&mut buf, chunk).unwrap();
162
163        #[rustfmt::skip]
164        let mut expected = vec![
165            0x0A, // chunk basic header - fmt: 0, csid: 10 (the format should have been fixed to 0)
166            0x00, 0x00, 0x64, // timestamp (100)
167            0x00, 0x00, 0x81, // message length (129 bytes)
168            0x08, // message type id (audio)
169            0x0D, 0x00, 0x00, 0x00, // message stream id (13)
170        ];
171
172        for i in 0..128 {
173            expected.push(i);
174        }
175
176        expected.push((0x03 << 6) | 0x0A); // chunk basic header - fmt: 3, csid: 10
177        expected.push(128); // The rest of the payload should have been written
178
179        assert_eq!(buf, expected);
180    }
181
182    #[test]
183    fn test_writer_extended_timestamp() {
184        let writer = ChunkWriter::default();
185        let mut buf = Vec::new();
186
187        let chunk = Chunk::new(
188            0,
189            0xFFFFFFFF,
190            MessageType::Abort,
191            0,
192            Bytes::from(vec![0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]),
193        );
194
195        writer.write_chunk(&mut buf, chunk).unwrap();
196
197        #[rustfmt::skip]
198        assert_eq!(
199            buf,
200            vec![
201                (0x00 << 6), // chunk basic header - fmt: 0, csid: 0
202                0xFF, 0xFF, 0xFF, // timestamp (0xFFFFFF)
203                0x00, 0x00, 0x08, // message length (8 bytes)
204                0x02, // message type id (abort)
205                0x00, 0x00, 0x00,
206                0x00, // message stream id (0)
207                0xFF, 0xFF, 0xFF,
208                0xFF, // extended timestamp (1)
209                0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // message payload
210            ]
211        );
212    }
213
214    #[test]
215    fn test_writer_extended_timestamp_ext() {
216        let writer = ChunkWriter::default();
217        let mut buf = Vec::new();
218
219        let mut payload = Vec::new();
220        for i in 0..129 {
221            payload.push(i);
222        }
223
224        let chunk = Chunk::new(0, 0xFFFFFFFF, MessageType::Abort, 0, Bytes::from(payload));
225
226        writer.write_chunk(&mut buf, chunk).unwrap();
227
228        #[rustfmt::skip]
229        let mut expected = vec![
230            (0x00 << 6), // chunk basic header - fmt: 0, csid: 0
231            0xFF, 0xFF, 0xFF, // timestamp (0xFFFFFF)
232            0x00, 0x00, 0x81, // message length (8 bytes)
233            0x02, // message type id (abort)
234            0x00, 0x00, 0x00, 0x00, // message stream id (0)
235            0xFF, 0xFF, 0xFF, 0xFF, // extended timestamp (1)
236        ];
237
238        for i in 0..128 {
239            expected.push(i);
240        }
241
242        expected.push(0x03 << 6); // chunk basic header - fmt: 3, csid: 0
243        expected.extend(vec![0xFF, 0xFF, 0xFF, 0xFF]); // extended timestamp
244        expected.push(128); // The rest of the payload should have been written
245
246        assert_eq!(buf, expected);
247    }
248
249    #[test]
250    fn test_writer_extended_csid() {
251        let writer = ChunkWriter::default();
252        let mut buf = Vec::new();
253
254        let chunk = Chunk::new(
255            64,
256            0,
257            MessageType::Abort,
258            0,
259            Bytes::from(vec![0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]),
260        );
261
262        writer.write_chunk(&mut buf, chunk).unwrap();
263
264        #[rustfmt::skip]
265        assert_eq!(
266            buf,
267            vec![
268                (0x00 << 6), // chunk basic header - fmt: 0, csid: 0
269                0x00, // extended csid (64 + 0) = 64
270                0x00, 0x00, 0x00, // timestamp (0)
271                0x00, 0x00, 0x08, // message length (8 bytes)
272                0x02, // message type id (abort)
273                0x00, 0x00, 0x00, 0x00, // message stream id (0)
274                0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // message payload
275            ]
276        );
277    }
278
279    #[test]
280    fn test_writer_extended_csid_ext() {
281        let writer = ChunkWriter::default();
282        let mut buf = Vec::new();
283
284        let chunk = Chunk::new(
285            320,
286            0,
287            MessageType::Abort,
288            0,
289            Bytes::from(vec![0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]),
290        );
291
292        writer.write_chunk(&mut buf, chunk).unwrap();
293
294        #[rustfmt::skip]
295        assert_eq!(
296            buf,
297            vec![
298                0x01, // chunk basic header - fmt: 0, csid: 1
299                0x00, // extended csid (64 + 0) = 64
300                0x01, // extended csid (256 * 1) = 256 + 64 + 0 = 320
301                0x00, 0x00, 0x00, // timestamp (0)
302                0x00, 0x00, 0x08, // message length (8 bytes)
303                0x02, // message type id (abort)
304                0x00, 0x00, 0x00, 0x00, // message stream id (0)
305                0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // message payload
306            ]
307        );
308    }
309}