1#![cfg_attr(feature = "docs", doc = "\n\nSee the [changelog][changelog] for a full release history.")]
3#![cfg_attr(feature = "docs", doc = "## Feature flags")]
11#![cfg_attr(feature = "docs", doc = document_features::document_features!())]
12#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
64#![cfg_attr(docsrs, feature(doc_auto_cfg))]
65#![deny(missing_docs)]
66#![deny(unsafe_code)]
67#![deny(unreachable_pub)]
68
69pub mod chunk;
70pub mod command_messages;
71pub mod error;
72pub mod handshake;
73pub mod messages;
74pub mod protocol_control_messages;
75pub mod session;
76pub mod user_control_messages;
77
78pub use session::server::ServerSession;
79
80#[cfg(feature = "docs")]
82#[scuffle_changelog::changelog]
83pub mod changelog {}
84
85#[cfg(test)]
86#[cfg_attr(all(test, coverage_nightly), coverage(off))]
87mod tests {
88 use std::path::PathBuf;
89 use std::time::Duration;
90
91 use scuffle_future_ext::FutureExt;
92 use tokio::process::Command;
93 use tokio::sync::{mpsc, oneshot};
94
95 use crate::session::server::{ServerSession, ServerSessionError, SessionData, SessionHandler};
96
97 enum Event {
98 Publish {
99 stream_id: u32,
100 app_name: String,
101 stream_name: String,
102 response: oneshot::Sender<Result<(), ServerSessionError>>,
103 },
104 Unpublish {
105 stream_id: u32,
106 response: oneshot::Sender<Result<(), ServerSessionError>>,
107 },
108 Data {
109 stream_id: u32,
110 data: SessionData,
111 response: oneshot::Sender<Result<(), ServerSessionError>>,
112 },
113 }
114
115 struct Handler(mpsc::Sender<Event>);
116
117 impl SessionHandler for Handler {
118 async fn on_publish(&mut self, stream_id: u32, app_name: &str, stream_name: &str) -> Result<(), ServerSessionError> {
119 let (response, reciever) = oneshot::channel();
120
121 self.0
122 .send(Event::Publish {
123 stream_id,
124 app_name: app_name.to_string(),
125 stream_name: stream_name.to_string(),
126 response,
127 })
128 .await
129 .unwrap();
130
131 reciever.await.unwrap()
132 }
133
134 async fn on_unpublish(&mut self, stream_id: u32) -> Result<(), ServerSessionError> {
135 let (response, reciever) = oneshot::channel();
136
137 self.0.send(Event::Unpublish { stream_id, response }).await.unwrap();
138
139 reciever.await.unwrap()
140 }
141
142 async fn on_data(&mut self, stream_id: u32, data: SessionData) -> Result<(), ServerSessionError> {
143 let (response, reciever) = oneshot::channel();
144 self.0
145 .send(Event::Data {
146 stream_id,
147 data,
148 response,
149 })
150 .await
151 .unwrap();
152
153 reciever.await.unwrap()
154 }
155 }
156
157 #[cfg(not(valgrind))] #[tokio::test]
159 async fn test_basic_rtmp_clean() {
160 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
161 let addr = listener.local_addr().unwrap();
162
163 let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets");
164
165 let _ffmpeg = Command::new("ffmpeg")
166 .args([
167 "-loglevel",
168 "debug",
169 "-re",
170 "-i",
171 dir.join("avc_aac.mp4").to_str().expect("failed to get path"),
172 "-r",
173 "30",
174 "-t",
175 "1", "-c",
177 "copy",
178 "-f",
179 "flv",
180 &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
181 ])
182 .stdout(std::process::Stdio::inherit())
183 .stderr(std::process::Stdio::inherit())
184 .spawn()
185 .expect("failed to execute ffmpeg");
186
187 let (ffmpeg_stream, _) = listener
188 .accept()
189 .with_timeout(Duration::from_millis(1000))
190 .await
191 .expect("timed out")
192 .expect("failed to accept");
193
194 let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
195 let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
196 let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
197
198 (
199 tokio::spawn(async move {
200 let r = session.run().await;
201 println!("ffmpeg session ended: {r:?}");
202 r
203 }),
204 ffmpeg_event_reciever,
205 )
206 };
207
208 let event = ffmpeg_event_reciever
209 .recv()
210 .with_timeout(Duration::from_millis(1000))
211 .await
212 .expect("timed out")
213 .expect("failed to recv event");
214
215 match event {
216 Event::Publish {
217 stream_id,
218 app_name,
219 stream_name,
220 response,
221 } => {
222 assert_eq!(stream_id, 1);
223 assert_eq!(app_name, "live");
224 assert_eq!(stream_name, "stream-key");
225 response.send(Ok(())).expect("failed to send response");
226 }
227 _ => panic!("unexpected event"),
228 }
229
230 let mut got_video = false;
231 let mut got_audio = false;
232 let mut got_metadata = false;
233
234 while let Some(data) = ffmpeg_event_reciever
235 .recv()
236 .with_timeout(Duration::from_millis(1000))
237 .await
238 .expect("timed out")
239 {
240 match data {
241 Event::Data {
242 stream_id,
243 response,
244 data,
245 ..
246 } => {
247 match data {
248 SessionData::Video { .. } => got_video = true,
249 SessionData::Audio { .. } => got_audio = true,
250 SessionData::Amf0 { .. } => got_metadata = true,
251 }
252 response.send(Ok(())).expect("failed to send response");
253 assert_eq!(stream_id, 1);
254 }
255 Event::Unpublish { stream_id, response } => {
256 assert_eq!(stream_id, 1);
257 response.send(Ok(())).expect("failed to send response");
258 break;
259 }
260 _ => panic!("unexpected event"),
261 }
262 }
263
264 assert!(got_video);
265 assert!(got_audio);
266 assert!(got_metadata);
267
268 if ffmpeg_event_reciever
269 .recv()
270 .with_timeout(Duration::from_millis(1000))
271 .await
272 .expect("timed out")
273 .is_some()
274 {
275 panic!("unexpected event");
276 }
277
278 assert!(
279 ffmpeg_handle
280 .await
281 .expect("failed to join handle")
282 .expect("failed to handle ffmpeg connection")
283 );
284
285 }
288
289 #[cfg(all(not(valgrind), not(windows)))]
292 #[tokio::test]
293 async fn test_basic_rtmp_unclean() {
294 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
295 let addr = listener.local_addr().unwrap();
296
297 let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets");
298
299 let mut ffmpeg = Command::new("ffmpeg")
300 .args([
301 "-loglevel",
302 "debug",
303 "-re",
304 "-i",
305 dir.join("avc_aac.mp4").to_str().expect("failed to get path"),
306 "-r",
307 "30",
308 "-t",
309 "1", "-c",
311 "copy",
312 "-f",
313 "flv",
314 &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
315 ])
316 .stdout(std::process::Stdio::inherit())
317 .stderr(std::process::Stdio::inherit())
318 .spawn()
319 .expect("failed to execute ffmpeg");
320
321 let (ffmpeg_stream, _) = listener
322 .accept()
323 .with_timeout(Duration::from_millis(1000))
324 .await
325 .expect("timed out")
326 .expect("failed to accept");
327
328 let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
329 let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
330 let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
331
332 (
333 tokio::spawn(async move {
334 let r = session.run().await;
335 println!("ffmpeg session ended: {r:?}");
336 r
337 }),
338 ffmpeg_event_reciever,
339 )
340 };
341
342 let event = ffmpeg_event_reciever
343 .recv()
344 .with_timeout(Duration::from_millis(1000))
345 .await
346 .expect("timed out")
347 .expect("failed to recv event");
348
349 match event {
350 Event::Publish {
351 stream_id,
352 app_name,
353 stream_name,
354 response,
355 } => {
356 assert_eq!(stream_id, 1);
357 assert_eq!(app_name, "live");
358 assert_eq!(stream_name, "stream-key");
359 response.send(Ok(())).expect("failed to send response");
360 }
361 _ => panic!("unexpected event"),
362 }
363
364 let mut got_video = false;
365 let mut got_audio = false;
366 let mut got_metadata = false;
367
368 while let Some(data) = ffmpeg_event_reciever
369 .recv()
370 .with_timeout(Duration::from_millis(1000))
371 .await
372 .expect("timed out")
373 {
374 match data {
375 Event::Data {
376 stream_id,
377 response,
378 data,
379 ..
380 } => {
381 assert_eq!(stream_id, 1);
382 match data {
383 SessionData::Video { .. } => got_video = true,
384 SessionData::Audio { .. } => got_audio = true,
385 SessionData::Amf0 { .. } => got_metadata = true,
386 }
387 response.send(Ok(())).expect("failed to send response");
388 }
389 _ => panic!("unexpected event"),
390 }
391
392 if got_video && got_audio && got_metadata {
393 break;
394 }
395 }
396
397 assert!(got_video);
398 assert!(got_audio);
399 assert!(got_metadata);
400
401 ffmpeg.kill().await.expect("failed to kill ffmpeg");
402
403 while let Some(data) = ffmpeg_event_reciever
404 .recv()
405 .with_timeout(Duration::from_millis(1000))
406 .await
407 .expect("timed out")
408 {
409 match data {
410 Event::Data { response, .. } => {
411 response.send(Ok(())).expect("failed to send response");
412 }
413 _ => panic!("unexpected event"),
414 }
415 }
416
417 assert!(
419 !ffmpeg_handle
420 .await
421 .expect("failed to join handle")
422 .expect("failed to handle ffmpeg connection")
423 );
424 }
425}