scuffle_rtmp/command_messages/netstream/
reader.rs1use bytes::Bytes;
4use scuffle_amf0::decoder::Amf0Decoder;
5use scuffle_bytes_util::zero_copy::BytesBuf;
6
7use super::NetStreamCommand;
8use crate::command_messages::error::CommandError;
9
10impl NetStreamCommand<'_> {
11 pub fn read(command_name: &str, decoder: &mut Amf0Decoder<BytesBuf<Bytes>>) -> Result<Option<Self>, CommandError> {
15 match command_name {
16 "play" => {
17 decoder.decode_null()?;
19
20 let values = decoder.decode_all()?;
21 Ok(Some(Self::Play { values }))
22 }
23 "play2" => {
24 decoder.decode_null()?;
26
27 let parameters = decoder.decode_object()?;
28 Ok(Some(Self::Play2 { parameters }))
29 }
30 "deleteStream" => {
31 decoder.decode_null()?;
33
34 let stream_id = decoder.decode_number()?;
35 Ok(Some(Self::DeleteStream { stream_id }))
36 }
37 "closeStream" => Ok(Some(Self::CloseStream)),
38 "receiveAudio" => {
39 decoder.decode_null()?;
41
42 let receive_audio = decoder.decode_boolean()?;
43 Ok(Some(Self::ReceiveAudio { receive_audio }))
44 }
45 "receiveVideo" => {
46 decoder.decode_null()?;
48
49 let receive_video = decoder.decode_boolean()?;
50 Ok(Some(Self::ReceiveVideo { receive_video }))
51 }
52 "publish" => {
53 decoder.decode_null()?;
55
56 let publishing_name = decoder.decode_string()?;
57 let publishing_type = decoder.deserialize()?;
58
59 Ok(Some(Self::Publish {
60 publishing_name,
61 publishing_type,
62 }))
63 }
64 "seek" => {
65 decoder.decode_null()?;
67
68 let milliseconds = decoder.decode_number()?;
69 Ok(Some(Self::Seek { milliseconds }))
70 }
71 "pause" => {
72 decoder.decode_null()?;
74
75 let pause = decoder.decode_boolean()?;
76 let milliseconds = decoder.decode_number()?;
77 Ok(Some(Self::Pause { pause, milliseconds }))
78 }
79 _ => Ok(None),
80 }
81 }
82}
83
84#[cfg(test)]
85#[cfg_attr(all(test, coverage_nightly), coverage(off))]
86mod tests {
87 use bytes::Bytes;
88 use scuffle_amf0::decoder::Amf0Decoder;
89 use scuffle_amf0::encoder::Amf0Encoder;
90 use scuffle_amf0::{Amf0Marker, Amf0Object};
91 use scuffle_bytes_util::StringCow;
92
93 use crate::command_messages::netstream::{NetStreamCommand, NetStreamCommandPublishPublishingType};
94
95 #[test]
96 fn test_command_no_payload() {
97 let command = NetStreamCommand::read("closeStream", &mut Amf0Decoder::from_buf(Bytes::new()))
98 .unwrap()
99 .unwrap();
100 assert_eq!(command, NetStreamCommand::CloseStream);
101 }
102
103 #[test]
104 fn play_command() {
105 let mut payload = Vec::new();
106
107 let mut encoder = Amf0Encoder::new(&mut payload);
108 encoder.encode_null().unwrap();
109 encoder.encode_number(0.0).unwrap();
110 encoder.encode_string("test").unwrap();
111
112 let command = NetStreamCommand::read("play", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
113 .unwrap()
114 .unwrap();
115
116 assert_eq!(
117 command,
118 NetStreamCommand::Play {
119 values: vec![0.0f64.into(), StringCow::from("test").into(),],
120 }
121 );
122 }
123
124 #[test]
125 fn play2_command() {
126 let mut payload = Vec::new();
127
128 let mut encoder = Amf0Encoder::new(&mut payload);
129 encoder.encode_null().unwrap();
130
131 let object: Amf0Object = [
132 ("name".into(), StringCow::from("test").into()),
133 ("value".into(), 0.0f64.into()),
134 ]
135 .into_iter()
136 .collect();
137 encoder.encode_object(&object).unwrap();
138
139 let command = NetStreamCommand::read("play2", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
140 .unwrap()
141 .unwrap();
142
143 assert_eq!(command, NetStreamCommand::Play2 { parameters: object });
144 }
145
146 #[test]
147 fn receive_audio() {
148 let mut payload = Vec::new();
149 let mut encoder = Amf0Encoder::new(&mut payload);
150 encoder.encode_null().unwrap();
151 encoder.encode_boolean(true).unwrap();
152
153 let command = NetStreamCommand::read("receiveAudio", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
154 .unwrap()
155 .unwrap();
156 assert_eq!(command, NetStreamCommand::ReceiveAudio { receive_audio: true });
157 }
158
159 #[test]
160 fn receive_video() {
161 let mut payload = Vec::new();
162 let mut encoder = Amf0Encoder::new(&mut payload);
163 encoder.encode_null().unwrap();
164 encoder.encode_boolean(true).unwrap();
165
166 let command = NetStreamCommand::read("receiveVideo", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
167 .unwrap()
168 .unwrap();
169 assert_eq!(command, NetStreamCommand::ReceiveVideo { receive_video: true });
170 }
171
172 #[test]
173 fn delete_stream() {
174 let mut payload = vec![Amf0Marker::Null as u8, Amf0Marker::Number as u8];
175 payload.extend_from_slice(0.0f64.to_be_bytes().as_ref());
176
177 let command = NetStreamCommand::read("deleteStream", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
178 .unwrap()
179 .unwrap();
180 assert_eq!(command, NetStreamCommand::DeleteStream { stream_id: 0.0 });
181 }
182
183 #[test]
184 fn publish() {
185 let mut payload = Vec::new();
186
187 let mut encoder = Amf0Encoder::new(&mut payload);
188 encoder.encode_null().unwrap();
189 encoder.encode_string("live").unwrap();
190 encoder.serialize(NetStreamCommandPublishPublishingType::Record).unwrap();
191
192 let command = NetStreamCommand::read("publish", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
193 .unwrap()
194 .unwrap();
195
196 assert_eq!(
197 command,
198 NetStreamCommand::Publish {
199 publishing_name: "live".into(),
200 publishing_type: NetStreamCommandPublishPublishingType::Record
201 }
202 );
203 }
204
205 #[test]
206 fn seek() {
207 let mut payload = Vec::new();
208 let mut encoder = Amf0Encoder::new(&mut payload);
209 encoder.encode_null().unwrap();
210 encoder.encode_number(0.0).unwrap();
211
212 let command = NetStreamCommand::read("seek", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
213 .unwrap()
214 .unwrap();
215 assert_eq!(command, NetStreamCommand::Seek { milliseconds: 0.0 });
216 }
217
218 #[test]
219 fn pause() {
220 let mut payload = Vec::new();
221 let mut encoder = Amf0Encoder::new(&mut payload);
222 encoder.encode_null().unwrap();
223 encoder.encode_boolean(true).unwrap();
224 encoder.encode_number(0.0).unwrap();
225
226 let command = NetStreamCommand::read("pause", &mut Amf0Decoder::from_buf(Bytes::from_owner(payload)))
227 .unwrap()
228 .unwrap();
229 assert_eq!(
230 command,
231 NetStreamCommand::Pause {
232 pause: true,
233 milliseconds: 0.0
234 }
235 );
236 }
237}