scuffle_rtmp/messages/
reader.rs

1//! Reading [`MessageData`].
2
3use super::{MessageData, MessageType, UnknownMessage};
4use crate::chunk::Chunk;
5use crate::command_messages::Command;
6use crate::protocol_control_messages::{
7    ProtocolControlMessageSetChunkSize, ProtocolControlMessageWindowAcknowledgementSize,
8};
9
10impl MessageData<'_> {
11    /// Reads [`MessageData`] from the given chunk.
12    pub fn read(chunk: &Chunk) -> Result<Self, crate::error::RtmpError> {
13        match chunk.message_header.msg_type_id {
14            // Protocol Control Messages
15            MessageType::SetChunkSize => {
16                let data = ProtocolControlMessageSetChunkSize::read(&chunk.payload)?;
17                Ok(Self::SetChunkSize(data))
18            }
19            MessageType::Abort => Ok(Self::Abort), // Not implemented
20            MessageType::Acknowledgement => Ok(Self::Acknowledgement), // Not implemented
21            MessageType::UserControlEvent => Ok(Self::UserControlEvent), // Not implemented
22            MessageType::WindowAcknowledgementSize => {
23                let data = ProtocolControlMessageWindowAcknowledgementSize::read(&chunk.payload)?;
24                Ok(Self::SetAcknowledgementWindowSize(data))
25            }
26            MessageType::SetPeerBandwidth => Ok(Self::SetPeerBandwidth), // Not implemented
27            // RTMP Command Messages
28            MessageType::Audio => Ok(Self::AudioData {
29                data: chunk.payload.clone(),
30            }),
31            MessageType::Video => Ok(Self::VideoData {
32                data: chunk.payload.clone(),
33            }),
34            MessageType::DataAMF3 => Ok(Self::DataAmf3), // Not implemented
35            MessageType::SharedObjAMF3 => Ok(Self::SharedObjAmf3), // Not implemented
36            MessageType::CommandAMF3 => Ok(Self::CommandAmf3), // Not implemented
37            // Metadata
38            MessageType::DataAMF0 => Ok(Self::DataAmf0 {
39                data: chunk.payload.clone(),
40            }),
41            MessageType::SharedObjAMF0 => Ok(Self::SharedObjAmf0), // Not implemented
42            MessageType::CommandAMF0 => Ok(Self::Amf0Command(Command::read(chunk.payload.clone())?)),
43            MessageType::Aggregate => Ok(Self::Aggregate), // Not implemented
44            msg_type_id => Ok(Self::Unknown(UnknownMessage {
45                msg_type_id,
46                data: chunk.payload.clone(),
47            })),
48        }
49    }
50}
51
52#[cfg(test)]
53#[cfg_attr(all(test, coverage_nightly), coverage(off))]
54mod tests {
55    use bytes::Bytes;
56    use scuffle_amf0::encoder::Amf0Encoder;
57    use scuffle_amf0::{Amf0Object, Amf0Value};
58
59    use super::*;
60    use crate::command_messages::CommandType;
61    use crate::command_messages::netconnection::NetConnectionCommand;
62
63    #[test]
64    fn test_parse_command() {
65        let mut buf = Vec::new();
66        let mut encoder = Amf0Encoder::new(&mut buf);
67
68        encoder.encode_string("connect").unwrap();
69        encoder.encode_number(1.0).unwrap();
70        let object: Amf0Object = [("app".into(), Amf0Value::String("testapp".into()))].into_iter().collect();
71        encoder.encode_object(&object).unwrap();
72
73        let amf_data = Bytes::from(buf);
74
75        let chunk = Chunk::new(0, 0, MessageType::CommandAMF0, 0, amf_data);
76
77        let message = MessageData::read(&chunk).expect("no errors");
78        match message {
79            MessageData::Amf0Command(command) => {
80                let Command {
81                    transaction_id,
82                    command_type,
83                } = command;
84                assert_eq!(transaction_id, 1.0);
85
86                let CommandType::NetConnection(NetConnectionCommand::Connect(connect)) = command_type else {
87                    panic!("wrong command");
88                };
89
90                assert_eq!(connect.app, "testapp");
91            }
92            _ => unreachable!("wrong message type"),
93        }
94    }
95
96    #[test]
97    fn test_parse_audio_packet() {
98        let chunk = Chunk::new(0, 0, MessageType::Audio, 0, vec![0x00, 0x00, 0x00, 0x00].into());
99
100        let message = MessageData::read(&chunk).expect("no errors");
101        match message {
102            MessageData::AudioData { data } => {
103                assert_eq!(data, vec![0x00, 0x00, 0x00, 0x00]);
104            }
105            _ => unreachable!("wrong message type"),
106        }
107    }
108
109    #[test]
110    fn test_parse_video_packet() {
111        let chunk = Chunk::new(0, 0, MessageType::Video, 0, vec![0x00, 0x00, 0x00, 0x00].into());
112
113        let message = MessageData::read(&chunk).expect("no errors");
114        match message {
115            MessageData::VideoData { data } => {
116                assert_eq!(data, vec![0x00, 0x00, 0x00, 0x00]);
117            }
118            _ => unreachable!("wrong message type"),
119        }
120    }
121
122    #[test]
123    fn test_parse_set_chunk_size() {
124        let chunk = Chunk::new(0, 0, MessageType::SetChunkSize, 0, vec![0x00, 0xFF, 0xFF, 0xFF].into());
125
126        let message = MessageData::read(&chunk).expect("no errors");
127        match message {
128            MessageData::SetChunkSize(ProtocolControlMessageSetChunkSize { chunk_size }) => {
129                assert_eq!(chunk_size, 0x00FFFFFF);
130            }
131            _ => unreachable!("wrong message type"),
132        }
133    }
134
135    #[test]
136    fn test_parse_window_acknowledgement_size() {
137        let chunk = Chunk::new(
138            0,
139            0,
140            MessageType::WindowAcknowledgementSize,
141            0,
142            vec![0x00, 0xFF, 0xFF, 0xFF].into(),
143        );
144
145        let message = MessageData::read(&chunk).expect("no errors");
146        match message {
147            MessageData::SetAcknowledgementWindowSize(ProtocolControlMessageWindowAcknowledgementSize {
148                acknowledgement_window_size,
149            }) => {
150                assert_eq!(acknowledgement_window_size, 0x00FFFFFF);
151            }
152            _ => unreachable!("wrong message type"),
153        }
154    }
155
156    #[test]
157    fn test_parse_metadata() {
158        let mut buf = Vec::new();
159
160        let mut encoder = Amf0Encoder::new(&mut buf);
161        encoder.encode_string("onMetaData").unwrap();
162        let object: Amf0Object = [("duration".into(), Amf0Value::Number(0.0))].into_iter().collect();
163        encoder.encode_object(&object).unwrap();
164
165        let amf_data = Bytes::from(buf);
166        let chunk = Chunk::new(0, 0, MessageType::DataAMF0, 0, amf_data.clone());
167
168        let message = MessageData::read(&chunk).expect("no errors");
169        match message {
170            MessageData::DataAmf0 { data } => {
171                assert_eq!(data, amf_data);
172            }
173            _ => unreachable!("wrong message type"),
174        }
175    }
176
177    #[test]
178    fn test_unsupported_message_type() {
179        let chunk = Chunk::new(0, 0, MessageType(42), 0, vec![0x00, 0x00, 0x00, 0x00].into());
180
181        assert!(matches!(
182            MessageData::read(&chunk).expect("no errors"),
183            MessageData::Unknown(UnknownMessage {
184                msg_type_id: MessageType(42),
185                ..
186            })
187        ));
188    }
189}