scuffle_rtmp/protocol_control_messages/
writer.rs1use std::io;
4
5use byteorder::{BigEndian, WriteBytesExt};
6use bytes::Bytes;
7
8use super::{
9 ProtocolControlMessageAcknowledgement, ProtocolControlMessageSetChunkSize, ProtocolControlMessageSetPeerBandwidth,
10 ProtocolControlMessageWindowAcknowledgementSize,
11};
12use crate::chunk::Chunk;
13use crate::chunk::writer::ChunkWriter;
14use crate::messages::MessageType;
15
16impl ProtocolControlMessageSetChunkSize {
17 pub fn write(&self, io: &mut impl io::Write, writer: &ChunkWriter) -> Result<(), crate::error::RtmpError> {
19 let chunk_size = self.chunk_size & 0x7FFFFFFF; writer.write_chunk(
23 io,
24 Chunk::new(
25 2, 0, MessageType::SetChunkSize,
28 0, Bytes::from(chunk_size.to_be_bytes().to_vec()),
30 ),
31 )?;
32
33 Ok(())
34 }
35}
36
37impl ProtocolControlMessageAcknowledgement {
38 pub fn write(&self, io: &mut impl io::Write, writer: &ChunkWriter) -> Result<(), crate::error::RtmpError> {
40 writer.write_chunk(
41 io,
42 Chunk::new(
43 2, 0, MessageType::Acknowledgement,
46 0, Bytes::from(self.sequence_number.to_be_bytes().to_vec()),
48 ),
49 )?;
50
51 Ok(())
52 }
53}
54
55impl ProtocolControlMessageWindowAcknowledgementSize {
56 pub fn write(&self, io: &mut impl io::Write, writer: &ChunkWriter) -> Result<(), crate::error::RtmpError> {
58 writer.write_chunk(
59 io,
60 Chunk::new(
61 2, 0, MessageType::WindowAcknowledgementSize,
64 0, Bytes::from(self.acknowledgement_window_size.to_be_bytes().to_vec()),
66 ),
67 )?;
68
69 Ok(())
70 }
71}
72
73impl ProtocolControlMessageSetPeerBandwidth {
74 pub fn write(&self, io: &mut impl io::Write, writer: &ChunkWriter) -> Result<(), crate::error::RtmpError> {
76 let mut data = Vec::new();
77 data.write_u32::<BigEndian>(self.acknowledgement_window_size)
78 .expect("Failed to write window size");
79 data.write_u8(self.limit_type as u8).expect("Failed to write limit type");
80
81 writer.write_chunk(
82 io,
83 Chunk::new(
84 2, 0, MessageType::SetPeerBandwidth,
87 0, Bytes::from(data),
89 ),
90 )?;
91
92 Ok(())
93 }
94}
95
96#[cfg(test)]
97#[cfg_attr(all(test, coverage_nightly), coverage(off))]
98mod tests {
99 use bytes::{BufMut, BytesMut};
100
101 use super::*;
102 use crate::chunk::reader::ChunkReader;
103 use crate::protocol_control_messages::ProtocolControlMessageSetPeerBandwidthLimitType;
104
105 #[test]
106 fn write_set_chunk_size() {
107 let writer = ChunkWriter::default();
108 let mut buf = BytesMut::new();
109
110 ProtocolControlMessageSetChunkSize { chunk_size: 1 }
111 .write(&mut (&mut buf).writer(), &writer)
112 .unwrap();
113
114 let mut reader = ChunkReader::default();
115
116 let chunk = reader.read_chunk(&mut buf).expect("read chunk").expect("chunk");
117 assert_eq!(chunk.basic_header.chunk_stream_id, 0x02);
118 assert_eq!(chunk.message_header.msg_type_id.0, 0x01);
119 assert_eq!(chunk.message_header.msg_stream_id, 0);
120 assert_eq!(chunk.payload, vec![0x00, 0x00, 0x00, 0x01]);
121 }
122
123 #[test]
124 fn write_acknowledgement() {
125 let writer = ChunkWriter::default();
126 let mut buf = BytesMut::new();
127
128 ProtocolControlMessageAcknowledgement { sequence_number: 1 }
129 .write(&mut (&mut buf).writer(), &writer)
130 .unwrap();
131
132 let mut reader = ChunkReader::default();
133
134 let chunk = reader.read_chunk(&mut buf).expect("read chunk").expect("chunk");
135 assert_eq!(chunk.basic_header.chunk_stream_id, 0x02);
136 assert_eq!(chunk.message_header.msg_type_id.0, 0x03);
137 assert_eq!(chunk.message_header.msg_stream_id, 0);
138 assert_eq!(chunk.payload, vec![0x00, 0x00, 0x00, 0x01]);
139 }
140
141 #[test]
142 fn window_acknowledgement_size() {
143 let writer = ChunkWriter::default();
144 let mut buf = BytesMut::new();
145
146 ProtocolControlMessageWindowAcknowledgementSize {
147 acknowledgement_window_size: 1,
148 }
149 .write(&mut (&mut buf).writer(), &writer)
150 .unwrap();
151
152 let mut reader = ChunkReader::default();
153
154 let chunk = reader.read_chunk(&mut buf).expect("read chunk").expect("chunk");
155 assert_eq!(chunk.basic_header.chunk_stream_id, 0x02);
156 assert_eq!(chunk.message_header.msg_type_id.0, 0x05);
157 assert_eq!(chunk.message_header.msg_stream_id, 0);
158 assert_eq!(chunk.payload, vec![0x00, 0x00, 0x00, 0x01]);
159 }
160
161 #[test]
162 fn set_peer_bandwidth() {
163 let writer = ChunkWriter::default();
164 let mut buf = BytesMut::new();
165
166 ProtocolControlMessageSetPeerBandwidth {
167 acknowledgement_window_size: 1,
168 limit_type: ProtocolControlMessageSetPeerBandwidthLimitType::Dynamic,
169 }
170 .write(&mut (&mut buf).writer(), &writer)
171 .unwrap();
172
173 let mut reader = ChunkReader::default();
174
175 let chunk = reader.read_chunk(&mut buf).expect("read chunk").expect("chunk");
176 assert_eq!(chunk.basic_header.chunk_stream_id, 0x02);
177 assert_eq!(chunk.message_header.msg_type_id.0, 0x06);
178 assert_eq!(chunk.message_header.msg_stream_id, 0);
179 assert_eq!(chunk.payload, vec![0x00, 0x00, 0x00, 0x01, 0x02]);
180 }
181}