scuffle_rtmp/handshake/
mod.rs

1//! RTMP handshake logic.
2//!
3//! Order of messages:
4//! ```txt
5//! Client -> C0 -> Server
6//! Client -> C1 -> Server
7//! Client <- S0 <- Server
8//! Client <- S1 <- Server
9//! Client <- S2 <- Server
10//! Client -> C2 -> Server
11//! ```
12
13use std::io::{self, Seek};
14use std::time::SystemTime;
15
16use bytes::Bytes;
17use complex::ComplexHandshakeServer;
18use simple::SimpleHandshakeServer;
19
20pub mod complex;
21pub mod simple;
22
23/// This is the total size of the C1/S1 C2/S2 packets.
24pub const RTMP_HANDSHAKE_SIZE: usize = 1536;
25
26/// This is the length of the time and version.
27/// The time is 4 bytes and the version is 4 bytes.
28pub const TIME_VERSION_LENGTH: usize = 8;
29
30/// This is the length of the chunk.
31/// The chunk is 764 bytes. or (1536 - 8) / 2 = 764
32pub const CHUNK_LENGTH: usize = (RTMP_HANDSHAKE_SIZE - TIME_VERSION_LENGTH) / 2;
33
34nutype_enum::nutype_enum! {
35    /// The RTMP version.
36    ///
37    /// We only support version 3.
38    pub enum RtmpVersion(u8) {
39        /// RTMP version 3.
40        Version3 = 0x3,
41    }
42}
43
44/// The state of the handshake.
45///
46/// This is used to determine what the next step is.
47#[derive(Debug, Copy, Clone, PartialEq, Eq)]
48pub enum ServerHandshakeState {
49    /// Next step is to read C0 and C1.
50    ReadC0C1,
51    /// Next step is to read C2.
52    ReadC2,
53    /// Handshake is finished.
54    Finish,
55}
56
57/// The server side of the handshake.
58pub enum HandshakeServer {
59    /// Simple handshake.
60    Simple(SimpleHandshakeServer),
61    /// Complex handshake.
62    Complex(ComplexHandshakeServer),
63}
64
65impl Default for HandshakeServer {
66    fn default() -> Self {
67        Self::Complex(ComplexHandshakeServer::default())
68    }
69}
70
71impl HandshakeServer {
72    /// Returns true if the handshake is finished.
73    pub fn is_finished(&self) -> bool {
74        match self {
75            HandshakeServer::Simple(handshaker) => handshaker.is_finished(),
76            HandshakeServer::Complex(handshaker) => handshaker.is_finished(),
77        }
78    }
79
80    /// Perform the handshake.
81    pub fn handshake(&mut self, input: &mut io::Cursor<Bytes>, writer: &mut Vec<u8>) -> Result<(), crate::error::RtmpError> {
82        match self {
83            HandshakeServer::Complex(handshaker) => {
84                // We need to be able to go back if the handshake isn't complex.
85                let position = input.position();
86
87                let result = handshaker.handshake(input, writer);
88                if result.is_err() {
89                    // Complex handshake failed, switch to simple handshake.
90                    let mut simple = SimpleHandshakeServer::default();
91
92                    // We seek back to the position where we started.
93                    input.seek(io::SeekFrom::Start(position))?;
94
95                    // We then perform the handshake.
96                    simple.handshake(input, writer)?;
97
98                    // We then set the handshake to simple.
99                    *self = HandshakeServer::Simple(simple);
100                }
101            }
102            HandshakeServer::Simple(handshaker) => {
103                handshaker.handshake(input, writer)?;
104            }
105        }
106
107        Ok(())
108    }
109}
110
111/// Returns the current unix epoch time in nanoseconds.
112pub fn current_time() -> u32 {
113    let duration = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH);
114    match duration {
115        Ok(result) => result.as_nanos() as u32,
116        _ => 0,
117    }
118}
119
120#[cfg(test)]
121#[cfg_attr(all(test, coverage_nightly), coverage(off))]
122mod tests {
123    use std::io::{Read, Write};
124
125    use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
126    use bytes::Bytes;
127
128    use crate::handshake::HandshakeServer;
129    use crate::handshake::complex::digest::DigestProcessor;
130    use crate::handshake::complex::{
131        RTMP_CLIENT_KEY_FIRST_HALF, RTMP_SERVER_KEY, RTMP_SERVER_KEY_FIRST_HALF, RTMP_SERVER_VERSION, SchemaVersion,
132    };
133
134    #[test]
135    fn test_simple_handshake() {
136        let mut handshake_server = HandshakeServer::default();
137
138        let mut c0c1 = Vec::with_capacity(1528 + 8);
139        c0c1.write_u8(3).unwrap(); // version
140        c0c1.write_u32::<BigEndian>(123).unwrap(); // timestamp
141        c0c1.write_u32::<BigEndian>(0).unwrap(); // zero
142
143        for i in 0..1528 {
144            c0c1.write_u8((i % 256) as u8).unwrap();
145        }
146
147        let c0c1 = Bytes::from(c0c1);
148
149        let mut writer = Vec::new();
150        handshake_server
151            .handshake(&mut std::io::Cursor::new(c0c1.clone()), &mut writer)
152            .unwrap();
153
154        let mut reader = std::io::Cursor::new(writer);
155        assert_eq!(reader.read_u8().unwrap(), 3); // version
156        let timestamp = reader.read_u32::<BigEndian>().unwrap(); // timestamp
157        assert_eq!(reader.read_u32::<BigEndian>().unwrap(), 0); // zero
158
159        let mut server_random = vec![0; 1528];
160        reader.read_exact(&mut server_random).unwrap();
161
162        assert_eq!(reader.read_u32::<BigEndian>().unwrap(), 123); // our timestamp
163        let timestamp2 = reader.read_u32::<BigEndian>().unwrap(); // server timestamp
164
165        assert!(timestamp2 >= timestamp);
166
167        let mut read_client_random = vec![0; 1528];
168        reader.read_exact(&mut read_client_random).unwrap();
169
170        assert_eq!(&c0c1[9..], &read_client_random);
171
172        let mut c2 = Vec::with_capacity(1528 + 8);
173        c2.write_u32::<BigEndian>(timestamp).unwrap(); // timestamp
174        c2.write_u32::<BigEndian>(124).unwrap(); // our timestamp
175        c2.write_all(&server_random).unwrap();
176
177        let mut writer = Vec::new();
178        handshake_server
179            .handshake(&mut std::io::Cursor::new(Bytes::from(c2)), &mut writer)
180            .unwrap();
181
182        assert!(handshake_server.is_finished());
183    }
184
185    #[test]
186    fn test_complex_handshake() {
187        let mut handshake_server = HandshakeServer::default();
188
189        let mut writer = Vec::with_capacity(3073);
190        writer.write_u8(3).unwrap(); // version
191
192        let mut c0c1 = Vec::with_capacity(1528 + 8);
193        c0c1.write_u32::<BigEndian>(123).unwrap(); // timestamp
194        c0c1.write_u32::<BigEndian>(100).unwrap(); // client version
195
196        for i in 0..1528 {
197            c0c1.write_u8((i % 256) as u8).unwrap();
198        }
199
200        let data_digest = DigestProcessor::new(Bytes::from(c0c1), RTMP_CLIENT_KEY_FIRST_HALF);
201
202        let res = data_digest.generate_and_fill_digest(SchemaVersion::Schema1).unwrap();
203
204        res.write_to(&mut writer).unwrap();
205
206        let mut bytes = Vec::new();
207        handshake_server
208            .handshake(&mut std::io::Cursor::new(Bytes::from(writer)), &mut bytes)
209            .unwrap();
210
211        let s0 = &bytes[0..1];
212        let s1 = &bytes[1..1537];
213        let s2 = &bytes[1537..3073];
214
215        assert_eq!(s0[0], 3); // version
216        assert_ne!((&s1[..4]).read_u32::<BigEndian>().unwrap(), 0); // timestamp should not be zero
217        assert_eq!((&s1[4..8]).read_u32::<BigEndian>().unwrap(), RTMP_SERVER_VERSION); // RTMP version
218
219        let data_digest = DigestProcessor::new(Bytes::copy_from_slice(s1), RTMP_SERVER_KEY_FIRST_HALF);
220
221        let (digest, schema) = data_digest.read_digest().unwrap();
222        assert_eq!(schema, SchemaVersion::Schema1);
223
224        assert_ne!((&s2[..4]).read_u32::<BigEndian>().unwrap(), 0); // timestamp should not be zero
225        assert_eq!((&s2[4..8]).read_u32::<BigEndian>().unwrap(), 123); // our timestamp
226
227        let key_digest = DigestProcessor::new(Bytes::new(), RTMP_SERVER_KEY);
228
229        let key = key_digest.make_digest(&res.digest, &[]).unwrap();
230        let data_digest = DigestProcessor::new(Bytes::new(), &key);
231
232        assert_eq!(data_digest.make_digest(&s2[..1504], &[]).unwrap(), s2[1504..]);
233
234        let key = key_digest.make_digest(&digest, &[]).unwrap();
235        let data_digest = DigestProcessor::new(Bytes::new(), &key);
236
237        let mut c2 = Vec::new();
238        for i in 0..1528 {
239            c2.write_u8((i % 256) as u8).unwrap();
240        }
241
242        let digest = data_digest.make_digest(&c2, &[]).unwrap();
243
244        let mut c2 = Vec::with_capacity(1528 + 8);
245        c2.write_u32::<BigEndian>(123).unwrap(); // timestamp
246        c2.write_u32::<BigEndian>(124).unwrap(); // our timestamp
247        c2.write_all(&digest).unwrap();
248
249        let mut writer = Vec::new();
250        handshake_server
251            .handshake(&mut std::io::Cursor::new(Bytes::from(c2)), &mut writer)
252            .unwrap();
253
254        assert!(handshake_server.is_finished());
255    }
256}