1
0
Fork 0
mirror of https://gitlab.com/futo-org/fcast.git synced 2025-08-18 13:22:49 +00:00

Support protocol v3 in the rust terminal sender

This commit is contained in:
Marcus Hanestad 2025-06-04 15:34:56 +00:00 committed by Michael Hollister
parent a83f92d874
commit bfa61907c7
13 changed files with 1532 additions and 492 deletions

View file

@ -13,3 +13,4 @@ ctrlc = "3.1.9"
tungstenite = { version = "0.21.0" } tungstenite = { version = "0.21.0" }
url = "2.5.0" url = "2.5.0"
tiny_http = "0.12.0" tiny_http = "0.12.0"
serde_repr = "0.1.20"

View file

@ -50,4 +50,13 @@ cat dash.mpd | ./fcast -h localhost play --mime_type application/dash+xml
# Set speed to double # Set speed to double
./fcast -h localhost setspeed -s 2.0 ./fcast -h localhost setspeed -s 2.0
# Receive keyboard events
./fcast -h localhost -s KeyDown,KeyUp listen
# Show image playlist
cat image_playlist_example.json | ./fcast -h localhost play --mime_type application/json
# Play from video playlist
cat image_playlist_example.json | ./fcast -h localhost play --mime_type application/json
``` ```

View file

@ -0,0 +1,14 @@
{
"contentType": 0,
"items": [
{
"container": "image/jpeg",
"url": "https://upload.wikimedia.org/wikipedia/commons/thumb/9/90/Everest%2C_Himalayas.jpg/640px-Everest%2C_Himalayas.jpg"
},
{
"container": "image/png",
"url": "https://upload.wikimedia.org/wikipedia/commons/thumb/6/62/NTV7_Testpattern.png/640px-NTV7_Testpattern.png"
}
],
"offset": 0
}

View file

