scuffle_rtmp/
lib.rs

1//! A crate for handling RTMP server connections.
2#![cfg_attr(feature = "docs", doc = "\n\nSee the [changelog][changelog] for a full release history.")]
3//! ## Specifications
4//!
5//! | Name | Version | Link | Comments |
6//! | --- | --- | --- | --- |
7//! | Adobe’s Real Time Messaging Protocol | `1.0` | <https://github.com/veovera/enhanced-rtmp/blob/main/docs/legacy/rtmp-v1-0-spec.pdf> | Refered to as 'Legacy RTMP spec' in this documentation |
8//! | Enhancing RTMP, FLV | `v1-2024-02-29-r1` | <https://github.com/veovera/enhanced-rtmp/blob/main/docs/enhanced/enhanced-rtmp-v1.pdf> | |
9//! | Enhanced RTMP | `v2-2024-10-22-b1` | <https://github.com/veovera/enhanced-rtmp/blob/main/docs/enhanced/enhanced-rtmp-v2.pdf> | Refered to as 'Enhanced RTMP spec' in this documentation |
10#![cfg_attr(feature = "docs", doc = "## Feature flags")]
11#![cfg_attr(feature = "docs", doc = document_features::document_features!())]
12//! ## Example
13//!
14//! ```no_run
15//! # use std::io::Cursor;
16//! #
17//! # use scuffle_rtmp::ServerSession;
18//! # use scuffle_rtmp::session::server::{ServerSessionError, SessionData, SessionHandler};
19//! # use tokio::net::TcpListener;
20//! #
21//! struct Handler;
22//!
23//! impl SessionHandler for Handler {
24//!     async fn on_data(&mut self, stream_id: u32, data: SessionData) -> Result<(), ServerSessionError> {
25//!         // Handle incoming video/audio/meta data
26//!         Ok(())
27//!     }
28//!
29//!     async fn on_publish(&mut self, stream_id: u32, app_name: &str, stream_name: &str) -> Result<(), ServerSessionError> {
30//!         // Handle the publish event
31//!         Ok(())
32//!     }
33//!
34//!     async fn on_unpublish(&mut self, stream_id: u32) -> Result<(), ServerSessionError> {
35//!         // Handle the unpublish event
36//!         Ok(())
37//!     }
38//! }
39//!
40//! #[tokio::main]
41//! async fn main() {
42//!     let listener = TcpListener::bind("[::]:1935").await.unwrap();
43//!     // listening on [::]:1935
44//!
45//!     while let Ok((stream, addr)) = listener.accept().await {
46//!         let session = ServerSession::new(stream, Handler);
47//!
48//!         tokio::spawn(async move {
49//!             if let Err(err) = session.run().await {
50//!                 // Handle the session error
51//!             }
52//!         });
53//!     }
54//! }
55//! ```
56//!
57//! ## License
58//!
59//! This project is licensed under the MIT or Apache-2.0 license.
60//! You can choose between one of them if you use this work.
61//!
62//! `SPDX-License-Identifier: MIT OR Apache-2.0`
63#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
64#![cfg_attr(docsrs, feature(doc_auto_cfg))]
65#![deny(missing_docs)]
66#![deny(unsafe_code)]
67#![deny(unreachable_pub)]
68
69pub mod chunk;
70pub mod command_messages;
71pub mod error;
72pub mod handshake;
73pub mod messages;
74pub mod protocol_control_messages;
75pub mod session;
76pub mod user_control_messages;
77
78pub use session::server::ServerSession;
79
80/// Changelogs generated by [scuffle_changelog]
81#[cfg(feature = "docs")]
82#[scuffle_changelog::changelog]
83pub mod changelog {}
84
85#[cfg(test)]
86#[cfg_attr(all(test, coverage_nightly), coverage(off))]
87mod tests {
88    use std::path::PathBuf;
89    use std::time::Duration;
90
91    use scuffle_future_ext::FutureExt;
92    use tokio::process::Command;
93    use tokio::sync::{mpsc, oneshot};
94
95    use crate::session::server::{ServerSession, ServerSessionError, SessionData, SessionHandler};
96
97    enum Event {
98        Publish {
99            stream_id: u32,
100            app_name: String,
101            stream_name: String,
102            response: oneshot::Sender<Result<(), ServerSessionError>>,
103        },
104        Unpublish {
105            stream_id: u32,
106            response: oneshot::Sender<Result<(), ServerSessionError>>,
107        },
108        Data {
109            stream_id: u32,
110            data: SessionData,
111            response: oneshot::Sender<Result<(), ServerSessionError>>,
112        },
113    }
114
115    struct Handler(mpsc::Sender<Event>);
116
117    impl SessionHandler for Handler {
118        async fn on_publish(&mut self, stream_id: u32, app_name: &str, stream_name: &str) -> Result<(), ServerSessionError> {
119            let (response, reciever) = oneshot::channel();
120
121            self.0
122                .send(Event::Publish {
123                    stream_id,
124                    app_name: app_name.to_string(),
125                    stream_name: stream_name.to_string(),
126                    response,
127                })
128                .await
129                .unwrap();
130
131            reciever.await.unwrap()
132        }
133
134        async fn on_unpublish(&mut self, stream_id: u32) -> Result<(), ServerSessionError> {
135            let (response, reciever) = oneshot::channel();
136
137            self.0.send(Event::Unpublish { stream_id, response }).await.unwrap();
138
139            reciever.await.unwrap()
140        }
141
142        async fn on_data(&mut self, stream_id: u32, data: SessionData) -> Result<(), ServerSessionError> {
143            let (response, reciever) = oneshot::channel();
144            self.0
145                .send(Event::Data {
146                    stream_id,
147                    data,
148                    response,
149                })
150                .await
151                .unwrap();
152
153            reciever.await.unwrap()
154        }
155    }
156
157    #[cfg(not(valgrind))] // test is time-sensitive, consider refactoring?
158    #[tokio::test]
159    async fn test_basic_rtmp_clean() {
160        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
161        let addr = listener.local_addr().unwrap();
162
163        let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets");
164
165        let _ffmpeg = Command::new("ffmpeg")
166            .args([
167                "-loglevel",
168                "debug",
169                "-re",
170                "-i",
171                dir.join("avc_aac.mp4").to_str().expect("failed to get path"),
172                "-r",
173                "30",
174                "-t",
175                "1", // just for the test so it doesn't take too long
176                "-c",
177                "copy",
178                "-f",
179                "flv",
180                &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
181            ])
182            .stdout(std::process::Stdio::inherit())
183            .stderr(std::process::Stdio::inherit())
184            .spawn()
185            .expect("failed to execute ffmpeg");
186
187        let (ffmpeg_stream, _) = listener
188            .accept()
189            .with_timeout(Duration::from_millis(1000))
190            .await
191            .expect("timed out")
192            .expect("failed to accept");
193
194        let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
195            let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
196            let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
197
198            (
199                tokio::spawn(async move {
200                    let r = session.run().await;
201                    println!("ffmpeg session ended: {r:?}");
202                    r
203                }),
204                ffmpeg_event_reciever,
205            )
206        };
207
208        let event = ffmpeg_event_reciever
209            .recv()
210            .with_timeout(Duration::from_millis(1000))
211            .await
212            .expect("timed out")
213            .expect("failed to recv event");
214
215        match event {
216            Event::Publish {
217                stream_id,
218                app_name,
219                stream_name,
220                response,
221            } => {
222                assert_eq!(stream_id, 1);
223                assert_eq!(app_name, "live");
224                assert_eq!(stream_name, "stream-key");
225                response.send(Ok(())).expect("failed to send response");
226            }
227            _ => panic!("unexpected event"),
228        }
229
230        let mut got_video = false;
231        let mut got_audio = false;
232        let mut got_metadata = false;
233
234        while let Some(data) = ffmpeg_event_reciever
235            .recv()
236            .with_timeout(Duration::from_millis(1000))
237            .await
238            .expect("timed out")
239        {
240            match data {
241                Event::Data {
242                    stream_id,
243                    response,
244                    data,
245                    ..
246                } => {
247                    match data {
248                        SessionData::Video { .. } => got_video = true,
249                        SessionData::Audio { .. } => got_audio = true,
250                        SessionData::Amf0 { .. } => got_metadata = true,
251                    }
252                    response.send(Ok(())).expect("failed to send response");
253                    assert_eq!(stream_id, 1);
254                }
255                Event::Unpublish { stream_id, response } => {
256                    assert_eq!(stream_id, 1);
257                    response.send(Ok(())).expect("failed to send response");
258                    break;
259                }
260                _ => panic!("unexpected event"),
261            }
262        }
263
264        assert!(got_video);
265        assert!(got_audio);
266        assert!(got_metadata);
267
268        if ffmpeg_event_reciever
269            .recv()
270            .with_timeout(Duration::from_millis(1000))
271            .await
272            .expect("timed out")
273            .is_some()
274        {
275            panic!("unexpected event");
276        }
277
278        assert!(
279            ffmpeg_handle
280                .await
281                .expect("failed to join handle")
282                .expect("failed to handle ffmpeg connection")
283        );
284
285        // TODO: Fix this assertion
286        // assert!(ffmpeg.try_wait().expect("failed to wait for ffmpeg").is_none());
287    }
288
289    // test is time-sensitive, consider refactoring?
290    // windows seems to not let us kill ffmpeg without it cleaning up the stream.
291    #[cfg(all(not(valgrind), not(windows)))]
292    #[tokio::test]
293    async fn test_basic_rtmp_unclean() {
294        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
295        let addr = listener.local_addr().unwrap();
296
297        let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets");
298
299        let mut ffmpeg = Command::new("ffmpeg")
300            .args([
301                "-loglevel",
302                "debug",
303                "-re",
304                "-i",
305                dir.join("avc_aac.mp4").to_str().expect("failed to get path"),
306                "-r",
307                "30",
308                "-t",
309                "1", // just for the test so it doesn't take too long
310                "-c",
311                "copy",
312                "-f",
313                "flv",
314                &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
315            ])
316            .stdout(std::process::Stdio::inherit())
317            .stderr(std::process::Stdio::inherit())
318            .spawn()
319            .expect("failed to execute ffmpeg");
320
321        let (ffmpeg_stream, _) = listener
322            .accept()
323            .with_timeout(Duration::from_millis(1000))
324            .await
325            .expect("timed out")
326            .expect("failed to accept");
327
328        let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
329            let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
330            let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
331
332            (
333                tokio::spawn(async move {
334                    let r = session.run().await;
335                    println!("ffmpeg session ended: {r:?}");
336                    r
337                }),
338                ffmpeg_event_reciever,
339            )
340        };
341
342        let event = ffmpeg_event_reciever
343            .recv()
344            .with_timeout(Duration::from_millis(1000))
345            .await
346            .expect("timed out")
347            .expect("failed to recv event");
348
349        match event {
350            Event::Publish {
351                stream_id,
352                app_name,
353                stream_name,
354                response,
355            } => {
356                assert_eq!(stream_id, 1);
357                assert_eq!(app_name, "live");
358                assert_eq!(stream_name, "stream-key");
359                response.send(Ok(())).expect("failed to send response");
360            }
361            _ => panic!("unexpected event"),
362        }
363
364        let mut got_video = false;
365        let mut got_audio = false;
366        let mut got_metadata = false;
367
368        while let Some(data) = ffmpeg_event_reciever
369            .recv()
370            .with_timeout(Duration::from_millis(1000))
371            .await
372            .expect("timed out")
373        {
374            match data {
375                Event::Data {
376                    stream_id,
377                    response,
378                    data,
379                    ..
380                } => {
381                    assert_eq!(stream_id, 1);
382                    match data {
383                        SessionData::Video { .. } => got_video = true,
384                        SessionData::Audio { .. } => got_audio = true,
385                        SessionData::Amf0 { .. } => got_metadata = true,
386                    }
387                    response.send(Ok(())).expect("failed to send response");
388                }
389                _ => panic!("unexpected event"),
390            }
391
392            if got_video && got_audio && got_metadata {
393                break;
394            }
395        }
396
397        assert!(got_video);
398        assert!(got_audio);
399        assert!(got_metadata);
400
401        ffmpeg.kill().await.expect("failed to kill ffmpeg");
402
403        while let Some(data) = ffmpeg_event_reciever
404            .recv()
405            .with_timeout(Duration::from_millis(1000))
406            .await
407            .expect("timed out")
408        {
409            match data {
410                Event::Data { response, .. } => {
411                    response.send(Ok(())).expect("failed to send response");
412                }
413                _ => panic!("unexpected event"),
414            }
415        }
416
417        // the server should have detected the ffmpeg process has died uncleanly
418        assert!(
419            !ffmpeg_handle
420                .await
421                .expect("failed to join handle")
422                .expect("failed to handle ffmpeg connection")
423        );
424    }
425}