scuffle_rtmp/command_messages/netstream/
reader.rs

1//! Reading [`NetStreamCommand`].
2
3use bytes::Bytes;
4use scuffle_amf0::decoder::Amf0Decoder;
5use scuffle_bytes_util::zero_copy::BytesBuf;
6
7use super::NetStreamCommand;
8use crate::command_messages::error::CommandError;
9
10impl NetStreamCommand<'_> {
11    /// Reads a [`NetStreamCommand`] from the given decoder.
12    ///
13    /// Returns `Ok(None)` if the `command_name` is not recognized.
14    pub fn read(command_name: &str, decoder: &mut Amf0Decoder<BytesBuf<Bytes>>) -> Result<Option<Self>, CommandError> {
15        match command_name {
16            "play" => {
17                // skip command object
18                decoder.decode_null()?;
19
20                let values = decoder.decode_all()?;
21                Ok(Some(Self::Play { values }))
22            }
23            "play2" => {
24                // skip command object
25                decoder.decode_null()?;
26
27                let parameters = decoder.decode_object()?;
28                Ok(Some(Self::Play2 { parameters }))
29            }
30            "deleteStream" => {
31                // skip command object
32                decoder.decode_null()?;
33
34                let stream_id = decoder.decode_number()?;
35                Ok(Some(Self::DeleteStream { stream_id }))
36            }
37            "closeStream" => Ok(Some(Self::CloseStream)),
38            "receiveAudio" => {
39                // skip command object
40                decoder.decode_null()?;
41
42                let receive_audio = decoder.decode_boolean()?;
43                Ok(Some(Self::ReceiveAudio { receive_audio }))
44            }
45            "receiveVideo" => {
46                // skip command object
47                decoder.decode_null()?;
48
49                let receive_video = decoder.decode_boolean()?;
50                Ok(Some(Self::ReceiveVideo { receive_video }))
51            }
52            "publish" => {
53                // skip command object
54                decoder.decode_null()?;
55
56                let publishing_name = decoder.decode_string()?;
57                let publishing_type = decoder.deserialize()?;
58
59                Ok(Some(Self::Publish {
60                    publishing_name,
61                    publishing_type,
62                }))
63            }
64            "seek" => {
65                // skip command object
66                decoder.decode_null()?;
67
68                let milliseconds = decoder.decode_number()?;
69                Ok(Some(Self::Seek { milliseconds }))
70            }
71            "pause" => {
72                // skip command object
73                decoder.decode_null()?;
74
75                let pause = decoder.decode_boolean()?;
76                let milliseconds = decoder.decode_number()?;
77                Ok(Some(Self::Pause { pause, milliseconds }))
78            }
79            _ => Ok(None),
80        }
81    }
82}
83
84#[cfg(test)]
85#[cfg_attr(all(test, coverage_nightly), coverage(off))]
86mod tests {
87    use bytes::Bytes;
88    use scuffle_amf0::decoder::Amf0Decoder;
89    use scuffle_amf0::encoder::Amf0Encoder;
90    use scuffle_amf0::{Amf0Marker, Amf0Object};
91    use scuffle_bytes_util::StringCow;
92
93    use crate::command_messages::netstream::{NetStreamCommand, NetStreamCommandPublishPublishingType};
94
95    #[test]
96    fn test_command_no_payload() {
97        let command = NetStreamCommand::read("closeStream", &mut Amf0Decoder::from_buf(Bytes::new()))
98            .unwrap()
99            .unwrap();
100        assert_eq!(command, NetStreamCommand::CloseStream);
101    }
102
103    #[test]
104    fn play_command() {
105        let mut payload = Vec::new();
106
107        let mut encoder = Amf0Encoder::new(&mut payload);
108        encoder.encode_null().unwrap();
109        encoder.encode_number(0.0).unwrap();
110        encoder.encode_string("test").unwrap();
111
112        let command = NetStreamCommand::read("play", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
113            .unwrap()
114            .unwrap();
115
116        assert_eq!(
117            command,
118            NetStreamCommand::Play {
119                values: vec![0.0f64.into(), StringCow::from("test").into(),],
120            }
121        );
122    }
123
124    #[test]
125    fn play2_command() {
126        let mut payload = Vec::new();
127
128        let mut encoder = Amf0Encoder::new(&mut payload);
129        encoder.encode_null().unwrap();
130
131        let object: Amf0Object = [
132            ("name".into(), StringCow::from("test").into()),
133            ("value".into(), 0.0f64.into()),
134        ]
135        .into_iter()
136        .collect();
137        encoder.encode_object(&object).unwrap();
138
139        let command = NetStreamCommand::read("play2", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
140            .unwrap()
141            .unwrap();
142
143        assert_eq!(command, NetStreamCommand::Play2 { parameters: object });
144    }
145
146    #[test]
147    fn receive_audio() {
148        let mut payload = Vec::new();
149        let mut encoder = Amf0Encoder::new(&mut payload);
150        encoder.encode_null().unwrap();
151        encoder.encode_boolean(true).unwrap();
152
153        let command = NetStreamCommand::read("receiveAudio", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
154            .unwrap()
155            .unwrap();
156        assert_eq!(command, NetStreamCommand::ReceiveAudio { receive_audio: true });
157    }
158
159    #[test]
160    fn receive_video() {
161        let mut payload = Vec::new();
162        let mut encoder = Amf0Encoder::new(&mut payload);
163        encoder.encode_null().unwrap();
164        encoder.encode_boolean(true).unwrap();
165
166        let command = NetStreamCommand::read("receiveVideo", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
167            .unwrap()
168            .unwrap();
169        assert_eq!(command, NetStreamCommand::ReceiveVideo { receive_video: true });
170    }
171
172    #[test]
173    fn delete_stream() {
174        let mut payload = vec![Amf0Marker::Null as u8, Amf0Marker::Number as u8];
175        payload.extend_from_slice(0.0f64.to_be_bytes().as_ref());
176
177        let command = NetStreamCommand::read("deleteStream", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
178            .unwrap()
179            .unwrap();
180        assert_eq!(command, NetStreamCommand::DeleteStream { stream_id: 0.0 });
181    }
182
183    #[test]
184    fn publish() {
185        let mut payload = Vec::new();
186
187        let mut encoder = Amf0Encoder::new(&mut payload);
188        encoder.encode_null().unwrap();
189        encoder.encode_string("live").unwrap();
190        encoder.serialize(NetStreamCommandPublishPublishingType::Record).unwrap();
191
192        let command = NetStreamCommand::read("publish", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
193            .unwrap()
194            .unwrap();
195
196        assert_eq!(
197            command,
198            NetStreamCommand::Publish {
199                publishing_name: "live".into(),
200                publishing_type: NetStreamCommandPublishPublishingType::Record
201            }
202        );
203    }
204
205    #[test]
206    fn seek() {
207        let mut payload = Vec::new();
208        let mut encoder = Amf0Encoder::new(&mut payload);
209        encoder.encode_null().unwrap();
210        encoder.encode_number(0.0).unwrap();
211
212        let command = NetStreamCommand::read("seek", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
213            .unwrap()
214            .unwrap();
215        assert_eq!(command, NetStreamCommand::Seek { milliseconds: 0.0 });
216    }
217
218    #[test]
219    fn pause() {
220        let mut payload = Vec::new();
221        let mut encoder = Amf0Encoder::new(&mut payload);
222        encoder.encode_null().unwrap();
223        encoder.encode_boolean(true).unwrap();
224        encoder.encode_number(0.0).unwrap();
225
226        let command = NetStreamCommand::read("pause", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
227            .unwrap()
228            .unwrap();
229        assert_eq!(
230            command,
231            NetStreamCommand::Pause {
232                pause: true,
233                milliseconds: 0.0
234            }
235        );
236    }
237}