@ -1,13 +1,27 @@
use std::sync::{atomic::{AtomicBool, Ordering}, Arc}; use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use crate::{models::{PlaybackUpdateMessage, VolumeUpdateMessage, PlaybackErrorMessage, VersionMessage}, transport::Transport}; use crate::{
models::{v2, v3, PlaybackErrorMessage, VersionMessage, VolumeUpdateMessage},
transport::Transport,
};
use serde::Serialize; use serde::Serialize;
#[derive(Debug)] #[derive(Debug, PartialEq, Eq)]
enum ProtoVersion {
V2,
V3,
}
#[derive(Debug, PartialEq, Eq)]
enum SessionState { enum SessionState {
Idle = 0, Idle,
WaitingForLength, Connected(ProtoVersion),
WaitingForData,
Disconnected, Disconnected,
} }
@ -26,7 +40,13 @@ pub enum Opcode {
SetSpeed = 10, SetSpeed = 10,
Version = 11, Version = 11,
Ping = 12, Ping = 12,
Pong = 13 Pong = 13,
Initial = 14,
PlayUpdate = 15,
SetPlaylistItem = 16,
SubscribeEvent = 17,
UnsubscribeEvent = 18,
Event = 19,
} }
impl Opcode { impl Opcode {
@ -46,6 +66,12 @@ impl Opcode {
11 => Opcode::Version, 11 => Opcode::Version,
12 => Opcode::Ping, 12 => Opcode::Ping,
13 => Opcode::Pong, 13 => Opcode::Pong,
14 => Opcode::Initial,
15 => Opcode::PlayUpdate,
16 => Opcode::SetPlaylistItem,
17 => Opcode::SubscribeEvent,
18 => Opcode::UnsubscribeEvent,
19 => Opcode::Event,
_ => panic!("Unknown value: {}", value), _ => panic!("Unknown value: {}", value),
} }
} }
@ -56,26 +82,100 @@ const MAXIMUM_PACKET_LENGTH: usize = 32000;
pub struct FCastSession<'a> { pub struct FCastSession<'a> {
buffer: Vec<u8>, buffer: Vec<u8>,
bytes_read: usize,
packet_length: usize,
stream: Box<dyn Transport + 'a>, stream: Box<dyn Transport + 'a>,
state: SessionState state: SessionState,
} }
impl<'a> FCastSession<'a> { impl<'a> FCastSession<'a> {
pub fn new<T: Transport + 'a>(stream: T) -> Self { pub fn new<T: Transport + 'a>(stream: T) -> Self {
return FCastSession { Self {
buffer: vec![0; MAXIMUM_PACKET_LENGTH], buffer: vec![0; MAXIMUM_PACKET_LENGTH],
bytes_read: 0,
packet_length: 0,
stream: Box::new(stream), stream: Box::new(stream),
state: SessionState::Idle state: SessionState::Idle,
} }
} }
}
impl FCastSession<'_> { pub fn connect<T: Transport + 'a>(stream: T) -> Result<Self, Box<dyn std::error::Error>> {
pub fn send_message<T: Serialize>(&mut self, opcode: Opcode, message: T) -> Result<(), Box<dyn std::error::Error>> { let mut session = Self::new(stream);
session.send_message(
Opcode::Version,
crate::models::VersionMessage { version: 3 },
)?;
let (opcode, body) = session.read_packet()?;
if opcode != Opcode::Version {
return Err(format!("Expected Opcode::Version, got {opcode:?}").into());
}
let msg: VersionMessage =
serde_json::from_str(&body.ok_or("Version requires body".to_owned())?)?;
if msg.version == 3 {
let initial = v3::InitialSenderMessage {
display_name: None,
app_name: Some(env!("CARGO_PKG_NAME").to_owned()),
app_version: Some(env!("CARGO_PKG_VERSION").to_owned()),
};
session.send_message(Opcode::Initial, initial)?;
let (opcode, body) = session.read_packet()?;
if opcode != Opcode::Initial {
return Err(format!("Expected Opcode::Initial, got {opcode:?}").into());
}
let inital_receiver: v3::InitialReceiverMessage =
serde_json::from_str(&body.ok_or("InitialReceiverMessage requires body")?)?;
println!("Got inital message from sender: {inital_receiver:?}");
session.state = SessionState::Connected(ProtoVersion::V3);
} else {
session.state = SessionState::Connected(ProtoVersion::V2);
}
Ok(session)
}
fn read_packet(&mut self) -> Result<(Opcode, Option<String>), Box<dyn std::error::Error>> {
let mut header_buf = [0u8; 5];
self.stream.transport_read_exact(&mut header_buf)?;
let opcode = Opcode::from_u8(header_buf[4]);
let body_length =
u32::from_le_bytes([header_buf[0], header_buf[1], header_buf[2], header_buf[3]])
as usize
- 1;
if body_length > MAXIMUM_PACKET_LENGTH {
println!(
"Maximum packet length is 32kB, killing stream: {}",
body_length,
);
self.stream.transport_shutdown()?;
self.state = SessionState::Disconnected;
return Err(format!(
"Stream killed due to packet length ({}) exceeding maximum 32kB packet size.",
body_length,
)
.into());
}
self.stream
.transport_read_exact(&mut self.buffer[0..body_length])?;
let body_json = if body_length > 0 {
Some(String::from_utf8(self.buffer[0..body_length].to_vec())?)
} else {
None
};
Ok((opcode, body_json))
}
pub fn send_message<T: Serialize>(
&mut self,
opcode: Opcode,
message: T,
) -> Result<(), Box<dyn std::error::Error>> {
let json = serde_json::to_string(&message)?; let json = serde_json::to_string(&message)?;
let data = json.as_bytes(); let data = json.as_bytes();
let size = 1 + data.len(); let size = 1 + data.len();
@ -85,11 +185,45 @@ impl FCastSession<'_> {
header[LENGTH_BYTES] = opcode as u8; header[LENGTH_BYTES] = opcode as u8;
let packet = [header, data.to_vec()].concat(); let packet = [header, data.to_vec()].concat();
println!("Sent {} bytes with (opcode: {:?}, header size: {}, body size: {}, body: {}).", packet.len(), opcode, header_size, data.len(), json); println!(
"Sent {} bytes with (opcode: {:?}, header size: {}, body size: {}, body: {}).",
packet.len(),
opcode,
header_size,
data.len(),
json
);
self.stream.transport_write(&packet)?; self.stream.transport_write(&packet)?;
Ok(()) Ok(())
} }
pub fn subscribe(&mut self, event: v3::EventType) -> Result<(), Box<dyn std::error::Error>> {
if self.state != SessionState::Connected(ProtoVersion::V3) {
return Err(format!(
"Cannot subscribe to events in the current state ({:?})",
self.state
)
.into());
}
let obj = match event {
v3::EventType::MediaItemStart => v3::EventSubscribeObject::MediaItemStart,
v3::EventType::MediaItemEnd => v3::EventSubscribeObject::MediaItemEnd,
v3::EventType::MediaItemChange => v3::EventSubscribeObject::MediaItemChanged,
v3::EventType::KeyDown => v3::EventSubscribeObject::KeyDown {
keys: v3::KeyNames::all(),
},
v3::EventType::KeyUp => v3::EventSubscribeObject::KeyUp {
keys: v3::KeyNames::all(),
},
};
self.send_message(
Opcode::SubscribeEvent,
v3::SubscribeEventMessage { event: obj },
)
}
pub fn send_empty(&mut self, opcode: Opcode) -> Result<(), Box<dyn std::error::Error>> { pub fn send_empty(&mut self, opcode: Opcode) -> Result<(), Box<dyn std::error::Error>> {
let json = String::new(); let json = String::new();
let data = json.as_bytes(); let data = json.as_bytes();
@ -103,123 +237,90 @@ impl FCastSession<'_> {
Ok(()) Ok(())
} }
pub fn receive_loop(&mut self, running: &Arc<AtomicBool>) -> Result<(), Box<dyn std::error::Error>> { pub fn receive_loop(
&mut self,
running: &Arc<AtomicBool>,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Start receiving."); println!("Start receiving.");
self.state = SessionState::WaitingForLength;
let mut buffer = [0u8; 1024];
while running.load(Ordering::SeqCst) { while running.load(Ordering::SeqCst) {
let bytes_read = self.stream.transport_read(&mut buffer)?; let (opcode, body) = self.read_packet()?;
self.process_bytes(&buffer[..bytes_read])?; self.handle_packet(opcode, body)?;
} }
self.state = SessionState::Idle;
Ok(()) Ok(())
} }
fn process_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box<dyn std::error::Error>> { pub fn send_play_message(
if received_bytes.is_empty() { &mut self,
return Ok(()); mime_type: String,
} url: Option<String>,
content: Option<String>,
println!("{} bytes received", received_bytes.len()); time: Option<f64>,
speed: Option<f64>,
headers: Option<HashMap<String, String>>,
) -> Result<(), Box<dyn std::error::Error>> {
match self.state { match self.state {
SessionState::WaitingForLength => self.handle_length_bytes(received_bytes)?, SessionState::Connected(ProtoVersion::V2) => {
SessionState::WaitingForData => self.handle_packet_bytes(received_bytes)?, let msg = v2::PlayMessage {
_ => println!("Data received is unhandled in current session state {:?}", self.state), container: mime_type,
url,
content,
time,
speed,
headers,
};
self.send_message(Opcode::Play, msg)?;
}
SessionState::Connected(ProtoVersion::V3) => {
let msg = v3::PlayMessage {
container: mime_type,
url,
content,
time,
volume: Some(1.0),
speed,
headers,
metadata: None,
};
self.send_message(Opcode::Play, msg)?;
}
_ => return Err("invalid state for sending play message".into()),
} }
Ok(()) Ok(())
} }
fn handle_packet(
fn handle_length_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box<dyn std::error::Error>> { &mut self,
let bytes_to_read = std::cmp::min(LENGTH_BYTES, received_bytes.len()); opcode: Opcode,
let bytes_remaining = received_bytes.len() - bytes_to_read; body: Option<String>,
self.buffer[self.bytes_read..self.bytes_read + bytes_to_read] ) -> Result<(), Box<dyn std::error::Error>> {
.copy_from_slice(&received_bytes[..bytes_to_read]);
self.bytes_read += bytes_to_read;
println!("handleLengthBytes: Read {} bytes from packet", bytes_to_read);
if self.bytes_read >= LENGTH_BYTES {
self.state = SessionState::WaitingForData;
self.packet_length = u32::from_le_bytes(self.buffer[..LENGTH_BYTES].try_into()?) as usize;
self.bytes_read = 0;
println!("Packet length header received from: {}", self.packet_length);
if self.packet_length > MAXIMUM_PACKET_LENGTH {
println!("Maximum packet length is 32kB, killing stream: {}", self.packet_length);
self.stream.transport_shutdown()?;
self.state = SessionState::Disconnected;
return Err(format!("Stream killed due to packet length ({}) exceeding maximum 32kB packet size.", self.packet_length).into());
}
if bytes_remaining > 0 {
println!("{} remaining bytes pushed to handlePacketBytes", bytes_remaining);
self.handle_packet_bytes(&received_bytes[bytes_to_read..])?;
}
}
Ok(())
}
fn handle_packet_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
let bytes_to_read = std::cmp::min(self.packet_length, received_bytes.len());
let bytes_remaining = received_bytes.len() - bytes_to_read;
self.buffer[self.bytes_read..self.bytes_read + bytes_to_read]
.copy_from_slice(&received_bytes[..bytes_to_read]);
self.bytes_read += bytes_to_read;
println!("handlePacketBytes: Read {} bytes from packet", bytes_to_read);
if self.bytes_read >= self.packet_length {
println!("Packet finished receiving of {} bytes.", self.packet_length);
self.handle_next_packet()?;
self.state = SessionState::WaitingForLength;
self.packet_length = 0;
self.bytes_read = 0;
if bytes_remaining > 0 {
println!("{} remaining bytes pushed to handleLengthBytes", bytes_remaining);
self.handle_length_bytes(&received_bytes[bytes_to_read..])?;
}
}
Ok(())
}
fn handle_next_packet(&mut self) -> Result<(), Box<dyn std::error::Error>> {
println!("Processing packet of {} bytes", self.bytes_read);
let opcode = Opcode::from_u8(self.buffer[0]);
let packet_length = self.packet_length;
let body = if packet_length > 1 {
Some(std::str::from_utf8(&self.buffer[1..packet_length])?.to_string())
} else {
None
};
println!("Received body: {:?}", body);
self.handle_packet(opcode, body)
}
fn handle_packet(&mut self, opcode: Opcode, body: Option<String>) -> Result<(), Box<dyn std::error::Error>> {
println!("Received message with opcode {:?}.", opcode); println!("Received message with opcode {:?}.", opcode);
match opcode { match opcode {
Opcode::PlaybackUpdate => { Opcode::PlaybackUpdate => {
if let Some(body_str) = body { if let Some(body_str) = body {
if let Ok(playback_update_msg) = serde_json::from_str::<PlaybackUpdateMessage>(body_str.as_str()) { match self.state {
println!("Received playback update {:?}", playback_update_msg); SessionState::Connected(ProtoVersion::V2) => {
} else { if let Ok(playback_update_msg) =
println!("Received playback update with malformed body."); serde_json::from_str::<v2::PlaybackUpdateMessage>(body_str.as_str())
{
println!("Received playback update {:?}", playback_update_msg);
} else {
println!("Received playback update with malformed body.");
}
}
SessionState::Connected(ProtoVersion::V3) => {
if let Ok(playback_update_msg) =
serde_json::from_str::<v3::PlaybackUpdateMessage>(body_str.as_str())
{
println!("Received playback update {:?}", playback_update_msg);
} else {
println!("Received playback update with malformed body.");
}
}
_ => unreachable!(),
} }
} else { } else {
println!("Received playback update with no body."); println!("Received playback update with no body.");
@ -227,7 +328,9 @@ impl FCastSession<'_> {
} }
Opcode::VolumeUpdate => { Opcode::VolumeUpdate => {
if let Some(body_str) = body { if let Some(body_str) = body {
if let Ok(volume_update_msg) = serde_json::from_str::<VolumeUpdateMessage>(body_str.as_str()) { if let Ok(volume_update_msg) =
serde_json::from_str::<VolumeUpdateMessage>(body_str.as_str())
{
println!("Received volume update {:?}", volume_update_msg); println!("Received volume update {:?}", volume_update_msg);
} else { } else {
println!("Received volume update with malformed body."); println!("Received volume update with malformed body.");
@ -238,7 +341,9 @@ impl FCastSession<'_> {
} }
Opcode::PlaybackError => { Opcode::PlaybackError => {
if let Some(body_str) = body { if let Some(body_str) = body {
if let Ok(playback_error_msg) = serde_json::from_str::<PlaybackErrorMessage>(body_str.as_str()) { if let Ok(playback_error_msg) =
serde_json::from_str::<PlaybackErrorMessage>(body_str.as_str())
{
println!("Received playback error {:?}", playback_error_msg); println!("Received playback error {:?}", playback_error_msg);
} else { } else {
println!("Received playback error with malformed body."); println!("Received playback error with malformed body.");
@ -249,7 +354,9 @@ impl FCastSession<'_> {
} }
Opcode::Version => { Opcode::Version => {
if let Some(body_str) = body { if let Some(body_str) = body {
if let Ok(version_msg) = serde_json::from_str::<VersionMessage>(body_str.as_str()) { if let Ok(version_msg) =
serde_json::from_str::<VersionMessage>(body_str.as_str())
{
println!("Received version {:?}", version_msg); println!("Received version {:?}", version_msg);
} else { } else {
println!("Received version with malformed body."); println!("Received version with malformed body.");
@ -263,6 +370,31 @@ impl FCastSession<'_> {
self.send_empty(Opcode::Pong)?; self.send_empty(Opcode::Pong)?;
println!("Sent pong"); println!("Sent pong");
} }
Opcode::Pong => println!("Received pong"),
Opcode::PlayUpdate => {
if let Some(body_str) = body {
if let Ok(play_update_msg) =
serde_json::from_str::<v3::PlayUpdateMessage>(&body_str)
{
println!("Received play update {play_update_msg:?}");
} else {
println!("Received play update with malformed body.");
}
} else {
println!("Received play update with no body.");
}
}
Opcode::Event => {
if let Some(body_str) = body {
if let Ok(event_msg) = serde_json::from_str::<v3::EventMessage>(&body_str) {
println!("Received event {event_msg:?}");
} else {
println!("Received event with malformed body.");
}
} else {
println!("Received event with no body.");
}
}
_ => { _ => {
println!("Error handling packet"); println!("Error handling packet");
} }
@ -272,6 +404,6 @@ impl FCastSession<'_> {
} }
pub fn shutdown(&mut self) -> Result<(), std::io::Error> { pub fn shutdown(&mut self) -> Result<(), std::io::Error> {
return self.stream.transport_shutdown(); self.stream.transport_shutdown()
} }
} }

View file

@ -1,36 +0,0 @@
struct FileServer {
base_url: String,
base_path: String,
}
impl FileServer {
fn new(base_url: String, base_path: String) -> Self {
FileServer { base_url, base_path }
}
async fn serve(&self) {
let file_server = warp::fs::dir(&self.base_path);
warp::serve(file_server).run(([127, 0, 0, 1], 3030)).await;
}
fn get_url(&self, file_name: &str) -> String {
format!("{}/{}", self.base_url, file_name)
}
}
pub async fn host_file_and_get_url(file_path: &str) -> Result<String, Box<dyn std::error::Error>> {
let file_name = Path::new(file_path).file_name().ok_or("Invalid file path")?.to_str().ok_or("Invalid file name")?;
let file_server = FileServer::new("http://127.0.0.1:3030".to_string(), "path/to/hosted/files".to_string());
// Copy the file to the hosting directory
let destination = Path::new(&file_server.base_path).join(file_name);
tokio::fs::copy(file_path, &destination).await?;
// Start the server if not already running
// This part needs synchronization in a real-world scenario
tokio::spawn(async move {
file_server.serve().await;
});
Ok(file_server.get_url(file_name))
}

View file

@ -0,0 +1,3 @@
pub mod fcastsession;
pub mod models;
pub mod transport;

View file

@ -1,27 +1,27 @@
mod models;
mod fcastsession;
mod transport;
use clap::{App, Arg, SubCommand}; use clap::{App, Arg, SubCommand};
use tiny_http::{Server, Response, ListenAddr, Header}; use fcast::models::v3;
use tungstenite::stream::MaybeTlsStream; use fcast::transport::WebSocket;
use url::Url;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::IpAddr; use std::net::IpAddr;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex; use std::sync::Mutex;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use std::{thread, fs};
use std::time::Instant; use std::time::Instant;
use std::{fs, thread};
use std::{io::Read, net::TcpStream}; use std::{io::Read, net::TcpStream};
use std::sync::atomic::{ AtomicBool, Ordering};
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use tiny_http::{Header, ListenAddr, Response, Server};
use tungstenite::stream::MaybeTlsStream;
use url::Url;
use crate::fcastsession::Opcode; use fcast::fcastsession::Opcode;
use crate::models::{SetVolumeMessage, SetSpeedMessage}; use fcast::{
use crate::{models::{PlayMessage, SeekMessage}, fcastsession::FCastSession}; fcastsession::FCastSession,
models::{SeekMessage, SetSpeedMessage, SetVolumeMessage},
};
fn main() { fn main() {
if let Err(e) = run() { if let Err(e) = run() {
println!("Failed due to error: {}", e) println!("Failed due to error: {}", e)
} }
@ -30,127 +30,173 @@ fn main() {
fn run() -> Result<(), Box<dyn std::error::Error>> { fn run() -> Result<(), Box<dyn std::error::Error>> {
let app = App::new("Media Control") let app = App::new("Media Control")
.about("Control media playback") .about("Control media playback")
.arg(Arg::with_name("connection_type") .arg(
.short('c') Arg::with_name("connection_type")
.long("connection_type")
.value_name("CONNECTION_TYPE")
.help("Type of connection: tcp or ws (websocket)")
.required(false)
.default_value("tcp")
.takes_value(true))
.arg(Arg::with_name("host")
.short('h')
.long("host")
.value_name("Host")
.help("The host address to send the command to")
.required(true)
.takes_value(true))
.arg(Arg::with_name("port")
.short('p')
.long("port")
.value_name("PORT")
.help("The port to send the command to")
.required(false)
.takes_value(true))
.subcommand(SubCommand::with_name("play")
.about("Play media")
.arg(Arg::with_name("mime_type")
.short('m')
.long("mime_type")
.value_name("MIME_TYPE")
.help("Mime type (e.g., video/mp4)")
.required_unless_present("file")
.takes_value(true)
)
.arg(Arg::with_name("file")
.short('f')
.long("file")
.value_name("File")
.help("File content to play")
.required(false)
.takes_value(true))
.arg(Arg::with_name("url")
.short('u')
.long("url")
.value_name("URL")
.help("URL to the content")
.required(false)
.takes_value(true)
)
.arg(Arg::with_name("content")
.short('c') .short('c')
.long("content") .long("connection_type")
.value_name("CONTENT") .value_name("CONNECTION_TYPE")
.help("The actual content") .help("Type of connection: tcp or ws (websocket)")
.required(false) .required(false)
.takes_value(true) .default_value("tcp")
) .takes_value(true),
.arg(Arg::with_name("timestamp")
.short('t')
.long("timestamp")
.value_name("TIMESTAMP")
.help("Timestamp to start playing")
.required(false)
.default_value("0")
.takes_value(true)
)
.arg(Arg::with_name("speed")
.short('s')
.long("speed")
.value_name("SPEED")
.help("Factor to multiply playback speed by")
.required(false)
.default_value("1")
.takes_value(true)
)
.arg(Arg::with_name("header")
.short('H')
.long("header")
.value_name("HEADER")
.help("Custom request headers in key:value format")
.required(false)
.multiple_occurrences(true)
)
) )
.subcommand(SubCommand::with_name("seek") .arg(
.about("Seek to a timestamp") Arg::with_name("host")
.arg(Arg::with_name("timestamp") .short('h')
.short('t') .long("host")
.long("timestamp") .value_name("Host")
.value_name("TIMESTAMP") .help("The host address to send the command to")
.help("Timestamp to start playing") .default_value("127.0.0.1")
.required(true) .required(false)
.takes_value(true) .takes_value(true),
), )
.arg(
Arg::with_name("port")
.short('p')
.long("port")
.value_name("PORT")
.help("The port to send the command to")
.required(false)
.takes_value(true),
)
.arg(
Arg::with_name("subscribe")
.short('s')
.long("subscribe")
.value_name("EVENTS")
.help("A comma separated list of events to subscribe to (e.g. MediaItemStart,KeyDown). \
Available events: [MediaItemStart, MediaItemEnd, MediaItemChange, KeyDown, KeyUp]")
.required(false)
.takes_value(true),
)
.subcommand(
SubCommand::with_name("play")
.about("Play media")
.arg(
Arg::with_name("mime_type")
.short('m')
.long("mime_type")
.value_name("MIME_TYPE")
.help("Mime type (e.g., video/mp4)")
.takes_value(true),
)
.arg(
Arg::with_name("file")
.short('f')
.long("file")
.value_name("File")
.help("File content to play")
.required(false)
.takes_value(true),
)
.arg(
Arg::with_name("url")
.short('u')
.long("url")
.value_name("URL")
.help("URL to the content")
.required(false)
.takes_value(true),
)
.arg(
Arg::with_name("content")
.short('c')
.long("content")
.value_name("CONTENT")
.help("The actual content")
.required(false)
.takes_value(true),
)
.arg(
Arg::with_name("timestamp")
.short('t')
.long("timestamp")
.value_name("TIMESTAMP")
.help("Timestamp to start playing")
.required(false)
.default_value("0")
.takes_value(true),
)
.arg(
Arg::with_name("speed")
.short('s')
.long("speed")
.value_name("SPEED")
.help("Factor to multiply playback speed by")
.required(false)
.default_value("1")
.takes_value(true),
)
.arg(
Arg::with_name("header")
.short('H')
.long("header")
.value_name("HEADER")
.help("Custom request headers in key:value format")
.required(false)
.multiple_occurrences(true),
),
)
.subcommand(
SubCommand::with_name("seek")
.about("Seek to a timestamp")
.arg(
Arg::with_name("timestamp")
.short('t')
.long("timestamp")
.value_name("TIMESTAMP")
.help("Timestamp to start playing")
.required(true)
.takes_value(true),
),
) )
.subcommand(SubCommand::with_name("pause").about("Pause media")) .subcommand(SubCommand::with_name("pause").about("Pause media"))
.subcommand(SubCommand::with_name("resume").about("Resume media")) .subcommand(SubCommand::with_name("resume").about("Resume media"))
.subcommand(SubCommand::with_name("stop").about("Stop media")) .subcommand(SubCommand::with_name("stop").about("Stop media"))
.subcommand(SubCommand::with_name("listen").about("Listen to incoming events")) .subcommand(SubCommand::with_name("listen").about("Listen to incoming events"))
.subcommand(SubCommand::with_name("setvolume").about("Set the volume") .subcommand(
.arg(Arg::with_name("volume") SubCommand::with_name("setvolume")
.short('v') .about("Set the volume")
.long("volume") .arg(
.value_name("VOLUME") Arg::with_name("volume")
.help("Volume level (0-1)") .short('v')
.required(true) .long("volume")
.takes_value(true))) .value_name("VOLUME")
.subcommand(SubCommand::with_name("setspeed").about("Set the playback speed") .help("Volume level (0-1)")
.arg(Arg::with_name("speed") .required(true)
.short('s') .takes_value(true),
.long("speed") ),
.value_name("SPEED") )
.help("Factor to multiply playback speed by") .subcommand(
.required(true) SubCommand::with_name("setspeed")
.takes_value(true)) .about("Set the playback speed")
.arg(
Arg::with_name("speed")
.short('s')
.long("speed")
.value_name("SPEED")
.help("Factor to multiply playback speed by")
.required(true)
.takes_value(true),
),
)
.subcommand(
SubCommand::with_name("set_playlist_item")
.about("")
.arg(
Arg::with_name("item_index")
.short('i')
.long("item_index")
.value_name("INDEX")
.help("Index of the item in the playlist that should be play")
.required(true)
.takes_value(true),
)
); );
let matches = app.get_matches(); let matches = app.get_matches();
let host = match matches.value_of("host") { let host = matches.value_of("host").expect("host has default value");
Some(s) => s,
_ => return Err("Host is required.".into())
};
let connection_type = matches.value_of("connection_type").unwrap_or("tcp"); let connection_type = matches.value_of("connection_type").unwrap_or("tcp");
@ -159,8 +205,10 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
_ => match connection_type { _ => match connection_type {
"tcp" => "46899", "tcp" => "46899",
"ws" => "46898", "ws" => "46898",
_ => return Err("Unknown connection type, cannot automatically determine port.".into()) _ => {
} return Err("Unknown connection type, cannot automatically determine port.".into())
}
},
}; };
let local_ip: Option<IpAddr>; let local_ip: Option<IpAddr>;
@ -169,141 +217,147 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
println!("Connecting via TCP to host={} port={}...", host, port); println!("Connecting via TCP to host={} port={}...", host, port);
let stream = TcpStream::connect(format!("{}:{}", host, port))?; let stream = TcpStream::connect(format!("{}:{}", host, port))?;
local_ip = Some(stream.local_addr()?.ip()); local_ip = Some(stream.local_addr()?.ip());
FCastSession::new(stream) FCastSession::connect(stream)?
}, }
"ws" => { "ws" => {
println!("Connecting via WebSocket to host={} port={}...", host, port); println!("Connecting via WebSocket to host={} port={}...", host, port);
let url = Url::parse(format!("ws://{}:{}", host, port).as_str())?; let url = Url::parse(format!("ws://{}:{}", host, port).as_str())?;
let (stream, _) = tungstenite::connect(url)?; let (stream, _) = tungstenite::connect(url)?;
local_ip = match stream.get_ref() { local_ip = match stream.get_ref() {
MaybeTlsStream::Plain(ref stream) => Some(stream.local_addr()?.ip()), MaybeTlsStream::Plain(ref stream) => Some(stream.local_addr()?.ip()),
_ => return Err("Established connection type is not plain.".into()) _ => return Err("Established connection type is not plain.".into()),
}; };
FCastSession::new(stream) let stream = WebSocket::new(stream);
FCastSession::connect(stream)?
} }
_ => return Err("Invalid connection type.".into()), _ => return Err("Invalid connection type.".into()),
}; };
println!("Connection established."); println!("Connection established.");
if let Some(subscriptions) = matches.value_of("subscribe") {
let subs = subscriptions.split(',');
for sub in subs {
let event = match sub.to_lowercase().as_str() {
"mediaitemstart" => v3::EventType::MediaItemStart,
"mediaitemend" => v3::EventType::MediaItemEnd,
"mediaitemchange" => v3::EventType::MediaItemChange,
"keydown" => v3::EventType::KeyDown,
"keyup" => v3::EventType::KeyUp,
_ => {
println!("Invalid event in subscriptions list: {sub}");
continue;
}
};
session.subscribe(event)?;
println!("Subscribed to {event:?} events");
}
}
let mut join_handle: Option<JoinHandle<Result<(), String>>> = None; let mut join_handle: Option<JoinHandle<Result<(), String>>> = None;
if let Some(play_matches) = matches.subcommand_matches("play") { if let Some(play_matches) = matches.subcommand_matches("play") {
let file_path = play_matches.value_of("file"); let file_path = play_matches.value_of("file");
fn default_mime_type() -> String {
println!("No mime type provided via the `--mime_type` argument. Using default (application/octet-stream)");
"application/octet-stream".to_string()
}
let mime_type = match play_matches.value_of("mime_type") { let mime_type = match play_matches.value_of("mime_type") {
Some(s) => s.to_string(), Some(s) => s.to_string(),
_ => { _ => match file_path {
if file_path.is_none() { Some(path) => match path.split('.').next_back() {
return Err("MIME type is required.".into());
}
match file_path.unwrap().split('.').last() {
Some("mkv") => "video/x-matroska".to_string(), Some("mkv") => "video/x-matroska".to_string(),
Some("mov") => "video/quicktime".to_string(), Some("mov") => "video/quicktime".to_string(),
Some("mp4") | Some("m4v") => "video/mp4".to_string(), Some("mp4") | Some("m4v") => "video/mp4".to_string(),
Some("mpg") | Some("mpeg") => "video/mpeg".to_string(), Some("mpg") | Some("mpeg") => "video/mpeg".to_string(),
Some("webm") => "video/webm".to_string(), Some("webm") => "video/webm".to_string(),
_ => return Err("MIME type is required.".into()), _ => default_mime_type(),
} },
} None => default_mime_type(),
},
}; };
let time = match play_matches.value_of("timestamp") { let time = match play_matches.value_of("timestamp") {
Some(s) => s.parse::<f64>().ok(), Some(s) => s.parse::<f64>().ok(),
_ => None _ => None,
}; };
let speed = match play_matches.value_of("speed") { let speed = match play_matches.value_of("speed") {
Some(s) => s.parse::<f64>().ok(), Some(s) => s.parse::<f64>().ok(),
_ => None _ => None,
}; };
let headers = play_matches.values_of("header") let headers = play_matches.values_of("header").map(|values| {
.map(|values| values values
.filter_map(|s| { .filter_map(|s| {
let mut parts = s.splitn(2, ':'); let mut parts = s.splitn(2, ':');
if let (Some(key), Some(value)) = (parts.next(), parts.next()) { if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
Some((key.trim().to_string(), value.trim().to_string())) Some((key.trim().to_string(), value.trim().to_string()))
} else { } else {
None None
} }
} })
).collect::<HashMap<String, String>>()); .collect::<HashMap<String, String>>()
});
let mut play_message = if let Some(file_path) = file_path { #[allow(unused_assignments)]
let mut url = None;
let mut content = None;
if let Some(file_path) = file_path {
match local_ip { match local_ip {
Some(lip) => { Some(lip) => {
let running = Arc::new(AtomicBool::new(true)); let running = Arc::new(AtomicBool::new(true));
let r = running.clone(); let r = running.clone();
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
println!("Ctrl+C triggered, server will stop when onging request finishes..."); println!(
"Ctrl+C triggered, server will stop when onging request finishes..."
);
r.store(false, Ordering::SeqCst); r.store(false, Ordering::SeqCst);
}).expect("Error setting Ctrl-C handler"); })
.expect("Error setting Ctrl-C handler");
println!("Waiting for Ctrl+C..."); println!("Waiting for Ctrl+C...");
let result = host_file_and_get_url(&lip, file_path, &mime_type, &running)?; let result = host_file_and_get_url(&lip, file_path, &mime_type, &running)?;
let url = result.0; url = Some(result.0);
join_handle = Some(result.1); join_handle = Some(result.1);
}
PlayMessage::new( _ => return Err("Local IP was not able to be resolved.".into()),
mime_type,
Some(url),
None,
time,
speed,
headers
)
},
_ => return Err("Local IP was not able to be resolved.".into())
} }
} else { } else {
PlayMessage::new( url = play_matches.value_of("url").map(|s| s.to_owned());
mime_type, content = play_matches.value_of("content").map(|s| s.to_owned());
match play_matches.value_of("url") { }
Some(s) => Some(s.to_string()),
_ => None
},
match play_matches.value_of("content") {
Some(s) => Some(s.to_string()),
_ => None
},
time,
speed,
headers
)
};
if play_message.content.is_none() && play_message.url.is_none() { if content.is_none() && url.is_none() {
println!("Reading content from stdin..."); println!("Reading content from stdin...");
let mut buffer = String::new(); let mut buffer = String::new();
std::io::stdin().read_to_string(&mut buffer)?; std::io::stdin().read_to_string(&mut buffer)?;
play_message.content = Some(buffer); content = Some(buffer);
} }
let json = serde_json::to_string(&play_message); session.send_play_message(mime_type, url, content, time, speed, headers)?;
println!("Sent play {:?}", json);
session.send_message(Opcode::Play, &play_message)?;
} else if let Some(seek_matches) = matches.subcommand_matches("seek") { } else if let Some(seek_matches) = matches.subcommand_matches("seek") {
let seek_message = SeekMessage::new(match seek_matches.value_of("timestamp") { let seek_message = SeekMessage::new(match seek_matches.value_of("timestamp") {
Some(s) => s.parse::<f64>()?, Some(s) => s.parse::<f64>()?,
_ => return Err("Timestamp is required.".into()) _ => return Err("Timestamp is required.".into()),
}); });
println!("Sent seek {:?}", seek_message); println!("Sent seek {:?}", seek_message);
session.send_message(Opcode::Seek, &seek_message)?; session.send_message(Opcode::Seek, &seek_message)?;
} else if let Some(_) = matches.subcommand_matches("pause") { } else if matches.subcommand_matches("pause").is_some() {
println!("Sent pause"); println!("Sent pause");
session.send_empty(Opcode::Pause)?; session.send_empty(Opcode::Pause)?;
} else if let Some(_) = matches.subcommand_matches("resume") { } else if matches.subcommand_matches("resume").is_some() {
println!("Sent resume"); println!("Sent resume");
session.send_empty(Opcode::Resume)?; session.send_empty(Opcode::Resume)?;
} else if let Some(_) = matches.subcommand_matches("stop") { } else if matches.subcommand_matches("stop").is_some() {
println!("Sent stop"); println!("Sent stop");
session.send_empty(Opcode::Stop)?; session.send_empty(Opcode::Stop)?;
} else if let Some(_) = matches.subcommand_matches("listen") { } else if matches.subcommand_matches("listen").is_some() {
println!("Starter listening to events..."); println!("Starter listening to events...");
let running = Arc::new(AtomicBool::new(true)); let running = Arc::new(AtomicBool::new(true));
@ -312,7 +366,8 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
println!("Ctrl+C triggered..."); println!("Ctrl+C triggered...");
r.store(false, Ordering::SeqCst); r.store(false, Ordering::SeqCst);
}).expect("Error setting Ctrl-C handler"); })
.expect("Error setting Ctrl-C handler");
println!("Waiting for Ctrl+C..."); println!("Waiting for Ctrl+C...");
@ -322,17 +377,23 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
} else if let Some(setvolume_matches) = matches.subcommand_matches("setvolume") { } else if let Some(setvolume_matches) = matches.subcommand_matches("setvolume") {
let setvolume_message = SetVolumeMessage::new(match setvolume_matches.value_of("volume") { let setvolume_message = SetVolumeMessage::new(match setvolume_matches.value_of("volume") {
Some(s) => s.parse::<f64>()?, Some(s) => s.parse::<f64>()?,
_ => return Err("Timestamp is required.".into()) _ => return Err("Timestamp is required.".into()),
}); });
println!("Sent setvolume {:?}", setvolume_message); println!("Sent setvolume {:?}", setvolume_message);
session.send_message(Opcode::SetVolume, &setvolume_message)?; session.send_message(Opcode::SetVolume, &setvolume_message)?;
} else if let Some(setspeed_matches) = matches.subcommand_matches("setspeed") { } else if let Some(setspeed_matches) = matches.subcommand_matches("setspeed") {
let setspeed_message = SetSpeedMessage::new(match setspeed_matches.value_of("speed") { let setspeed_message = SetSpeedMessage::new(match setspeed_matches.value_of("speed") {
Some(s) => s.parse::<f64>()?, Some(s) => s.parse::<f64>()?,
_ => return Err("Speed is required.".into()) _ => return Err("Speed is required.".into()),
}); });
println!("Sent setspeed {:?}", setspeed_message); println!("Sent setspeed {:?}", setspeed_message);
session.send_message(Opcode::SetSpeed, &setspeed_message)?; session.send_message(Opcode::SetSpeed, &setspeed_message)?;
} else if let Some(set_playlist_item_matches) = matches.subcommand_matches("set_playlist_item") {
let message = v3::SetPlaylistItemMessage {
item_index: set_playlist_item_matches.value_of("item_index").expect("item_index required").parse::<u64>()?,
};
session.send_message(Opcode::SetPlaylistItem, &message)?;
println!("Sent set_playlist_item {message:?}");
} else { } else {
println!("Invalid command. Use --help for more information."); println!("Invalid command. Use --help for more information.");
std::process::exit(1); std::process::exit(1);
@ -340,7 +401,7 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
println!("Waiting on other threads..."); println!("Waiting on other threads...");
if let Some(v) = join_handle { if let Some(v) = join_handle {
if let Err(_) = v.join() { if v.join().is_err() {
return Err("Failed to join thread.".into()); return Err("Failed to join thread.".into());
} }
} }
@ -364,19 +425,19 @@ impl ServerState {
} }
} }
fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String, running: &Arc<AtomicBool>) -> Result<(String, thread::JoinHandle<Result<(), String>>), String> { fn host_file_and_get_url(
local_ip: &IpAddr,
file_path: &str,
mime_type: &String,
running: &Arc<AtomicBool>,
) -> Result<(String, thread::JoinHandle<Result<(), String>>), String> {
let local_ip_str = if local_ip.is_ipv6() { let local_ip_str = if local_ip.is_ipv6() {
format!("[{}]", local_ip) format!("[{}]", local_ip)
} else { } else {
format!("{}", local_ip) format!("{}", local_ip)
}; };
let server = { let server = Server::http(format!("{local_ip_str}:0"))
let this = Server::http(format!("{local_ip_str}:0")); .map_err(|err| format!("Failed to create server: {err}"))?;
match this {
Ok(t) => Ok(t),
Err(e) => Err((|e| format!("Failed to create server: {}", e))(e)),
}
}?;
let url = match server.server_addr() { let url = match server.server_addr() {
ListenAddr::IP(addr) => format!("http://{local_ip_str}:{}/", addr.port()), ListenAddr::IP(addr) => format!("http://{local_ip_str}:{}/", addr.port()),
@ -399,14 +460,9 @@ fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String,
} }
let should_break = { let should_break = {
let state = { let state = state.lock().unwrap();
let this = state.lock(); state.active_connections == 0
match this { && state.last_request_time.elapsed() > Duration::from_secs(300)
Ok(t) => Ok(t),
Err(e) => Err((|e| format!("Mutex error: {}", e))(e)),
}
}?;
state.active_connections == 0 && state.last_request_time.elapsed() > Duration::from_secs(300)
}; };
if should_break { if should_break {
@ -418,34 +474,18 @@ fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String,
Ok(Some(request)) => { Ok(Some(request)) => {
println!("Request received."); println!("Request received.");
let mut state = { let mut state = state.lock().unwrap();
let this = state.lock();
match this {
Ok(t) => Ok(t),
Err(e) => Err((|e| format!("Mutex error: {}", e))(e)),
}
}?;
state.active_connections += 1; state.active_connections += 1;
state.last_request_time = Instant::now(); state.last_request_time = Instant::now();
let file = { let file = fs::File::open(&file_path_clone)
let this = fs::File::open(&file_path_clone); .map_err(|_| "Failed to open file.".to_owned())?;
match this {
Ok(t) => Ok(t),
Err(e) => Err((|_| "Failed to open file.".to_string())(e)),
}
}?;
let content_type_header = { let content_type_header =
let this = Header::from_str(format!("Content-Type: {}", mime_type_clone).as_str()); Header::from_str(format!("Content-Type: {}", mime_type_clone).as_str())
match this { .map_err(|_| "Failed to open file.".to_owned())?;
Ok(t) => Ok(t),
Err(e) => Err((|_| "Failed to open file.".to_string())(e)),
}
}?;
let response = Response::from_file(file) let response = Response::from_file(file).with_header(content_type_header);
.with_header(content_type_header);
if let Err(e) = request.respond(response) { if let Err(e) = request.respond(response) {
println!("Failed to respond to request: {}", e); println!("Failed to respond to request: {}", e);

View file

@ -1,79 +0,0 @@
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Debug)]
pub struct PlayMessage {
pub container: String,
pub url: Option<String>,
pub content: Option<String>,
pub time: Option<f64>,
pub speed: Option<f64>,
pub headers: Option<HashMap<String, String>>
}
impl PlayMessage {
pub fn new(container: String, url: Option<String>, content: Option<String>, time: Option<f64>, speed: Option<f64>, headers: Option<HashMap<String, String>>) -> Self {
Self { container, url, content, time, speed, headers }
}
}
#[derive(Serialize, Debug)]
pub struct SeekMessage {
pub time: f64,
}
impl SeekMessage {
pub fn new(time: f64) -> Self {
Self { time }
}
}
#[derive(Deserialize, Debug)]
pub struct PlaybackUpdateMessage {
#[serde(rename = "generationTime")]
pub generation_time: u64,
pub time: f64,
pub duration: f64,
pub speed: f64,
pub state: u8 //0 = None, 1 = Playing, 2 = Paused
}
#[derive(Deserialize, Debug)]
pub struct VolumeUpdateMessage {
#[serde(rename = "generationTime")]
pub generation_time: u64,
pub volume: f64 //(0-1)
}
#[derive(Serialize, Debug)]
pub struct SetVolumeMessage {
pub volume: f64,
}
impl SetVolumeMessage {
pub fn new(volume: f64) -> Self {
Self { volume }
}
}
#[derive(Serialize, Debug)]
pub struct SetSpeedMessage {
pub speed: f64,
}
impl SetSpeedMessage {
pub fn new(speed: f64) -> Self {
Self { speed }
}
}
#[derive(Deserialize, Debug)]
pub struct PlaybackErrorMessage {
pub message: String,
}
#[derive(Deserialize, Debug)]
pub struct VersionMessage {
pub version: u64,
}

View file

@ -0,0 +1,54 @@
use serde::{Deserialize, Serialize};
pub mod v2;
pub mod v3;
#[derive(Deserialize, Debug)]
pub struct PlaybackErrorMessage {
pub message: String,
}
#[derive(Deserialize, Serialize, Debug)]
pub struct VersionMessage {
pub version: u64,
}
#[derive(Serialize, Debug)]
pub struct SetSpeedMessage {
pub speed: f64,
}
impl SetSpeedMessage {
pub fn new(speed: f64) -> Self {
Self { speed }
}
}
#[derive(Deserialize, Debug)]
pub struct VolumeUpdateMessage {
#[serde(rename = "generationTime")]
pub generation_time: u64,
pub volume: f64, //(0-1)
}
#[derive(Serialize, Debug)]
pub struct SetVolumeMessage {
pub volume: f64,
}
impl SetVolumeMessage {
pub fn new(volume: f64) -> Self {
Self { volume }
}
}
#[derive(Serialize, Debug)]
pub struct SeekMessage {
pub time: f64,
}
impl SeekMessage {
pub fn new(time: f64) -> Self {
Self { time }
}
}

View file

@ -0,0 +1,43 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Debug)]
pub struct PlayMessage {
pub container: String,
pub url: Option<String>,
pub content: Option<String>,
pub time: Option<f64>,
pub speed: Option<f64>,
pub headers: Option<HashMap<String, String>>,
}
impl PlayMessage {
pub fn new(
container: String,
url: Option<String>,
content: Option<String>,
time: Option<f64>,
speed: Option<f64>,
headers: Option<HashMap<String, String>>,
) -> Self {
Self {
container,
url,
content,
time,
speed,
headers,
}
}
}
#[derive(Deserialize, Debug)]
pub struct PlaybackUpdateMessage {
#[serde(rename = "generationTime")]
pub generation_time: u64,
pub time: f64,
pub duration: f64,
pub speed: f64,
pub state: u8, //0 = None, 1 = Playing, 2 = Paused
}

View file

@ -0,0 +1,723 @@
use std::collections::HashMap;
use serde::{de, ser, Deserialize, Serialize};
use serde_json::{json, Value};
use serde_repr::{Deserialize_repr, Serialize_repr};
macro_rules! get_from_map {
($map:expr, $key:expr) => {
$map.get($key).ok_or(de::Error::missing_field($key))
};
}
#[derive(Debug, PartialEq, Clone)]
pub enum MetadataObject {
Generic {
title: Option<String>,
thumbnail_url: Option<String>,
custom: Value,
},
}
impl Serialize for MetadataObject {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
MetadataObject::Generic {
title,
thumbnail_url,
custom,
} => {
let mut map = serde_json::Map::new();
map.insert("type".to_owned(), json!(0u64));
map.insert(
"title".to_owned(),
match title {
Some(t) => Value::String(t.to_owned()),
None => Value::Null,
},
);
map.insert(
"thumbnailUrl".to_owned(),
match thumbnail_url {
Some(t) => Value::String(t.to_owned()),
None => Value::Null,
},
);
map.insert("custom".to_owned(), custom.clone());
map.serialize(serializer)
}
}
}
}
impl<'de> Deserialize<'de> for MetadataObject {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let mut map = serde_json::Map::deserialize(deserializer)?;
let type_ = map
.remove("type")
.ok_or(de::Error::missing_field("type"))?
.as_u64()
.ok_or(de::Error::custom("`type` is not an integer"))?;
let rest = Value::Object(map);
match type_ {
0 => {
let title = match rest.get("title") {
Some(t) => Some(
t.as_str()
.ok_or(de::Error::custom("`title` is not a string"))?
.to_owned(),
),
None => None,
};
let thumbnail_url = match rest.get("thumbnailUrl") {
Some(t) => Some(
t.as_str()
.ok_or(de::Error::custom("`thumbnailUrl` is not a string"))?
.to_owned(),
),
None => None,
};
Ok(Self::Generic {
title,
thumbnail_url,
custom: rest
.get("custom")
.ok_or(de::Error::missing_field("custom"))?
.clone(),
})
}
_ => Err(de::Error::custom(format!("Unknown metadata type {type_}"))),
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct PlayMessage {
/// The MIME type (video/mp4)
pub container: String,
// The URL to load (optional)
pub url: Option<String>,
// The content to load (i.e. a DASH manifest, json content, optional)
pub content: Option<String>,
// The time to start playing in seconds
pub time: Option<f64>,
// The desired volume (0-1)
pub volume: Option<f64>,
// The factor to multiply playback speed by (defaults to 1.0)
pub speed: Option<f64>,
// HTTP request headers to add to the play request Map<string, string>
pub headers: Option<HashMap<String, String>>,
pub metadata: Option<MetadataObject>,
}
#[derive(Deserialize_repr, Serialize_repr, Debug)]
#[repr(u8)]
pub enum ContentType {
Playlist = 0,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct MediaItem {
/// The MIME type (video/mp4)
pub container: String,
/// The URL to load (optional)
pub url: Option<String>,
/// The content to load (i.e. a DASH manifest, json content, optional)
pub content: Option<String>,
/// The time to start playing in seconds
pub time: Option<f64>,
/// The desired volume (0-1)
pub volume: Option<f64>,
/// The factor to multiply playback speed by (defaults to 1.0)
pub speed: Option<f64>,
/// Indicates if the receiver should preload the media item
pub cache: Option<bool>,
/// Indicates how long the item content is presented on screen in seconds
#[serde(rename = "showDuration")]
pub show_duration: Option<f64>,
/// HTTP request headers to add to the play request Map<string, string>
pub headers: Option<HashMap<String, String>>,
pub metadata: Option<MetadataObject>,
}
#[derive(Serialize, Debug)]
pub struct PlaylistContent {
#[serde(rename = "type")]
variant: ContentType,
pub items: Vec<MediaItem>,
/// Start position of the first item to play from the playlist
pub offset: Option<u64>, // int or float?
/// The desired volume (0-1)
pub volume: Option<f64>,
/// The factor to multiply playback speed by (defaults to 1.0)
pub speed: Option<f64>,
/// Count of media items should be pre-loaded forward from the current view index
#[serde(rename = "forwardCache")]
pub forward_cache: Option<u64>,
/// Count of media items should be pre-loaded backward from the current view index
#[serde(rename = "backwardCache")]
pub backward_cache: Option<u64>,
pub metadata: Option<MetadataObject>,
}
#[derive(Serialize_repr, Deserialize_repr, Debug)]
#[repr(u8)]
pub enum PlaybackState {
Idle = 0,
Playing = 1,
Paused = 2,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct PlaybackUpdateMessage {
// The time the packet was generated (unix time milliseconds)
#[serde(rename = "generationTime")]
pub generation_time: u64,
// The playback state
pub state: PlaybackState,
// The current time playing in seconds
pub time: Option<f64>,
// The duration in seconds
pub duration: Option<f64>,
// The playback speed factor
pub speed: Option<f64>,
// The playlist item index currently being played on receiver
#[serde(rename = "itemIndex")]
pub item_index: Option<u64>,
}
#[derive(Serialize, Debug)]
pub struct InitialSenderMessage {
#[serde(rename = "displayName")]
pub display_name: Option<String>,
#[serde(rename = "appName")]
pub app_name: Option<String>,
#[serde(rename = "appVersion")]
pub app_version: Option<String>,
}
#[derive(Deserialize, Debug)]
pub struct InitialReceiverMessage {
#[serde(rename = "displayName")]
pub display_name: Option<String>,
#[serde(rename = "appName")]
pub app_name: Option<String>,
#[serde(rename = "appVersion")]
pub app_version: Option<String>,
#[serde(rename = "playData")]
pub play_data: Option<PlayMessage>,
}
#[derive(Deserialize, Debug)]
pub struct PlayUpdateMessage {
#[serde(rename = "generationTime")]
pub generation_time: Option<u64>,
#[serde(rename = "playData")]
pub play_data: Option<PlayMessage>,
}
#[derive(Serialize, Debug)]
pub struct SetPlaylistItemMessage {
#[serde(rename = "itemIndex")]
pub item_index: u64,
}
#[derive(Deserialize, Debug)]
pub enum KeyNames {
ArrowLeft,
ArrowRight,
ArrowUp,
ArrowDown,
Enter,
}
impl KeyNames {
pub fn all() -> Vec<String> {
vec![
"ArrowLeft".to_owned(),
"ArrowRight".to_owned(),
"ArrowUp".to_owned(),
"ArrowDown".to_owned(),
"Enter".to_owned(),
]
}
}
#[derive(Debug, PartialEq)]
pub enum EventSubscribeObject {
MediaItemStart,
MediaItemEnd,
MediaItemChanged,
KeyDown { keys: Vec<String> },
KeyUp { keys: Vec<String> },
}
impl Serialize for EventSubscribeObject {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serde_json::Map::new();
let type_val: u64 = match self {
EventSubscribeObject::MediaItemStart => 0,
EventSubscribeObject::MediaItemEnd => 1,
EventSubscribeObject::MediaItemChanged => 2,
EventSubscribeObject::KeyDown { .. } => 3,
EventSubscribeObject::KeyUp { .. } => 4,
};
map.insert("type".to_owned(), json!(type_val));
let keys = match self {
EventSubscribeObject::KeyDown { keys } => Some(keys),
EventSubscribeObject::KeyUp { keys } => Some(keys),
_ => None,
};
if let Some(keys) = keys {
map.insert("keys".to_owned(), json!(keys));
}
map.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for EventSubscribeObject {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let mut map = serde_json::Map::deserialize(deserializer)?;
let type_ = map
.remove("type")
.ok_or(de::Error::missing_field("type"))?
.as_u64()
.ok_or(de::Error::custom("`type` is not an integer"))?;
let rest = Value::Object(map);
match type_ {
0 => Ok(Self::MediaItemStart),
1 => Ok(Self::MediaItemEnd),
2 => Ok(Self::MediaItemChanged),
3 | 4 => {
let keys = get_from_map!(rest, "keys")?
.as_array()
.ok_or(de::Error::custom("`type` is not an array"))?
.iter()
.map(|v| v.as_str().map(|s| s.to_owned()))
.collect::<Option<Vec<String>>>()
.ok_or(de::Error::custom("`type` is not an array of strings"))?;
if type_ == 3 {
Ok(Self::KeyDown { keys })
} else {
Ok(Self::KeyUp { keys })
}
}
_ => Err(de::Error::custom(format!("Unknown event type {type_}"))),
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct SubscribeEventMessage {
pub event: EventSubscribeObject,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct UnsubscribeEventMessage {
pub event: EventSubscribeObject,
}
#[derive(Debug, PartialEq, Copy, Clone)]
#[repr(u8)]
pub enum EventType {
MediaItemStart = 0,
MediaItemEnd = 1,
MediaItemChange = 2,
KeyDown = 3,
KeyUp = 4,
}
#[derive(Debug, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum EventObject {
MediaItem {
variant: EventType,
item: MediaItem,
},
Key {
variant: EventType,
key: String,
repeat: bool,
handled: bool,
},
}
impl Serialize for EventObject {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serde_json::Map::new();
match self {
EventObject::MediaItem { variant, item } => {
map.insert("type".to_owned(), json!(*variant as u8));
map.insert(
"item".to_owned(),
serde_json::to_value(item).map_err(ser::Error::custom)?,
);
}
EventObject::Key {
variant,
key,
repeat,
handled,
} => {
map.insert("type".to_owned(), json!(*variant as u8));
map.insert(
"key".to_owned(),
serde_json::to_value(key).map_err(ser::Error::custom)?,
);
map.insert(
"repeat".to_owned(),
serde_json::to_value(repeat).map_err(ser::Error::custom)?,
);
map.insert(
"handled".to_owned(),
serde_json::to_value(handled).map_err(ser::Error::custom)?,
);
}
}
map.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for EventObject {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: de::Deserializer<'de>,
{
let mut map = serde_json::Map::deserialize(deserializer)?;
let type_ = map
.remove("type")
.ok_or(de::Error::missing_field("type"))?
.as_u64()
.ok_or(de::Error::custom("`type` is not an integer"))?;
let rest = Value::Object(map);
match type_ {
#[allow(clippy::manual_range_patterns)]
0 | 1 | 2 => {
let variant = match type_ {
0 => EventType::MediaItemStart,
1 => EventType::MediaItemEnd,
_ => EventType::MediaItemChange,
};
let item = get_from_map!(rest, "item")?;
Ok(Self::MediaItem {
variant,
item: MediaItem::deserialize(item).map_err(de::Error::custom)?,
})
}
3 | 4 => {
let variant = if type_ == 3 {
EventType::KeyDown
} else {
EventType::KeyUp
};
Ok(Self::Key {
variant,
key: get_from_map!(rest, "key")?
.as_str()
.ok_or(de::Error::custom("`key` is not a string"))?
.to_owned(),
repeat: get_from_map!(rest, "repeat")?
.as_bool()
.ok_or(de::Error::custom("`repeat` is not a bool"))?,
handled: get_from_map!(rest, "handled")?
.as_bool()
.ok_or(de::Error::custom("`handled` is not a bool"))?,
})
}
_ => Err(de::Error::custom(format!("Unknown event type {type_}"))),
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct EventMessage {
#[serde(rename = "generationTime")]
pub generation_time: u64,
event: EventObject,
}
#[cfg(test)]
mod tests {
use super::*;
macro_rules! s {
($s:expr) => {
($s).to_string()
};
}
#[test]
fn serialize_metadata_object() {
assert_eq!(
&serde_json::to_string(&MetadataObject::Generic {
title: Some(s!("abc")),
thumbnail_url: Some(s!("def")),
custom: serde_json::Value::Null,
})
.unwrap(),
r#"{"custom":null,"thumbnailUrl":"def","title":"abc","type":0}"#
);
assert_eq!(
&serde_json::to_string(&MetadataObject::Generic {
title: None,
thumbnail_url: None,
custom: serde_json::Value::Null,
})
.unwrap(),
r#"{"custom":null,"thumbnailUrl":null,"title":null,"type":0}"#
);
}
#[test]
fn deserialize_metadata_object() {
assert_eq!(
serde_json::from_str::<MetadataObject>(
r#"{"type":0,"title":"abc","thumbnailUrl":"def","custom":null}"#
)
.unwrap(),
MetadataObject::Generic {
title: Some(s!("abc")),
thumbnail_url: Some(s!("def")),
custom: serde_json::Value::Null,
}
);
assert_eq!(
serde_json::from_str::<MetadataObject>(r#"{"type":0,"custom":null}"#).unwrap(),
MetadataObject::Generic {
title: None,
thumbnail_url: None,
custom: serde_json::Value::Null,
}
);
assert!(serde_json::from_str::<MetadataObject>(r#"{"type":1"#).is_err());
}
#[test]
fn serialize_event_sub_obj() {
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::MediaItemStart).unwrap(),
r#"{"type":0}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::MediaItemEnd).unwrap(),
r#"{"type":1}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::MediaItemChanged).unwrap(),
r#"{"type":2}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::KeyDown { keys: vec![] }).unwrap(),
r#"{"keys":[],"type":3}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::KeyDown { keys: vec![] }).unwrap(),
r#"{"keys":[],"type":3}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::KeyUp {
keys: vec![s!("abc"), s!("def")]
})
.unwrap(),
r#"{"keys":["abc","def"],"type":4}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::KeyDown {
keys: vec![s!("abc"), s!("def")]
})
.unwrap(),
r#"{"keys":["abc","def"],"type":3}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::KeyDown {
keys: vec![s!("\"\"")]
})
.unwrap(),
r#"{"keys":["\"\""],"type":3}"#
);
}
#[test]
fn deserialize_event_sub_obj() {
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"type":0}"#).unwrap(),
EventSubscribeObject::MediaItemStart
);
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"type":1}"#).unwrap(),
EventSubscribeObject::MediaItemEnd
);
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"type":2}"#).unwrap(),
EventSubscribeObject::MediaItemChanged
);
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"keys":[],"type":3}"#).unwrap(),
EventSubscribeObject::KeyDown { keys: vec![] }
);
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"keys":[],"type":4}"#).unwrap(),
EventSubscribeObject::KeyUp { keys: vec![] }
);
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"keys":["abc","def"],"type":3}"#)
.unwrap(),
EventSubscribeObject::KeyDown {
keys: vec![s!("abc"), s!("def")]
}
);
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"keys":["abc","def"],"type":4}"#)
.unwrap(),
EventSubscribeObject::KeyUp {
keys: vec![s!("abc"), s!("def")]
}
);
assert!(serde_json::from_str::<EventSubscribeObject>(r#""type":5}"#).is_err());
}
const EMPTY_TEST_MEDIA_ITEM: MediaItem = MediaItem {
container: String::new(),
url: None,
content: None,
time: None,
volume: None,
speed: None,
cache: None,
show_duration: None,
headers: None,
metadata: None,
};
const TEST_MEDIA_ITEM_JSON: &str = r#"{"cache":null,"container":"","content":null,"headers":null,"metadata":null,"showDuration":null,"speed":null,"time":null,"url":null,"volume":null}"#;
#[test]
fn serialize_event_obj() {
assert_eq!(
serde_json::to_string(&EventObject::MediaItem {
variant: EventType::MediaItemStart,
item: EMPTY_TEST_MEDIA_ITEM.clone(),
})
.unwrap(),
format!(r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":0}}"#),
);
assert_eq!(
serde_json::to_string(&EventObject::MediaItem {
variant: EventType::MediaItemEnd,
item: EMPTY_TEST_MEDIA_ITEM.clone(),
})
.unwrap(),
format!(r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":1}}"#),
);
assert_eq!(
serde_json::to_string(&EventObject::MediaItem {
variant: EventType::MediaItemChange,
item: EMPTY_TEST_MEDIA_ITEM.clone(),
})
.unwrap(),
format!(r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":2}}"#),
);
assert_eq!(
&serde_json::to_string(&EventObject::Key {
variant: EventType::KeyDown,
key: s!(""),
repeat: false,
handled: false,
})
.unwrap(),
r#"{"handled":false,"key":"","repeat":false,"type":3}"#
);
assert_eq!(
&serde_json::to_string(&EventObject::Key {
variant: EventType::KeyUp,
key: s!(""),
repeat: false,
handled: false,
})
.unwrap(),
r#"{"handled":false,"key":"","repeat":false,"type":4}"#
);
}
#[test]
fn deserialize_event_obj() {
assert_eq!(
serde_json::from_str::<EventObject>(&format!(
r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":0}}"#
))
.unwrap(),
EventObject::MediaItem {
variant: EventType::MediaItemStart,
item: EMPTY_TEST_MEDIA_ITEM.clone(),
}
);
assert_eq!(
serde_json::from_str::<EventObject>(&format!(
r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":1}}"#
))
.unwrap(),
EventObject::MediaItem {
variant: EventType::MediaItemEnd,
item: EMPTY_TEST_MEDIA_ITEM.clone(),
}
);
assert_eq!(
serde_json::from_str::<EventObject>(&format!(
r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":2}}"#
))
.unwrap(),
EventObject::MediaItem {
variant: EventType::MediaItemChange,
item: EMPTY_TEST_MEDIA_ITEM.clone(),
}
);
assert_eq!(
serde_json::from_str::<EventObject>(
r#"{"handled":false,"key":"","repeat":false,"type":3}"#
)
.unwrap(),
EventObject::Key {
variant: EventType::KeyDown,
key: s!(""),
repeat: false,
handled: false,
}
);
assert_eq!(
serde_json::from_str::<EventObject>(
r#"{"handled":false,"key":"","repeat":false,"type":4}"#
)
.unwrap(),
EventObject::Key {
variant: EventType::KeyUp,
key: s!(""),
repeat: false,
handled: false,
}
);
assert!(serde_json::from_str::<EventObject>(r#"{"type":5}"#).is_err());
}
}

View file

@ -1,12 +1,14 @@
use std::collections::VecDeque;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::TcpStream; use std::net::TcpStream;
use tungstenite::protocol::WebSocket as TWebSocket;
use tungstenite::Message; use tungstenite::Message;
use tungstenite::protocol::WebSocket;
pub trait Transport { pub trait Transport {
fn transport_read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error>; fn transport_read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error>;
fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error>; fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error>;
fn transport_shutdown(&mut self) -> Result<(), std::io::Error>; fn transport_shutdown(&mut self) -> Result<(), std::io::Error>;
fn transport_read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error>;
} }
impl Transport for TcpStream { impl Transport for TcpStream {
@ -21,36 +23,156 @@ impl Transport for TcpStream {
fn transport_shutdown(&mut self) -> Result<(), std::io::Error> { fn transport_shutdown(&mut self) -> Result<(), std::io::Error> {
self.shutdown(std::net::Shutdown::Both) self.shutdown(std::net::Shutdown::Both)
} }
fn transport_read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error> {
self.read_exact(buf)
}
} }
impl<T: Read + Write> Transport for WebSocket<T> { pub struct WebSocket<T>
fn transport_read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { where
match self.read() { T: Read + Write,
Ok(Message::Binary(data)) => { {
let len = std::cmp::min(buf.len(), data.len()); inner: TWebSocket<T>,
buf[..len].copy_from_slice(&data[..len]); buffer: VecDeque<u8>,
Ok(len) }
},
_ => Err(std::io::Error::new(std::io::ErrorKind::Other, "Invalid message type")) impl<T> WebSocket<T>
where
T: Read + Write,
{
pub fn new(web_socket: TWebSocket<T>) -> Self {
Self {
inner: web_socket,
buffer: VecDeque::new(),
} }
} }
pub fn read_buffered(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
if !self.buffer.is_empty() {
let bytes_to_read = buf.len().min(self.buffer.len());
assert!(buf.len() >= bytes_to_read);
assert!(self.buffer.len() >= bytes_to_read);
for b in buf.iter_mut().take(bytes_to_read) {
*b = self.buffer.pop_front().unwrap(); // Safe unwrap as bounds was checked previously
}
} else {
match self.inner.read() {
Ok(Message::Binary(data)) => {
let bytes_to_read = buf.len().min(data.len());
buf.copy_from_slice(&data[..bytes_to_read]);
for rest in data[bytes_to_read..].iter() {
self.buffer.push_back(*rest);
}
}
_ => return Err(std::io::Error::other("Invalid message type")),
}
}
Ok(buf.len())
}
}
impl<T> Transport for WebSocket<T>
where
T: Read + Write,
{
fn transport_read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.read_buffered(buf)
}
fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error> { fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error> {
self.write(Message::Binary(buf.to_vec())) self.inner
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; .write(Message::Binary(buf.to_vec()))
self.flush().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) .map_err(std::io::Error::other)?;
self.inner.flush().map_err(std::io::Error::other)
} }
fn transport_shutdown(&mut self) -> Result<(), std::io::Error> { fn transport_shutdown(&mut self) -> Result<(), std::io::Error> {
self.close(None).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; self.inner.close(None).map_err(std::io::Error::other)?;
loop { loop {
match self.read() { match self.inner.read() {
Ok(_) => continue, Ok(_) => continue,
Err(tungstenite::Error::ConnectionClosed) => break, Err(tungstenite::Error::ConnectionClosed) => break,
Err(e) => return Err(std::io::Error::new(std::io::ErrorKind::Other, e)), Err(e) => return Err(std::io::Error::other(e)),
} }
} }
Ok(()) Ok(())
} }
fn transport_read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error> {
let mut total_read = 0;
while total_read < buf.len() {
total_read += self.read_buffered(&mut buf[total_read..])?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::net::TcpListener;
use super::*;
#[test]
fn websocket_read_buffered() {
let jh = std::thread::spawn(|| {
let server = TcpListener::bind("127.0.0.1:51232").unwrap();
let stream = server.incoming().next().unwrap().unwrap();
let mut websocket = tungstenite::accept(stream).unwrap();
websocket
.send(tungstenite::Message::binary([1, 2, 3, 4]))
.unwrap();
websocket
.send(tungstenite::Message::binary([5, 6, 7, 8]))
.unwrap();
});
let (websocket, _) = tungstenite::connect("ws://127.0.0.1:51232").unwrap();
let mut websocket = WebSocket::new(websocket);
let mut buf = [0u8; 2];
assert_eq!(websocket.read_buffered(&mut buf).unwrap(), 2);
assert_eq!(buf, [1, 2]);
assert_eq!(websocket.read_buffered(&mut buf).unwrap(), 2);
assert_eq!(buf, [3, 4]);
let mut buf = [0u8; 4];
assert_eq!(websocket.read_buffered(&mut buf).unwrap(), 4);
assert_eq!(buf, [5, 6, 7, 8]);
let _ = websocket.transport_shutdown();
jh.join().unwrap();
}
#[test]
fn websocket_read_exact() {
let jh = std::thread::spawn(|| {
let server = TcpListener::bind("127.0.0.1:51234").unwrap();
let stream = server.incoming().next().unwrap().unwrap();
let mut websocket = tungstenite::accept(stream).unwrap();
websocket
.send(tungstenite::Message::binary([1, 2, 3]))
.unwrap();
});
let (websocket, _) = tungstenite::connect("ws://127.0.0.1:51234").unwrap();
let mut websocket = WebSocket::new(websocket);
fn read_exact<T: Transport>(stream: &mut T) {
let mut buf = [0u8; 3];
stream.transport_read_exact(&mut buf).unwrap();
assert_eq!(buf, [1, 2, 3]);
}
read_exact(&mut websocket);
let _ = websocket.transport_shutdown();
jh.join().unwrap();
}
} }

View file

@ -0,0 +1,14 @@
{
"contentType": 0,
"items": [
{
"container": "video/webm",
"url": "https://upload.wikimedia.org/wikipedia/commons/7/70/EVEREST.webm"
},
{
"container": "image/png",
"url": "https://upload.wikimedia.org/wikipedia/commons/thumb/6/62/NTV7_Testpattern.png/640px-NTV7_Testpattern.png"
}
],
"offset": 0
}